Skip to content

Commit

Permalink
Refactor and handle comments
Browse files Browse the repository at this point in the history
  • Loading branch information
ajantha-bhat committed Aug 21, 2024
1 parent 9c380e2 commit 827cb20
Show file tree
Hide file tree
Showing 7 changed files with 126 additions and 67 deletions.
42 changes: 22 additions & 20 deletions core/src/main/java/org/apache/iceberg/PartitionStatsUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,12 @@ public enum Column {
}

/**
* Generates a schema as per partition statistics spec based on the given partition type.
* Generates the Partition Stats Files Schema based on a given partition type.
*
* <p>Note: Provide the unified partition tuple as mentioned in the spec.
*
* @param partitionType the struct type that defines the structure of the partition.
* @return a schema that corresponds to the provided partition type.
* @return a schema that corresponds to the provided unified partition type.
*/
public static Schema schema(Types.StructType partitionType) {
Preconditions.checkState(
Expand Down Expand Up @@ -88,6 +90,10 @@ public static Schema schema(Types.StructType partitionType) {
*/
public static CloseableIterable<Record> fromManifest(
Table table, ManifestFile manifest, Schema recordSchema) {
Preconditions.checkState(
!recordSchema.findField(Column.PARTITION.name()).type().asStructType().fields().isEmpty(),
"record schema should not be unpartitioned");

return CloseableIterable.transform(
ManifestFiles.open(manifest, table.io(), table.specs())
.select(BaseScan.scanColumns(manifest.content()))
Expand Down Expand Up @@ -191,28 +197,24 @@ private static Record coercedPartitionData(
}

private static void checkAndIncrementLong(Record toUpdate, Record fromRecord, Column column) {
if (fromRecord.get(column.ordinal()) != null) {
if (toUpdate.get(column.ordinal()) != null) {
toUpdate.set(
column.ordinal(),
toUpdate.get(column.ordinal(), Long.class)
+ fromRecord.get(column.ordinal(), Long.class));
} else {
toUpdate.set(column.ordinal(), fromRecord.get(column.ordinal(), Long.class));
}
if ((fromRecord.get(column.ordinal()) != null) && (toUpdate.get(column.ordinal()) != null)) {
toUpdate.set(
column.ordinal(),
toUpdate.get(column.ordinal(), Long.class)
+ fromRecord.get(column.ordinal(), Long.class));
} else if (fromRecord.get(column.ordinal()) != null) {
toUpdate.set(column.ordinal(), fromRecord.get(column.ordinal(), Long.class));
}
}

private static void checkAndIncrementInt(Record toUpdate, Record fromRecord, Column column) {
if (fromRecord.get(column.ordinal()) != null) {
if (toUpdate.get(column.ordinal()) != null) {
toUpdate.set(
column.ordinal(),
toUpdate.get(column.ordinal(), Integer.class)
+ fromRecord.get(column.ordinal(), Integer.class));
} else {
toUpdate.set(column.ordinal(), fromRecord.get(column.ordinal(), Integer.class));
}
if ((fromRecord.get(column.ordinal()) != null) && (toUpdate.get(column.ordinal()) != null)) {
toUpdate.set(
column.ordinal(),
toUpdate.get(column.ordinal(), Integer.class)
+ fromRecord.get(column.ordinal(), Integer.class));
} else if (fromRecord.get(column.ordinal()) != null) {
toUpdate.set(column.ordinal(), fromRecord.get(column.ordinal(), Integer.class));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.avro.generic.GenericData;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.ByteBuffers;
import org.apache.iceberg.util.DateTimeUtil;

public class IdentityPartitionConverters {
Expand Down Expand Up @@ -51,12 +52,12 @@ public static Object convertConstant(Type type, Object value) {
if (value instanceof GenericData.Fixed) {
return ((GenericData.Fixed) value).bytes();
} else if (value instanceof ByteBuffer) {
return ((ByteBuffer) value).array();
return ByteBuffers.toByteArray((ByteBuffer) value);
}
return value;
case UUID:
if (value instanceof ByteBuffer) {
return ((ByteBuffer) value).array();
return ByteBuffers.toByteArray((ByteBuffer) value);
}
return value;
default:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,10 @@
import java.nio.file.Paths;
import java.util.Comparator;
import java.util.concurrent.TimeUnit;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import org.apache.iceberg.data.PartitionStatsGenerator;
import org.apache.iceberg.data.PartitionStatsWriterUtil;
import org.apache.iceberg.data.PartitionStatsGeneratorUtil;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.types.Types;
Expand Down Expand Up @@ -66,50 +67,45 @@ public class PartitionStatsGeneratorBenchmark {

private String baseDir;

// create 3 manifests with same partition values
private static final int ITERATION_COUNTER = 3;
// Create 10k manifests
private static final int MANIFEST_COUNTER = 10000;

// create 3 * 10000 manifests
private static final int PARTITION_COUNT = 10000;
// each manifest with 100 partition values
private static final int PARTITION_PER_MANIFEST = 100;

// one data file manifest
private static final int DATA_FILES_PER_PARTITION_COUNT = 1;
// 20 data files per partition, which results in 2k data files per manifest
private static final int DATA_FILES_PER_PARTITION_COUNT = 20;

private Table table;

private PartitionStatisticsFile result;

@Setup
public void setupBenchmark() throws IOException {
public void setupBenchmark() {
baseDir =
Paths.get(new File(System.getProperty("java.io.tmpdir")).getAbsolutePath()).toString();
table = TestTables.create(new File(baseDir), "foo", SCHEMA, SPEC, SortOrder.unsorted(), 2);

for (int interations = 0; interations < ITERATION_COUNTER; interations++) {
for (int partitionOrdinal = 0; partitionOrdinal < PARTITION_COUNT; partitionOrdinal++) {
StructLike partition = TestHelpers.Row.of(partitionOrdinal);
AppendFiles appendFiles = table.newAppend();
for (int fileOrdinal = 0; fileOrdinal < DATA_FILES_PER_PARTITION_COUNT; fileOrdinal++) {
DataFile dataFile = FileGenerationUtil.generateDataFile(table, partition);
appendFiles.appendFile(dataFile);
}

appendFiles.commit();
}
}
IntStream.range(0, MANIFEST_COUNTER)
.forEach(
manifestCount -> {
AppendFiles appendFiles = table.newAppend();

IntStream.range(0, PARTITION_PER_MANIFEST)
.forEach(
partitionOrdinal -> {
StructLike partition = TestHelpers.Row.of(partitionOrdinal);
IntStream.range(0, DATA_FILES_PER_PARTITION_COUNT)
.forEach(
fileOrdinal ->
appendFiles.appendFile(
FileGenerationUtil.generateDataFile(table, partition)));
});

appendFiles.commit();
});
}

@TearDown
public void tearDownBenchmark() throws IOException {
// validate row count
try (CloseableIterable<Record> recordIterator =
PartitionStatsWriterUtil.readPartitionStatsFile(
PartitionStatsUtil.schema(Partitioning.partitionType(table)),
Files.localInput(result.path()))) {
assertThat(recordIterator).hasSize(PARTITION_COUNT);
}

// clean up the temp folder
if (baseDir != null) {
try (Stream<Path> walk = java.nio.file.Files.walk(Paths.get(baseDir))) {
walk.sorted(Comparator.reverseOrder()).map(Path::toFile).forEach(File::delete);
Expand All @@ -120,12 +116,20 @@ public void tearDownBenchmark() throws IOException {

@Benchmark
@Threads(1)
public void writePartitionStats() {
public void benchmarkPartitionStats() throws IOException {
Snapshot currentSnapshot = table.currentSnapshot();

PartitionStatsGenerator partitionStatsGenerator = new PartitionStatsGenerator(table);
result = partitionStatsGenerator.generate();
PartitionStatisticsFile result = partitionStatsGenerator.generate();
table.updatePartitionStatistics().setPartitionStatistics(result).commit();
assertThat(result.snapshotId()).isEqualTo(currentSnapshot.snapshotId());

// validate row count
try (CloseableIterable<Record> recordIterator =
PartitionStatsGeneratorUtil.readPartitionStatsFile(
PartitionStatsUtil.schema(Partitioning.partitionType(table)),
Files.localInput(result.path()))) {
assertThat(recordIterator).hasSize(PARTITION_PER_MANIFEST);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,12 +45,17 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Generates {@link PartitionStatisticsFile} file as per the spec for the given table. Computes the
* stats by going through the partition info stored in each manifest file.
*/
public class PartitionStatsGenerator {
private static final Logger LOG = LoggerFactory.getLogger(PartitionStatsGenerator.class);

private final Table table;
private String branch;
private Types.StructType partitionType;
// Map of PartitionData, partition-stats-entry per partitionData.
private Map<Record, Record> partitionEntryMap;

public PartitionStatsGenerator(Table table) {
Expand All @@ -77,7 +82,6 @@ public PartitionStatisticsFile generate() {
}

partitionType = Partitioning.partitionType(table);
// Map of partitionData, partition-stats-entry per partitionData.
partitionEntryMap = Maps.newConcurrentMap();

Schema dataSchema = PartitionStatsUtil.schema(partitionType);
Expand Down Expand Up @@ -119,8 +123,8 @@ public PartitionStatisticsFile generate() {
Iterators.transform(sortedKeys.iterator(), this::convertPartitionRecords);

OutputFile outputFile =
PartitionStatsWriterUtil.newPartitionStatsFile(table, currentSnapshot.snapshotId());
PartitionStatsWriterUtil.writePartitionStatsFile(table, entriesForWriter, outputFile);
PartitionStatsGeneratorUtil.newPartitionStatsFile(table, currentSnapshot.snapshotId());
PartitionStatsGeneratorUtil.writePartitionStatsFile(table, entriesForWriter, outputFile);
return ImmutableGenericPartitionStatisticsFile.builder()
.snapshotId(currentSnapshot.snapshotId())
.path(outputFile.location())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.HasTableOperations;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.PartitionStatisticsFile;
import org.apache.iceberg.PartitionStatsUtil;
import org.apache.iceberg.Partitioning;
import org.apache.iceberg.Schema;
Expand All @@ -42,10 +43,22 @@
import org.apache.iceberg.io.OutputFile;
import org.apache.iceberg.parquet.Parquet;

public final class PartitionStatsWriterUtil {
/**
* Util to write and read the {@link PartitionStatisticsFile}. Uses generic readers and writes to
* support writing and reading of the stats in table default format.
*/
public final class PartitionStatsGeneratorUtil {

private PartitionStatsWriterUtil() {}
private PartitionStatsGeneratorUtil() {}

/**
* Creates a new {@link OutputFile} for storing partition statistics for the specified table. The
* output file extension will be same as table's default format.
*
* @param table The {@link Table} for which the partition statistics file is being created.
* @param snapshotId The ID of the snapshot associated with the partition statistics.
* @return A new {@link OutputFile} that can be used to write partition statistics.
*/
public static OutputFile newPartitionStatsFile(Table table, long snapshotId) {
FileFormat fileFormat =
FileFormat.fromString(
Expand All @@ -59,12 +72,20 @@ public static OutputFile newPartitionStatsFile(Table table, long snapshotId) {
fileFormat.addExtension(String.format("partition-stats-%d", snapshotId))));
}

/**
* Writes partition statistics to the specified {@link OutputFile} for the given table.
*
* @param table The {@link Table} for which the partition statistics are being written.
* @param records An {@link Iterator} of {@link Record} to be written into the file.
* @param outputFile The {@link OutputFile} where the partition statistics will be written. Should
* include the file extension.
*/
public static void writePartitionStatsFile(
Table table, Iterator<Record> records, OutputFile outputFile) {
Schema dataSchema = PartitionStatsUtil.schema(Partitioning.partitionType(table));
FileFormat fileFormat =
FileFormat.fromString(
table.properties().getOrDefault(DEFAULT_FILE_FORMAT, DEFAULT_FILE_FORMAT_DEFAULT));
outputFile.location().substring(outputFile.location().lastIndexOf(".") + 1));
FileWriterFactory<Record> factory =
GenericFileWriterFactory.builderFor(table)
.dataSchema(dataSchema)
Expand All @@ -82,6 +103,12 @@ public static void writePartitionStatsFile(
}
}

/**
* Reads partition statistics from the specified {@link InputFile} using given schema.
*
* @param schema The {@link Schema} of the partition statistics file.
* @param inputFile An {@link InputFile} pointing to the partition stats file.
*/
public static CloseableIterable<Record> readPartitionStatsFile(
Schema schema, InputFile inputFile) {
// TODO: support other formats or introduce GenericFileReader
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,27 @@ public void testPartitionStatsOnInvalidSnapshot() throws Exception {
.hasMessage("Couldn't find the snapshot for the branch INVALID_BRANCH");
}

@Test
public void testPartitionStatsOnUnPartitionedTable() throws Exception {
Table testTable =
TestTables.create(
temp.newFolder("unpartitioned_table"),
"unpartitioned_table",
SCHEMA,
PartitionSpec.unpartitioned(),
SortOrder.unsorted(),
2);

List<Record> records = prepareRecords(testTable.schema());
DataFile dataFile = FileHelpers.writeDataFile(testTable, outputFile(), records);
testTable.newAppend().appendFile(dataFile).commit();

PartitionStatsGenerator partitionStatsGenerator = new PartitionStatsGenerator(testTable);
assertThatThrownBy(partitionStatsGenerator::generate)
.isInstanceOf(IllegalStateException.class)
.hasMessage("getting schema for an unpartitioned table");
}

@SuppressWarnings("checkstyle:MethodLength")
@Test
public void testPartitionStats() throws Exception {
Expand Down Expand Up @@ -497,7 +518,7 @@ private static void computeAndValidatePartitionStats(
// read the partition entries from the stats file
List<Record> rows;
try (CloseableIterable<Record> recordIterator =
PartitionStatsWriterUtil.readPartitionStatsFile(
PartitionStatsGeneratorUtil.readPartitionStatsFile(
recordSchema, Files.localInput(result.path()))) {
rows = Lists.newArrayList(recordIterator);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@
import org.junit.Test;
import org.junit.rules.TemporaryFolder;

public class TestPartitionStatsWriterUtil {
public class TestPartitionStatsGeneratorUtil {
private static final Schema SCHEMA =
new Schema(
required(1, "id", Types.LongType.get()),
Expand Down Expand Up @@ -105,7 +105,7 @@ public void testPartitionStats() throws Exception {
}

@Test
public void testPartitionStatsOptionalFields() throws Exception {
public void testOptionalFields() throws Exception {
PartitionSpec spec = PartitionSpec.builderFor(SCHEMA).identity("id").build();
Table testTable =
TestTables.create(
Expand Down Expand Up @@ -230,14 +230,14 @@ public void testAllDatatypePartition() throws Exception {

private static void testPartitionStats(
Table testTable, List<Record> expectedRecords, Schema dataSchema) throws IOException {
OutputFile outputFile = PartitionStatsWriterUtil.newPartitionStatsFile(testTable, 42L);
PartitionStatsWriterUtil.writePartitionStatsFile(
OutputFile outputFile = PartitionStatsGeneratorUtil.newPartitionStatsFile(testTable, 42L);
PartitionStatsGeneratorUtil.writePartitionStatsFile(
testTable, expectedRecords.iterator(), outputFile);
assertThat(Paths.get(outputFile.location())).exists();

List<Record> writtenRecords;
try (CloseableIterable<Record> recordIterator =
PartitionStatsWriterUtil.readPartitionStatsFile(
PartitionStatsGeneratorUtil.readPartitionStatsFile(
dataSchema, Files.localInput(outputFile.location()))) {
writtenRecords = Lists.newArrayList(recordIterator);
}
Expand Down

0 comments on commit 827cb20

Please sign in to comment.