diff --git a/core/src/main/java/org/apache/iceberg/PartitionStatsUtil.java b/core/src/main/java/org/apache/iceberg/PartitionStatsUtil.java index a2faecb24826..ef8fa35f5419 100644 --- a/core/src/main/java/org/apache/iceberg/PartitionStatsUtil.java +++ b/core/src/main/java/org/apache/iceberg/PartitionStatsUtil.java @@ -46,10 +46,12 @@ public enum Column { } /** - * Generates a schema as per partition statistics spec based on the given partition type. + * Generates the Partition Stats Files Schema based on a given partition type. + * + *

Note: Provide the unified partition tuple as mentioned in the spec. * * @param partitionType the struct type that defines the structure of the partition. - * @return a schema that corresponds to the provided partition type. + * @return a schema that corresponds to the provided unified partition type. */ public static Schema schema(Types.StructType partitionType) { Preconditions.checkState( @@ -88,6 +90,10 @@ public static Schema schema(Types.StructType partitionType) { */ public static CloseableIterable fromManifest( Table table, ManifestFile manifest, Schema recordSchema) { + Preconditions.checkState( + !recordSchema.findField(Column.PARTITION.name()).type().asStructType().fields().isEmpty(), + "record schema should not be unpartitioned"); + return CloseableIterable.transform( ManifestFiles.open(manifest, table.io(), table.specs()) .select(BaseScan.scanColumns(manifest.content())) @@ -191,28 +197,24 @@ private static Record coercedPartitionData( } private static void checkAndIncrementLong(Record toUpdate, Record fromRecord, Column column) { - if (fromRecord.get(column.ordinal()) != null) { - if (toUpdate.get(column.ordinal()) != null) { - toUpdate.set( - column.ordinal(), - toUpdate.get(column.ordinal(), Long.class) - + fromRecord.get(column.ordinal(), Long.class)); - } else { - toUpdate.set(column.ordinal(), fromRecord.get(column.ordinal(), Long.class)); - } + if ((fromRecord.get(column.ordinal()) != null) && (toUpdate.get(column.ordinal()) != null)) { + toUpdate.set( + column.ordinal(), + toUpdate.get(column.ordinal(), Long.class) + + fromRecord.get(column.ordinal(), Long.class)); + } else if (fromRecord.get(column.ordinal()) != null) { + toUpdate.set(column.ordinal(), fromRecord.get(column.ordinal(), Long.class)); } } private static void checkAndIncrementInt(Record toUpdate, Record fromRecord, Column column) { - if (fromRecord.get(column.ordinal()) != null) { - if (toUpdate.get(column.ordinal()) != null) { - toUpdate.set( - column.ordinal(), - toUpdate.get(column.ordinal(), Integer.class) - + fromRecord.get(column.ordinal(), Integer.class)); - } else { - toUpdate.set(column.ordinal(), fromRecord.get(column.ordinal(), Integer.class)); - } + if ((fromRecord.get(column.ordinal()) != null) && (toUpdate.get(column.ordinal()) != null)) { + toUpdate.set( + column.ordinal(), + toUpdate.get(column.ordinal(), Integer.class) + + fromRecord.get(column.ordinal(), Integer.class)); + } else if (fromRecord.get(column.ordinal()) != null) { + toUpdate.set(column.ordinal(), fromRecord.get(column.ordinal(), Integer.class)); } } } diff --git a/core/src/main/java/org/apache/iceberg/data/IdentityPartitionConverters.java b/core/src/main/java/org/apache/iceberg/data/IdentityPartitionConverters.java index e90e61e0ba5a..b3b4c75d1c3e 100644 --- a/core/src/main/java/org/apache/iceberg/data/IdentityPartitionConverters.java +++ b/core/src/main/java/org/apache/iceberg/data/IdentityPartitionConverters.java @@ -23,6 +23,7 @@ import org.apache.avro.generic.GenericData; import org.apache.iceberg.types.Type; import org.apache.iceberg.types.Types; +import org.apache.iceberg.util.ByteBuffers; import org.apache.iceberg.util.DateTimeUtil; public class IdentityPartitionConverters { @@ -51,12 +52,12 @@ public static Object convertConstant(Type type, Object value) { if (value instanceof GenericData.Fixed) { return ((GenericData.Fixed) value).bytes(); } else if (value instanceof ByteBuffer) { - return ((ByteBuffer) value).array(); + return ByteBuffers.toByteArray((ByteBuffer) value); } return value; case UUID: if (value instanceof ByteBuffer) { - return ((ByteBuffer) value).array(); + return ByteBuffers.toByteArray((ByteBuffer) value); } return value; default: diff --git a/data/src/jmh/java/org/apache/iceberg/PartitionStatsGeneratorBenchmark.java b/data/src/jmh/java/org/apache/iceberg/PartitionStatsGeneratorBenchmark.java index 8bbfcfffe996..5aec62193a2e 100644 --- a/data/src/jmh/java/org/apache/iceberg/PartitionStatsGeneratorBenchmark.java +++ b/data/src/jmh/java/org/apache/iceberg/PartitionStatsGeneratorBenchmark.java @@ -28,9 +28,10 @@ import java.nio.file.Paths; import java.util.Comparator; import java.util.concurrent.TimeUnit; +import java.util.stream.IntStream; import java.util.stream.Stream; import org.apache.iceberg.data.PartitionStatsGenerator; -import org.apache.iceberg.data.PartitionStatsWriterUtil; +import org.apache.iceberg.data.PartitionStatsGeneratorUtil; import org.apache.iceberg.data.Record; import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.types.Types; @@ -66,50 +67,45 @@ public class PartitionStatsGeneratorBenchmark { private String baseDir; - // create 3 manifests with same partition values - private static final int ITERATION_COUNTER = 3; + // Create 10k manifests + private static final int MANIFEST_COUNTER = 10000; - // create 3 * 10000 manifests - private static final int PARTITION_COUNT = 10000; + // each manifest with 100 partition values + private static final int PARTITION_PER_MANIFEST = 100; - // one data file manifest - private static final int DATA_FILES_PER_PARTITION_COUNT = 1; + // 20 data files per partition, which results in 2k data files per manifest + private static final int DATA_FILES_PER_PARTITION_COUNT = 20; private Table table; - private PartitionStatisticsFile result; - @Setup - public void setupBenchmark() throws IOException { + public void setupBenchmark() { baseDir = Paths.get(new File(System.getProperty("java.io.tmpdir")).getAbsolutePath()).toString(); table = TestTables.create(new File(baseDir), "foo", SCHEMA, SPEC, SortOrder.unsorted(), 2); - for (int interations = 0; interations < ITERATION_COUNTER; interations++) { - for (int partitionOrdinal = 0; partitionOrdinal < PARTITION_COUNT; partitionOrdinal++) { - StructLike partition = TestHelpers.Row.of(partitionOrdinal); - AppendFiles appendFiles = table.newAppend(); - for (int fileOrdinal = 0; fileOrdinal < DATA_FILES_PER_PARTITION_COUNT; fileOrdinal++) { - DataFile dataFile = FileGenerationUtil.generateDataFile(table, partition); - appendFiles.appendFile(dataFile); - } - - appendFiles.commit(); - } - } + IntStream.range(0, MANIFEST_COUNTER) + .forEach( + manifestCount -> { + AppendFiles appendFiles = table.newAppend(); + + IntStream.range(0, PARTITION_PER_MANIFEST) + .forEach( + partitionOrdinal -> { + StructLike partition = TestHelpers.Row.of(partitionOrdinal); + IntStream.range(0, DATA_FILES_PER_PARTITION_COUNT) + .forEach( + fileOrdinal -> + appendFiles.appendFile( + FileGenerationUtil.generateDataFile(table, partition))); + }); + + appendFiles.commit(); + }); } @TearDown public void tearDownBenchmark() throws IOException { - // validate row count - try (CloseableIterable recordIterator = - PartitionStatsWriterUtil.readPartitionStatsFile( - PartitionStatsUtil.schema(Partitioning.partitionType(table)), - Files.localInput(result.path()))) { - assertThat(recordIterator).hasSize(PARTITION_COUNT); - } - - // clean up the temp folder if (baseDir != null) { try (Stream walk = java.nio.file.Files.walk(Paths.get(baseDir))) { walk.sorted(Comparator.reverseOrder()).map(Path::toFile).forEach(File::delete); @@ -120,12 +116,20 @@ public void tearDownBenchmark() throws IOException { @Benchmark @Threads(1) - public void writePartitionStats() { + public void benchmarkPartitionStats() throws IOException { Snapshot currentSnapshot = table.currentSnapshot(); PartitionStatsGenerator partitionStatsGenerator = new PartitionStatsGenerator(table); - result = partitionStatsGenerator.generate(); + PartitionStatisticsFile result = partitionStatsGenerator.generate(); table.updatePartitionStatistics().setPartitionStatistics(result).commit(); assertThat(result.snapshotId()).isEqualTo(currentSnapshot.snapshotId()); + + // validate row count + try (CloseableIterable recordIterator = + PartitionStatsGeneratorUtil.readPartitionStatsFile( + PartitionStatsUtil.schema(Partitioning.partitionType(table)), + Files.localInput(result.path()))) { + assertThat(recordIterator).hasSize(PARTITION_PER_MANIFEST); + } } } diff --git a/data/src/main/java/org/apache/iceberg/data/PartitionStatsGenerator.java b/data/src/main/java/org/apache/iceberg/data/PartitionStatsGenerator.java index 213586fcf6aa..ad167660c4eb 100644 --- a/data/src/main/java/org/apache/iceberg/data/PartitionStatsGenerator.java +++ b/data/src/main/java/org/apache/iceberg/data/PartitionStatsGenerator.java @@ -45,12 +45,17 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +/** + * Generates {@link PartitionStatisticsFile} file as per the spec for the given table. Computes the + * stats by going through the partition info stored in each manifest file. + */ public class PartitionStatsGenerator { private static final Logger LOG = LoggerFactory.getLogger(PartitionStatsGenerator.class); private final Table table; private String branch; private Types.StructType partitionType; + // Map of PartitionData, partition-stats-entry per partitionData. private Map partitionEntryMap; public PartitionStatsGenerator(Table table) { @@ -77,7 +82,6 @@ public PartitionStatisticsFile generate() { } partitionType = Partitioning.partitionType(table); - // Map of partitionData, partition-stats-entry per partitionData. partitionEntryMap = Maps.newConcurrentMap(); Schema dataSchema = PartitionStatsUtil.schema(partitionType); @@ -119,8 +123,8 @@ public PartitionStatisticsFile generate() { Iterators.transform(sortedKeys.iterator(), this::convertPartitionRecords); OutputFile outputFile = - PartitionStatsWriterUtil.newPartitionStatsFile(table, currentSnapshot.snapshotId()); - PartitionStatsWriterUtil.writePartitionStatsFile(table, entriesForWriter, outputFile); + PartitionStatsGeneratorUtil.newPartitionStatsFile(table, currentSnapshot.snapshotId()); + PartitionStatsGeneratorUtil.writePartitionStatsFile(table, entriesForWriter, outputFile); return ImmutableGenericPartitionStatisticsFile.builder() .snapshotId(currentSnapshot.snapshotId()) .path(outputFile.location()) diff --git a/data/src/main/java/org/apache/iceberg/data/PartitionStatsWriterUtil.java b/data/src/main/java/org/apache/iceberg/data/PartitionStatsGeneratorUtil.java similarity index 69% rename from data/src/main/java/org/apache/iceberg/data/PartitionStatsWriterUtil.java rename to data/src/main/java/org/apache/iceberg/data/PartitionStatsGeneratorUtil.java index 1fe1c1886a8a..103c70a9dd53 100644 --- a/data/src/main/java/org/apache/iceberg/data/PartitionStatsWriterUtil.java +++ b/data/src/main/java/org/apache/iceberg/data/PartitionStatsGeneratorUtil.java @@ -28,6 +28,7 @@ import org.apache.iceberg.FileFormat; import org.apache.iceberg.HasTableOperations; import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.PartitionStatisticsFile; import org.apache.iceberg.PartitionStatsUtil; import org.apache.iceberg.Partitioning; import org.apache.iceberg.Schema; @@ -42,10 +43,22 @@ import org.apache.iceberg.io.OutputFile; import org.apache.iceberg.parquet.Parquet; -public final class PartitionStatsWriterUtil { +/** + * Util to write and read the {@link PartitionStatisticsFile}. Uses generic readers and writes to + * support writing and reading of the stats in table default format. + */ +public final class PartitionStatsGeneratorUtil { - private PartitionStatsWriterUtil() {} + private PartitionStatsGeneratorUtil() {} + /** + * Creates a new {@link OutputFile} for storing partition statistics for the specified table. The + * output file extension will be same as table's default format. + * + * @param table The {@link Table} for which the partition statistics file is being created. + * @param snapshotId The ID of the snapshot associated with the partition statistics. + * @return A new {@link OutputFile} that can be used to write partition statistics. + */ public static OutputFile newPartitionStatsFile(Table table, long snapshotId) { FileFormat fileFormat = FileFormat.fromString( @@ -59,12 +72,20 @@ public static OutputFile newPartitionStatsFile(Table table, long snapshotId) { fileFormat.addExtension(String.format("partition-stats-%d", snapshotId)))); } + /** + * Writes partition statistics to the specified {@link OutputFile} for the given table. + * + * @param table The {@link Table} for which the partition statistics are being written. + * @param records An {@link Iterator} of {@link Record} to be written into the file. + * @param outputFile The {@link OutputFile} where the partition statistics will be written. Should + * include the file extension. + */ public static void writePartitionStatsFile( Table table, Iterator records, OutputFile outputFile) { Schema dataSchema = PartitionStatsUtil.schema(Partitioning.partitionType(table)); FileFormat fileFormat = FileFormat.fromString( - table.properties().getOrDefault(DEFAULT_FILE_FORMAT, DEFAULT_FILE_FORMAT_DEFAULT)); + outputFile.location().substring(outputFile.location().lastIndexOf(".") + 1)); FileWriterFactory factory = GenericFileWriterFactory.builderFor(table) .dataSchema(dataSchema) @@ -82,6 +103,12 @@ public static void writePartitionStatsFile( } } + /** + * Reads partition statistics from the specified {@link InputFile} using given schema. + * + * @param schema The {@link Schema} of the partition statistics file. + * @param inputFile An {@link InputFile} pointing to the partition stats file. + */ public static CloseableIterable readPartitionStatsFile( Schema schema, InputFile inputFile) { // TODO: support other formats or introduce GenericFileReader diff --git a/data/src/test/java/org/apache/iceberg/data/TestPartitionStatsGenerator.java b/data/src/test/java/org/apache/iceberg/data/TestPartitionStatsGenerator.java index 1dd478732118..e022287519d8 100644 --- a/data/src/test/java/org/apache/iceberg/data/TestPartitionStatsGenerator.java +++ b/data/src/test/java/org/apache/iceberg/data/TestPartitionStatsGenerator.java @@ -106,6 +106,27 @@ public void testPartitionStatsOnInvalidSnapshot() throws Exception { .hasMessage("Couldn't find the snapshot for the branch INVALID_BRANCH"); } + @Test + public void testPartitionStatsOnUnPartitionedTable() throws Exception { + Table testTable = + TestTables.create( + temp.newFolder("unpartitioned_table"), + "unpartitioned_table", + SCHEMA, + PartitionSpec.unpartitioned(), + SortOrder.unsorted(), + 2); + + List records = prepareRecords(testTable.schema()); + DataFile dataFile = FileHelpers.writeDataFile(testTable, outputFile(), records); + testTable.newAppend().appendFile(dataFile).commit(); + + PartitionStatsGenerator partitionStatsGenerator = new PartitionStatsGenerator(testTable); + assertThatThrownBy(partitionStatsGenerator::generate) + .isInstanceOf(IllegalStateException.class) + .hasMessage("getting schema for an unpartitioned table"); + } + @SuppressWarnings("checkstyle:MethodLength") @Test public void testPartitionStats() throws Exception { @@ -497,7 +518,7 @@ private static void computeAndValidatePartitionStats( // read the partition entries from the stats file List rows; try (CloseableIterable recordIterator = - PartitionStatsWriterUtil.readPartitionStatsFile( + PartitionStatsGeneratorUtil.readPartitionStatsFile( recordSchema, Files.localInput(result.path()))) { rows = Lists.newArrayList(recordIterator); } diff --git a/data/src/test/java/org/apache/iceberg/data/TestPartitionStatsWriterUtil.java b/data/src/test/java/org/apache/iceberg/data/TestPartitionStatsGeneratorUtil.java similarity index 97% rename from data/src/test/java/org/apache/iceberg/data/TestPartitionStatsWriterUtil.java rename to data/src/test/java/org/apache/iceberg/data/TestPartitionStatsGeneratorUtil.java index c972b3bb8200..a853362c2f57 100644 --- a/data/src/test/java/org/apache/iceberg/data/TestPartitionStatsWriterUtil.java +++ b/data/src/test/java/org/apache/iceberg/data/TestPartitionStatsGeneratorUtil.java @@ -51,7 +51,7 @@ import org.junit.Test; import org.junit.rules.TemporaryFolder; -public class TestPartitionStatsWriterUtil { +public class TestPartitionStatsGeneratorUtil { private static final Schema SCHEMA = new Schema( required(1, "id", Types.LongType.get()), @@ -105,7 +105,7 @@ public void testPartitionStats() throws Exception { } @Test - public void testPartitionStatsOptionalFields() throws Exception { + public void testOptionalFields() throws Exception { PartitionSpec spec = PartitionSpec.builderFor(SCHEMA).identity("id").build(); Table testTable = TestTables.create( @@ -230,14 +230,14 @@ public void testAllDatatypePartition() throws Exception { private static void testPartitionStats( Table testTable, List expectedRecords, Schema dataSchema) throws IOException { - OutputFile outputFile = PartitionStatsWriterUtil.newPartitionStatsFile(testTable, 42L); - PartitionStatsWriterUtil.writePartitionStatsFile( + OutputFile outputFile = PartitionStatsGeneratorUtil.newPartitionStatsFile(testTable, 42L); + PartitionStatsGeneratorUtil.writePartitionStatsFile( testTable, expectedRecords.iterator(), outputFile); assertThat(Paths.get(outputFile.location())).exists(); List writtenRecords; try (CloseableIterable recordIterator = - PartitionStatsWriterUtil.readPartitionStatsFile( + PartitionStatsGeneratorUtil.readPartitionStatsFile( dataSchema, Files.localInput(outputFile.location()))) { writtenRecords = Lists.newArrayList(recordIterator); }