From 0d2be519a7967b72ab2edb4c239a539b113501f4 Mon Sep 17 00:00:00 2001 From: ajantha-bhat Date: Thu, 29 Aug 2024 13:13:24 +0530 Subject: [PATCH] refactor --- .../org/apache/iceberg/PartitionStats.java | 339 ++++++++++++++++++ .../apache/iceberg/PartitionStatsUtil.java | 264 +++++--------- .../data/IdentityPartitionConverters.java | 5 + .../iceberg/data/PartitionStatsRecord.java | 176 +++++++++ .../PartitionStatsGeneratorBenchmark.java | 14 +- .../iceberg/data/PartitionStatsGenerator.java | 152 -------- .../data/PartitionStatsGeneratorUtil.java | 139 ------- .../iceberg/data/PartitionStatsHandler.java | 321 +++++++++++++++++ .../data/TestPartitionStatsGeneratorUtil.java | 258 ------------- ...or.java => TestPartitionStatsHandler.java} | 217 ++++++++--- 10 files changed, 1102 insertions(+), 783 deletions(-) create mode 100644 core/src/main/java/org/apache/iceberg/PartitionStats.java create mode 100644 core/src/main/java/org/apache/iceberg/data/PartitionStatsRecord.java delete mode 100644 data/src/main/java/org/apache/iceberg/data/PartitionStatsGenerator.java delete mode 100644 data/src/main/java/org/apache/iceberg/data/PartitionStatsGeneratorUtil.java create mode 100644 data/src/main/java/org/apache/iceberg/data/PartitionStatsHandler.java delete mode 100644 data/src/test/java/org/apache/iceberg/data/TestPartitionStatsGeneratorUtil.java rename data/src/test/java/org/apache/iceberg/data/{TestPartitionStatsGenerator.java => TestPartitionStatsHandler.java} (69%) diff --git a/core/src/main/java/org/apache/iceberg/PartitionStats.java b/core/src/main/java/org/apache/iceberg/PartitionStats.java new file mode 100644 index 000000000000..a7c16c6eb59b --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/PartitionStats.java @@ -0,0 +1,339 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg; + +import java.util.Objects; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; + +public class PartitionStats implements StructLike { + + private Record partition; // PartitionData as Record + private int specId; + private long dataRecordCount; + private int dataFileCount; + private long totalDataFileSizeInBytes; + private long positionDeleteRecordCount; + private int positionDeleteFileCount; + private long equalityDeleteRecordCount; + private int equalityDeleteFileCount; + private long totalRecordCount; + private Long lastUpdatedAt; // null by default + private Long lastUpdatedSnapshotId; // null by default + + private static final int STATS_COUNT = 12; + + public PartitionStats(Record partition) { + this.partition = partition; + } + + public Record partition() { + return partition; + } + + public int specId() { + return specId; + } + + public void setSpecId(int specId) { + this.specId = specId; + } + + public long dataRecordCount() { + return dataRecordCount; + } + + public void setDataRecordCount(long dataRecordCount) { + this.dataRecordCount = dataRecordCount; + } + + public int dataFileCount() { + return dataFileCount; + } + + public void setDataFileCount(int dataFileCount) { + this.dataFileCount = dataFileCount; + } + + public long totalDataFileSizeInBytes() { + return totalDataFileSizeInBytes; + } + + public void setTotalDataFileSizeInBytes(long totalDataFileSizeInBytes) { + this.totalDataFileSizeInBytes = totalDataFileSizeInBytes; + } + + public long positionDeleteRecordCount() { + return positionDeleteRecordCount; + } + + public void setPositionDeleteRecordCount(long positionDeleteRecordCount) { + this.positionDeleteRecordCount = positionDeleteRecordCount; + } + + public int positionDeleteFileCount() { + return positionDeleteFileCount; + } + + public void setPositionDeleteFileCount(int positionDeleteFileCount) { + this.positionDeleteFileCount = positionDeleteFileCount; + } + + public long equalityDeleteRecordCount() { + return equalityDeleteRecordCount; + } + + public void setEqualityDeleteRecordCount(long equalityDeleteRecordCount) { + this.equalityDeleteRecordCount = equalityDeleteRecordCount; + } + + public int equalityDeleteFileCount() { + return equalityDeleteFileCount; + } + + public void setEqualityDeleteFileCount(int equalityDeleteFileCount) { + this.equalityDeleteFileCount = equalityDeleteFileCount; + } + + public long totalRecordCount() { + return totalRecordCount; + } + + public void setTotalRecordCount(long totalRecordCount) { + this.totalRecordCount = totalRecordCount; + } + + public Long lastUpdatedAt() { + return lastUpdatedAt; + } + + public void setLastUpdatedAt(Long lastUpdatedAt) { + this.lastUpdatedAt = lastUpdatedAt; + } + + public Long lastUpdatedSnapshotId() { + return lastUpdatedSnapshotId; + } + + public void setLastUpdatedSnapshotId(Long lastUpdatedSnapshotId) { + this.lastUpdatedSnapshotId = lastUpdatedSnapshotId; + } + + /** + * Updates the partition stats from the data/delete file. + * + * @param file the ContentFile from the manifest entry. + * @param snapshot the snapshot corresponding to the live entry. + */ + public void liveEntry(ContentFile file, Snapshot snapshot) { + Preconditions.checkState(file != null, "content file cannot be null"); + + specId = file.specId(); + + switch (file.content()) { + case DATA: + dataRecordCount = file.recordCount(); + dataFileCount = 1; + totalDataFileSizeInBytes = file.fileSizeInBytes(); + break; + case POSITION_DELETES: + positionDeleteRecordCount = file.recordCount(); + positionDeleteFileCount = 1; + break; + case EQUALITY_DELETES: + equalityDeleteRecordCount = file.recordCount(); + equalityDeleteFileCount = 1; + break; + default: + throw new UnsupportedOperationException("Unsupported file content type: " + file.content()); + } + + if (snapshot != null) { + lastUpdatedSnapshotId = snapshot.snapshotId(); + lastUpdatedAt = snapshot.timestampMillis(); + } + + // Note: Not computing the `TOTAL_RECORD_COUNT` for now as it needs scanning the data. + } + + /** + * Updates the modified time and snapshot ID for the deleted manifest entry. + * + * @param snapshot the snapshot corresponding to the deleted manifest entry. + */ + public void deletedEntry(Snapshot snapshot) { + if (snapshot != null && lastUpdatedAt != null && snapshot.timestampMillis() > lastUpdatedAt) { + lastUpdatedAt = snapshot.timestampMillis(); + lastUpdatedSnapshotId = snapshot.snapshotId(); + } + } + + /** + * Appends statistics from given entry to current entry. + * + * @param entry the entry from which statistics will be sourced. + */ + public void appendStats(PartitionStats entry) { + Preconditions.checkState(entry != null, "entry to update from cannot be null"); + + specId = Math.max(specId, entry.specId); + dataRecordCount += entry.dataRecordCount; + dataFileCount += entry.dataFileCount; + totalDataFileSizeInBytes += entry.totalDataFileSizeInBytes; + positionDeleteRecordCount += entry.positionDeleteRecordCount; + positionDeleteFileCount += entry.positionDeleteFileCount; + equalityDeleteRecordCount += entry.equalityDeleteRecordCount; + equalityDeleteFileCount += entry.equalityDeleteFileCount; + totalRecordCount += entry.totalRecordCount; + + if (entry.lastUpdatedAt != null) { + if (lastUpdatedAt == null || (lastUpdatedAt < entry.lastUpdatedAt)) { + lastUpdatedAt = entry.lastUpdatedAt; + lastUpdatedSnapshotId = entry.lastUpdatedSnapshotId; + } + } + } + + @Override + public int size() { + return STATS_COUNT; + } + + @Override + public T get(int pos, Class javaClass) { + switch (pos) { + case 0: + return javaClass.cast(partition); + case 1: + return javaClass.cast(specId); + case 2: + return javaClass.cast(dataRecordCount); + case 3: + return javaClass.cast(dataFileCount); + case 4: + return javaClass.cast(totalDataFileSizeInBytes); + case 5: + return javaClass.cast(positionDeleteRecordCount); + case 6: + return javaClass.cast(positionDeleteFileCount); + case 7: + return javaClass.cast(equalityDeleteRecordCount); + case 8: + return javaClass.cast(equalityDeleteFileCount); + case 9: + return javaClass.cast(totalRecordCount); + case 10: + return javaClass.cast(lastUpdatedAt); + case 11: + return javaClass.cast(lastUpdatedSnapshotId); + default: + throw new UnsupportedOperationException("Unknown position: " + pos); + } + } + + @Override + public void set(int pos, T value) { + switch (pos) { + case 0: + partition = (Record) value; + break; + case 1: + specId = (int) value; + break; + case 2: + dataRecordCount = (long) value; + break; + case 3: + dataFileCount = (int) value; + break; + case 4: + totalDataFileSizeInBytes = (long) value; + break; + case 5: + // optional field as per spec, implementation initialize to 0 for counters + positionDeleteRecordCount = value == null ? 0L : (long) value; + break; + case 6: + // optional field as per spec, implementation initialize to 0 for counters + positionDeleteFileCount = value == null ? 0 : (int) value; + break; + case 7: + // optional field as per spec, implementation initialize to 0 for counters + equalityDeleteRecordCount = value == null ? 0L : (long) value; + break; + case 8: + // optional field as per spec, implementation initialize to 0 for counters + equalityDeleteFileCount = value == null ? 0 : (int) value; + break; + case 9: + // optional field as per spec, implementation initialize to 0 for counters + totalRecordCount = value == null ? 0L : (long) value; + break; + case 10: + lastUpdatedAt = (Long) value; + break; + case 11: + lastUpdatedSnapshotId = (Long) value; + break; + default: + throw new UnsupportedOperationException("Unknown position: " + pos); + } + } + + @Override + @SuppressWarnings("checkstyle:CyclomaticComplexity") + public boolean equals(Object other) { + if (this == other) { + return true; + } else if (!(other instanceof PartitionStats)) { + return false; + } + + PartitionStats that = (PartitionStats) other; + return Objects.equals(partition, that.partition) + && specId == that.specId + && dataRecordCount == that.dataRecordCount + && dataFileCount == that.dataFileCount + && totalDataFileSizeInBytes == that.totalDataFileSizeInBytes + && positionDeleteRecordCount == that.positionDeleteRecordCount + && positionDeleteFileCount == that.positionDeleteFileCount + && equalityDeleteRecordCount == that.equalityDeleteRecordCount + && equalityDeleteFileCount == that.equalityDeleteFileCount + && totalRecordCount == that.totalRecordCount + && Objects.equals(lastUpdatedAt, that.lastUpdatedAt) + && Objects.equals(lastUpdatedSnapshotId, that.lastUpdatedSnapshotId); + } + + @Override + public int hashCode() { + return Objects.hash( + partition, + specId, + dataRecordCount, + dataFileCount, + totalDataFileSizeInBytes, + positionDeleteRecordCount, + positionDeleteFileCount, + equalityDeleteRecordCount, + equalityDeleteFileCount, + totalRecordCount, + lastUpdatedAt, + lastUpdatedSnapshotId); + } +} diff --git a/core/src/main/java/org/apache/iceberg/PartitionStatsUtil.java b/core/src/main/java/org/apache/iceberg/PartitionStatsUtil.java index 8c21bccf6b18..ac50f6d78fe2 100644 --- a/core/src/main/java/org/apache/iceberg/PartitionStatsUtil.java +++ b/core/src/main/java/org/apache/iceberg/PartitionStatsUtil.java @@ -18,180 +18,112 @@ */ package org.apache.iceberg; +import java.io.IOException; +import java.io.UncheckedIOException; +import java.util.Comparator; +import java.util.Iterator; +import java.util.List; import java.util.Map; import org.apache.iceberg.data.GenericRecord; import org.apache.iceberg.data.Record; import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.types.Comparators; import org.apache.iceberg.types.Types; import org.apache.iceberg.util.PartitionUtil; +import org.apache.iceberg.util.Tasks; +import org.apache.iceberg.util.ThreadPools; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class PartitionStatsUtil { private PartitionStatsUtil() {} - public enum Column { - PARTITION, - SPEC_ID, - DATA_RECORD_COUNT, - DATA_FILE_COUNT, - TOTAL_DATA_FILE_SIZE_IN_BYTES, - POSITION_DELETE_RECORD_COUNT, - POSITION_DELETE_FILE_COUNT, - EQUALITY_DELETE_RECORD_COUNT, - EQUALITY_DELETE_FILE_COUNT, - TOTAL_RECORD_COUNT, - LAST_UPDATED_AT, - LAST_UPDATED_SNAPSHOT_ID - } + private static final Logger LOG = LoggerFactory.getLogger(PartitionStatsUtil.class); /** - * Generates the Partition Stats Files Schema based on a given partition type. - * - *

Note: Provide the unified partition tuple as mentioned in the spec. + * Computes the partition stats for the given snapshot of the table. * - * @param partitionType the struct type that defines the structure of the partition. - * @return a schema that corresponds to the provided unified partition type. + * @param table the table for which partition stats to be computed. + * @param snapshot the snapshot for which partition stats is computed. + * @return iterable {@link PartitionStats} */ - public static Schema schema(Types.StructType partitionType) { - Preconditions.checkState( - !partitionType.fields().isEmpty(), "getting schema for an unpartitioned table"); - - return new Schema( - Types.NestedField.required(1, Column.PARTITION.name(), partitionType), - Types.NestedField.required(2, Column.SPEC_ID.name(), Types.IntegerType.get()), - Types.NestedField.required(3, Column.DATA_RECORD_COUNT.name(), Types.LongType.get()), - Types.NestedField.required(4, Column.DATA_FILE_COUNT.name(), Types.IntegerType.get()), - Types.NestedField.required( - 5, Column.TOTAL_DATA_FILE_SIZE_IN_BYTES.name(), Types.LongType.get()), - Types.NestedField.optional( - 6, Column.POSITION_DELETE_RECORD_COUNT.name(), Types.LongType.get()), - Types.NestedField.optional( - 7, Column.POSITION_DELETE_FILE_COUNT.name(), Types.IntegerType.get()), - Types.NestedField.optional( - 8, Column.EQUALITY_DELETE_RECORD_COUNT.name(), Types.LongType.get()), - Types.NestedField.optional( - 9, Column.EQUALITY_DELETE_FILE_COUNT.name(), Types.IntegerType.get()), - Types.NestedField.optional(10, Column.TOTAL_RECORD_COUNT.name(), Types.LongType.get()), - Types.NestedField.optional(11, Column.LAST_UPDATED_AT.name(), Types.LongType.get()), - Types.NestedField.optional( - 12, Column.LAST_UPDATED_SNAPSHOT_ID.name(), Types.LongType.get())); + public static Iterable computeStats(Table table, Snapshot snapshot) { + Preconditions.checkState(table != null, "table cannot be null"); + Preconditions.checkState(snapshot != null, "snapshot cannot be null"); + + Types.StructType partitionType = Partitioning.partitionType(table); + Map partitionEntryMap = Maps.newConcurrentMap(); + + List manifestFiles = snapshot.allManifests(table.io()); + Tasks.foreach(manifestFiles) + .stopOnFailure() + .executeWith(ThreadPools.getWorkerPool()) + .onFailure( + (file, thrown) -> + LOG.warn( + "Failed to compute the partition stats for the manifest file: {}", + file.path(), + thrown)) + .run( + manifest -> { + try (CloseableIterable entries = + PartitionStatsUtil.fromManifest(table, manifest, partitionType)) { + entries.forEach( + entry -> { + Record partitionKey = entry.partition(); + partitionEntryMap.merge( + partitionKey, + entry, + (existingEntry, newEntry) -> { + existingEntry.appendStats(newEntry); + return existingEntry; + }); + }); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + }); + + return partitionEntryMap.values(); } /** - * Creates an iterable of partition stats records from a given manifest file, using the specified - * table and record schema. + * Sorts the {@link PartitionStats} based on the partition data. * - * @param table the table from which the manifest file is derived. - * @param manifest the manifest file containing metadata about the records. - * @param recordSchema the schema defining the structure of the records. - * @return a CloseableIterable of partition stats records as defined by the manifest file and - * record schema. + * @param stats iterable {@link PartitionStats} which needs to be sorted. + * @param partitionType unified partition schema. + * @return Iterator of {@link PartitionStats} */ - public static CloseableIterable fromManifest( - Table table, ManifestFile manifest, Schema recordSchema) { - Preconditions.checkState( - !recordSchema.findField(Column.PARTITION.name()).type().asStructType().fields().isEmpty(), - "record schema should not be unpartitioned"); + public static Iterator sortStats( + Iterable stats, Types.StructType partitionType) { + List entries = Lists.newArrayList(stats.iterator()); + entries.sort( + Comparator.comparing(PartitionStats::partition, Comparators.forType(partitionType))); + return entries.iterator(); + } + private static CloseableIterable fromManifest( + Table table, ManifestFile manifest, Types.StructType partitionType) { return CloseableIterable.transform( ManifestFiles.open(manifest, table.io(), table.specs()) .select(BaseScan.scanColumns(manifest.content())) - .liveEntries(), - entry -> fromManifestEntry(entry, table, recordSchema)); - } - - /** - * Appends statistics from one Record to another. - * - * @param toRecord the Record to which statistics will be appended. - * @param fromRecord the Record from which statistics will be sourced. - */ - public static void appendStats(Record toRecord, Record fromRecord) { - Preconditions.checkState(toRecord != null, "Record to update cannot be null"); - Preconditions.checkState(fromRecord != null, "Record to update from cannot be null"); - - toRecord.set( - Column.SPEC_ID.ordinal(), - Math.max( - (int) toRecord.get(Column.SPEC_ID.ordinal()), - (int) fromRecord.get(Column.SPEC_ID.ordinal()))); - checkAndIncrementLong(toRecord, fromRecord, Column.DATA_RECORD_COUNT); - checkAndIncrementInt(toRecord, fromRecord, Column.DATA_FILE_COUNT); - checkAndIncrementLong(toRecord, fromRecord, Column.TOTAL_DATA_FILE_SIZE_IN_BYTES); - checkAndIncrementLong(toRecord, fromRecord, Column.POSITION_DELETE_RECORD_COUNT); - checkAndIncrementInt(toRecord, fromRecord, Column.POSITION_DELETE_FILE_COUNT); - checkAndIncrementLong(toRecord, fromRecord, Column.EQUALITY_DELETE_RECORD_COUNT); - checkAndIncrementInt(toRecord, fromRecord, Column.EQUALITY_DELETE_FILE_COUNT); - checkAndIncrementLong(toRecord, fromRecord, Column.TOTAL_RECORD_COUNT); - if (fromRecord.get(Column.LAST_UPDATED_AT.ordinal()) != null) { - if (toRecord.get(Column.LAST_UPDATED_AT.ordinal()) == null - || ((long) toRecord.get(Column.LAST_UPDATED_AT.ordinal()) - < (long) fromRecord.get(Column.LAST_UPDATED_AT.ordinal()))) { - toRecord.set( - Column.LAST_UPDATED_AT.ordinal(), fromRecord.get(Column.LAST_UPDATED_AT.ordinal())); - toRecord.set( - Column.LAST_UPDATED_SNAPSHOT_ID.ordinal(), - fromRecord.get(Column.LAST_UPDATED_SNAPSHOT_ID.ordinal())); - } - } - } - - private static Record fromManifestEntry( - ManifestEntry entry, Table table, Schema recordSchema) { - GenericRecord record = GenericRecord.create(recordSchema); - Types.StructType partitionType = - recordSchema.findField(Column.PARTITION.name()).type().asStructType(); - Record partitionData = coercedPartitionData(entry.file(), table.specs(), partitionType); - record.set(Column.PARTITION.ordinal(), partitionData); - record.set(Column.SPEC_ID.ordinal(), entry.file().specId()); - - Snapshot snapshot = table.snapshot(entry.snapshotId()); - if (snapshot != null) { - record.set(Column.LAST_UPDATED_SNAPSHOT_ID.ordinal(), snapshot.snapshotId()); - record.set(Column.LAST_UPDATED_AT.ordinal(), snapshot.timestampMillis()); - } - - switch (entry.file().content()) { - case DATA: - record.set(Column.DATA_RECORD_COUNT.ordinal(), entry.file().recordCount()); - record.set(Column.DATA_FILE_COUNT.ordinal(), 1); - record.set(Column.TOTAL_DATA_FILE_SIZE_IN_BYTES.ordinal(), entry.file().fileSizeInBytes()); - // default values - record.set(Column.POSITION_DELETE_RECORD_COUNT.ordinal(), 0L); - record.set(Column.POSITION_DELETE_FILE_COUNT.ordinal(), 0); - record.set(Column.EQUALITY_DELETE_RECORD_COUNT.ordinal(), 0L); - record.set(Column.EQUALITY_DELETE_FILE_COUNT.ordinal(), 0); - break; - case POSITION_DELETES: - record.set(Column.POSITION_DELETE_RECORD_COUNT.ordinal(), entry.file().recordCount()); - record.set(Column.POSITION_DELETE_FILE_COUNT.ordinal(), 1); - // default values - record.set(Column.DATA_RECORD_COUNT.ordinal(), 0L); - record.set(Column.DATA_FILE_COUNT.ordinal(), 0); - record.set(Column.TOTAL_DATA_FILE_SIZE_IN_BYTES.ordinal(), 0L); - record.set(Column.EQUALITY_DELETE_RECORD_COUNT.ordinal(), 0L); - record.set(Column.EQUALITY_DELETE_FILE_COUNT.ordinal(), 0); - break; - case EQUALITY_DELETES: - record.set(Column.EQUALITY_DELETE_RECORD_COUNT.ordinal(), entry.file().recordCount()); - record.set(Column.EQUALITY_DELETE_FILE_COUNT.ordinal(), 1); - // default values - record.set(Column.DATA_RECORD_COUNT.ordinal(), 0L); - record.set(Column.DATA_FILE_COUNT.ordinal(), 0); - record.set(Column.TOTAL_DATA_FILE_SIZE_IN_BYTES.ordinal(), 0L); - record.set(Column.POSITION_DELETE_RECORD_COUNT.ordinal(), 0L); - record.set(Column.POSITION_DELETE_FILE_COUNT.ordinal(), 0); - break; - default: - throw new UnsupportedOperationException( - "Unsupported file content type: " + entry.file().content()); - } - - // Note: Not computing the `TOTAL_RECORD_COUNT` for now as it needs scanning the data. - record.set(Column.TOTAL_RECORD_COUNT.ordinal(), 0L); - - return record; + .entries(), + entry -> { + // partition data as per unified partition spec + Record partitionData = coercedPartitionData(entry.file(), table.specs(), partitionType); + PartitionStats partitionStats = new PartitionStats(partitionData); + if (entry.isLive()) { + partitionStats.liveEntry(entry.file(), table.snapshot(entry.snapshotId())); + } else { + partitionStats.deletedEntry(table.snapshot(entry.snapshotId())); + } + + return partitionStats; + }); } private static Record coercedPartitionData( @@ -199,35 +131,15 @@ private static Record coercedPartitionData( // keep the partition data as per the unified spec by coercing StructLike partition = PartitionUtil.coercePartition(partitionType, specs.get(file.specId()), file.partition()); - GenericRecord genericRecord = GenericRecord.create(partitionType); + GenericRecord record = GenericRecord.create(partitionType); for (int index = 0; index < partitionType.fields().size(); index++) { - genericRecord.set( - index, - partition.get(index, partitionType.fields().get(index).type().typeId().javaClass())); - } - - return genericRecord; - } - - private static void checkAndIncrementLong(Record toUpdate, Record fromRecord, Column column) { - if ((fromRecord.get(column.ordinal()) != null) && (toUpdate.get(column.ordinal()) != null)) { - toUpdate.set( - column.ordinal(), - toUpdate.get(column.ordinal(), Long.class) - + fromRecord.get(column.ordinal(), Long.class)); - } else if (fromRecord.get(column.ordinal()) != null) { - toUpdate.set(column.ordinal(), fromRecord.get(column.ordinal(), Long.class)); + Object val = + partition.get(index, partitionType.fields().get(index).type().typeId().javaClass()); + if (val != null) { + record.set(index, val); + } } - } - private static void checkAndIncrementInt(Record toUpdate, Record fromRecord, Column column) { - if ((fromRecord.get(column.ordinal()) != null) && (toUpdate.get(column.ordinal()) != null)) { - toUpdate.set( - column.ordinal(), - toUpdate.get(column.ordinal(), Integer.class) - + fromRecord.get(column.ordinal(), Integer.class)); - } else if (fromRecord.get(column.ordinal()) != null) { - toUpdate.set(column.ordinal(), fromRecord.get(column.ordinal(), Integer.class)); - } + return record; } } diff --git a/core/src/main/java/org/apache/iceberg/data/IdentityPartitionConverters.java b/core/src/main/java/org/apache/iceberg/data/IdentityPartitionConverters.java index b3b4c75d1c3e..114820443382 100644 --- a/core/src/main/java/org/apache/iceberg/data/IdentityPartitionConverters.java +++ b/core/src/main/java/org/apache/iceberg/data/IdentityPartitionConverters.java @@ -60,6 +60,11 @@ public static Object convertConstant(Type type, Object value) { return ByteBuffers.toByteArray((ByteBuffer) value); } return value; + case BINARY: + if (value instanceof byte[]) { + return ByteBuffer.wrap((byte[]) value); + } + return value; default: } return value; diff --git a/core/src/main/java/org/apache/iceberg/data/PartitionStatsRecord.java b/core/src/main/java/org/apache/iceberg/data/PartitionStatsRecord.java new file mode 100644 index 000000000000..4f4cb9da68e7 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/data/PartitionStatsRecord.java @@ -0,0 +1,176 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.data; + +import com.github.benmanes.caffeine.cache.Caffeine; +import com.github.benmanes.caffeine.cache.LoadingCache; +import java.util.List; +import java.util.Map; +import org.apache.iceberg.PartitionStats; +import org.apache.iceberg.Schema; +import org.apache.iceberg.StructLike; +import org.apache.iceberg.relocated.com.google.common.base.Objects; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.types.Types; +import org.apache.iceberg.types.Types.StructType; + +/** + * Wraps the {@link PartitionStats} as {@link Record}. Used by generic file format writers and + * readers. + */ +public class PartitionStatsRecord implements Record, StructLike { + private static final LoadingCache> NAME_MAP_CACHE = + Caffeine.newBuilder() + .weakKeys() + .build( + struct -> { + Map idToPos = Maps.newHashMap(); + List fields = struct.fields(); + for (int index = 0; index < fields.size(); index += 1) { + idToPos.put(fields.get(index).name(), index); + } + return idToPos; + }); + + private final StructType struct; + private final PartitionStats partitionStats; + private final Map nameToPos; + + public static PartitionStatsRecord create(Schema schema, PartitionStats partitionStats) { + return new PartitionStatsRecord(schema.asStruct(), partitionStats); + } + + public static PartitionStatsRecord create(StructType struct, PartitionStats partitionStats) { + return new PartitionStatsRecord(struct, partitionStats); + } + + public PartitionStats unwrap() { + return partitionStats; + } + + private PartitionStatsRecord(StructType struct, PartitionStats partitionStats) { + this.struct = struct; + this.partitionStats = partitionStats; + this.nameToPos = NAME_MAP_CACHE.get(struct); + } + + private PartitionStatsRecord(PartitionStatsRecord toCopy) { + this.struct = toCopy.struct; + this.partitionStats = toCopy.partitionStats; + this.nameToPos = toCopy.nameToPos; + } + + private PartitionStatsRecord(PartitionStatsRecord toCopy, Map overwrite) { + this.struct = toCopy.struct; + this.partitionStats = toCopy.partitionStats; + this.nameToPos = toCopy.nameToPos; + for (Map.Entry entry : overwrite.entrySet()) { + setField(entry.getKey(), entry.getValue()); + } + } + + @Override + public StructType struct() { + return struct; + } + + @Override + public Object getField(String name) { + Integer pos = nameToPos.get(name); + if (pos != null) { + return partitionStats.get(pos, Object.class); + } + + return null; + } + + @Override + public void setField(String name, Object value) { + Integer pos = nameToPos.get(name); + Preconditions.checkArgument(pos != null, "Cannot set unknown field named: %s", name); + partitionStats.set(pos, value); + } + + @Override + public int size() { + return partitionStats.size(); + } + + @Override + public Object get(int pos) { + return partitionStats.get(pos, Object.class); + } + + @Override + public T get(int pos, Class javaClass) { + Object value = get(pos); + if (value == null || javaClass.isInstance(value)) { + return javaClass.cast(value); + } else { + throw new IllegalStateException("Not an instance of " + javaClass.getName() + ": " + value); + } + } + + @Override + public void set(int pos, T value) { + partitionStats.set(pos, value); + } + + @Override + public PartitionStatsRecord copy() { + return new PartitionStatsRecord(this); + } + + @Override + public PartitionStatsRecord copy(Map overwriteValues) { + return new PartitionStatsRecord(this, overwriteValues); + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + sb.append("Record("); + for (int index = 0; index < partitionStats.size(); index += 1) { + if (index != 0) { + sb.append(", "); + } + sb.append(partitionStats.get(index, Object.class)); + } + sb.append(")"); + return sb.toString(); + } + + @Override + public boolean equals(Object other) { + if (this == other) { + return true; + } else if (!(other instanceof PartitionStatsRecord)) { + return false; + } + + PartitionStatsRecord that = (PartitionStatsRecord) other; + return this.partitionStats.equals(that.partitionStats); + } + + @Override + public int hashCode() { + return Objects.hashCode(partitionStats); + } +} diff --git a/data/src/jmh/java/org/apache/iceberg/PartitionStatsGeneratorBenchmark.java b/data/src/jmh/java/org/apache/iceberg/PartitionStatsGeneratorBenchmark.java index 5aec62193a2e..8d1c282023be 100644 --- a/data/src/jmh/java/org/apache/iceberg/PartitionStatsGeneratorBenchmark.java +++ b/data/src/jmh/java/org/apache/iceberg/PartitionStatsGeneratorBenchmark.java @@ -30,9 +30,8 @@ import java.util.concurrent.TimeUnit; import java.util.stream.IntStream; import java.util.stream.Stream; -import org.apache.iceberg.data.PartitionStatsGenerator; -import org.apache.iceberg.data.PartitionStatsGeneratorUtil; -import org.apache.iceberg.data.Record; +import org.apache.iceberg.data.PartitionStatsHandler; +import org.apache.iceberg.data.PartitionStatsRecord; import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.types.Types; import org.openjdk.jmh.annotations.Benchmark; @@ -119,15 +118,14 @@ public void tearDownBenchmark() throws IOException { public void benchmarkPartitionStats() throws IOException { Snapshot currentSnapshot = table.currentSnapshot(); - PartitionStatsGenerator partitionStatsGenerator = new PartitionStatsGenerator(table); - PartitionStatisticsFile result = partitionStatsGenerator.generate(); + PartitionStatisticsFile result = PartitionStatsHandler.computeAndWritePartitionStatsFile(table); table.updatePartitionStatistics().setPartitionStatistics(result).commit(); assertThat(result.snapshotId()).isEqualTo(currentSnapshot.snapshotId()); // validate row count - try (CloseableIterable recordIterator = - PartitionStatsGeneratorUtil.readPartitionStatsFile( - PartitionStatsUtil.schema(Partitioning.partitionType(table)), + try (CloseableIterable recordIterator = + PartitionStatsHandler.readPartitionStatsFile( + PartitionStatsHandler.schema(Partitioning.partitionType(table)), Files.localInput(result.path()))) { assertThat(recordIterator).hasSize(PARTITION_PER_MANIFEST); } diff --git a/data/src/main/java/org/apache/iceberg/data/PartitionStatsGenerator.java b/data/src/main/java/org/apache/iceberg/data/PartitionStatsGenerator.java deleted file mode 100644 index ad167660c4eb..000000000000 --- a/data/src/main/java/org/apache/iceberg/data/PartitionStatsGenerator.java +++ /dev/null @@ -1,152 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.iceberg.data; - -import java.io.IOException; -import java.io.UncheckedIOException; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import org.apache.iceberg.ImmutableGenericPartitionStatisticsFile; -import org.apache.iceberg.ManifestFile; -import org.apache.iceberg.PartitionStatisticsFile; -import org.apache.iceberg.PartitionStatsUtil; -import org.apache.iceberg.Partitioning; -import org.apache.iceberg.Schema; -import org.apache.iceberg.Snapshot; -import org.apache.iceberg.Table; -import org.apache.iceberg.io.CloseableIterable; -import org.apache.iceberg.io.OutputFile; -import org.apache.iceberg.relocated.com.google.common.base.Preconditions; -import org.apache.iceberg.relocated.com.google.common.collect.Iterators; -import org.apache.iceberg.relocated.com.google.common.collect.Lists; -import org.apache.iceberg.relocated.com.google.common.collect.Maps; -import org.apache.iceberg.types.Comparators; -import org.apache.iceberg.types.Types; -import org.apache.iceberg.util.SnapshotUtil; -import org.apache.iceberg.util.Tasks; -import org.apache.iceberg.util.ThreadPools; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Generates {@link PartitionStatisticsFile} file as per the spec for the given table. Computes the - * stats by going through the partition info stored in each manifest file. - */ -public class PartitionStatsGenerator { - private static final Logger LOG = LoggerFactory.getLogger(PartitionStatsGenerator.class); - - private final Table table; - private String branch; - private Types.StructType partitionType; - // Map of PartitionData, partition-stats-entry per partitionData. - private Map partitionEntryMap; - - public PartitionStatsGenerator(Table table) { - this.table = table; - } - - public PartitionStatsGenerator(Table table, String branch) { - this.table = table; - this.branch = branch; - } - - /** - * Computes the partition stats for the current snapshot and writes it into the metadata folder. - * - * @return {@link PartitionStatisticsFile} for the latest snapshot id or null if table doesn't - * have any snapshot. - */ - public PartitionStatisticsFile generate() { - Snapshot currentSnapshot = SnapshotUtil.latestSnapshot(table, branch); - if (currentSnapshot == null) { - Preconditions.checkArgument( - branch == null, "Couldn't find the snapshot for the branch %s", branch); - return null; - } - - partitionType = Partitioning.partitionType(table); - partitionEntryMap = Maps.newConcurrentMap(); - - Schema dataSchema = PartitionStatsUtil.schema(partitionType); - List manifestFiles = currentSnapshot.allManifests(table.io()); - Tasks.foreach(manifestFiles) - .stopOnFailure() - .executeWith(ThreadPools.getWorkerPool()) - .onFailure( - (file, thrown) -> - LOG.warn( - "Failed to compute the partition stats for the manifest file: {}", - file.path(), - thrown)) - .run( - manifest -> { - try (CloseableIterable entries = - PartitionStatsUtil.fromManifest(table, manifest, dataSchema)) { - entries.forEach( - entry -> { - Record partitionKey = - (Record) entry.get(PartitionStatsUtil.Column.PARTITION.ordinal()); - partitionEntryMap.merge( - partitionKey, - entry, - (existingEntry, newEntry) -> { - PartitionStatsUtil.appendStats(existingEntry, newEntry); - return existingEntry; - }); - }); - } catch (IOException e) { - throw new UncheckedIOException(e); - } - }); - - // Sorting the records based on partition as per spec. - List sortedKeys = Lists.newArrayList(partitionEntryMap.keySet()); - sortedKeys.sort(Comparators.forType(partitionType)); - Iterator entriesForWriter = - Iterators.transform(sortedKeys.iterator(), this::convertPartitionRecords); - - OutputFile outputFile = - PartitionStatsGeneratorUtil.newPartitionStatsFile(table, currentSnapshot.snapshotId()); - PartitionStatsGeneratorUtil.writePartitionStatsFile(table, entriesForWriter, outputFile); - return ImmutableGenericPartitionStatisticsFile.builder() - .snapshotId(currentSnapshot.snapshotId()) - .path(outputFile.location()) - .fileSizeInBytes(outputFile.toInputFile().getLength()) - .build(); - } - - private Record convertPartitionRecords(Record key) { - Record record = partitionEntryMap.get(key); - Record partitionRecord = (Record) record.get(PartitionStatsUtil.Column.PARTITION.ordinal()); - if (partitionRecord != null) { - for (int index = 0; index < partitionType.fields().size(); index++) { - Object val = partitionRecord.get(index); - if (val != null) { - partitionRecord.set( - index, - IdentityPartitionConverters.convertConstant( - partitionType.fields().get(index).type(), val)); - } - } - } - - return record; - } -} diff --git a/data/src/main/java/org/apache/iceberg/data/PartitionStatsGeneratorUtil.java b/data/src/main/java/org/apache/iceberg/data/PartitionStatsGeneratorUtil.java deleted file mode 100644 index feeadcb3b5e7..000000000000 --- a/data/src/main/java/org/apache/iceberg/data/PartitionStatsGeneratorUtil.java +++ /dev/null @@ -1,139 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.iceberg.data; - -import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT; -import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT_DEFAULT; - -import java.io.Closeable; -import java.io.IOException; -import java.io.UncheckedIOException; -import java.util.Iterator; -import org.apache.iceberg.FileFormat; -import org.apache.iceberg.HasTableOperations; -import org.apache.iceberg.PartitionSpec; -import org.apache.iceberg.PartitionStatisticsFile; -import org.apache.iceberg.PartitionStatsUtil; -import org.apache.iceberg.Partitioning; -import org.apache.iceberg.Schema; -import org.apache.iceberg.Table; -import org.apache.iceberg.avro.Avro; -import org.apache.iceberg.data.avro.DataReader; -import org.apache.iceberg.data.orc.GenericOrcReader; -import org.apache.iceberg.data.parquet.GenericParquetReaders; -import org.apache.iceberg.encryption.EncryptedFiles; -import org.apache.iceberg.encryption.EncryptionKeyMetadata; -import org.apache.iceberg.io.CloseableIterable; -import org.apache.iceberg.io.DataWriter; -import org.apache.iceberg.io.FileWriterFactory; -import org.apache.iceberg.io.InputFile; -import org.apache.iceberg.io.OutputFile; -import org.apache.iceberg.orc.ORC; -import org.apache.iceberg.parquet.Parquet; - -/** - * Util to write and read the {@link PartitionStatisticsFile}. Uses generic readers and writes to - * support writing and reading of the stats in table default format. - */ -public final class PartitionStatsGeneratorUtil { - - private PartitionStatsGeneratorUtil() {} - - /** - * Creates a new {@link OutputFile} for storing partition statistics for the specified table. The - * output file extension will be same as table's default format. - * - * @param table The {@link Table} for which the partition statistics file is being created. - * @param snapshotId The ID of the snapshot associated with the partition statistics. - * @return A new {@link OutputFile} that can be used to write partition statistics. - */ - public static OutputFile newPartitionStatsFile(Table table, long snapshotId) { - FileFormat fileFormat = - FileFormat.fromString( - table.properties().getOrDefault(DEFAULT_FILE_FORMAT, DEFAULT_FILE_FORMAT_DEFAULT)); - return table - .io() - .newOutputFile( - ((HasTableOperations) table) - .operations() - .metadataFileLocation( - fileFormat.addExtension(String.format("partition-stats-%d", snapshotId)))); - } - - /** - * Writes partition statistics to the specified {@link OutputFile} for the given table. - * - * @param table The {@link Table} for which the partition statistics are being written. - * @param records An {@link Iterator} of {@link Record} to be written into the file. - * @param outputFile The {@link OutputFile} where the partition statistics will be written. Should - * include the file extension. - */ - public static void writePartitionStatsFile( - Table table, Iterator records, OutputFile outputFile) { - Schema dataSchema = PartitionStatsUtil.schema(Partitioning.partitionType(table)); - FileFormat fileFormat = - FileFormat.fromString( - outputFile.location().substring(outputFile.location().lastIndexOf(".") + 1)); - FileWriterFactory factory = - GenericFileWriterFactory.builderFor(table) - .dataSchema(dataSchema) - .dataFileFormat(fileFormat) - .build(); - DataWriter writer = - factory.newDataWriter( - EncryptedFiles.encryptedOutput(outputFile, EncryptionKeyMetadata.EMPTY), - PartitionSpec.unpartitioned(), - null); - try (Closeable toClose = writer) { - records.forEachRemaining(writer::write); - } catch (IOException e) { - throw new UncheckedIOException(e); - } - } - - /** - * Reads partition statistics from the specified {@link InputFile} using given schema. - * - * @param schema The {@link Schema} of the partition statistics file. - * @param inputFile An {@link InputFile} pointing to the partition stats file. - */ - public static CloseableIterable readPartitionStatsFile( - Schema schema, InputFile inputFile) { - FileFormat fileFormat = - FileFormat.fromString( - inputFile.location().substring(inputFile.location().lastIndexOf(".") + 1)); - - switch (fileFormat) { - case PARQUET: - return Parquet.read(inputFile) - .project(schema) - .createReaderFunc(fileSchema -> GenericParquetReaders.buildReader(schema, fileSchema)) - .build(); - case ORC: - return ORC.read(inputFile) - .project(schema) - .createReaderFunc(fileSchema -> GenericOrcReader.buildReader(schema, fileSchema)) - .build(); - case AVRO: - return Avro.read(inputFile).project(schema).createReaderFunc(DataReader::create).build(); - default: - throw new UnsupportedOperationException("Unsupported file format:" + fileFormat.name()); - } - } -} diff --git a/data/src/main/java/org/apache/iceberg/data/PartitionStatsHandler.java b/data/src/main/java/org/apache/iceberg/data/PartitionStatsHandler.java new file mode 100644 index 000000000000..411f5f5f5d4f --- /dev/null +++ b/data/src/main/java/org/apache/iceberg/data/PartitionStatsHandler.java @@ -0,0 +1,321 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.data; + +import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT; +import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT_DEFAULT; + +import java.io.Closeable; +import java.io.IOException; +import java.io.UncheckedIOException; +import java.util.Iterator; +import java.util.function.BiFunction; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.HasTableOperations; +import org.apache.iceberg.ImmutableGenericPartitionStatisticsFile; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.PartitionStatisticsFile; +import org.apache.iceberg.PartitionStats; +import org.apache.iceberg.PartitionStatsUtil; +import org.apache.iceberg.Partitioning; +import org.apache.iceberg.Schema; +import org.apache.iceberg.Snapshot; +import org.apache.iceberg.Table; +import org.apache.iceberg.avro.Avro; +import org.apache.iceberg.data.avro.DataReader; +import org.apache.iceberg.data.orc.GenericOrcReader; +import org.apache.iceberg.data.parquet.GenericParquetReaders; +import org.apache.iceberg.encryption.EncryptedFiles; +import org.apache.iceberg.encryption.EncryptionKeyMetadata; +import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.io.DataWriter; +import org.apache.iceberg.io.FileWriterFactory; +import org.apache.iceberg.io.InputFile; +import org.apache.iceberg.io.OutputFile; +import org.apache.iceberg.orc.ORC; +import org.apache.iceberg.parquet.Parquet; +import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.types.Types; +import org.apache.iceberg.util.SnapshotUtil; + +/** + * Computes, writes and reads the {@link PartitionStatisticsFile}. Uses generic readers and writers + * to support writing and reading of the stats in table default format. + */ +public final class PartitionStatsHandler { + + private PartitionStatsHandler() {} + + public enum Column { + PARTITION(0), + SPEC_ID(1), + DATA_RECORD_COUNT(2), + DATA_FILE_COUNT(3), + TOTAL_DATA_FILE_SIZE_IN_BYTES(4), + POSITION_DELETE_RECORD_COUNT(5), + POSITION_DELETE_FILE_COUNT(6), + EQUALITY_DELETE_RECORD_COUNT(7), + EQUALITY_DELETE_FILE_COUNT(8), + TOTAL_RECORD_COUNT(9), + LAST_UPDATED_AT(10), + LAST_UPDATED_SNAPSHOT_ID(11); + + private final int id; + + Column(int id) { + this.id = id; + } + + public int id() { + return id; + } + } + + /** + * Generates the Partition Stats Files Schema based on a given partition type. + * + *

Note: Provide the unified partition tuple as mentioned in the spec. + * + * @param partitionType the struct type that defines the structure of the partition. + * @return a schema that corresponds to the provided unified partition type. + */ + public static Schema schema(Types.StructType partitionType) { + Preconditions.checkState( + !partitionType.fields().isEmpty(), "getting schema for an unpartitioned table"); + + return new Schema( + Types.NestedField.required(1, Column.PARTITION.name(), partitionType), + Types.NestedField.required(2, Column.SPEC_ID.name(), Types.IntegerType.get()), + Types.NestedField.required(3, Column.DATA_RECORD_COUNT.name(), Types.LongType.get()), + Types.NestedField.required(4, Column.DATA_FILE_COUNT.name(), Types.IntegerType.get()), + Types.NestedField.required( + 5, Column.TOTAL_DATA_FILE_SIZE_IN_BYTES.name(), Types.LongType.get()), + Types.NestedField.optional( + 6, Column.POSITION_DELETE_RECORD_COUNT.name(), Types.LongType.get()), + Types.NestedField.optional( + 7, Column.POSITION_DELETE_FILE_COUNT.name(), Types.IntegerType.get()), + Types.NestedField.optional( + 8, Column.EQUALITY_DELETE_RECORD_COUNT.name(), Types.LongType.get()), + Types.NestedField.optional( + 9, Column.EQUALITY_DELETE_FILE_COUNT.name(), Types.IntegerType.get()), + Types.NestedField.optional(10, Column.TOTAL_RECORD_COUNT.name(), Types.LongType.get()), + Types.NestedField.optional(11, Column.LAST_UPDATED_AT.name(), Types.LongType.get()), + Types.NestedField.optional( + 12, Column.LAST_UPDATED_SNAPSHOT_ID.name(), Types.LongType.get())); + } + + /** + * Computes and writes the {@link PartitionStatisticsFile} for a given table's current snapshot. + * + * @param table The {@link Table} for which the partition statistics is computed. + * @return {@link PartitionStatisticsFile} for the current snapshot. + */ + public static PartitionStatisticsFile computeAndWritePartitionStatsFile(Table table) { + return computeAndWritePartitionStatsFile(table, null); + } + + /** + * Computes and writes the {@link PartitionStatisticsFile} for a given table and branch. + * + * @param table The {@link Table} for which the partition statistics is computed. + * @param branch A branch information to select the required snapshot. + * @return {@link PartitionStatisticsFile} for the given branch. + */ + public static PartitionStatisticsFile computeAndWritePartitionStatsFile( + Table table, String branch) { + Snapshot currentSnapshot = SnapshotUtil.latestSnapshot(table, branch); + if (currentSnapshot == null) { + Preconditions.checkArgument( + branch == null, "Couldn't find the snapshot for the branch %s", branch); + return null; + } + + Types.StructType partitionType = Partitioning.partitionType(table); + Schema schema = schema(partitionType); + + Iterable stats = PartitionStatsUtil.computeStats(table, currentSnapshot); + Iterator sortedStats = PartitionStatsUtil.sortStats(stats, partitionType); + Iterator convertedRecords = statsToRecords(sortedStats, schema); + return writePartitionStatsFile(table, currentSnapshot.snapshotId(), schema, convertedRecords); + } + + @VisibleForTesting + static ImmutableGenericPartitionStatisticsFile writePartitionStatsFile( + Table table, long snapshotId, Schema dataSchema, Iterator records) { + OutputFile outputFile = newPartitionStatsFile(table, snapshotId); + FileWriterFactory factory = + GenericFileWriterFactory.builderFor(table) + .dataSchema(dataSchema) + .dataFileFormat(fileFormat(outputFile.location())) + .build(); + DataWriter writer = + factory.newDataWriter( + EncryptedFiles.encryptedOutput(outputFile, EncryptionKeyMetadata.EMPTY), + PartitionSpec.unpartitioned(), + null); + try (Closeable toClose = writer) { + records.forEachRemaining(writer::write); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + + return ImmutableGenericPartitionStatisticsFile.builder() + .snapshotId(snapshotId) + .path(outputFile.location()) + .fileSizeInBytes(outputFile.toInputFile().getLength()) + .build(); + } + + /** + * Reads partition statistics from the specified {@link InputFile} using given schema. + * + * @param schema The {@link Schema} of the partition statistics file. + * @param inputFile An {@link InputFile} pointing to the partition stats file. + */ + public static CloseableIterable readPartitionStatsFile( + Schema schema, InputFile inputFile) { + CloseableIterable records; + FileFormat fileFormat = fileFormat(inputFile.location()); + switch (fileFormat) { + case PARQUET: + records = + Parquet.read(inputFile) + .project(schema) + .createReaderFunc( + fileSchema -> GenericParquetReaders.buildReader(schema, fileSchema)) + .build(); + break; + case ORC: + records = + ORC.read(inputFile) + .project(schema) + .createReaderFunc(fileSchema -> GenericOrcReader.buildReader(schema, fileSchema)) + .build(); + break; + case AVRO: + records = Avro.read(inputFile).project(schema).createReaderFunc(DataReader::create).build(); + break; + default: + throw new UnsupportedOperationException("Unsupported file format:" + fileFormat.name()); + } + + return CloseableIterable.transform( + records, PartitionStatsHandler::recordToPartitionStatsRecord); + } + + private static FileFormat fileFormat(String fileLocation) { + return FileFormat.fromString(fileLocation.substring(fileLocation.lastIndexOf(".") + 1)); + } + + private static OutputFile newPartitionStatsFile(Table table, long snapshotId) { + FileFormat fileFormat = + fileFormat( + table.properties().getOrDefault(DEFAULT_FILE_FORMAT, DEFAULT_FILE_FORMAT_DEFAULT)); + return table + .io() + .newOutputFile( + ((HasTableOperations) table) + .operations() + .metadataFileLocation( + fileFormat.addExtension(String.format("partition-stats-%d", snapshotId)))); + } + + private static PartitionStatsRecord recordToPartitionStatsRecord(Record record) { + PartitionStats partitionStats = + new PartitionStats(record.get(Column.PARTITION.id(), Record.class)); + partitionStats.setSpecId(record.get(Column.SPEC_ID.id(), Integer.class)); + partitionStats.setDataRecordCount(record.get(Column.DATA_RECORD_COUNT.id(), Long.class)); + partitionStats.setDataFileCount(record.get(Column.DATA_FILE_COUNT.id(), Integer.class)); + partitionStats.setTotalDataFileSizeInBytes( + record.get(Column.TOTAL_DATA_FILE_SIZE_IN_BYTES.id(), Long.class)); + partitionStats.setPositionDeleteRecordCount( + record.get(Column.POSITION_DELETE_RECORD_COUNT.id(), Long.class)); + partitionStats.setPositionDeleteFileCount( + record.get(Column.POSITION_DELETE_FILE_COUNT.id(), Integer.class)); + partitionStats.setEqualityDeleteRecordCount( + record.get(Column.EQUALITY_DELETE_RECORD_COUNT.id(), Long.class)); + partitionStats.setEqualityDeleteFileCount( + record.get(Column.EQUALITY_DELETE_FILE_COUNT.id(), Integer.class)); + partitionStats.setTotalRecordCount(record.get(Column.TOTAL_RECORD_COUNT.id(), Long.class)); + partitionStats.setLastUpdatedAt(record.get(Column.LAST_UPDATED_AT.id(), Long.class)); + partitionStats.setLastUpdatedSnapshotId( + record.get(Column.LAST_UPDATED_SNAPSHOT_ID.id(), Long.class)); + + return PartitionStatsRecord.create(record.struct(), partitionStats); + } + + @VisibleForTesting + static Iterator statsToRecords( + Iterator partitionStatsIterator, Schema recordSchema) { + return new TransformIteratorWithBiFunction<>( + partitionStatsIterator, + (partitionStats, schema) -> { + PartitionStatsRecord record = PartitionStatsRecord.create(schema, partitionStats); + record.set( + Column.PARTITION.id(), + convertPartitionValues(record.get(Column.PARTITION.id(), Record.class))); + return record; + }, + recordSchema); + } + + private static Record convertPartitionValues(Record partitionRecord) { + if (partitionRecord == null) { + return null; + } + + GenericRecord converted = GenericRecord.create(partitionRecord.struct()); + for (int index = 0; index < partitionRecord.size(); index++) { + Object val = partitionRecord.get(index); + if (val != null) { + converted.set( + index, + IdentityPartitionConverters.convertConstant( + partitionRecord.struct().fields().get(index).type(), val)); + } + } + + return converted; + } + + private static class TransformIteratorWithBiFunction implements Iterator { + private final Iterator iterator; + private final BiFunction transformer; + private final U additionalInput; + + TransformIteratorWithBiFunction( + Iterator iterator, BiFunction transformer, U additionalInput) { + this.iterator = iterator; + this.transformer = transformer; + this.additionalInput = additionalInput; + } + + @Override + public boolean hasNext() { + return iterator.hasNext(); + } + + @Override + public R next() { + T nextElement = iterator.next(); + return transformer.apply(nextElement, additionalInput); + } + } +} diff --git a/data/src/test/java/org/apache/iceberg/data/TestPartitionStatsGeneratorUtil.java b/data/src/test/java/org/apache/iceberg/data/TestPartitionStatsGeneratorUtil.java deleted file mode 100644 index 30d5a28bf1f0..000000000000 --- a/data/src/test/java/org/apache/iceberg/data/TestPartitionStatsGeneratorUtil.java +++ /dev/null @@ -1,258 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.iceberg.data; - -import static org.apache.iceberg.PartitionStatsUtil.Column; -import static org.apache.iceberg.types.Types.NestedField.optional; -import static org.apache.iceberg.types.Types.NestedField.required; -import static org.assertj.core.api.Assertions.assertThat; -import static org.assertj.core.api.Assertions.assertThatIterable; - -import java.io.File; -import java.io.IOException; -import java.nio.file.Paths; -import java.util.Collections; -import java.util.List; -import java.util.Random; -import java.util.UUID; -import java.util.concurrent.ThreadLocalRandom; -import org.apache.iceberg.Files; -import org.apache.iceberg.PartitionData; -import org.apache.iceberg.PartitionSpec; -import org.apache.iceberg.PartitionStatsUtil; -import org.apache.iceberg.Partitioning; -import org.apache.iceberg.Schema; -import org.apache.iceberg.SortOrder; -import org.apache.iceberg.Table; -import org.apache.iceberg.TestTables; -import org.apache.iceberg.expressions.Literal; -import org.apache.iceberg.io.CloseableIterable; -import org.apache.iceberg.io.OutputFile; -import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; -import org.apache.iceberg.relocated.com.google.common.collect.Lists; -import org.apache.iceberg.types.Types; -import org.apache.iceberg.util.UUIDUtil; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.io.TempDir; - -public class TestPartitionStatsGeneratorUtil { - private static final Schema SCHEMA = - new Schema( - required(1, "id", Types.LongType.get()), - optional(2, "data", Types.StringType.get()), - optional(3, "binary", Types.BinaryType.get())); - - private static final Random RANDOM = ThreadLocalRandom.current(); - - @TempDir public File temp; - - @Test - public void testPartitionStats() throws Exception { - PartitionSpec spec = PartitionSpec.builderFor(SCHEMA).identity("id").build(); - Table testTable = - TestTables.create( - tempDir("test_partition_stats"), - "test_partition_stats", - SCHEMA, - spec, - SortOrder.unsorted(), - 2); - - Types.StructType partitionSchema = Partitioning.partitionType(testTable); - Schema dataSchema = PartitionStatsUtil.schema(partitionSchema); - - ImmutableList.Builder partitionListBuilder = ImmutableList.builder(); - - for (int i = 0; i < 42; i++) { - PartitionData partitionData = - new PartitionData(dataSchema.findField(Column.PARTITION.name()).type().asStructType()); - partitionData.set(0, RANDOM.nextLong()); - - Record record = GenericRecord.create(dataSchema); - record.set(Column.PARTITION.ordinal(), partitionDataToRecord(partitionSchema, partitionData)); - record.set(Column.SPEC_ID.ordinal(), RANDOM.nextInt(10)); - record.set(Column.DATA_RECORD_COUNT.ordinal(), RANDOM.nextLong()); - record.set(Column.DATA_FILE_COUNT.ordinal(), RANDOM.nextInt()); - record.set(Column.TOTAL_DATA_FILE_SIZE_IN_BYTES.ordinal(), 1024L * RANDOM.nextInt(20)); - record.set(Column.POSITION_DELETE_RECORD_COUNT.ordinal(), RANDOM.nextLong()); - record.set(Column.POSITION_DELETE_FILE_COUNT.ordinal(), RANDOM.nextInt()); - record.set(Column.EQUALITY_DELETE_RECORD_COUNT.ordinal(), RANDOM.nextLong()); - record.set(Column.EQUALITY_DELETE_FILE_COUNT.ordinal(), RANDOM.nextInt()); - record.set(Column.TOTAL_RECORD_COUNT.ordinal(), RANDOM.nextLong()); - record.set(Column.LAST_UPDATED_AT.ordinal(), RANDOM.nextLong()); - record.set(Column.LAST_UPDATED_SNAPSHOT_ID.ordinal(), RANDOM.nextLong()); - - partitionListBuilder.add(record); - } - - testPartitionStats(testTable, partitionListBuilder.build(), dataSchema); - } - - @Test - public void testOptionalFields() throws Exception { - PartitionSpec spec = PartitionSpec.builderFor(SCHEMA).identity("id").build(); - Table testTable = - TestTables.create( - tempDir("test_partition_stats_optional"), - "test_partition_stats_optional", - SCHEMA, - spec, - SortOrder.unsorted(), - 2); - - Types.StructType partitionSchema = Partitioning.partitionType(testTable); - Schema dataSchema = PartitionStatsUtil.schema(partitionSchema); - - ImmutableList.Builder partitionListBuilder = ImmutableList.builder(); - - for (int i = 0; i < 5; i++) { - PartitionData partitionData = - new PartitionData(dataSchema.findField(Column.PARTITION.name()).type().asStructType()); - partitionData.set(0, RANDOM.nextLong()); - - Record record = GenericRecord.create(dataSchema); - record.set(Column.PARTITION.ordinal(), partitionDataToRecord(partitionSchema, partitionData)); - record.set(Column.SPEC_ID.ordinal(), RANDOM.nextInt(10)); - record.set(Column.DATA_RECORD_COUNT.ordinal(), RANDOM.nextLong()); - record.set(Column.DATA_FILE_COUNT.ordinal(), RANDOM.nextInt()); - record.set(Column.TOTAL_DATA_FILE_SIZE_IN_BYTES.ordinal(), 1024L * RANDOM.nextInt(20)); - record.set(Column.POSITION_DELETE_RECORD_COUNT.ordinal(), null); - record.set(Column.POSITION_DELETE_FILE_COUNT.ordinal(), null); - record.set(Column.EQUALITY_DELETE_RECORD_COUNT.ordinal(), null); - record.set(Column.EQUALITY_DELETE_FILE_COUNT.ordinal(), null); - record.set(Column.TOTAL_RECORD_COUNT.ordinal(), null); - record.set(Column.LAST_UPDATED_AT.ordinal(), null); - record.set(Column.LAST_UPDATED_SNAPSHOT_ID.ordinal(), null); - - partitionListBuilder.add(record); - } - - testPartitionStats(testTable, partitionListBuilder.build(), dataSchema); - } - - @Test - public void testAllDatatypePartition() throws Exception { - Schema schema = - new Schema( - required(100, "id", Types.LongType.get()), - optional(101, "data", Types.StringType.get()), - required(102, "b", Types.BooleanType.get()), - optional(103, "i", Types.IntegerType.get()), - required(104, "l", Types.LongType.get()), - optional(105, "f", Types.FloatType.get()), - required(106, "d", Types.DoubleType.get()), - optional(107, "date", Types.DateType.get()), - required(108, "ts", Types.TimestampType.withoutZone()), - required(110, "s", Types.StringType.get()), - required(111, "uuid", Types.UUIDType.get()), - required(112, "fixed", Types.FixedType.ofLength(7)), - optional(113, "bytes", Types.BinaryType.get()), - required(114, "dec_9_0", Types.DecimalType.of(9, 0)), - required(115, "dec_11_2", Types.DecimalType.of(11, 2)), - required(116, "dec_38_10", Types.DecimalType.of(38, 10)), // maximum precision - required(117, "time", Types.TimeType.get())); - - PartitionSpec spec = - PartitionSpec.builderFor(schema) - .identity("b") - .identity("i") - .identity("l") - .identity("f") - .identity("d") - .identity("date") - .identity("ts") - .identity("s") - .identity("uuid") - .identity("fixed") - .identity("bytes") - .identity("dec_9_0") - .identity("dec_11_2") - .identity("dec_38_10") - .identity("time") - .build(); - Table testTable = - TestTables.create( - tempDir("test_all_type"), "test_all_type", schema, spec, SortOrder.unsorted(), 2); - - Types.StructType partitionSchema = Partitioning.partitionType(testTable); - Schema dataSchema = PartitionStatsUtil.schema(partitionSchema); - - PartitionData partitionData = - new PartitionData(dataSchema.findField(Column.PARTITION.name()).type().asStructType()); - partitionData.set(0, true); - partitionData.set(1, 42); - partitionData.set(2, 42L); - partitionData.set(3, 3.14f); - partitionData.set(4, 3.141592653589793); - partitionData.set(5, Literal.of("2022-01-01").to(Types.DateType.get()).value()); - partitionData.set( - 6, Literal.of("2017-12-01T10:12:55.038194").to(Types.TimestampType.withoutZone()).value()); - partitionData.set(7, "string"); - partitionData.set(8, UUIDUtil.convertToByteBuffer(UUID.randomUUID())); - partitionData.set(9, new byte[] {0, 1, 2, 3, 4, 5, 6}); - partitionData.set(10, new byte[] {1, 2, 3}); - partitionData.set(11, Literal.of("123456789").to(Types.DecimalType.of(9, 0)).value()); - partitionData.set(12, Literal.of("1234567.89").to(Types.DecimalType.of(11, 2)).value()); - partitionData.set( - 13, Literal.of("12345678901234567890.1234567890").to(Types.DecimalType.of(38, 10)).value()); - partitionData.set(14, Literal.of("10:10:10").to(Types.TimeType.get()).value()); - - Record record = GenericRecord.create(dataSchema); - record.set(Column.PARTITION.ordinal(), partitionDataToRecord(partitionSchema, partitionData)); - record.set(Column.SPEC_ID.ordinal(), RANDOM.nextInt(10)); - record.set(Column.DATA_RECORD_COUNT.ordinal(), RANDOM.nextLong()); - record.set(Column.DATA_FILE_COUNT.ordinal(), RANDOM.nextInt()); - record.set(Column.TOTAL_DATA_FILE_SIZE_IN_BYTES.ordinal(), 1024L * RANDOM.nextInt(20)); - - testPartitionStats(testTable, Collections.singletonList(record), dataSchema); - } - - private static void testPartitionStats( - Table testTable, List expectedRecords, Schema dataSchema) throws IOException { - OutputFile outputFile = PartitionStatsGeneratorUtil.newPartitionStatsFile(testTable, 42L); - PartitionStatsGeneratorUtil.writePartitionStatsFile( - testTable, expectedRecords.iterator(), outputFile); - assertThat(Paths.get(outputFile.location())).exists(); - - List writtenRecords; - try (CloseableIterable recordIterator = - PartitionStatsGeneratorUtil.readPartitionStatsFile( - dataSchema, Files.localInput(outputFile.location()))) { - writtenRecords = Lists.newArrayList(recordIterator); - } - assertThatIterable(writtenRecords).isEqualTo(expectedRecords); - } - - private static Record partitionDataToRecord( - Types.StructType partitionSchema, PartitionData partitionData) { - GenericRecord genericRecord = GenericRecord.create(partitionSchema); - for (int index = 0; index < partitionData.size(); index++) { - genericRecord.set( - index, - IdentityPartitionConverters.convertConstant( - partitionSchema.fields().get(index).type(), partitionData.get(index))); - } - - return genericRecord; - } - - private File tempDir(String folderName) throws IOException { - return java.nio.file.Files.createTempDirectory(temp.toPath(), folderName).toFile(); - } -} diff --git a/data/src/test/java/org/apache/iceberg/data/TestPartitionStatsGenerator.java b/data/src/test/java/org/apache/iceberg/data/TestPartitionStatsHandler.java similarity index 69% rename from data/src/test/java/org/apache/iceberg/data/TestPartitionStatsGenerator.java rename to data/src/test/java/org/apache/iceberg/data/TestPartitionStatsHandler.java index 1f2e2448db4c..1a8ce4304308 100644 --- a/data/src/test/java/org/apache/iceberg/data/TestPartitionStatsGenerator.java +++ b/data/src/test/java/org/apache/iceberg/data/TestPartitionStatsHandler.java @@ -18,14 +18,20 @@ */ package org.apache.iceberg.data; +import static org.apache.iceberg.data.PartitionStatsHandler.Column; import static org.apache.iceberg.types.Types.NestedField.optional; +import static org.apache.iceberg.types.Types.NestedField.required; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; import java.io.File; import java.io.IOException; import java.util.Arrays; +import java.util.Collections; +import java.util.Iterator; import java.util.List; +import java.util.Random; +import java.util.concurrent.ThreadLocalRandom; import org.apache.iceberg.DataFile; import org.apache.iceberg.DeleteFile; import org.apache.iceberg.FileFormat; @@ -35,7 +41,7 @@ import org.apache.iceberg.Parameters; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.PartitionStatisticsFile; -import org.apache.iceberg.PartitionStatsUtil; +import org.apache.iceberg.PartitionStats; import org.apache.iceberg.Partitioning; import org.apache.iceberg.Schema; import org.apache.iceberg.Snapshot; @@ -47,6 +53,7 @@ import org.apache.iceberg.deletes.PositionDelete; import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.io.OutputFile; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.types.Types; @@ -57,7 +64,7 @@ import org.junit.jupiter.api.io.TempDir; @ExtendWith(ParameterizedTestExtension.class) -public class TestPartitionStatsGenerator { +public class TestPartitionStatsHandler { private static final Schema SCHEMA = new Schema( optional(1, "c1", Types.IntegerType.get()), @@ -69,6 +76,8 @@ public class TestPartitionStatsGenerator { @TempDir public File temp; + private static final Random RANDOM = ThreadLocalRandom.current(); + @Parameters(name = "fileFormat = {0}") public static List parameters() { return Arrays.asList(FileFormat.PARQUET, FileFormat.ORC, FileFormat.AVRO); @@ -79,24 +88,18 @@ public static List parameters() { @Test public void testPartitionStatsOnEmptyTable() throws Exception { Table testTable = TestTables.create(tempDir("empty_table"), "empty_table", SCHEMA, SPEC, 2); - - PartitionStatsGenerator partitionStatsGenerator = new PartitionStatsGenerator(testTable); - assertThat(partitionStatsGenerator.generate()).isNull(); + assertThat(PartitionStatsHandler.computeAndWritePartitionStatsFile(testTable)).isNull(); } @Test public void testPartitionStatsOnEmptyBranch() throws Exception { Table testTable = TestTables.create(tempDir("empty_branch"), "empty_branch", SCHEMA, SPEC, 2); - testTable.manageSnapshots().createBranch("b1").commit(); - - PartitionStatsGenerator partitionStatsGenerator = new PartitionStatsGenerator(testTable, "b1"); + PartitionStatisticsFile partitionStatisticsFile = + PartitionStatsHandler.computeAndWritePartitionStatsFile(testTable, "b1"); // creates an empty stats file since the dummy snapshot exist - assertThat(partitionStatsGenerator.generate()) - .extracting(PartitionStatisticsFile::fileSizeInBytes) - .isEqualTo(0L); - assertThat(partitionStatsGenerator.generate()) - .extracting(PartitionStatisticsFile::snapshotId) + assertThat(partitionStatisticsFile.fileSizeInBytes()).isEqualTo(0L); + assertThat(partitionStatisticsFile.snapshotId()) .isEqualTo(testTable.refs().get("b1").snapshotId()); } @@ -104,10 +107,10 @@ public void testPartitionStatsOnEmptyBranch() throws Exception { public void testPartitionStatsOnInvalidSnapshot() throws Exception { Table testTable = TestTables.create(tempDir("invalid_snapshot"), "invalid_snapshot", SCHEMA, SPEC, 2); - - PartitionStatsGenerator partitionStatsGenerator = - new PartitionStatsGenerator(testTable, "INVALID_BRANCH"); - assertThatThrownBy(partitionStatsGenerator::generate) + assertThatThrownBy( + () -> + PartitionStatsHandler.computeAndWritePartitionStatsFile( + testTable, "INVALID_BRANCH")) .isInstanceOf(IllegalArgumentException.class) .hasMessage("Couldn't find the snapshot for the branch INVALID_BRANCH"); } @@ -126,8 +129,7 @@ public void testPartitionStatsOnUnPartitionedTable() throws Exception { DataFile dataFile = FileHelpers.writeDataFile(testTable, outputFile(), records); testTable.newAppend().appendFile(dataFile).commit(); - PartitionStatsGenerator partitionStatsGenerator = new PartitionStatsGenerator(testTable); - assertThatThrownBy(partitionStatsGenerator::generate) + assertThatThrownBy(() -> PartitionStatsHandler.computeAndWritePartitionStatsFile(testTable)) .isInstanceOf(IllegalStateException.class) .hasMessage("getting schema for an unpartitioned table"); } @@ -170,9 +172,9 @@ public void testPartitionStats() throws Exception { } Snapshot snapshot1 = testTable.currentSnapshot(); - Schema recordSchema = PartitionStatsUtil.schema(Partitioning.partitionType(testTable)); + Schema recordSchema = PartitionStatsHandler.schema(Partitioning.partitionType(testTable)); Types.StructType partitionType = - recordSchema.findField(PartitionStatsUtil.Column.PARTITION.name()).type().asStructType(); + recordSchema.findField(Column.PARTITION.name()).type().asStructType(); computeAndValidatePartitionStats( testTable, recordSchema, @@ -235,9 +237,8 @@ public void testPartitionStats() throws Exception { DeleteFile eqDeletes = commitEqualityDeletes(testTable); Snapshot snapshot3 = testTable.currentSnapshot(); - recordSchema = PartitionStatsUtil.schema(Partitioning.partitionType(testTable)); - partitionType = - recordSchema.findField(PartitionStatsUtil.Column.PARTITION.name()).type().asStructType(); + recordSchema = PartitionStatsHandler.schema(Partitioning.partitionType(testTable)); + partitionType = recordSchema.findField(Column.PARTITION.name()).type().asStructType(); computeAndValidatePartitionStats( testTable, recordSchema, @@ -323,9 +324,9 @@ public void testPartitionStatsWithSchemaEvolution() throws Exception { } Snapshot snapshot1 = testTable.currentSnapshot(); - Schema recordSchema = PartitionStatsUtil.schema(Partitioning.partitionType(testTable)); + Schema recordSchema = PartitionStatsHandler.schema(Partitioning.partitionType(testTable)); Types.StructType partitionType = - recordSchema.findField(PartitionStatsUtil.Column.PARTITION.name()).type().asStructType(); + recordSchema.findField(Column.PARTITION.name()).type().asStructType(); computeAndValidatePartitionStats( testTable, @@ -390,15 +391,14 @@ public void testPartitionStatsWithSchemaEvolution() throws Exception { .commit(); Snapshot snapshot2 = testTable.currentSnapshot(); - recordSchema = PartitionStatsUtil.schema(Partitioning.partitionType(testTable)); - partitionType = - recordSchema.findField(PartitionStatsUtil.Column.PARTITION.name()).type().asStructType(); + recordSchema = PartitionStatsHandler.schema(Partitioning.partitionType(testTable)); + partitionType = recordSchema.findField(Column.PARTITION.name()).type().asStructType(); computeAndValidatePartitionStats( testTable, recordSchema, Tuple.tuple( partitionRecord(partitionType, "foo", null), - 0, + 0, // old spec id as the record is unmodified 8L, 2, 2 * dataFile1.fileSizeInBytes(), @@ -411,7 +411,7 @@ public void testPartitionStatsWithSchemaEvolution() throws Exception { snapshot1.snapshotId()), Tuple.tuple( partitionRecord(partitionType, "bar", null), - 1, // observe the old spec id as the record is unmodified + 1, 7L, 3, 2 * dataFile2.fileSizeInBytes() + dataFile7.fileSizeInBytes(), @@ -476,6 +476,122 @@ public void testPartitionStatsWithSchemaEvolution() throws Exception { snapshot2.snapshotId())); } + @Test + public void testAllDatatypePartitionWriting() throws Exception { + Schema schema = + new Schema( + required(100, "id", Types.LongType.get()), + optional(101, "data", Types.StringType.get()), + optional(113, "bytes", Types.BinaryType.get())); + + PartitionSpec spec = PartitionSpec.builderFor(schema).identity("bytes").build(); + Table testTable = + TestTables.create( + tempDir("test_all_type"), "test_all_type", schema, spec, SortOrder.unsorted(), 2); + + Types.StructType partitionSchema = Partitioning.partitionType(testTable); + Schema dataSchema = PartitionStatsHandler.schema(partitionSchema); + + Record partitionData = + GenericRecord.create(dataSchema.findField(Column.PARTITION.name()).type().asStructType()); + partitionData.set(0, new byte[] {1, 2, 3}); + + PartitionStats partitionStats = new PartitionStats(partitionData); + partitionStats.set(Column.SPEC_ID.id(), RANDOM.nextInt(10)); + partitionStats.set(Column.DATA_RECORD_COUNT.id(), RANDOM.nextLong()); + partitionStats.set(Column.DATA_FILE_COUNT.id(), RANDOM.nextInt()); + partitionStats.set(Column.TOTAL_DATA_FILE_SIZE_IN_BYTES.id(), 1024L * RANDOM.nextInt(20)); + + Iterator convertedRecords = + PartitionStatsHandler.statsToRecords( + Collections.singletonList(partitionStats).iterator(), dataSchema); + + List expectedRecords = Lists.newArrayList(convertedRecords); + + PartitionStatisticsFile statisticsFile = + PartitionStatsHandler.writePartitionStatsFile( + testTable, 42L, dataSchema, expectedRecords.iterator()); + + List writtenRecords; + try (CloseableIterable recordIterator = + PartitionStatsHandler.readPartitionStatsFile( + dataSchema, Files.localInput(statisticsFile.path()))) { + writtenRecords = Lists.newArrayList(recordIterator); + } + assertThat(writtenRecords).isEqualTo(expectedRecords); + } + + @Test + public void testOptionalFieldsWriting() throws Exception { + PartitionSpec spec = PartitionSpec.builderFor(SCHEMA).identity("c1").build(); + Table testTable = + TestTables.create( + tempDir("test_partition_stats_optional"), + "test_partition_stats_optional", + SCHEMA, + spec, + SortOrder.unsorted(), + 2); + + Types.StructType partitionSchema = Partitioning.partitionType(testTable); + Schema dataSchema = PartitionStatsHandler.schema(partitionSchema); + + ImmutableList.Builder partitionListBuilder = ImmutableList.builder(); + + for (int i = 0; i < 5; i++) { + GenericRecord partitionData = + GenericRecord.create(dataSchema.findField(Column.PARTITION.name()).type().asStructType()); + partitionData.set(0, RANDOM.nextInt()); + + PartitionStats partitionStats = new PartitionStats(partitionData); + + partitionStats.set(Column.PARTITION.ordinal(), partitionData); + partitionStats.set(Column.SPEC_ID.ordinal(), RANDOM.nextInt(10)); + partitionStats.set(Column.DATA_RECORD_COUNT.ordinal(), RANDOM.nextLong()); + partitionStats.set(Column.DATA_FILE_COUNT.ordinal(), RANDOM.nextInt()); + partitionStats.set( + Column.TOTAL_DATA_FILE_SIZE_IN_BYTES.ordinal(), 1024L * RANDOM.nextInt(20)); + partitionStats.set(Column.POSITION_DELETE_RECORD_COUNT.ordinal(), null); + partitionStats.set(Column.POSITION_DELETE_FILE_COUNT.ordinal(), null); + partitionStats.set(Column.EQUALITY_DELETE_RECORD_COUNT.ordinal(), null); + partitionStats.set(Column.EQUALITY_DELETE_FILE_COUNT.ordinal(), null); + partitionStats.set(Column.TOTAL_RECORD_COUNT.ordinal(), null); + partitionStats.set(Column.LAST_UPDATED_AT.ordinal(), null); + partitionStats.set(Column.LAST_UPDATED_SNAPSHOT_ID.ordinal(), null); + + partitionListBuilder.add(partitionStats); + } + + Iterator convertedRecords = + PartitionStatsHandler.statsToRecords(partitionListBuilder.build().iterator(), dataSchema); + + List expectedRecords = Lists.newArrayList(convertedRecords); + + PartitionStatisticsFile statisticsFile = + PartitionStatsHandler.writePartitionStatsFile( + testTable, 42L, dataSchema, expectedRecords.iterator()); + + List writtenRecords; + try (CloseableIterable recordIterator = + PartitionStatsHandler.readPartitionStatsFile( + dataSchema, Files.localInput(statisticsFile.path()))) { + writtenRecords = Lists.newArrayList(recordIterator); + } + assertThat(writtenRecords).isEqualTo(expectedRecords); + assertThat(expectedRecords.get(0).unwrap()) + .extracting( + PartitionStats::positionDeleteRecordCount, + PartitionStats::positionDeleteFileCount, + PartitionStats::equalityDeleteRecordCount, + PartitionStats::equalityDeleteFileCount, + PartitionStats::totalRecordCount, + PartitionStats::lastUpdatedAt, + PartitionStats::lastUpdatedSnapshotId) + .isEqualTo( + Arrays.asList( + 0L, 0, 0L, 0, 0L, null, null)); // null counters should be initialized to zero. + } + private OutputFile outputFile() throws IOException { return Files.localOutput(File.createTempFile("data", null, tempDir("stats"))); } @@ -515,32 +631,33 @@ private static void computeAndValidatePartitionStats( Table testTable, Schema recordSchema, Tuple... expectedValues) throws IOException { // compute and commit partition stats file Snapshot currentSnapshot = testTable.currentSnapshot(); - PartitionStatsGenerator partitionStatsGenerator = new PartitionStatsGenerator(testTable); - PartitionStatisticsFile result = partitionStatsGenerator.generate(); + PartitionStatisticsFile result = + PartitionStatsHandler.computeAndWritePartitionStatsFile(testTable); testTable.updatePartitionStatistics().setPartitionStatistics(result).commit(); assertThat(result.snapshotId()).isEqualTo(currentSnapshot.snapshotId()); // read the partition entries from the stats file - List rows; - try (CloseableIterable recordIterator = - PartitionStatsGeneratorUtil.readPartitionStatsFile( + List partitionStats; + try (CloseableIterable recordIterator = + PartitionStatsHandler.readPartitionStatsFile( recordSchema, Files.localInput(result.path()))) { - rows = Lists.newArrayList(recordIterator); + partitionStats = Lists.newArrayList(recordIterator); } - assertThat(rows) + assertThat(partitionStats) + .extracting(PartitionStatsRecord::unwrap) .extracting( - row -> row.get(PartitionStatsUtil.Column.PARTITION.ordinal()), - row -> row.get(PartitionStatsUtil.Column.SPEC_ID.ordinal()), - row -> row.get(PartitionStatsUtil.Column.DATA_RECORD_COUNT.ordinal()), - row -> row.get(PartitionStatsUtil.Column.DATA_FILE_COUNT.ordinal()), - row -> row.get(PartitionStatsUtil.Column.TOTAL_DATA_FILE_SIZE_IN_BYTES.ordinal()), - row -> row.get(PartitionStatsUtil.Column.POSITION_DELETE_RECORD_COUNT.ordinal()), - row -> row.get(PartitionStatsUtil.Column.POSITION_DELETE_FILE_COUNT.ordinal()), - row -> row.get(PartitionStatsUtil.Column.EQUALITY_DELETE_RECORD_COUNT.ordinal()), - row -> row.get(PartitionStatsUtil.Column.EQUALITY_DELETE_FILE_COUNT.ordinal()), - row -> row.get(PartitionStatsUtil.Column.TOTAL_RECORD_COUNT.ordinal()), - row -> row.get(PartitionStatsUtil.Column.LAST_UPDATED_AT.ordinal()), - row -> row.get(PartitionStatsUtil.Column.LAST_UPDATED_SNAPSHOT_ID.ordinal())) + PartitionStats::partition, + PartitionStats::specId, + PartitionStats::dataRecordCount, + PartitionStats::dataFileCount, + PartitionStats::totalDataFileSizeInBytes, + PartitionStats::positionDeleteRecordCount, + PartitionStats::positionDeleteFileCount, + PartitionStats::equalityDeleteRecordCount, + PartitionStats::equalityDeleteFileCount, + PartitionStats::totalRecordCount, + PartitionStats::lastUpdatedAt, + PartitionStats::lastUpdatedSnapshotId) .containsExactlyInAnyOrder(expectedValues); }