Skip to content

Commit

Permalink
DeltaFileFormat
Browse files Browse the repository at this point in the history
  • Loading branch information
jaceklaskowski committed Jul 3, 2023
1 parent 84d367b commit 484a5c0
Show file tree
Hide file tree
Showing 8 changed files with 86 additions and 30 deletions.
44 changes: 19 additions & 25 deletions docs/DeltaFileFormat.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,43 +4,37 @@

## Contract

### <span id="fileFormat"> FileFormat
### SparkSession { #spark }

```scala
fileFormat: FileFormat
spark: SparkSession
```

`FileFormat` ([Spark SQL]({{ book.spark_sql }}/datasources/FileFormat)) of this delta table

Default: [DeltaParquetFileFormat](DeltaParquetFileFormat.md) (with the [columnMappingMode](Metadata.md#columnMappingMode) and the [schema](Metadata.md#schema) of the given [Metadata](Metadata.md))

Used when:

* `DeltaLog` is requested for a [relation](DeltaLog.md#createRelation) (in batch queries) and a [DataFrame](DeltaLog.md#createDataFrame)
* `DeltaCommand` is requested for a [relation](commands/DeltaCommand.md#buildBaseRelation)
* `MergeIntoCommand` is requested to [buildTargetPlanWithFiles](commands/merge/MergeIntoCommand.md#buildTargetPlanWithFiles)
* `TransactionalWrite` is requested to [write data out](TransactionalWrite.md#writeFiles)

### <span id="metadata"> Metadata
Current `SparkSession` ([Spark SQL]({{ book.spark_sql }}/SparkSession))

```scala
metadata: Metadata
```
See:

Current [Metadata](Metadata.md)
* [DeltaLog](DeltaLog.md#spark)

Used when:
## Implementations

* `DeltaFileFormat` is requested for the [FileFormat](#fileFormat)
* [DeltaLog](DeltaLog.md)

### <span id="spark"> SparkSession
## FileFormat { #fileFormat }

```scala
spark: SparkSession
fileFormat(
protocol: Protocol,
metadata: Metadata): FileFormat
```

Current `SparkSession` ([Spark SQL]({{ book.spark_sql }}/SparkSession))
`fileFormat` creates a [DeltaParquetFileFormat](DeltaParquetFileFormat.md) for the given [Protocol](Protocol.md) and [Metadata](Metadata.md).

## Implementations
---

* [DeltaLog](DeltaLog.md)
Used when:

* `DeltaLog` is requested to [build a HadoopFsRelation](DeltaLog.md#buildHadoopFsRelationWithFileIndex)
* `DeltaCommand` is requested to [build a HadoopFsRelation](commands/DeltaCommand.md#buildBaseRelation)
* `MergeIntoCommandBase` is requested to [buildTargetPlanWithIndex](commands/merge/MergeIntoCommandBase.md#buildTargetPlanWithIndex)
* `TransactionalWrite` is requested to [write data out](TransactionalWrite.md#writeFiles)
25 changes: 22 additions & 3 deletions docs/DeltaLog.md
Original file line number Diff line number Diff line change
Expand Up @@ -311,7 +311,7 @@ Internally, `currentSnapshot`...FIXME

* `DeltaLog` is requested to [updateInternal](#updateInternal), [update](#update) and [tryUpdate](#tryUpdate)

## <span id="createRelation"> Creating Insertable HadoopFsRelation For Batch Queries
## Creating Insertable HadoopFsRelation For Batch Queries { #createRelation }

```scala
createRelation(
Expand All @@ -334,6 +334,8 @@ With non-empty `cdcOptions`, `createRelation` [creates a CDC-aware relation](cha

In the end, `createRelation` creates a `HadoopFsRelation` for the `TahoeLogFileIndex` and...FIXME. The `HadoopFsRelation` is also an [InsertableRelation](#createRelation-InsertableRelation).

---

`createRelation` is used when:

* `DeltaTableV2` is requested to [toBaseRelation](DeltaTableV2.md#toBaseRelation)
Expand Down Expand Up @@ -406,11 +408,11 @@ Internally, `getChanges` requests the [LogStore](#store) for [files](storage/Log

For every delta file, `getChanges` requests the [LogStore](#store) to [read the JSON content](storage/LogStore.md#read) (every line is an [action](Action.md)), and then [deserializes it to an action](Action.md#fromJson).

## <span id="createDataFrame"> Creating DataFrame (From AddFiles)
## Creating DataFrame (From AddFiles) { #createDataFrame }

```scala
createDataFrame(
snapshot: Snapshot,
snapshot: SnapshotDescriptor,
addFiles: Seq[AddFile],
isStreaming: Boolean = false,
actionTypeOpt: Option[String] = None): DataFrame
Expand Down Expand Up @@ -662,6 +664,23 @@ apply(

In the end, `apply` looks up a `DeltaLog` for the HDFS-qualified path (with the file system options) in the [deltaLogCache](#deltaLogCache) or creates (and caches) a new [DeltaLog](#creating-instance).

## buildHadoopFsRelationWithFileIndex { #buildHadoopFsRelationWithFileIndex }

```scala
buildHadoopFsRelationWithFileIndex(
snapshot: SnapshotDescriptor,
fileIndex: TahoeFileIndex,
bucketSpec: Option[BucketSpec]): HadoopFsRelation
```

`buildHadoopFsRelationWithFileIndex`...FIXME

---

`buildHadoopFsRelationWithFileIndex` is used when:

* `DeltaLog` is requested to [create a DataFrame](#createDataFrame) and [create a BaseRelation](#createRelation)

## Logging

Enable `ALL` logging level for `org.apache.spark.sql.delta.DeltaLog` logger to see what happens inside.
Expand Down
4 changes: 3 additions & 1 deletion docs/commands/DeltaCommand.md
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ removeFilesFromPaths(

* [DeleteCommand](delete/DeleteCommand.md) and [UpdateCommand](update/UpdateCommand.md) commands are executed

## <span id="buildBaseRelation"> Creating HadoopFsRelation (with TahoeBatchFileIndex)
## Creating HadoopFsRelation (with TahoeBatchFileIndex) { #buildBaseRelation }

```scala
buildBaseRelation(
Expand All @@ -103,6 +103,8 @@ buildBaseRelation(

In the end, `buildBaseRelation` creates a `HadoopFsRelation` ([Spark SQL]({{ book.spark_sql }}/HadoopFsRelation/)) with the `TahoeBatchFileIndex` (and the other properties based on the [metadata](../OptimisticTransactionImpl.md#metadata) of the given [OptimisticTransaction](../OptimisticTransaction.md)).

---

`buildBaseRelation` is used when:

* [DeleteCommand](delete/DeleteCommand.md) and [UpdateCommand](update/UpdateCommand.md) commands are executed (with `delete` and `update` action types, respectively)
Expand Down
3 changes: 3 additions & 0 deletions docs/commands/merge/ClassicMergeExecutor.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
# ClassicMergeExecutor

`ClassicMergeExecutor` is...FIXME
3 changes: 3 additions & 0 deletions docs/commands/merge/InsertOnlyMergeExecutor.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
# InsertOnlyMergeExecutor

`InsertOnlyMergeExecutor` is...FIXME
32 changes: 32 additions & 0 deletions docs/commands/merge/MergeIntoCommandBase.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
# MergeIntoCommandBase

## buildTargetPlanWithFiles { #buildTargetPlanWithFiles }

```scala
buildTargetPlanWithFiles(
spark: SparkSession,
deltaTxn: OptimisticTransaction,
files: Seq[AddFile],
columnsToDrop: Seq[String]): LogicalPlan
```

`buildTargetPlanWithFiles`...FIXME

---

`buildTargetPlanWithFiles` is used when:

* `ClassicMergeExecutor` is requested to [findTouchedFiles](ClassicMergeExecutor.md#findTouchedFiles) and [writeAllChanges](ClassicMergeExecutor.md#writeAllChanges)
* `InsertOnlyMergeExecutor` is requested to [writeOnlyInserts](InsertOnlyMergeExecutor.md#writeOnlyInserts)

### buildTargetPlanWithIndex { #buildTargetPlanWithIndex }

```scala
buildTargetPlanWithIndex(
spark: SparkSession,
deltaTxn: OptimisticTransaction,
fileIndex: TahoeFileIndex,
columnsToDrop: Seq[String]): LogicalPlan
```

`buildTargetPlanWithIndex`...FIXME
2 changes: 1 addition & 1 deletion docs/overview.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ Delta Lake allows you to store data on blob stores like HDFS, S3, Azure Data Lak

As [it was well said](https://github.com/delta-io/delta/issues/467#issuecomment-696708455): _"Delta is a storage format while Spark is an execution engine...to separate storage from compute."_ Yet, Delta Lake can run with other execution engines like [Trino](https://trino.io/docs/current/connector/delta-lake.html) or [Apache Flink](https://github.com/delta-io/connectors/tree/master/flink).

Delta Lake {{ delta.version }} supports Apache Spark {{ spark.version }} (cf. [build.sbt]({{ delta.github }}/build.sbt#L20)).
Delta Lake {{ delta.version }} supports Apache Spark {{ spark.version }} (cf. [build.sbt]({{ delta.github }}/build.sbt#L28)).

## Delta Tables

Expand Down
3 changes: 3 additions & 0 deletions mkdocs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -336,6 +336,7 @@ nav:
- DeltaGenerateCommand: commands/generate/DeltaGenerateCommand.md
- Merge:
- commands/merge/index.md
- ClassicMergeExecutor: commands/merge/ClassicMergeExecutor.md
- DeltaMergeAction: commands/merge/DeltaMergeAction.md
- DeltaMergeBuilder: commands/merge/DeltaMergeBuilder.md
- DeltaMergeInto: commands/merge/DeltaMergeInto.md
Expand All @@ -346,8 +347,10 @@ nav:
- DeltaMergeIntoUpdateClause: commands/merge/DeltaMergeIntoUpdateClause.md
- DeltaMergeMatchedActionBuilder: commands/merge/DeltaMergeMatchedActionBuilder.md
- DeltaMergeNotMatchedActionBuilder: commands/merge/DeltaMergeNotMatchedActionBuilder.md
- InsertOnlyMergeExecutor: commands/merge/InsertOnlyMergeExecutor.md
- JoinedRowProcessor: commands/merge/JoinedRowProcessor.md
- MergeIntoCommand: commands/merge/MergeIntoCommand.md
- MergeIntoCommandBase: commands/merge/MergeIntoCommandBase.md
- Optimize:
- commands/optimize/index.md
- InterleaveBits: commands/optimize/InterleaveBits.md
Expand Down

0 comments on commit 484a5c0

Please sign in to comment.