Skip to content

Commit

Permalink
Row Tracking
Browse files Browse the repository at this point in the history
  • Loading branch information
jaceklaskowski committed May 24, 2024
1 parent 95a98b0 commit 1fd7139
Show file tree
Hide file tree
Showing 18 changed files with 266 additions and 18 deletions.
7 changes: 7 additions & 0 deletions docs/CheckUnresolvedRelationTimeTravel.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
---
title: CheckUnresolvedRelationTimeTravel
---

# CheckUnresolvedRelationTimeTravel Logical Check Analysis Rule

`CheckUnresolvedRelationTimeTravel` is...FIXME
2 changes: 2 additions & 0 deletions docs/DeltaParquetFileFormat.md
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,8 @@ prepareSchema(

`metadataSchemaFields` is part of the `FileFormat` ([Spark SQL]({{ book.spark_sql }}/connectors/FileFormat/#metadataSchemaFields)) abstraction.

!!! note "Review Me"

Due to an issue in Spark SQL (to be reported), `metadataSchemaFields` removes `row_index` from the default `metadataSchemaFields` ([Spark SQL]({{ book.spark_sql }}/parquet/ParquetFileFormat/#metadataSchemaFields)).

!!! note "ParquetFileFormat"
Expand Down
8 changes: 6 additions & 2 deletions docs/DeltaSparkSessionExtension.md
Original file line number Diff line number Diff line change
@@ -1,15 +1,19 @@
# DeltaSparkSessionExtension

`DeltaSparkSessionExtension` is used to register (_inject_) the following extensions to a `SparkSession`:
`DeltaSparkSessionExtension` is used to register (_inject_) Delta Lake-specific Spark SQL extensions as part of its [installation](installation.md).

* [CheckUnresolvedRelationTimeTravel](CheckUnresolvedRelationTimeTravel.md)
* [Delta SQL](sql/index.md) support (using [DeltaSqlParser](sql/DeltaSqlParser.md))
* [DeltaAnalysis](DeltaAnalysis.md)
* [PreprocessTimeTravel](PreprocessTimeTravel.md) resolution rule
* [DeltaUnsupportedOperationsCheck](DeltaUnsupportedOperationsCheck.md)
* [GenerateRowIDs](row-tracking/GenerateRowIDs.md)
* [PostHocResolveUpCast](PostHocResolveUpCast.md)
* [PrepareDeltaScan](data-skipping/PrepareDeltaScan.md)
* [PreprocessTableWithDVsStrategy](deletion-vectors/PreprocessTableWithDVsStrategy.md)
* [PreprocessTableDelete](PreprocessTableDelete.md)
* [PreprocessTableMerge](PreprocessTableMerge.md)
* [PreprocessTableUpdate](PreprocessTableUpdate.md)
* [PreprocessTimeTravel](PreprocessTimeTravel.md) resolution rule
* [RangePartitionIdRewrite](commands/optimize/RangePartitionIdRewrite.md)
* [Table-Valued Functions](table-valued-functions/index.md)

Expand Down
30 changes: 23 additions & 7 deletions docs/DeltaTableUtils.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# DeltaTableUtils

## <span id="extractIfPathContainsTimeTravel"> extractIfPathContainsTimeTravel
## extractIfPathContainsTimeTravel { #extractIfPathContainsTimeTravel }

```scala
extractIfPathContainsTimeTravel(
Expand All @@ -16,7 +16,7 @@ extractIfPathContainsTimeTravel(

* `DeltaDataSource` is requested to [sourceSchema](spark-connector/DeltaDataSource.md#sourceSchema) and [parsePathIdentifier](spark-connector/DeltaDataSource.md#parsePathIdentifier)

## <span id="findDeltaTableRoot"> findDeltaTableRoot
## findDeltaTableRoot { #findDeltaTableRoot }

```scala
findDeltaTableRoot(
Expand All @@ -36,7 +36,7 @@ For `_delta_log` or `_samples` directories, `findDeltaTableRoot` returns the par
* `DeltaTableUtils` utility is used to [isDeltaTable](#isDeltaTable)
* `DeltaDataSource` utility is used to [parsePathIdentifier](spark-connector/DeltaDataSource.md#parsePathIdentifier)

## <span id="isPredicatePartitionColumnsOnly"> isPredicatePartitionColumnsOnly
## isPredicatePartitionColumnsOnly { #isPredicatePartitionColumnsOnly }

```scala
isPredicatePartitionColumnsOnly(
Expand All @@ -53,7 +53,7 @@ isPredicatePartitionColumnsOnly(
* `OptimisticTransactionImpl` is requested for the [filterFiles](OptimisticTransactionImpl.md#filterFiles)
* `DeltaSourceSnapshot` is requested for the [partition](spark-connector/DeltaSourceSnapshot.md#partitionFilters) and [data](spark-connector/DeltaSourceSnapshot.md#dataFilters) filters

## <span id="isDeltaTable"> isDeltaTable
## isDeltaTable { #isDeltaTable }

```scala
isDeltaTable(
Expand All @@ -75,7 +75,7 @@ isDeltaTable(
* `DeltaTableIdentifier` utility is used to [create a DeltaTableIdentifier from a TableIdentifier](DeltaTableIdentifier.md#apply)
* `DeltaUnsupportedOperationsCheck` is requested to [fail](DeltaUnsupportedOperationsCheck.md#fail)

## <span id="resolveTimeTravelVersion"> resolveTimeTravelVersion
## resolveTimeTravelVersion { #resolveTimeTravelVersion }

```scala
resolveTimeTravelVersion(
Expand All @@ -91,7 +91,7 @@ resolveTimeTravelVersion(
* `DeltaLog` is requested to [create a relation (per partition filters and time travel)](DeltaLog.md#createRelation)
* `DeltaTableV2` is requested for a [Snapshot](DeltaTableV2.md#snapshot)

## <span id="splitMetadataAndDataPredicates"> splitMetadataAndDataPredicates
## splitMetadataAndDataPredicates { #splitMetadataAndDataPredicates }

```scala
splitMetadataAndDataPredicates(
Expand All @@ -108,7 +108,7 @@ splitMetadataAndDataPredicates(
* [DeleteCommand](commands/delete/DeleteCommand.md) is executed (with a delete condition)
* [UpdateCommand](commands/update/UpdateCommand.md) is executed

### <span id="isPredicateMetadataOnly"> isPredicateMetadataOnly
### isPredicateMetadataOnly { #isPredicateMetadataOnly }

```scala
isPredicateMetadataOnly(
Expand Down Expand Up @@ -150,3 +150,19 @@ With [spark.databricks.delta.schema.removeSparkInternalMetadata](configuration-p
* `DeltaTableV2` is requested to [tableSchema](DeltaTableV2.md#tableSchema)
* `DeltaDataSource` is requested to [sourceSchema](spark-connector/DeltaDataSource.md#sourceSchema)
* `DeltaSourceBase` is requested for the [schema](spark-connector/DeltaSourceBase.md#schema)

## getFileMetadataColumn { #getFileMetadataColumn }

```scala
getFileMetadataColumn(
df: DataFrame): Column
```

`getFileMetadataColumn` requests the given `DataFrame` for the metadata column for the `_metadata` logical column name (using `Dataset.metadataColumn` operator).

---

`getFileMetadataColumn` is used when:

* `RowCommitVersion` is requested to [preserveRowCommitVersions](row-tracking/RowCommitVersion.md#preserveRowCommitVersions)
* `RowId` is requested to [preserveRowIds](row-tracking/RowId.md#preserveRowIds)
8 changes: 6 additions & 2 deletions docs/DeltaUnsupportedOperationsCheck.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
# DeltaUnsupportedOperationsCheck
---
title: DeltaUnsupportedOperationsCheck
---

**DeltaUnsupportedOperationsCheck** is a logical check rule that adds helpful error messages when Delta is being used with unsupported Hive operations or if an unsupported operation is executed.
# DeltaUnsupportedOperationsCheck Logical Check Analysis Rule

`DeltaUnsupportedOperationsCheck` is a logical check rule that adds helpful error messages when Delta is being used with unsupported Hive operations or if an unsupported operation is executed.
7 changes: 7 additions & 0 deletions docs/PostHocResolveUpCast.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
---
title: PostHocResolveUpCast
---

# PostHocResolveUpCast Post-Hoc Resolution Rule

`PostHocResolveUpCast` is...FIXME
7 changes: 7 additions & 0 deletions docs/ResolveDeltaPathTable.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
---
title: ResolveDeltaPathTable
---

# ResolveDeltaPathTable Logical Resolution Rule

`ResolveDeltaPathTable` is...FIXME
2 changes: 2 additions & 0 deletions docs/commands/alter/RemoveColumnMappingCommand.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ run(
removeColumnMappingTableProperty: Boolean): Unit
```

`run` requests the [DeltaLog](#deltaLog) to [start a new transaction](../../DeltaLog.md#withNewTransaction).

`run`...FIXME

---
Expand Down
9 changes: 7 additions & 2 deletions docs/commands/convert/ConvertToDeltaCommand.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,11 @@ title: ConvertToDeltaCommand

`ConvertToDeltaCommand` is a [DeltaCommand](../DeltaCommand.md) that [converts a parquet table to delta format](#run).

`ConvertToDeltaCommand` represents the following high-level operators:

* [CONVERT TO DELTA](../../sql/index.md#CONVERT-TO-DELTA) SQL statement
* [DeltaTable.convertToDelta](../../DeltaTable.md#convertToDelta)

`ConvertToDeltaCommand` is a `LeafRunnableCommand` ([Spark SQL]({{ book.spark_sql }}/logical-operators/LeafRunnableCommand/)).

`ConvertToDeltaCommand` requires that the [partition schema](#partitionSchema) matches the partitions of the [parquet table](#tableIdentifier) ([or an AnalysisException is thrown](#createAddFile-unexpectedNumPartitionColumnsFromFileNameException)).
Expand Down Expand Up @@ -33,9 +38,9 @@ title: ConvertToDeltaCommand
* Always `true` for [DeltaTable.convertToDelta](../../DeltaTable.md#convertToDelta) utility
* Always `true` for [CONVERT TO DELTA](../../sql/index.md#CONVERT-TO-DELTA) statement unless `NO STATISTICS` clause is used

## <span id="run"> Executing Command
## Executing Command { #run }

??? note "Signature"
??? note "RunnableCommand"

```scala
run(
Expand Down
7 changes: 7 additions & 0 deletions docs/deletion-vectors/PreprocessTableWithDVsStrategy.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
---
title: PreprocessTableWithDVsStrategy
---

# PreprocessTableWithDVsStrategy Execution Planning Strategy

`PreprocessTableWithDVsStrategy` is...FIXME
23 changes: 23 additions & 0 deletions docs/row-tracking/DeltaScanWithRowTrackingEnabled.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
# DeltaScanWithRowTrackingEnabled

`DeltaScanWithRowTrackingEnabled` is a Scala extractor object to [match a scan on a delta table with Row Tracking enabled](#unapply).

??? note "Scala Extractor Object"
`DeltaScanWithRowTrackingEnabled` is a Scala extractor object (with an [unapply](#unapply) method) to match a pattern and extract data values.

Learn more in the [Scala Language Specification]({{ scala.spec }}/08-pattern-matching.html#extractor-patterns) and the [Tour of Scala](https://docs.scala-lang.org/tour/extractor-objects.html).

## unapply { #unapply }

```scala
unapply(
plan: LogicalPlan): Option[LogicalRelation]
```

`unapply` returns the given `LogicalPlan` if it is a `LogicalRelation` over a `HadoopFsRelation` with a [DeltaParquetFileFormat](../DeltaParquetFileFormat.md) with [Row Tracking enabled](RowTracking.md#isEnabled). Otherwise, `unapply` returns `None` (_nothing matches_).

---

`unapply` is used when:

* [GenerateRowIDs](GenerateRowIDs.md) logical rule is executed
50 changes: 50 additions & 0 deletions docs/row-tracking/GenerateRowIDs.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
---
title: GenerateRowIDs
---

# GenerateRowIDs Logical Plan Normalization Rule

`GenerateRowIDs` is a logical rule (`Rule[LogicalPlan]`) that Delta Lake injects into `SparkSession` (using [DeltaSparkSessionExtension](../DeltaSparkSessionExtension.md)).

## Executing Rule { #apply }

??? note "Rule"

```scala
apply(
plan: LogicalPlan): LogicalPlan
```

`apply` is part of the `Rule` ([Spark SQL]({{ book.spark_sql }}/catalyst/Rule#apply)) abstraction.

`apply` transforms the [scans on delta tables with Row Tracking enabled](DeltaScanWithRowTrackingEnabled.md) in the given `LogicalPlan` ([Spark SQL]({{ book.spark_sql }}/logical-operators/LogicalPlan)) bottom-up.

`apply`...FIXME

### metadataWithRowTrackingColumnsProjection { #metadataWithRowTrackingColumnsProjection }

```scala
metadataWithRowTrackingColumnsProjection(
metadata: AttributeReference): NamedExpression
```

`metadataWithRowTrackingColumnsProjection`...FIXME

### rowIdExpr { #rowIdExpr }

```scala
rowIdExpr(
metadata: AttributeReference): Expression
```

`rowIdExpr` creates a `Coalesce` expression with the following expressions:

1. `row_id` (sub)attribute of the given `AttributeReference`
```
_metadata.row_id
```
1. `Add` expression of the `base_row_id` and `row_index` attributes of the given `AttributeReference`

```
_metadata.base_row_id + _metadata.row_index
```
3 changes: 3 additions & 0 deletions docs/row-tracking/MaterializedRowCommitVersion.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
# MaterializedRowCommitVersion

`MaterializedRowCommitVersion` is...FIXME
3 changes: 3 additions & 0 deletions docs/row-tracking/MaterializedRowTrackingColumn.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
# MaterializedRowTrackingColumn

`MaterializedRowTrackingColumn` is...FIXME
3 changes: 3 additions & 0 deletions docs/row-tracking/MetadataStructField.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
# MetadataStructField

`MetadataStructField` is...FIXME
26 changes: 26 additions & 0 deletions docs/row-tracking/RowCommitVersion.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
# RowCommitVersion

## row_commit_version { #METADATA_STRUCT_FIELD_NAME }

`RowCommitVersion` object defines `row_commit_version` for...FIXME

## \_metadata.row_commit_version { #QUALIFIED_COLUMN_NAME }

`RowCommitVersion` object defines `_metadata.row_commit_version` for...FIXME

## createMetadataStructField { #createMetadataStructField }

```scala
createMetadataStructField(
protocol: Protocol,
metadata: Metadata,
nullable: Boolean = false): Option[StructField]
```

`createMetadataStructField` creates a [MetadataStructField](MetadataStructField.md) with the [materializedColumnName](MaterializedRowCommitVersion.md#getMaterializedColumnName) (for the given [Protocol](../Protocol.md) and [Metadata](../Metadata.md)).

---

`createMetadataStructField` is used when:

* `RowTracking` is requested to [createMetadataStructFields](RowTracking.md#createMetadataStructFields)
Loading

0 comments on commit 1fd7139

Please sign in to comment.