diff --git a/core/src/main/java/org/apache/iceberg/PartitionStats.java b/core/src/main/java/org/apache/iceberg/PartitionStats.java index e4cbd1f6b9bd..2b779a11a12e 100644 --- a/core/src/main/java/org/apache/iceberg/PartitionStats.java +++ b/core/src/main/java/org/apache/iceberg/PartitionStats.java @@ -18,6 +18,7 @@ */ package org.apache.iceberg; +import java.util.Objects; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; public class PartitionStats implements StructLike { @@ -249,4 +250,45 @@ public void set(int pos, T value) { throw new UnsupportedOperationException("Unknown position: " + pos); } } + + @Override + @SuppressWarnings("checkstyle:CyclomaticComplexity") + public boolean equals(Object other) { + if (this == other) { + return true; + } else if (!(other instanceof PartitionStats)) { + return false; + } + + PartitionStats that = (PartitionStats) other; + return Objects.equals(partition, that.partition) + && specId == that.specId + && dataRecordCount == that.dataRecordCount + && dataFileCount == that.dataFileCount + && totalDataFileSizeInBytes == that.totalDataFileSizeInBytes + && positionDeleteRecordCount == that.positionDeleteRecordCount + && positionDeleteFileCount == that.positionDeleteFileCount + && equalityDeleteRecordCount == that.equalityDeleteRecordCount + && equalityDeleteFileCount == that.equalityDeleteFileCount + && totalRecordCount == that.totalRecordCount + && Objects.equals(lastUpdatedAt, that.lastUpdatedAt) + && Objects.equals(lastUpdatedSnapshotId, that.lastUpdatedSnapshotId); + } + + @Override + public int hashCode() { + return Objects.hash( + partition, + specId, + dataRecordCount, + dataFileCount, + totalDataFileSizeInBytes, + positionDeleteRecordCount, + positionDeleteFileCount, + equalityDeleteRecordCount, + equalityDeleteFileCount, + totalRecordCount, + lastUpdatedAt, + lastUpdatedSnapshotId); + } } diff --git a/core/src/main/java/org/apache/iceberg/PartitionStatsUtil.java b/core/src/main/java/org/apache/iceberg/PartitionStatsUtil.java index 1fe4e6767fe6..a2b4a5ea2275 100644 --- a/core/src/main/java/org/apache/iceberg/PartitionStatsUtil.java +++ b/core/src/main/java/org/apache/iceberg/PartitionStatsUtil.java @@ -25,10 +25,13 @@ import java.util.List; import java.util.Map; import java.util.Queue; +import org.apache.iceberg.data.GenericRecord; +import org.apache.iceberg.data.Record; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Queues; import org.apache.iceberg.types.Comparators; +import org.apache.iceberg.types.Types; import org.apache.iceberg.types.Types.StructType; import org.apache.iceberg.util.PartitionMap; import org.apache.iceberg.util.PartitionUtil; @@ -87,13 +90,10 @@ private static PartitionMap collectStats( PartitionMap statsMap = PartitionMap.create(table.specs()); int specId = manifest.partitionSpecId(); PartitionSpec spec = table.specs().get(specId); - PartitionData keyTemplate = new PartitionData(partitionType); for (ManifestEntry entry : reader.entries()) { ContentFile file = entry.file(); - StructLike coercedPartition = - PartitionUtil.coercePartition(partitionType, spec, file.partition()); - StructLike key = keyTemplate.copyFor(coercedPartition); + Record key = coercedPartitionRecord(file, spec, partitionType); Snapshot snapshot = table.snapshot(entry.snapshotId()); PartitionStats stats = statsMap.computeIfAbsent(specId, key, () -> new PartitionStats(key, specId)); @@ -133,4 +133,19 @@ private static Collection mergeStats( return statsMap.values(); } + + private static Record coercedPartitionRecord( + ContentFile file, PartitionSpec spec, StructType partitionType) { + // keep the partition data as per the unified spec by coercing + StructLike partition = PartitionUtil.coercePartition(partitionType, spec, file.partition()); + + GenericRecord record = GenericRecord.create(partitionType); + List fields = partitionType.fields(); + for (int index = 0; index < fields.size(); index++) { + Object val = partition.get(index, fields.get(index).type().typeId().javaClass()); + record.set(index, val); + } + + return record; + } } diff --git a/core/src/main/java/org/apache/iceberg/data/IdentityPartitionConverters.java b/core/src/main/java/org/apache/iceberg/data/IdentityPartitionConverters.java index 4cb41263152d..114820443382 100644 --- a/core/src/main/java/org/apache/iceberg/data/IdentityPartitionConverters.java +++ b/core/src/main/java/org/apache/iceberg/data/IdentityPartitionConverters.java @@ -18,9 +18,12 @@ */ package org.apache.iceberg.data; +import java.nio.ByteBuffer; +import java.util.UUID; import org.apache.avro.generic.GenericData; import org.apache.iceberg.types.Type; import org.apache.iceberg.types.Types; +import org.apache.iceberg.util.ByteBuffers; import org.apache.iceberg.util.DateTimeUtil; public class IdentityPartitionConverters { @@ -48,6 +51,18 @@ public static Object convertConstant(Type type, Object value) { case FIXED: if (value instanceof GenericData.Fixed) { return ((GenericData.Fixed) value).bytes(); + } else if (value instanceof ByteBuffer) { + return ByteBuffers.toByteArray((ByteBuffer) value); + } + return value; + case UUID: + if (value instanceof ByteBuffer) { + return ByteBuffers.toByteArray((ByteBuffer) value); + } + return value; + case BINARY: + if (value instanceof byte[]) { + return ByteBuffer.wrap((byte[]) value); } return value; default: diff --git a/core/src/main/java/org/apache/iceberg/data/PartitionStatsRecord.java b/core/src/main/java/org/apache/iceberg/data/PartitionStatsRecord.java new file mode 100644 index 000000000000..1b4e40148e73 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/data/PartitionStatsRecord.java @@ -0,0 +1,170 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.data; + +import com.github.benmanes.caffeine.cache.Caffeine; +import com.github.benmanes.caffeine.cache.LoadingCache; +import java.util.List; +import java.util.Map; +import org.apache.iceberg.PartitionStats; +import org.apache.iceberg.Schema; +import org.apache.iceberg.StructLike; +import org.apache.iceberg.relocated.com.google.common.base.Objects; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.types.Types; +import org.apache.iceberg.types.Types.StructType; + +/** Wraps the {@link PartitionStats} as {@link Record}. Used by generic writers and readers. */ +public class PartitionStatsRecord implements Record, StructLike { + private static final LoadingCache> NAME_MAP_CACHE = + Caffeine.newBuilder() + .weakKeys() + .build( + struct -> { + Map idToPos = Maps.newHashMap(); + List fields = struct.fields(); + for (int index = 0; index < fields.size(); index++) { + idToPos.put(fields.get(index).name(), index); + } + return idToPos; + }); + + private final StructType struct; + private final PartitionStats partitionStats; + private final Map nameToPos; + + public static PartitionStatsRecord create(Schema schema, PartitionStats partitionStats) { + return new PartitionStatsRecord(schema.asStruct(), partitionStats); + } + + public static PartitionStatsRecord create(StructType struct, PartitionStats partitionStats) { + return new PartitionStatsRecord(struct, partitionStats); + } + + public PartitionStats unwrap() { + return partitionStats; + } + + private PartitionStatsRecord(StructType struct, PartitionStats partitionStats) { + this.struct = struct; + this.partitionStats = partitionStats; + this.nameToPos = NAME_MAP_CACHE.get(struct); + } + + private PartitionStatsRecord(PartitionStatsRecord toCopy) { + this.struct = toCopy.struct; + this.partitionStats = toCopy.partitionStats; + this.nameToPos = toCopy.nameToPos; + } + + private PartitionStatsRecord(PartitionStatsRecord toCopy, Map overwrite) { + this.struct = toCopy.struct; + this.partitionStats = toCopy.partitionStats; + this.nameToPos = toCopy.nameToPos; + for (Map.Entry entry : overwrite.entrySet()) { + setField(entry.getKey(), entry.getValue()); + } + } + + @Override + public StructType struct() { + return struct; + } + + @Override + public Object getField(String name) { + Integer pos = nameToPos.get(name); + Preconditions.checkArgument(pos != null, "Cannot get unknown field named: %s", name); + return partitionStats.get(pos, Object.class); + } + + @Override + public void setField(String name, Object value) { + Integer pos = nameToPos.get(name); + Preconditions.checkArgument(pos != null, "Cannot set unknown field named: %s", name); + partitionStats.set(pos, value); + } + + @Override + public int size() { + return partitionStats.size(); + } + + @Override + public Object get(int pos) { + return partitionStats.get(pos, Object.class); + } + + @Override + public T get(int pos, Class javaClass) { + Object value = get(pos); + if (value == null || javaClass.isInstance(value)) { + return javaClass.cast(value); + } else { + throw new IllegalStateException("Not an instance of " + javaClass.getName() + ": " + value); + } + } + + @Override + public void set(int pos, T value) { + partitionStats.set(pos, value); + } + + @Override + public PartitionStatsRecord copy() { + return new PartitionStatsRecord(this); + } + + @Override + public PartitionStatsRecord copy(Map overwriteValues) { + return new PartitionStatsRecord(this, overwriteValues); + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + sb.append("Record("); + for (int index = 0; index < partitionStats.size(); index++) { + if (index != 0) { + sb.append(", "); + } + sb.append(partitionStats.get(index, Object.class)); + } + sb.append(")"); + return sb.toString(); + } + + @Override + public boolean equals(Object other) { + if (this == other) { + return true; + } else if (!(other instanceof PartitionStatsRecord)) { + return false; + } + + PartitionStatsRecord that = (PartitionStatsRecord) other; + return this.partitionStats.equals(that.partitionStats); + } + + @Override + public int hashCode() { + return Objects.hashCode(partitionStats); + } +} diff --git a/core/src/test/java/org/apache/iceberg/TestPartitionStatsUtil.java b/core/src/test/java/org/apache/iceberg/TestPartitionStatsUtil.java index 541fcd2ca22d..cb9f5851da31 100644 --- a/core/src/test/java/org/apache/iceberg/TestPartitionStatsUtil.java +++ b/core/src/test/java/org/apache/iceberg/TestPartitionStatsUtil.java @@ -26,6 +26,7 @@ import java.io.IOException; import java.util.Collection; import java.util.List; +import org.apache.iceberg.data.GenericRecord; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.types.Types; import org.assertj.core.groups.Tuple; @@ -370,17 +371,17 @@ public void testPartitionStatsWithSchemaEvolution() throws Exception { snapshot2.snapshotId())); } - private static PartitionData partitionData(Types.StructType partitionType, String c2, String c3) { - PartitionData partitionData = new PartitionData(partitionType); - partitionData.set(0, c2); - partitionData.set(1, c3); - return partitionData; + private static StructLike partitionData(Types.StructType partitionType, String c2, String c3) { + GenericRecord record = GenericRecord.create(partitionType); + record.set(0, c2); + record.set(1, c3); + return record; } - private static PartitionData partitionData(Types.StructType partitionType, String c2) { - PartitionData partitionData = new PartitionData(partitionType); - partitionData.set(0, c2); - return partitionData; + private static StructLike partitionData(Types.StructType partitionType, String c2) { + GenericRecord record = GenericRecord.create(partitionType); + record.set(0, c2); + return record; } private static List prepareDataFiles(Table table) { diff --git a/core/src/test/java/org/apache/iceberg/TestTables.java b/core/src/test/java/org/apache/iceberg/TestTables.java index eeff5db8e5a6..ca9dbe5e9fcb 100644 --- a/core/src/test/java/org/apache/iceberg/TestTables.java +++ b/core/src/test/java/org/apache/iceberg/TestTables.java @@ -93,6 +93,26 @@ public static TestTable create( return new TestTable(ops, name, reporter); } + public static TestTable create( + File temp, + String name, + Schema schema, + PartitionSpec spec, + int formatVersion, + Map properties) { + TestTableOperations ops = new TestTableOperations(name, temp); + if (ops.current() != null) { + throw new AlreadyExistsException("Table %s already exists at location: %s", name, temp); + } + + ops.commit( + null, + newTableMetadata( + schema, spec, SortOrder.unsorted(), temp.toString(), properties, formatVersion)); + + return new TestTable(ops, name); + } + public static Transaction beginCreate(File temp, String name, Schema schema, PartitionSpec spec) { return beginCreate(temp, name, schema, spec, SortOrder.unsorted()); } diff --git a/data/src/main/java/org/apache/iceberg/data/PartitionStatsHandler.java b/data/src/main/java/org/apache/iceberg/data/PartitionStatsHandler.java new file mode 100644 index 000000000000..6c80ff982815 --- /dev/null +++ b/data/src/main/java/org/apache/iceberg/data/PartitionStatsHandler.java @@ -0,0 +1,330 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.data; + +import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT; +import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT_DEFAULT; + +import java.io.Closeable; +import java.io.IOException; +import java.io.UncheckedIOException; +import java.util.Collection; +import java.util.Iterator; +import java.util.List; +import java.util.function.BiFunction; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.HasTableOperations; +import org.apache.iceberg.ImmutableGenericPartitionStatisticsFile; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.PartitionStatisticsFile; +import org.apache.iceberg.PartitionStats; +import org.apache.iceberg.PartitionStatsUtil; +import org.apache.iceberg.Partitioning; +import org.apache.iceberg.Schema; +import org.apache.iceberg.Snapshot; +import org.apache.iceberg.StructLike; +import org.apache.iceberg.Table; +import org.apache.iceberg.avro.Avro; +import org.apache.iceberg.data.avro.DataReader; +import org.apache.iceberg.data.orc.GenericOrcReader; +import org.apache.iceberg.data.parquet.GenericParquetReaders; +import org.apache.iceberg.encryption.EncryptedFiles; +import org.apache.iceberg.encryption.EncryptionKeyMetadata; +import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.io.DataWriter; +import org.apache.iceberg.io.FileWriterFactory; +import org.apache.iceberg.io.InputFile; +import org.apache.iceberg.io.OutputFile; +import org.apache.iceberg.orc.ORC; +import org.apache.iceberg.parquet.Parquet; +import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.types.Types.IntegerType; +import org.apache.iceberg.types.Types.LongType; +import org.apache.iceberg.types.Types.NestedField; +import org.apache.iceberg.types.Types.StructType; +import org.apache.iceberg.util.SnapshotUtil; + +/** + * Computes, writes and reads the {@link PartitionStatisticsFile}. Uses generic readers and writers + * to support writing and reading of the stats in table default format. + */ +public final class PartitionStatsHandler { + + private PartitionStatsHandler() {} + + public enum Column { + PARTITION(0), + SPEC_ID(1), + DATA_RECORD_COUNT(2), + DATA_FILE_COUNT(3), + TOTAL_DATA_FILE_SIZE_IN_BYTES(4), + POSITION_DELETE_RECORD_COUNT(5), + POSITION_DELETE_FILE_COUNT(6), + EQUALITY_DELETE_RECORD_COUNT(7), + EQUALITY_DELETE_FILE_COUNT(8), + TOTAL_RECORD_COUNT(9), + LAST_UPDATED_AT(10), + LAST_UPDATED_SNAPSHOT_ID(11); + + private final int id; + + Column(int id) { + this.id = id; + } + + public int id() { + return id; + } + } + + /** + * Generates the partition stats file schema based on a given partition type. + * + *

Note: Provide the unified partition schema type as mentioned in the spec. + * + * @param partitionType unified partition schema type. + * @return a schema that corresponds to the provided unified partition type. + */ + public static Schema schema(StructType partitionType) { + Preconditions.checkState(!partitionType.fields().isEmpty(), "table must be partitioned"); + + return new Schema( + NestedField.required(1, Column.PARTITION.name(), partitionType), + NestedField.required(2, Column.SPEC_ID.name(), IntegerType.get()), + NestedField.required(3, Column.DATA_RECORD_COUNT.name(), LongType.get()), + NestedField.required(4, Column.DATA_FILE_COUNT.name(), IntegerType.get()), + NestedField.required(5, Column.TOTAL_DATA_FILE_SIZE_IN_BYTES.name(), LongType.get()), + NestedField.optional(6, Column.POSITION_DELETE_RECORD_COUNT.name(), LongType.get()), + NestedField.optional(7, Column.POSITION_DELETE_FILE_COUNT.name(), IntegerType.get()), + NestedField.optional(8, Column.EQUALITY_DELETE_RECORD_COUNT.name(), LongType.get()), + NestedField.optional(9, Column.EQUALITY_DELETE_FILE_COUNT.name(), IntegerType.get()), + NestedField.optional(10, Column.TOTAL_RECORD_COUNT.name(), LongType.get()), + NestedField.optional(11, Column.LAST_UPDATED_AT.name(), LongType.get()), + NestedField.optional(12, Column.LAST_UPDATED_SNAPSHOT_ID.name(), LongType.get())); + } + + /** + * Computes and writes the {@link PartitionStatisticsFile} for a given table's current snapshot. + * + * @param table The {@link Table} for which the partition statistics is computed. + * @return {@link PartitionStatisticsFile} for the current snapshot. + */ + public static PartitionStatisticsFile computeAndWriteStatsFile(Table table) { + return computeAndWriteStatsFile(table, null); + } + + /** + * Computes and writes the {@link PartitionStatisticsFile} for a given table and branch. + * + * @param table The {@link Table} for which the partition statistics is computed. + * @param branch A branch information to select the required snapshot. + * @return {@link PartitionStatisticsFile} for the given branch. + */ + public static PartitionStatisticsFile computeAndWriteStatsFile(Table table, String branch) { + Snapshot currentSnapshot = SnapshotUtil.latestSnapshot(table, branch); + if (currentSnapshot == null) { + Preconditions.checkArgument( + branch == null, "Couldn't find the snapshot for the branch %s", branch); + return null; + } + + StructType partitionType = Partitioning.partitionType(table); + Schema schema = schema(partitionType); + + Collection stats = PartitionStatsUtil.computeStats(table, currentSnapshot); + List sortedStats = PartitionStatsUtil.sortStats(stats, partitionType); + Iterator convertedRecords = statsToRecords(sortedStats, schema); + return writePartitionStatsFile(table, currentSnapshot.snapshotId(), schema, convertedRecords); + } + + @VisibleForTesting + static PartitionStatisticsFile writePartitionStatsFile( + Table table, long snapshotId, Schema dataSchema, Iterator records) { + OutputFile outputFile = newPartitionStatsFile(table, snapshotId); + FileWriterFactory factory = + GenericFileWriterFactory.builderFor(table) + .dataSchema(dataSchema) + .dataFileFormat(fileFormat(outputFile.location())) + .build(); + DataWriter writer = + factory.newDataWriter( + EncryptedFiles.encryptedOutput(outputFile, EncryptionKeyMetadata.EMPTY), + PartitionSpec.unpartitioned(), + null); + try (Closeable toClose = writer) { + records.forEachRemaining(writer::write); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + + return ImmutableGenericPartitionStatisticsFile.builder() + .snapshotId(snapshotId) + .path(outputFile.location()) + .fileSizeInBytes(outputFile.toInputFile().getLength()) + .build(); + } + + /** + * Reads partition statistics from the specified {@link InputFile} using given schema. + * + * @param schema The {@link Schema} of the partition statistics file. + * @param inputFile An {@link InputFile} pointing to the partition stats file. + */ + public static CloseableIterable readPartitionStatsFile( + Schema schema, InputFile inputFile) { + CloseableIterable records; + FileFormat fileFormat = fileFormat(inputFile.location()); + switch (fileFormat) { + case PARQUET: + records = + Parquet.read(inputFile) + .project(schema) + .createReaderFunc( + fileSchema -> GenericParquetReaders.buildReader(schema, fileSchema)) + .build(); + break; + case ORC: + records = + ORC.read(inputFile) + .project(schema) + .createReaderFunc(fileSchema -> GenericOrcReader.buildReader(schema, fileSchema)) + .build(); + break; + case AVRO: + records = Avro.read(inputFile).project(schema).createReaderFunc(DataReader::create).build(); + break; + default: + throw new UnsupportedOperationException("Unsupported file format:" + fileFormat.name()); + } + + return CloseableIterable.transform( + records, PartitionStatsHandler::recordToPartitionStatsRecord); + } + + private static FileFormat fileFormat(String fileLocation) { + return FileFormat.fromString(fileLocation.substring(fileLocation.lastIndexOf(".") + 1)); + } + + private static OutputFile newPartitionStatsFile(Table table, long snapshotId) { + FileFormat fileFormat = + fileFormat( + table.properties().getOrDefault(DEFAULT_FILE_FORMAT, DEFAULT_FILE_FORMAT_DEFAULT)); + return table + .io() + .newOutputFile( + ((HasTableOperations) table) + .operations() + .metadataFileLocation( + fileFormat.addExtension(String.format("partition-stats-%d", snapshotId)))); + } + + private static PartitionStatsRecord recordToPartitionStatsRecord(Record record) { + PartitionStats stats = + new PartitionStats( + record.get(Column.PARTITION.id(), StructLike.class), + record.get(Column.SPEC_ID.id(), Integer.class)); + stats.set(Column.DATA_RECORD_COUNT.id(), record.get(Column.DATA_RECORD_COUNT.id(), Long.class)); + stats.set(Column.DATA_FILE_COUNT.id(), record.get(Column.DATA_FILE_COUNT.id(), Integer.class)); + stats.set( + Column.TOTAL_DATA_FILE_SIZE_IN_BYTES.id(), + record.get(Column.TOTAL_DATA_FILE_SIZE_IN_BYTES.id(), Long.class)); + stats.set( + Column.POSITION_DELETE_RECORD_COUNT.id(), + record.get(Column.POSITION_DELETE_RECORD_COUNT.id(), Long.class)); + stats.set( + Column.POSITION_DELETE_FILE_COUNT.id(), + record.get(Column.POSITION_DELETE_FILE_COUNT.id(), Integer.class)); + stats.set( + Column.EQUALITY_DELETE_RECORD_COUNT.id(), + record.get(Column.EQUALITY_DELETE_RECORD_COUNT.id(), Long.class)); + stats.set( + Column.EQUALITY_DELETE_FILE_COUNT.id(), + record.get(Column.EQUALITY_DELETE_FILE_COUNT.id(), Integer.class)); + stats.set( + Column.TOTAL_RECORD_COUNT.id(), record.get(Column.TOTAL_RECORD_COUNT.id(), Long.class)); + stats.set(Column.LAST_UPDATED_AT.id(), record.get(Column.LAST_UPDATED_AT.id(), Long.class)); + stats.set( + Column.LAST_UPDATED_SNAPSHOT_ID.id(), + record.get(Column.LAST_UPDATED_SNAPSHOT_ID.id(), Long.class)); + + return PartitionStatsRecord.create(record.struct(), stats); + } + + @VisibleForTesting + static Iterator statsToRecords( + List stats, Schema recordSchema) { + StructType partitionType = (StructType) recordSchema.findField(Column.PARTITION.name()).type(); + return new TransformIteratorWithBiFunction<>( + stats.iterator(), + (partitionStats, schema) -> { + PartitionStatsRecord record = PartitionStatsRecord.create(schema, partitionStats); + record.set( + Column.PARTITION.id(), + convertPartitionValues( + record.get(Column.PARTITION.id(), StructLike.class), partitionType)); + return record; + }, + recordSchema); + } + + private static Record convertPartitionValues( + StructLike partitionRecord, StructType partitionType) { + if (partitionRecord == null) { + return null; + } + + GenericRecord converted = GenericRecord.create(partitionType); + for (int index = 0; index < partitionRecord.size(); index++) { + Object val = partitionRecord.get(index, Object.class); + if (val != null) { + converted.set( + index, + IdentityPartitionConverters.convertConstant( + partitionType.fields().get(index).type(), val)); + } + } + + return converted; + } + + private static class TransformIteratorWithBiFunction implements Iterator { + private final Iterator iterator; + private final BiFunction transformer; + private final U additionalInput; + + TransformIteratorWithBiFunction( + Iterator iterator, BiFunction transformer, U additionalInput) { + this.iterator = iterator; + this.transformer = transformer; + this.additionalInput = additionalInput; + } + + @Override + public boolean hasNext() { + return iterator.hasNext(); + } + + @Override + public R next() { + T nextElement = iterator.next(); + return transformer.apply(nextElement, additionalInput); + } + } +} diff --git a/data/src/test/java/org/apache/iceberg/data/TestPartitionStatsHandler.java b/data/src/test/java/org/apache/iceberg/data/TestPartitionStatsHandler.java new file mode 100644 index 000000000000..ff293114c030 --- /dev/null +++ b/data/src/test/java/org/apache/iceberg/data/TestPartitionStatsHandler.java @@ -0,0 +1,569 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.data; + +import static org.apache.iceberg.data.PartitionStatsHandler.Column; +import static org.apache.iceberg.types.Types.NestedField.optional; +import static org.apache.iceberg.types.Types.NestedField.required; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +import java.io.File; +import java.io.IOException; +import java.util.Arrays; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.Random; +import java.util.UUID; +import java.util.concurrent.ThreadLocalRandom; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.DeleteFile; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.Files; +import org.apache.iceberg.Parameter; +import org.apache.iceberg.ParameterizedTestExtension; +import org.apache.iceberg.Parameters; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.PartitionStatisticsFile; +import org.apache.iceberg.PartitionStats; +import org.apache.iceberg.Partitioning; +import org.apache.iceberg.Schema; +import org.apache.iceberg.Snapshot; +import org.apache.iceberg.SortOrder; +import org.apache.iceberg.Table; +import org.apache.iceberg.TableProperties; +import org.apache.iceberg.TestHelpers; +import org.apache.iceberg.TestTables; +import org.apache.iceberg.deletes.PositionDelete; +import org.apache.iceberg.expressions.Literal; +import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.io.OutputFile; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.types.Types; +import org.apache.iceberg.util.UUIDUtil; +import org.assertj.core.groups.Tuple; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestTemplate; +import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.api.io.TempDir; + +@ExtendWith(ParameterizedTestExtension.class) +public class TestPartitionStatsHandler { + private static final Schema SCHEMA = + new Schema( + optional(1, "c1", Types.IntegerType.get()), + optional(2, "c2", Types.StringType.get()), + optional(3, "c3", Types.StringType.get())); + + protected static final PartitionSpec SPEC = + PartitionSpec.builderFor(SCHEMA).identity("c2").identity("c3").build(); + + @TempDir public File temp; + + private static final Random RANDOM = ThreadLocalRandom.current(); + + @Parameters(name = "fileFormat = {0}") + public static List parameters() { + return Arrays.asList(FileFormat.PARQUET, FileFormat.ORC, FileFormat.AVRO); + } + + @Parameter private FileFormat format; + + @Test + public void testPartitionStatsOnEmptyTable() throws Exception { + Table testTable = TestTables.create(tempDir("empty_table"), "empty_table", SCHEMA, SPEC, 2); + assertThat(PartitionStatsHandler.computeAndWriteStatsFile(testTable)).isNull(); + } + + @Test + public void testPartitionStatsOnEmptyBranch() throws Exception { + Table testTable = TestTables.create(tempDir("empty_branch"), "empty_branch", SCHEMA, SPEC, 2); + testTable.manageSnapshots().createBranch("b1").commit(); + PartitionStatisticsFile partitionStatisticsFile = + PartitionStatsHandler.computeAndWriteStatsFile(testTable, "b1"); + // creates an empty stats file since the dummy snapshot exist + assertThat(partitionStatisticsFile.fileSizeInBytes()).isEqualTo(0L); + assertThat(partitionStatisticsFile.snapshotId()) + .isEqualTo(testTable.refs().get("b1").snapshotId()); + } + + @Test + public void testPartitionStatsOnInvalidSnapshot() throws Exception { + Table testTable = + TestTables.create(tempDir("invalid_snapshot"), "invalid_snapshot", SCHEMA, SPEC, 2); + assertThatThrownBy( + () -> PartitionStatsHandler.computeAndWriteStatsFile(testTable, "INVALID_BRANCH")) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("Couldn't find the snapshot for the branch INVALID_BRANCH"); + } + + @Test + public void testPartitionStatsOnUnPartitionedTable() throws Exception { + Table testTable = + TestTables.create( + tempDir("unpartitioned_table"), + "unpartitioned_table", + SCHEMA, + PartitionSpec.unpartitioned(), + 2); + + List records = prepareRecords(testTable.schema()); + DataFile dataFile = FileHelpers.writeDataFile(testTable, outputFile(), records); + testTable.newAppend().appendFile(dataFile).commit(); + + assertThatThrownBy(() -> PartitionStatsHandler.computeAndWriteStatsFile(testTable)) + .isInstanceOf(IllegalStateException.class) + .hasMessage("table must be partitioned"); + } + + @Test + public void testAllDatatypePartitionWriting() throws Exception { + Schema schema = + new Schema( + required(100, "id", Types.LongType.get()), + optional(101, "data", Types.StringType.get()), + required(102, "b", Types.BooleanType.get()), + optional(103, "i", Types.IntegerType.get()), + required(104, "l", Types.LongType.get()), + optional(105, "f", Types.FloatType.get()), + required(106, "d", Types.DoubleType.get()), + optional(107, "date", Types.DateType.get()), + required(108, "ts", Types.TimestampType.withoutZone()), + required(110, "s", Types.StringType.get()), + required(111, "uuid", Types.UUIDType.get()), + required(112, "fixed", Types.FixedType.ofLength(7)), + optional(113, "bytes", Types.BinaryType.get()), + required(114, "dec_9_0", Types.DecimalType.of(9, 0)), + required(115, "dec_11_2", Types.DecimalType.of(11, 2)), + required(116, "dec_38_10", Types.DecimalType.of(38, 10)), // maximum precision + required(117, "time", Types.TimeType.get())); + + PartitionSpec spec = + PartitionSpec.builderFor(schema) + .identity("b") + .identity("i") + .identity("l") + .identity("f") + .identity("d") + .identity("date") + .identity("ts") + .identity("s") + .identity("uuid") + .identity("fixed") + .identity("bytes") + .identity("dec_9_0") + .identity("dec_11_2") + .identity("dec_38_10") + .identity("time") + .build(); + + Table testTable = + TestTables.create( + tempDir("test_all_type"), "test_all_type", schema, spec, SortOrder.unsorted(), 2); + + Types.StructType partitionSchema = Partitioning.partitionType(testTable); + Schema dataSchema = PartitionStatsHandler.schema(partitionSchema); + + Record partitionData = + GenericRecord.create(dataSchema.findField(Column.PARTITION.name()).type().asStructType()); + partitionData.set(0, true); + partitionData.set(1, 42); + partitionData.set(2, 42L); + partitionData.set(3, 3.14f); + partitionData.set(4, 3.141592653589793); + partitionData.set(5, Literal.of("2022-01-01").to(Types.DateType.get()).value()); + partitionData.set( + 6, Literal.of("2017-12-01T10:12:55.038194").to(Types.TimestampType.withoutZone()).value()); + partitionData.set(7, "string"); + partitionData.set(8, UUIDUtil.convertToByteBuffer(UUID.randomUUID())); + partitionData.set(9, new byte[] {0, 1, 2, 3, 4, 5, 6}); + partitionData.set(10, new byte[] {1, 2, 3}); + partitionData.set(11, Literal.of("123456789").to(Types.DecimalType.of(9, 0)).value()); + partitionData.set(12, Literal.of("1234567.89").to(Types.DecimalType.of(11, 2)).value()); + partitionData.set( + 13, Literal.of("12345678901234567890.1234567890").to(Types.DecimalType.of(38, 10)).value()); + partitionData.set(14, Literal.of("10:10:10").to(Types.TimeType.get()).value()); + + PartitionStats partitionStats = new PartitionStats(partitionData, RANDOM.nextInt(10)); + partitionStats.set(Column.DATA_RECORD_COUNT.id(), RANDOM.nextLong()); + partitionStats.set(Column.DATA_FILE_COUNT.id(), RANDOM.nextInt()); + partitionStats.set(Column.TOTAL_DATA_FILE_SIZE_IN_BYTES.id(), 1024L * RANDOM.nextInt(20)); + + Iterator convertedRecords = + PartitionStatsHandler.statsToRecords(Collections.singletonList(partitionStats), dataSchema); + List expectedRecords = Lists.newArrayList(convertedRecords); + PartitionStatisticsFile statisticsFile = + PartitionStatsHandler.writePartitionStatsFile( + testTable, 42L, dataSchema, expectedRecords.iterator()); + + List writtenRecords; + try (CloseableIterable recordIterator = + PartitionStatsHandler.readPartitionStatsFile( + dataSchema, Files.localInput(statisticsFile.path()))) { + writtenRecords = Lists.newArrayList(recordIterator); + } + assertThat(writtenRecords).isEqualTo(expectedRecords); + } + + @Test + public void testOptionalFieldsWriting() throws Exception { + PartitionSpec spec = PartitionSpec.builderFor(SCHEMA).identity("c1").build(); + Table testTable = + TestTables.create( + tempDir("test_partition_stats_optional"), + "test_partition_stats_optional", + SCHEMA, + spec, + SortOrder.unsorted(), + 2); + + Types.StructType partitionSchema = Partitioning.partitionType(testTable); + Schema dataSchema = PartitionStatsHandler.schema(partitionSchema); + + ImmutableList.Builder partitionListBuilder = ImmutableList.builder(); + + for (int i = 0; i < 5; i++) { + GenericRecord partitionData = + GenericRecord.create(dataSchema.findField(Column.PARTITION.name()).type().asStructType()); + partitionData.set(0, RANDOM.nextInt()); + + PartitionStats stats = new PartitionStats(partitionData, RANDOM.nextInt(10)); + + stats.set(Column.PARTITION.ordinal(), partitionData); + stats.set(Column.DATA_RECORD_COUNT.ordinal(), RANDOM.nextLong()); + stats.set(Column.DATA_FILE_COUNT.ordinal(), RANDOM.nextInt()); + stats.set(Column.TOTAL_DATA_FILE_SIZE_IN_BYTES.ordinal(), 1024L * RANDOM.nextInt(20)); + stats.set(Column.POSITION_DELETE_RECORD_COUNT.ordinal(), null); + stats.set(Column.POSITION_DELETE_FILE_COUNT.ordinal(), null); + stats.set(Column.EQUALITY_DELETE_RECORD_COUNT.ordinal(), null); + stats.set(Column.EQUALITY_DELETE_FILE_COUNT.ordinal(), null); + stats.set(Column.TOTAL_RECORD_COUNT.ordinal(), null); + stats.set(Column.LAST_UPDATED_AT.ordinal(), null); + stats.set(Column.LAST_UPDATED_SNAPSHOT_ID.ordinal(), null); + + partitionListBuilder.add(stats); + } + + Iterator convertedRecords = + PartitionStatsHandler.statsToRecords(partitionListBuilder.build(), dataSchema); + + List expectedRecords = Lists.newArrayList(convertedRecords); + + PartitionStatisticsFile statisticsFile = + PartitionStatsHandler.writePartitionStatsFile( + testTable, 42L, dataSchema, expectedRecords.iterator()); + + List writtenRecords; + try (CloseableIterable recordIterator = + PartitionStatsHandler.readPartitionStatsFile( + dataSchema, Files.localInput(statisticsFile.path()))) { + writtenRecords = Lists.newArrayList(recordIterator); + } + assertThat(writtenRecords).isEqualTo(expectedRecords); + assertThat(expectedRecords.get(0).unwrap()) + .extracting( + PartitionStats::positionDeleteRecordCount, + PartitionStats::positionDeleteFileCount, + PartitionStats::equalityDeleteRecordCount, + PartitionStats::equalityDeleteFileCount, + PartitionStats::totalRecordCount, + PartitionStats::lastUpdatedAt, + PartitionStats::lastUpdatedSnapshotId) + .isEqualTo( + Arrays.asList( + 0L, 0, 0L, 0, 0L, null, null)); // null counters should be initialized to zero. + } + + @SuppressWarnings("checkstyle:MethodLength") + @TestTemplate // Tests for all the table formats (PARQUET, ORC, AVRO) + public void testPartitionStats() throws Exception { + Table testTable = + TestTables.create( + tempDir("partition_stats_" + format.name()), + "partition_stats_compute_" + format.name(), + SCHEMA, + SPEC, + 2, + ImmutableMap.of(TableProperties.DEFAULT_FILE_FORMAT, format.name())); + + List records = prepareRecords(testTable.schema()); + DataFile dataFile1 = + FileHelpers.writeDataFile( + testTable, outputFile(), TestHelpers.Row.of("foo", "A"), records.subList(0, 3)); + DataFile dataFile2 = + FileHelpers.writeDataFile( + testTable, outputFile(), TestHelpers.Row.of("foo", "B"), records.subList(3, 4)); + DataFile dataFile3 = + FileHelpers.writeDataFile( + testTable, outputFile(), TestHelpers.Row.of("bar", "A"), records.subList(4, 5)); + DataFile dataFile4 = + FileHelpers.writeDataFile( + testTable, outputFile(), TestHelpers.Row.of("bar", "B"), records.subList(5, 7)); + + for (int i = 0; i < 3; i++) { + // insert same set of seven records thrice to have a new manifest files + testTable + .newAppend() + .appendFile(dataFile1) + .appendFile(dataFile2) + .appendFile(dataFile3) + .appendFile(dataFile4) + .commit(); + } + + Snapshot snapshot1 = testTable.currentSnapshot(); + Schema recordSchema = PartitionStatsHandler.schema(Partitioning.partitionType(testTable)); + Types.StructType partitionType = + recordSchema.findField(Column.PARTITION.name()).type().asStructType(); + computeAndValidatePartitionStats( + testTable, + recordSchema, + Tuple.tuple( + partitionRecord(partitionType, "foo", "A"), + 0, + 9L, + 3, + 3 * dataFile1.fileSizeInBytes(), + 0L, + 0, + 0L, + 0, + 0L, + snapshot1.timestampMillis(), + snapshot1.snapshotId()), + Tuple.tuple( + partitionRecord(partitionType, "foo", "B"), + 0, + 3L, + 3, + 3 * dataFile2.fileSizeInBytes(), + 0L, + 0, + 0L, + 0, + 0L, + snapshot1.timestampMillis(), + snapshot1.snapshotId()), + Tuple.tuple( + partitionRecord(partitionType, "bar", "A"), + 0, + 3L, + 3, + 3 * dataFile3.fileSizeInBytes(), + 0L, + 0, + 0L, + 0, + 0L, + snapshot1.timestampMillis(), + snapshot1.snapshotId()), + Tuple.tuple( + partitionRecord(partitionType, "bar", "B"), + 0, + 6L, + 3, + 3 * dataFile4.fileSizeInBytes(), + 0L, + 0, + 0L, + 0, + 0L, + snapshot1.timestampMillis(), + snapshot1.snapshotId())); + + DeleteFile posDeletes = commitPositionDeletes(testTable, dataFile1); + Snapshot snapshot2 = testTable.currentSnapshot(); + + DeleteFile eqDeletes = commitEqualityDeletes(testTable); + Snapshot snapshot3 = testTable.currentSnapshot(); + + recordSchema = PartitionStatsHandler.schema(Partitioning.partitionType(testTable)); + partitionType = recordSchema.findField(Column.PARTITION.name()).type().asStructType(); + computeAndValidatePartitionStats( + testTable, + recordSchema, + Tuple.tuple( + partitionRecord(partitionType, "foo", "A"), + 0, + 9L, + 3, + 3 * dataFile1.fileSizeInBytes(), + 0L, + 0, + eqDeletes.recordCount(), + 1, + 0L, + snapshot3.timestampMillis(), + snapshot3.snapshotId()), + Tuple.tuple( + partitionRecord(partitionType, "foo", "B"), + 0, + 3L, + 3, + 3 * dataFile2.fileSizeInBytes(), + 0L, + 0, + 0L, + 0, + 0L, + snapshot1.timestampMillis(), + snapshot1.snapshotId()), + Tuple.tuple( + partitionRecord(partitionType, "bar", "A"), + 0, + 3L, + 3, + 3 * dataFile3.fileSizeInBytes(), + posDeletes.recordCount(), + 1, + 0L, + 0, + 0L, + snapshot2.timestampMillis(), + snapshot2.snapshotId()), + Tuple.tuple( + partitionRecord(partitionType, "bar", "B"), + 0, + 6L, + 3, + 3 * dataFile4.fileSizeInBytes(), + 0L, + 0, + 0L, + 0, + 0L, + snapshot1.timestampMillis(), + snapshot1.snapshotId())); + } + + private OutputFile outputFile() throws IOException { + return Files.localOutput(File.createTempFile("data", null, tempDir("stats"))); + } + + private static Record partitionRecord(Types.StructType partitionType, String c2, String c3) { + Record partitionData = GenericRecord.create(partitionType); + partitionData.set(0, c2); + partitionData.set(1, c3); + return partitionData; + } + + private static List prepareRecords(Schema schema) { + GenericRecord record = GenericRecord.create(schema); + List records = Lists.newArrayList(); + // foo 4 records, bar 3 records + // foo, A -> 3 records + records.add(record.copy("c1", 0, "c2", "foo", "c3", "A")); + records.add(record.copy("c1", 1, "c2", "foo", "c3", "A")); + records.add(record.copy("c1", 2, "c2", "foo", "c3", "A")); + // foo, B -> 1 record + records.add(record.copy("c1", 3, "c2", "foo", "c3", "B")); + // bar, A -> 1 record + records.add(record.copy("c1", 4, "c2", "bar", "c3", "A")); + // bar, B -> 2 records + records.add(record.copy("c1", 5, "c2", "bar", "c3", "B")); + records.add(record.copy("c1", 6, "c2", "bar", "c3", "B")); + return records; + } + + private static void computeAndValidatePartitionStats( + Table testTable, Schema recordSchema, Tuple... expectedValues) throws IOException { + // compute and commit partition stats file + Snapshot currentSnapshot = testTable.currentSnapshot(); + PartitionStatisticsFile result = PartitionStatsHandler.computeAndWriteStatsFile(testTable); + testTable.updatePartitionStatistics().setPartitionStatistics(result).commit(); + assertThat(result.snapshotId()).isEqualTo(currentSnapshot.snapshotId()); + + // read the partition entries from the stats file + List partitionStats; + try (CloseableIterable recordIterator = + PartitionStatsHandler.readPartitionStatsFile( + recordSchema, Files.localInput(result.path()))) { + partitionStats = Lists.newArrayList(recordIterator); + } + assertThat(partitionStats) + .extracting(PartitionStatsRecord::unwrap) + .extracting( + PartitionStats::partition, + PartitionStats::specId, + PartitionStats::dataRecordCount, + PartitionStats::dataFileCount, + PartitionStats::totalDataFileSizeInBytes, + PartitionStats::positionDeleteRecordCount, + PartitionStats::positionDeleteFileCount, + PartitionStats::equalityDeleteRecordCount, + PartitionStats::equalityDeleteFileCount, + PartitionStats::totalRecordCount, + PartitionStats::lastUpdatedAt, + PartitionStats::lastUpdatedSnapshotId) + .containsExactlyInAnyOrder(expectedValues); + } + + private DeleteFile commitEqualityDeletes(Table testTable) throws IOException { + Schema deleteRowSchema = testTable.schema().select("c1"); + Record dataDelete = GenericRecord.create(deleteRowSchema); + List dataDeletes = + Lists.newArrayList(dataDelete.copy("c1", 1), dataDelete.copy("c1", 2)); + + DeleteFile eqDeletes = + FileHelpers.writeDeleteFile( + testTable, + Files.localOutput(File.createTempFile("junit", null, tempDir("eq_delete"))), + TestHelpers.Row.of("foo", "A"), + dataDeletes, + deleteRowSchema); + + testTable.newRowDelta().addDeletes(eqDeletes).commit(); + return eqDeletes; + } + + private DeleteFile commitPositionDeletes(Table testTable, DataFile dataFile1) throws IOException { + List> deletes = Lists.newArrayList(); + for (long i = 0; i < 2; i++) { + deletes.add( + positionDelete(testTable.schema(), dataFile1.location(), i, (int) i, String.valueOf(i))); + } + DeleteFile posDeletes = + FileHelpers.writePosDeleteFile( + testTable, + Files.localOutput(File.createTempFile("junit", null, tempDir("pos_delete"))), + TestHelpers.Row.of("bar", "A"), + deletes); + + testTable.newRowDelta().addDeletes(posDeletes).commit(); + return posDeletes; + } + + private static PositionDelete positionDelete( + Schema tableSchema, CharSequence path, Long position, Object... values) { + PositionDelete posDelete = PositionDelete.create(); + GenericRecord nested = GenericRecord.create(tableSchema); + for (int i = 0; i < values.length; i++) { + nested.set(i, values[i]); + } + posDelete.set(path, position, nested); + return posDelete; + } + + private File tempDir(String folderName) throws IOException { + return java.nio.file.Files.createTempDirectory(temp.toPath(), folderName).toFile(); + } +}