Skip to content

Commit

Permalink
Other format read support and testcase
Browse files Browse the repository at this point in the history
  • Loading branch information
ajantha-bhat committed Aug 23, 2024
1 parent 827cb20 commit d3345b5
Show file tree
Hide file tree
Showing 4 changed files with 92 additions and 45 deletions.
20 changes: 20 additions & 0 deletions core/src/test/java/org/apache/iceberg/TestTables.java
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,26 @@ public static TestTable create(
return create(temp, name, schema, spec, SortOrder.unsorted(), formatVersion);
}

public static TestTable create(
File temp,
String name,
Schema schema,
PartitionSpec spec,
int formatVersion,
Map<String, String> properties) {
TestTableOperations ops = new TestTableOperations(name, temp);
if (ops.current() != null) {
throw new AlreadyExistsException("Table %s already exists at location: %s", name, temp);
}

ops.commit(
null,
newTableMetadata(
schema, spec, SortOrder.unsorted(), temp.toString(), properties, formatVersion));

return new TestTable(ops, name);
}

public static TestTable create(
File temp,
String name,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@
import org.apache.iceberg.Partitioning;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
import org.apache.iceberg.avro.Avro;
import org.apache.iceberg.data.avro.DataReader;
import org.apache.iceberg.data.orc.GenericOrcReader;
import org.apache.iceberg.data.parquet.GenericParquetReaders;
import org.apache.iceberg.encryption.EncryptedFiles;
import org.apache.iceberg.encryption.EncryptionKeyMetadata;
Expand All @@ -41,6 +44,7 @@
import org.apache.iceberg.io.FileWriterFactory;
import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.io.OutputFile;
import org.apache.iceberg.orc.ORC;
import org.apache.iceberg.parquet.Parquet;

/**
Expand Down Expand Up @@ -111,10 +115,25 @@ public static void writePartitionStatsFile(
*/
public static CloseableIterable<Record> readPartitionStatsFile(
Schema schema, InputFile inputFile) {
// TODO: support other formats or introduce GenericFileReader
return Parquet.read(inputFile)
.project(schema)
.createReaderFunc(fileSchema -> GenericParquetReaders.buildReader(schema, fileSchema))
.build();
FileFormat fileFormat =
FileFormat.fromString(
inputFile.location().substring(inputFile.location().lastIndexOf(".") + 1));

switch (fileFormat) {
case PARQUET:
return Parquet.read(inputFile)
.project(schema)
.createReaderFunc(fileSchema -> GenericParquetReaders.buildReader(schema, fileSchema))
.build();
case ORC:
return ORC.read(inputFile)
.project(schema)
.createReaderFunc(fileSchema -> GenericOrcReader.buildReader(schema, fileSchema))
.build();
case AVRO:
return Avro.read(inputFile).project(schema).createReaderFunc(DataReader::create).build();
default:
throw new UnsupportedOperationException("Unsupported file format:" + fileFormat.name());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,15 @@

import java.io.File;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DeleteFile;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.Files;
import org.apache.iceberg.Parameter;
import org.apache.iceberg.ParameterizedTestExtension;
import org.apache.iceberg.Parameters;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.PartitionStatisticsFile;
import org.apache.iceberg.PartitionStatsUtil;
Expand All @@ -36,18 +41,22 @@
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.SortOrder;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.TestHelpers;
import org.apache.iceberg.TestTables;
import org.apache.iceberg.deletes.PositionDelete;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.OutputFile;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.types.Types;
import org.assertj.core.groups.Tuple;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestTemplate;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.api.io.TempDir;

@ExtendWith(ParameterizedTestExtension.class)
public class TestPartitionStatsGenerator {
private static final Schema SCHEMA =
new Schema(
Expand All @@ -58,23 +67,26 @@ public class TestPartitionStatsGenerator {
protected static final PartitionSpec SPEC =
PartitionSpec.builderFor(SCHEMA).identity("c2").identity("c3").build();

@Rule public TemporaryFolder temp = new TemporaryFolder();
@TempDir public File temp;

@Parameters(name = "fileFormat = {0}")
public static List<Object> parameters() {
return Arrays.asList(FileFormat.PARQUET, FileFormat.ORC, FileFormat.AVRO);
}

@Parameter private FileFormat format;

@Test
public void testPartitionStatsOnEmptyTable() throws Exception {
Table testTable =
TestTables.create(
temp.newFolder("empty_table"), "empty_table", SCHEMA, SPEC, SortOrder.unsorted(), 2);
Table testTable = TestTables.create(tempDir("empty_table"), "empty_table", SCHEMA, SPEC, 2);

PartitionStatsGenerator partitionStatsGenerator = new PartitionStatsGenerator(testTable);
assertThat(partitionStatsGenerator.generate()).isNull();
}

@Test
public void testPartitionStatsOnEmptyBranch() throws Exception {
Table testTable =
TestTables.create(
temp.newFolder("empty_branch"), "empty_branch", SCHEMA, SPEC, SortOrder.unsorted(), 2);
Table testTable = TestTables.create(tempDir("empty_branch"), "empty_branch", SCHEMA, SPEC, 2);

testTable.manageSnapshots().createBranch("b1").commit();

Expand All @@ -91,13 +103,7 @@ public void testPartitionStatsOnEmptyBranch() throws Exception {
@Test
public void testPartitionStatsOnInvalidSnapshot() throws Exception {
Table testTable =
TestTables.create(
temp.newFolder("invalid_snapshot"),
"invalid_snapshot",
SCHEMA,
SPEC,
SortOrder.unsorted(),
2);
TestTables.create(tempDir("invalid_snapshot"), "invalid_snapshot", SCHEMA, SPEC, 2);

PartitionStatsGenerator partitionStatsGenerator =
new PartitionStatsGenerator(testTable, "INVALID_BRANCH");
Expand All @@ -110,11 +116,10 @@ public void testPartitionStatsOnInvalidSnapshot() throws Exception {
public void testPartitionStatsOnUnPartitionedTable() throws Exception {
Table testTable =
TestTables.create(
temp.newFolder("unpartitioned_table"),
tempDir("unpartitioned_table"),
"unpartitioned_table",
SCHEMA,
PartitionSpec.unpartitioned(),
SortOrder.unsorted(),
2);

List<Record> records = prepareRecords(testTable.schema());
Expand All @@ -128,16 +133,16 @@ public void testPartitionStatsOnUnPartitionedTable() throws Exception {
}

@SuppressWarnings("checkstyle:MethodLength")
@Test
@TestTemplate // Tests for all the table formats (PARQUET, ORC, AVRO)
public void testPartitionStats() throws Exception {
Table testTable =
TestTables.create(
temp.newFolder("partition_stats"),
"partition_stats_compute",
tempDir("partition_stats_" + format.name()),
"partition_stats_compute_" + format.name(),
SCHEMA,
SPEC,
SortOrder.unsorted(),
2);
2,
ImmutableMap.of(TableProperties.DEFAULT_FILE_FORMAT, format.name()));

List<Record> records = prepareRecords(testTable.schema());
DataFile dataFile1 =
Expand Down Expand Up @@ -297,7 +302,7 @@ public void testPartitionStatsWithSchemaEvolution() throws Exception {

Table testTable =
TestTables.create(
temp.newFolder("partition_stats_schema_evolve"),
tempDir("partition_stats_schema_evolve"),
"partition_stats_schema_evolve",
SCHEMA,
specBefore,
Expand Down Expand Up @@ -472,7 +477,7 @@ public void testPartitionStatsWithSchemaEvolution() throws Exception {
}

private OutputFile outputFile() throws IOException {
return Files.localOutput(File.createTempFile("data", null, temp.newFolder()));
return Files.localOutput(File.createTempFile("data", null, tempDir("stats")));
}

private static Record partitionRecord(Types.StructType partitionType, String c2, String c3) {
Expand Down Expand Up @@ -548,7 +553,7 @@ private DeleteFile commitEqualityDeletes(Table testTable) throws IOException {
DeleteFile eqDeletes =
FileHelpers.writeDeleteFile(
testTable,
Files.localOutput(File.createTempFile("junit", null, temp.newFolder())),
Files.localOutput(File.createTempFile("junit", null, tempDir("eq_delete"))),
TestHelpers.Row.of("foo", "A"),
dataDeletes,
deleteRowSchema);
Expand All @@ -566,7 +571,7 @@ private DeleteFile commitPositionDeletes(Table testTable, DataFile dataFile1) th
DeleteFile posDeletes =
FileHelpers.writePosDeleteFile(
testTable,
Files.localOutput(File.createTempFile("junit", null, temp.newFolder())),
Files.localOutput(File.createTempFile("junit", null, tempDir("pos_delete"))),
TestHelpers.Row.of("bar", "A"),
deletes);
testTable.newRowDelta().addDeletes(posDeletes).commit();
Expand All @@ -583,4 +588,8 @@ private static PositionDelete<GenericRecord> positionDelete(
posDelete.set(path, position, nested);
return posDelete;
}

private File tempDir(String folderName) throws IOException {
return java.nio.file.Files.createTempDirectory(temp.toPath(), folderName).toFile();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatIterable;

import java.io.File;
import java.io.IOException;
import java.nio.file.Paths;
import java.util.Collections;
Expand All @@ -47,9 +48,8 @@
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.UUIDUtil;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;

public class TestPartitionStatsGeneratorUtil {
private static final Schema SCHEMA =
Expand All @@ -60,14 +60,14 @@ public class TestPartitionStatsGeneratorUtil {

private static final Random RANDOM = ThreadLocalRandom.current();

@Rule public TemporaryFolder temp = new TemporaryFolder();
@TempDir public File temp;

@Test
public void testPartitionStats() throws Exception {
PartitionSpec spec = PartitionSpec.builderFor(SCHEMA).identity("id").build();
Table testTable =
TestTables.create(
temp.newFolder("test_partition_stats"),
tempDir("test_partition_stats"),
"test_partition_stats",
SCHEMA,
spec,
Expand Down Expand Up @@ -109,7 +109,7 @@ public void testOptionalFields() throws Exception {
PartitionSpec spec = PartitionSpec.builderFor(SCHEMA).identity("id").build();
Table testTable =
TestTables.create(
temp.newFolder("test_partition_stats_optional"),
tempDir("test_partition_stats_optional"),
"test_partition_stats_optional",
SCHEMA,
spec,
Expand Down Expand Up @@ -188,12 +188,7 @@ public void testAllDatatypePartition() throws Exception {
.build();
Table testTable =
TestTables.create(
temp.newFolder("test_all_type"),
"test_all_type",
schema,
spec,
SortOrder.unsorted(),
2);
tempDir("test_all_type"), "test_all_type", schema, spec, SortOrder.unsorted(), 2);

Types.StructType partitionSchema = Partitioning.partitionType(testTable);
Schema dataSchema = PartitionStatsUtil.schema(partitionSchema);
Expand Down Expand Up @@ -256,4 +251,8 @@ private static Record partitionDataToRecord(

return genericRecord;
}

private File tempDir(String folderName) throws IOException {
return java.nio.file.Files.createTempDirectory(temp.toPath(), folderName).toFile();
}
}

0 comments on commit d3345b5

Please sign in to comment.