-
Notifications
You must be signed in to change notification settings - Fork 2.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Data: Add partition stats writer and reader #11216
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -25,10 +25,13 @@ | |
import java.util.List; | ||
import java.util.Map; | ||
import java.util.Queue; | ||
import org.apache.iceberg.data.GenericRecord; | ||
import org.apache.iceberg.data.Record; | ||
import org.apache.iceberg.relocated.com.google.common.base.Preconditions; | ||
import org.apache.iceberg.relocated.com.google.common.collect.Lists; | ||
import org.apache.iceberg.relocated.com.google.common.collect.Queues; | ||
import org.apache.iceberg.types.Comparators; | ||
import org.apache.iceberg.types.Types; | ||
import org.apache.iceberg.types.Types.StructType; | ||
import org.apache.iceberg.util.PartitionMap; | ||
import org.apache.iceberg.util.PartitionUtil; | ||
|
@@ -87,13 +90,10 @@ private static PartitionMap<PartitionStats> collectStats( | |
PartitionMap<PartitionStats> statsMap = PartitionMap.create(table.specs()); | ||
int specId = manifest.partitionSpecId(); | ||
PartitionSpec spec = table.specs().get(specId); | ||
PartitionData keyTemplate = new PartitionData(partitionType); | ||
|
||
for (ManifestEntry<?> entry : reader.entries()) { | ||
ContentFile<?> file = entry.file(); | ||
StructLike coercedPartition = | ||
PartitionUtil.coercePartition(partitionType, spec, file.partition()); | ||
StructLike key = keyTemplate.copyFor(coercedPartition); | ||
Record key = coercedPartitionRecord(file, spec, partitionType); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Need Cannot keep this conversion in the data module as it just to wraps the same |
||
Snapshot snapshot = table.snapshot(entry.snapshotId()); | ||
PartitionStats stats = | ||
statsMap.computeIfAbsent(specId, key, () -> new PartitionStats(key, specId)); | ||
|
@@ -133,4 +133,19 @@ private static Collection<PartitionStats> mergeStats( | |
|
||
return statsMap.values(); | ||
} | ||
|
||
private static Record coercedPartitionRecord( | ||
ContentFile<?> file, PartitionSpec spec, StructType partitionType) { | ||
// keep the partition data as per the unified spec by coercing | ||
StructLike partition = PartitionUtil.coercePartition(partitionType, spec, file.partition()); | ||
|
||
GenericRecord record = GenericRecord.create(partitionType); | ||
List<Types.NestedField> fields = partitionType.fields(); | ||
for (int index = 0; index < fields.size(); index++) { | ||
Object val = partition.get(index, fields.get(index).type().typeId().javaClass()); | ||
record.set(index, val); | ||
} | ||
|
||
return record; | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,170 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one | ||
* or more contributor license agreements. See the NOTICE file | ||
* distributed with this work for additional information | ||
* regarding copyright ownership. The ASF licenses this file | ||
* to you under the Apache License, Version 2.0 (the | ||
* "License"); you may not use this file except in compliance | ||
* with the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, | ||
* software distributed under the License is distributed on an | ||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
* KIND, either express or implied. See the License for the | ||
* specific language governing permissions and limitations | ||
* under the License. | ||
*/ | ||
package org.apache.iceberg.data; | ||
|
||
import com.github.benmanes.caffeine.cache.Caffeine; | ||
import com.github.benmanes.caffeine.cache.LoadingCache; | ||
import java.util.List; | ||
import java.util.Map; | ||
import org.apache.iceberg.PartitionStats; | ||
import org.apache.iceberg.Schema; | ||
import org.apache.iceberg.StructLike; | ||
import org.apache.iceberg.relocated.com.google.common.base.Objects; | ||
import org.apache.iceberg.relocated.com.google.common.base.Preconditions; | ||
import org.apache.iceberg.relocated.com.google.common.collect.Maps; | ||
import org.apache.iceberg.types.Types; | ||
import org.apache.iceberg.types.Types.StructType; | ||
|
||
/** Wraps the {@link PartitionStats} as {@link Record}. Used by generic writers and readers. */ | ||
public class PartitionStatsRecord implements Record, StructLike { | ||
private static final LoadingCache<StructType, Map<String, Integer>> NAME_MAP_CACHE = | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Class is similar to |
||
Caffeine.newBuilder() | ||
.weakKeys() | ||
.build( | ||
struct -> { | ||
Map<String, Integer> idToPos = Maps.newHashMap(); | ||
List<Types.NestedField> fields = struct.fields(); | ||
for (int index = 0; index < fields.size(); index++) { | ||
idToPos.put(fields.get(index).name(), index); | ||
} | ||
return idToPos; | ||
}); | ||
|
||
private final StructType struct; | ||
private final PartitionStats partitionStats; | ||
private final Map<String, Integer> nameToPos; | ||
|
||
public static PartitionStatsRecord create(Schema schema, PartitionStats partitionStats) { | ||
return new PartitionStatsRecord(schema.asStruct(), partitionStats); | ||
} | ||
|
||
public static PartitionStatsRecord create(StructType struct, PartitionStats partitionStats) { | ||
return new PartitionStatsRecord(struct, partitionStats); | ||
} | ||
|
||
public PartitionStats unwrap() { | ||
return partitionStats; | ||
} | ||
|
||
private PartitionStatsRecord(StructType struct, PartitionStats partitionStats) { | ||
this.struct = struct; | ||
this.partitionStats = partitionStats; | ||
this.nameToPos = NAME_MAP_CACHE.get(struct); | ||
} | ||
|
||
private PartitionStatsRecord(PartitionStatsRecord toCopy) { | ||
this.struct = toCopy.struct; | ||
this.partitionStats = toCopy.partitionStats; | ||
this.nameToPos = toCopy.nameToPos; | ||
} | ||
|
||
private PartitionStatsRecord(PartitionStatsRecord toCopy, Map<String, Object> overwrite) { | ||
this.struct = toCopy.struct; | ||
this.partitionStats = toCopy.partitionStats; | ||
this.nameToPos = toCopy.nameToPos; | ||
for (Map.Entry<String, Object> entry : overwrite.entrySet()) { | ||
setField(entry.getKey(), entry.getValue()); | ||
} | ||
} | ||
|
||
@Override | ||
public StructType struct() { | ||
return struct; | ||
} | ||
|
||
@Override | ||
public Object getField(String name) { | ||
Integer pos = nameToPos.get(name); | ||
Preconditions.checkArgument(pos != null, "Cannot get unknown field named: %s", name); | ||
return partitionStats.get(pos, Object.class); | ||
} | ||
|
||
@Override | ||
public void setField(String name, Object value) { | ||
Integer pos = nameToPos.get(name); | ||
Preconditions.checkArgument(pos != null, "Cannot set unknown field named: %s", name); | ||
partitionStats.set(pos, value); | ||
} | ||
|
||
@Override | ||
public int size() { | ||
return partitionStats.size(); | ||
} | ||
|
||
@Override | ||
public Object get(int pos) { | ||
return partitionStats.get(pos, Object.class); | ||
} | ||
|
||
@Override | ||
public <T> T get(int pos, Class<T> javaClass) { | ||
Object value = get(pos); | ||
if (value == null || javaClass.isInstance(value)) { | ||
return javaClass.cast(value); | ||
} else { | ||
throw new IllegalStateException("Not an instance of " + javaClass.getName() + ": " + value); | ||
} | ||
} | ||
|
||
@Override | ||
public <T> void set(int pos, T value) { | ||
partitionStats.set(pos, value); | ||
} | ||
|
||
@Override | ||
public PartitionStatsRecord copy() { | ||
return new PartitionStatsRecord(this); | ||
} | ||
|
||
@Override | ||
public PartitionStatsRecord copy(Map<String, Object> overwriteValues) { | ||
return new PartitionStatsRecord(this, overwriteValues); | ||
} | ||
|
||
@Override | ||
public String toString() { | ||
StringBuilder sb = new StringBuilder(); | ||
sb.append("Record("); | ||
for (int index = 0; index < partitionStats.size(); index++) { | ||
if (index != 0) { | ||
sb.append(", "); | ||
} | ||
sb.append(partitionStats.get(index, Object.class)); | ||
} | ||
sb.append(")"); | ||
return sb.toString(); | ||
} | ||
|
||
@Override | ||
public boolean equals(Object other) { | ||
if (this == other) { | ||
return true; | ||
} else if (!(other instanceof PartitionStatsRecord)) { | ||
return false; | ||
} | ||
|
||
PartitionStatsRecord that = (PartitionStatsRecord) other; | ||
return this.partitionStats.equals(that.partitionStats); | ||
} | ||
|
||
@Override | ||
public int hashCode() { | ||
return Objects.hashCode(partitionStats); | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -26,6 +26,7 @@ | |
import java.io.IOException; | ||
import java.util.Collection; | ||
import java.util.List; | ||
import org.apache.iceberg.data.GenericRecord; | ||
import org.apache.iceberg.relocated.com.google.common.collect.Lists; | ||
import org.apache.iceberg.types.Types; | ||
import org.assertj.core.groups.Tuple; | ||
|
@@ -370,17 +371,17 @@ public void testPartitionStatsWithSchemaEvolution() throws Exception { | |
snapshot2.snapshotId())); | ||
} | ||
|
||
private static PartitionData partitionData(Types.StructType partitionType, String c2, String c3) { | ||
PartitionData partitionData = new PartitionData(partitionType); | ||
partitionData.set(0, c2); | ||
partitionData.set(1, c3); | ||
return partitionData; | ||
private static StructLike partitionData(Types.StructType partitionType, String c2, String c3) { | ||
GenericRecord record = GenericRecord.create(partitionType); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Stores the |
||
record.set(0, c2); | ||
record.set(1, c3); | ||
return record; | ||
} | ||
|
||
private static PartitionData partitionData(Types.StructType partitionType, String c2) { | ||
PartitionData partitionData = new PartitionData(partitionType); | ||
partitionData.set(0, c2); | ||
return partitionData; | ||
private static StructLike partitionData(Types.StructType partitionType, String c2) { | ||
GenericRecord record = GenericRecord.create(partitionType); | ||
record.set(0, c2); | ||
return record; | ||
} | ||
|
||
private static List<DataFile> prepareDataFiles(Table table) { | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -93,6 +93,26 @@ public static TestTable create( | |
return new TestTable(ops, name, reporter); | ||
} | ||
|
||
public static TestTable create( | ||
File temp, | ||
String name, | ||
Schema schema, | ||
PartitionSpec spec, | ||
int formatVersion, | ||
Map<String, String> properties) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There was no option to pass the table properties before. |
||
TestTableOperations ops = new TestTableOperations(name, temp); | ||
if (ops.current() != null) { | ||
throw new AlreadyExistsException("Table %s already exists at location: %s", name, temp); | ||
} | ||
|
||
ops.commit( | ||
null, | ||
newTableMetadata( | ||
schema, spec, SortOrder.unsorted(), temp.toString(), properties, formatVersion)); | ||
|
||
return new TestTable(ops, name); | ||
} | ||
|
||
public static Transaction beginCreate(File temp, String name, Schema schema, PartitionSpec spec) { | ||
return beginCreate(temp, name, schema, spec, SortOrder.unsorted()); | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
StructLikeMap
was previously handling this implicitly. But whenPartitionStatsRecord
wrapsPartitionStats
now for the writers, it needs to overrideequals
andhashcode