Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Data: Add a util to read write partition stats #10176

Closed
wants to merge 10 commits into from

Conversation

ajantha-bhat
Copy link
Member

@ajantha-bhat ajantha-bhat commented Apr 18, 2024

Based on the conclusion from #9437, we are going with the local implementation with multi threads instead of distributive algorithm.

iceberg-core module cannot write stats as it doesn't depend on parquet code.
So, iceberg-data module is introducing an API to write it and core module just registers it to the table.

Syntax look like this.

PartitionStatisticsFile result = PartitionStatsHandler.computeAndWritePartitionStatsFile(table);
table.updatePartitionStatistics().setPartitionStatistics(result).commit();

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class GeneratePartitionStats {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

iceberg-core module cannot write stats as it doesn't depend on parquet code.
So, iceberg-data module is introducing an API to write it and core module just registers it to the table.


public static CloseableIterable<Record> readPartitionStatsFile(
Schema schema, InputFile inputFile) {
// TODO: support other formats or introduce GenericFileReader
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we don't have GenericFileReader, all reading is based on tableScan currently. For this we may need GenericFileReader similar to the writer we used

Types.NestedField.required(3, Column.DATA_RECORD_COUNT.name(), Types.LongType.get()),
Types.NestedField.required(4, Column.DATA_FILE_COUNT.name(), Types.IntegerType.get()),
Types.NestedField.required(5, Column.DATA_FILE_SIZE_IN_BYTES.name(), Types.LongType.get()),
Types.NestedField.optional(
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

observe optional and required fields, as per spec.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Question: We made these counts nullable cause we think not all implementations will populate these values? We will still write 0 if needed in the current implementation, though? Except the total record count, which may be expensive to compute.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

last computed snapshot may be unavailable at the time of computing the stats (due to expire snapshots). So, LAST_UPDATED_SNAPSHOT_ID and LAST_UPDATED_AT can be null. Hence, it is optional. Also, it doesn't make sense to fill it with 0 as snapshot id 0 is not meaningful in iceberg.

To maintain uniform behaviour of all optional fields, the stats like TOTAL_RECORD_COUNT, EQUALITY_DELETE_FILE_COUNT, EQUALITY_DELETE_RECORD_COUNT, POSITION_DELETE_FILE_COUNT, POSITION_DELETE_RECORD_COUNT will be made null when absent instead of 0.

I have added a nice testcase which validates all these now.

Also, schema(optional & required) is as spec: https://iceberg.apache.org/spec/#partition-statistics-file
If we decide to change optional to required now, it has to go through vote :(

Copy link
Contributor

@aokolnychyi aokolnychyi Aug 22, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I understand why we mark LAST_UPDATED_SNAPSHOT_ID and LAST_UPDATED_AT as optional. My question was about counts. In my view, null means unknown and 0 means absent. If I read a partition stats file and see null as the number of position deletes, it won't be informative.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It makes sense to keep fields optional but I would expect implementations to write 0 for the position delete record count if a partition has no deletes.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done.

record.set(Column.SPEC_ID.ordinal(), entry.file().specId());

Snapshot snapshot = table.snapshot(entry.snapshotId());
if (snapshot != null) {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if the snapshot is expired, last updated time and last updated snapshot will be null.

"Unsupported file content type: " + entry.file().content());
}

// TODO: optionally compute TOTAL_RECORD_COUNT based on the flag
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the idea of this field is to apply the equality or position delete filters and give the effective results. Need to add a logic to compute it in the follow up.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That would be tricky, not sure we can actually do it without actually scanning rows.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

True. But we decided we need to have this optional field during spec design.

Plan is to have a follow up PR with a flag that enables computing this. By default it won't be computed as it is an expensive operation.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure we would want to extend this path to compute deletes. We may also reconsider how position deletes are handled in V3, so it may be possible to compute this without scanning the data.

I'd probably drop this TODO for now.

Copy link
Member Author

@ajantha-bhat ajantha-bhat Jul 31, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

True. Maybe we didn't think well about this field during spec design. We can drop a todo (add a note instead) and make this field deprecated if required later on.

@ajantha-bhat
Copy link
Member Author

ping @aokolnychyi

@ajantha-bhat
Copy link
Member Author

cc: @RussellSpitzer, @szehon-ho : Can you please take a look? I need reviews and I want to take it forward.
previous discussions can be found at #9437

@ajantha-bhat ajantha-bhat marked this pull request as ready for review May 16, 2024 14:21
@ajantha-bhat
Copy link
Member Author

@RussellSpitzer, @szehon-ho: It seems Anton is on holiday for 2-3 weeks. Is it possible for any of you to support for review?

cc: @jbonofre

}

public static void updateRecord(Record toUpdate, Record fromRecord) {
toUpdate.set(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this mean we allow merging records with different spec IDs? Suppose we start with Spec1: {Field#1}. And we write a partition Filed#1 = 1. Then we change spec to Spec2: {Field#1, Field#2}, and write another partition Field#1 = 1 / Field#2 = null. Will we only have one entry in the partition stats file with Partition_Data: {Field#1 = 1 / Field#2 = null}, Spec_ID: 2?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes. please refer to the last point of #9437 (comment)

Also, it is similar to how the partitions metadata table implemented currently.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm.. but how can such a stats entry be useful? From a user's perspective, there's no way to tell if an entry is merged. Suppose the entry says we have 10 records for partition {Field#1 = 1 / Field#2 = null}. This doesn't mean there're 10 records satisfying filter Field#1 = 1 and Field#2 is null, right?

Copy link
Member Author

@ajantha-bhat ajantha-bhat Jun 18, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let us go with the example,
partitioned with col1 and inserted 5 rows for that partition col1 = x.
now the spec has evolved with partition as col1/col2 and inserted 3 rows for col1 = x and col2 = y
and inserted 7 rows for col1 = x and col2 = null

So, when the query is run with col1 = x and col2 = null, it scan 12 (5+7) rows. So, the partition stats will also have stats from 12 rows for col1 = x and col2 = null (unified tuple). So, I think it is fine.

Can you elaborate why do you think it is a problem to have unified tuple?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We're planning to use partition level stats to accelerate queries fetching partition metadata. I think the unified tuple can be confusing for users. For example, if the query says there're 12 rows in partition col1 = x/col2 = null, a user cannot tell whether we inserted 12 rows into that partition with spec2, or we inserted 5 and 7 rows with spec1 and spec2 respectively.

So maybe my question should be whether the partition stats are only intended for query optimizers, and not suitable for our use case? I thought it's user-facing because it has information such as last_updated_at, which optimizers usually don't care about.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So maybe my question should be whether the partition stats are only intended for query optimizers, and not suitable for our use case

It was designed mainly thinking query planners in mind. The spec was approved for unified tuple. The reason is query planner has to scan both the folders in this case [(c1 = 1), (c1 = 1 and c2 = null)], So the unified tuple include stats from both the spec. keeping stats per individual spec might slowdown the planning time to compute the unified stats.

the stats like file count and record count is mainly used for CBO and last modified time and snapshot info added later to decide whether to run compaction or not for the users.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ajantha-bhat Thanks for clarifying! Personally I still think this feature can be used to improve user experience for investigating partitions. Maybe we just need to explain to users about the ambiguity for partitions with null values.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@lirui-apache: You were right that we shouldn't combine the stats for different spec. Just that we didn't had a strong usecase.

Anton mentioned that

If we combine delete stats across specs, they will not be accurate. A delete file from one spec never applies to a data file in another spec.

So, now we are not combining it.

more details on #11146

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, thanks for the pointer

private static final int PARTITION_COUNT = 10000;

// one data file manifest
private static final int DATA_FILES_PER_PARTITION_COUNT = 1;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like this would be very unrealistic for benchmarking? Shouldn't we have multiple data files in each manifest (also multiple partitions in the same manifest?)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated now.

2 million data file entires.

100 partitions * 20 data files per partitions = 2k data files per manifest.
10K manifests.

It took around 10 seconds to write, commit and read the partition stats file.

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PartitionStatsGenerator {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we doc this and Writer Util? I'm mostly interested in the separation of these two classes and what should their responsibilities be?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated the javadocs.

PartitionStatsGenerator is to generate the stats file using util classes. Util class is a helper for that.

@ajantha-bhat
Copy link
Member Author

ajantha-bhat commented Aug 21, 2024

Thanks @RussellSpitzer for the review. I have addressed the comments.

@aokolnychyi
Copy link
Contributor

I took a detailed look at the implementation/spec today. Here are my high-level points:

  • The local algorithm was the right choice. I am on board with using ConcurrentHashMap and coerced partitions as keys.
  • The logic is split between 3 utility classes that are hard to navigate.
  • In my view, Record is not the best abstraction to expose when computing stats in core.
    • Projecting and getting values from Record is very fragile.
    • It is necessary to have Record to use generic writers but PartitionStatsUtil is in core and is supposed to be helpful for arbitrary implementations, not only those that rely on generic writers.
    • The fromManifest method in PartitionStatsUtil does not return a valid Record as it doesn’t transform values (it shouldn’t, the writer has to do that). We need a better abstraction in core.

I'd consider restructuring the code as below (names are suggestions):

  1. Add a wrapper class to represent partition stats in core.
public class PartitionStats {

  private final int specId;
  private final StructLike partition;
  ...
  private Long lastUpdatedSnapshotId = null;

  public void liveEntry(ContentFile<?> file, Snapshot snapshot) {
    // move the existing logic for updating stats from live entires here
  }

  public void deletedEntry(Snapshot snapshot) {
    if (snapshot != null) {
      // update last modified info to have accurate stats
    }
  }
}
  1. Keep PartitionStatsUtil in core but change its purpose. It does everything except writing.
public class PartitionStatsUtil {

  public static final int PARTITION_FIELD_ID = 1;
  public static final String PARTITION_FIELD_NAME = "partition";
  public static final NestedField SPEC_ID = required(2, "spec_id", IntegerType.get());
  ...
 
  public static Schema schema(StructType partitionType) {
    // return the schema of records in partition stats files
  }

  public static Iterable<PartitionStats> computeStats(Table table, Snapshot snapshot) {
    // compute stats like in this PR using ConcurrentHashMap
    // don't convert ManifestEntry to PartitionStats, invoke liveEntry/deletedEntry to update stats as needed
  }

  public static List<PartitionStats> sortStats(Iterable<PartitionStats> stats, StructType partitionType) {
    // sort stats per spec if they have to be written (having this separately is optional)
  }
}
  1. Add PartitionStatsWriter that would wrap PartitionStats as Record internally and write using generic writers (like in this PR). The writer will be responsible for converting values using IdentityPartitionConverters.

What do you think, @ajantha-bhat?

@ajantha-bhat ajantha-bhat force-pushed the write_util branch 4 times, most recently from 00fe16e to 521c2d7 Compare August 30, 2024 16:44
import org.apache.iceberg.types.Types.StructType;

public class PartitionStatsRecord implements Record, StructLike {
private static final LoadingCache<StructType, Map<String, Integer>> NAME_MAP_CACHE =
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar to existing GenericRecord class, but it wraps the existing PartitionStats and used only for partition stats

@aokolnychyi
Copy link
Contributor

Let me take a look hopefully over weekend or on Monday.

@aokolnychyi
Copy link
Contributor

Okay, I played around with this a bit over the weekend. I suggest we split this PR into two parts: PartitionStatsUtil that computes and sorts the stats (1) and the conversion and writing of the stats into a file (2). The reason for the split is that we may need to reconsider the writing a bit. The current problem is the need to translate to generic record for writing into Parquet. I just learned that there is an ongoing effort to support writes with the internal Iceberg object model, which will nicely solve this problem for us (given that it is already in progress, it shouldn't delay this work).

While we look into the writing part, it would be nice to get PartitionStatsUtil in. For that, I would suggest making some tweaks to PartitionStats and PartitionStatsUtil:

  • Use StructLike instead of Record.
  • Don't create a temporary PartitionStats from a manifest entry to simply merge it with others.

@ajantha-bhat
Copy link
Member Author

I just learned that there is an ongoing effort to support writes with the internal Iceberg object model, which will nicely solve this problem for us (given that it is already in progress, it shouldn't delay this work).

Could you please point me to that work?

@ajantha-bhat
Copy link
Member Author

Splitted this into two PRs as suggested.

compute:
#11146

Write:
#11216

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants