Skip to content

Commit

Permalink
SparkContext.runJob and RDD.withScope
Browse files Browse the repository at this point in the history
  • Loading branch information
jaceklaskowski committed Dec 31, 2023
1 parent 58ae78d commit 21d8785
Show file tree
Hide file tree
Showing 5 changed files with 107 additions and 25 deletions.
40 changes: 29 additions & 11 deletions docs/SparkContext.md
Original file line number Diff line number Diff line change
Expand Up @@ -354,7 +354,7 @@ Multiple external cluster managers registered for the url [url]: [serviceLoaders

`getClusterManager` is used when `SparkContext` is requested for a [SchedulerBackend and TaskScheduler](#createTaskScheduler).

## <span id="runJob"> Running Job Synchronously
## Running Job (Synchronously) { #runJob }

```scala
runJob[T, U: ClassTag](
Expand All @@ -368,7 +368,7 @@ runJob[T, U: ClassTag](
rdd: RDD[T],
func: (TaskContext, Iterator[T]) => U,
partitions: Seq[Int]): Array[U]
runJob[T, U: ClassTag](
runJob[T, U: ClassTag]( // (1)!
rdd: RDD[T],
func: (TaskContext, Iterator[T]) => U,
partitions: Seq[Int],
Expand All @@ -386,9 +386,11 @@ runJob[T, U: ClassTag](
partitions: Seq[Int]): Array[U]
```

1. Requests the [DAGScheduler](#dagScheduler) to [run a job](scheduler/DAGScheduler.md#runJob)

![Executing action](images/spark-runjob.png)

`runJob` finds the [call site](#getCallSite) and [cleans up](#clean) the given `func` function.
`runJob` determines the [call site](#getCallSite) and [cleans up](#clean) the given `func` function.

`runJob` prints out the following INFO message to the logs:

Expand All @@ -403,19 +405,23 @@ RDD's recursive dependencies:
[toDebugString]
```

`runJob` requests the [DAGScheduler](#dagScheduler) to [run a job](scheduler/DAGScheduler.md#runJob).
`runJob` requests the [DAGScheduler](#dagScheduler) to [run a job](scheduler/DAGScheduler.md#runJob) with the following:

`runJob` requests the [ConsoleProgressBar](#progressBar) to [finishAll](ConsoleProgressBar.md#finishAll) if defined.
* The given `rdd`
* The given `func` [cleaned up](#clean)
* The given `partitions`
* The [call site](#getCallSite)
* The given `resultHandler` function (_procedure_)
* The [local properties](#localProperties)

In the end, `runJob` requests the given `RDD` to [doCheckpoint](rdd/RDD.md#doCheckpoint).
!!! note
`runJob` is blocked until the job has finished (regardless of the result, successful or not).

`runJob` throws an `IllegalStateException` when `SparkContext` is [stopped](#stopped):
`runJob` requests the [ConsoleProgressBar](#progressBar) (if available) to [finishAll](ConsoleProgressBar.md#finishAll).

```text
SparkContext has been shutdown
```
In the end, `runJob` requests the given `RDD` to [doCheckpoint](rdd/RDD.md#doCheckpoint).

### <span id="runJob-demo"> Demo
### Demo { #runJob-demo }

`runJob` is essentially executing a `func` function on all or a subset of partitions of an RDD and returning the result as an array (with elements being the results per partition).

Expand Down Expand Up @@ -495,6 +501,18 @@ maxNumConcurrentTasks(

* `DAGScheduler` is requested to [checkBarrierStageWithNumSlots](scheduler/DAGScheduler.md#checkBarrierStageWithNumSlots)

## withScope { #withScope }

```scala
withScope[U](
body: => U): U
```

`withScope` [withScope](rdd/RDDOperationScope.md#withScope) with this `SparkContext`.

!!! note
`withScope` is used for most (if not all) `SparkContext` API operators.

## Logging

Enable `ALL` logging level for `org.apache.spark.SparkContext` logger to see what happens inside.
Expand Down
36 changes: 31 additions & 5 deletions docs/rdd/RDD.md
Original file line number Diff line number Diff line change
@@ -1,10 +1,15 @@
---
title: RDD
subtitle: Resilient Distributed Dataset
---

# RDD &mdash; Description of Distributed Computation

`RDD[T]` is an [abstraction](#contract) of [fault-tolerant resilient distributed datasets](#implementations) that are mere descriptions of computations over a distributed collection of records (of type `T`).

## Contract

### <span id="compute"> Computing Partition
### Computing Partition { #compute }

```scala
compute(
Expand All @@ -18,7 +23,7 @@ Used when:

* `RDD` is requested to [computeOrReadCheckpoint](#computeOrReadCheckpoint)

### <span id="getPartitions"> getPartitions
### getPartitions { #getPartitions }

```scala
getPartitions: Array[Partition]
Expand Down Expand Up @@ -75,7 +80,7 @@ isBarrier(): Boolean
* `ShuffleDependency` is requested to [canShuffleMergeBeEnabled](ShuffleDependency.md#canShuffleMergeBeEnabled)
* `DAGScheduler` is requested to [checkBarrierStageWithRDDChainPattern](../scheduler/DAGScheduler.md#checkBarrierStageWithRDDChainPattern), [checkBarrierStageWithDynamicAllocation](../scheduler/DAGScheduler.md#checkBarrierStageWithDynamicAllocation), [checkBarrierStageWithNumSlots](../scheduler/DAGScheduler.md#checkBarrierStageWithNumSlots), [handleTaskCompletion](../scheduler/DAGScheduler.md#handleTaskCompletion) (`FetchFailed` case to mark a map stage as broken)

### isBarrier_ { #isBarrier_ }
### isBarrier\_ { #isBarrier_ }

```scala
isBarrier_ : Boolean // (1)!
Expand Down Expand Up @@ -241,17 +246,26 @@ checkpointRDD: Option[CheckpointRDD[T]]

`checkpointRDD` returns the [CheckpointRDD](RDDCheckpointData.md#checkpointRDD) of the [RDDCheckpointData](#checkpointData) (if defined and so this `RDD` checkpointed).

---

`checkpointRDD` is used when:

* `RDD` is requested for the [dependencies](#dependencies), [partitions](#partitions) and [preferred locations](#preferredLocations) (all using _final_ methods!)

## <span id="doCheckpoint"> doCheckpoint
## doCheckpoint { #doCheckpoint }

```scala
doCheckpoint(): Unit
```

`doCheckpoint` executes in `checkpoint` scope.
!!! note "RDD.doCheckpoint, SparkContext.runJob and Dataset.checkpoint"
`doCheckpoint` is called every time a Spark job is submitted (using [SparkContext.runJob](../SparkContext.md#runJob)).

I found it quite interesting at the very least.

`doCheckpoint` is triggered when `Dataset.checkpoint` operator ([Spark SQL]({{ book.spark_sql }}/Dataset/#checkpoint)) is executed (with `eager` flag on) which will likely trigger one or more Spark jobs on the underlying RDD anyway.

`doCheckpoint` executes in [checkpoint](RDDOperationScope.md#withScope) scope.

`doCheckpoint` turns the [doCheckpointCalled](#doCheckpointCalled) flag on (to prevent multiple executions).

Expand Down Expand Up @@ -382,6 +396,18 @@ rddToOrderedRDDFunctions[K : Ordering : ClassTag, V: ClassTag](
* [RDD.sortBy](spark-rdd-transformations.md#sortBy)
* [PairRDDFunctions.combineByKey](PairRDDFunctions.md#combineByKey)

## withScope { #withScope }

```scala
withScope[U](
body: => U): U
```

`withScope` [withScope](RDDOperationScope.md#withScope) with this [SparkContext](#sc).

!!! note
`withScope` is used for most (if not all) `RDD` API operators.

<!---
## Review Me
Expand Down
33 changes: 33 additions & 0 deletions docs/rdd/RDDOperationScope.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
# RDDOperationScope

## withScope { #withScope }

```scala
withScope[T](
sc: SparkContext,
name: String,
allowNesting: Boolean,
ignoreParent: Boolean)(
body: => T): T
withScope[T](
sc: SparkContext,
allowNesting: Boolean = false)(
body: => T): T
```

??? note "name Argument"
Value | Caller
------|-------
`checkpoint` | [RDD.doCheckpoint](RDD.md#doCheckpoint)
_Some_ method name | Executed without `name`
The name of a physical operator (with no `Exec` suffix) | `SparkPlan.executeQuery` ([Spark SQL]({{ book.spark_sql }}/physical-operators/SparkPlan/#executeQuery))

`withScope`...FIXME

---

`withScope` is used when:

* `RDD` is requested to [doCheckpoint](RDD.md#doCheckpoint) and [withScope](RDD.md#withScope) (for most, if not all, `RDD` API operators)
* `SparkContext` is requested to [withScope](../SparkContext.md#withScope) (for most, if not all, `SparkContext` API operators)
* `SparkPlan` ([Spark SQL]({{ book.spark_sql }}/physical-operators/SparkPlan/#executeQuery)) is requested to `executeQuery`
22 changes: 13 additions & 9 deletions docs/scheduler/DAGScheduler.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,25 +31,25 @@ After an [action](../rdd/spark-rdd-actions.md) has been called on an `RDD`, [Spa
* Determines the [preferred locations](#preferred-locations) to run each task on
* Handles failures due to **shuffle output files** being lost

DAGScheduler computes [a directed acyclic graph (DAG)](https://en.wikipedia.org/wiki/Directed_acyclic_graph) of stages for each job, keeps track of which RDDs and stage outputs are materialized, and finds a minimal schedule to run jobs. It then submits stages to [TaskScheduler](TaskScheduler.md).
`DAGScheduler` computes [a directed acyclic graph (DAG)](https://en.wikipedia.org/wiki/Directed_acyclic_graph) of stages for each job, keeps track of which RDDs and stage outputs are materialized, and finds a minimal schedule to run jobs. It then submits stages to [TaskScheduler](TaskScheduler.md).

![DAGScheduler.submitJob](../images/scheduler/dagscheduler-submitjob.png)

In addition to coming up with the execution DAG, DAGScheduler also determines the preferred locations to run each task on, based on the current cache status, and passes the information to [TaskScheduler](TaskScheduler.md).
In addition to coming up with the execution DAG, `DAGScheduler` also determines the preferred locations to run each task on, based on the current cache status, and passes the information to [TaskScheduler](TaskScheduler.md).

DAGScheduler tracks which rdd/spark-rdd-caching.md[RDDs are cached (or persisted)] to avoid "recomputing" them, i.e. redoing the map side of a shuffle. DAGScheduler remembers what ShuffleMapStage.md[ShuffleMapStage]s have already produced output files (that are stored in [BlockManager](../storage/BlockManager.md)s).
`DAGScheduler` tracks which [RDDs are cached (or persisted)](../rdd/spark-rdd-caching.md) to avoid "recomputing" them (re-doing the map side of a shuffle). `DAGScheduler` remembers what [ShuffleMapStage](ShuffleMapStage.md)s have already produced output files (that are stored in [BlockManager](../storage/BlockManager.md)s).

`DAGScheduler` is only interested in cache location coordinates, i.e. host and executor id, per partition of a RDD.
`DAGScheduler` is only interested in cache location coordinates (i.e. host and executor id, per partition of a RDD).

Furthermore, it handles failures due to shuffle output files being lost, in which case old stages may need to be resubmitted. Failures within a stage that are not caused by shuffle file loss are handled by the TaskScheduler itself, which will retry each task a small number of times before cancelling the whole stage.
Furthermore, `DAGScheduler` handles failures due to shuffle output files being lost, in which case old stages may need to be resubmitted. Failures within a stage that are not caused by shuffle file loss are handled by the `TaskScheduler` itself, which will retry each task a small number of times before cancelling the whole stage.

DAGScheduler uses an **event queue architecture** in which a thread can post `DAGSchedulerEvent` events, e.g. a new job or stage being submitted, that DAGScheduler reads and executes sequentially. See the section [Event Bus](#event-loop).
`DAGScheduler` uses an **event queue architecture** in which a thread can post `DAGSchedulerEvent` events, e.g. a new job or stage being submitted, that `DAGScheduler` reads and executes sequentially. See the section [Event Bus](#event-loop).

DAGScheduler runs stages in topological order.
`DAGScheduler` runs stages in topological order.

DAGScheduler uses [SparkContext](../SparkContext.md), [TaskScheduler](TaskScheduler.md), LiveListenerBus.md[], MapOutputTracker.md[MapOutputTracker] and storage:BlockManager.md[BlockManager] for its services. However, at the very minimum, DAGScheduler takes a `SparkContext` only (and requests `SparkContext` for the other services).
`DAGScheduler` uses [SparkContext](../SparkContext.md), [TaskScheduler](TaskScheduler.md), [LiveListenerBus](LiveListenerBus.md), [MapOutputTracker](MapOutputTracker.md) and [BlockManager](../storage/BlockManager.md) for its services. However, at the very minimum, `DAGScheduler` takes a `SparkContext` only (and requests `SparkContext` for the other services).

When DAGScheduler schedules a job as a result of rdd/index.md#actions[executing an action on a RDD] or [calling SparkContext.runJob() method directly](../SparkContext.md#runJob), it spawns parallel tasks to compute (partial) results per partition.
When `DAGScheduler` schedules a job as a result of [executing an action on a RDD](../rdd/index.md#actions) or [calling SparkContext.runJob directly](../SparkContext.md#runJob), it spawns parallel tasks to compute (partial) results per partition.

## Creating Instance

Expand Down Expand Up @@ -85,6 +85,8 @@ submitMapStage[K, V, C](

In the end, `submitMapStage` posts a [MapStageSubmitted](DAGSchedulerEvent.md#MapStageSubmitted) and returns the `JobWaiter`.

---

Used when:

* `SparkContext` is requested to [submit a MapStage for execution](../SparkContext.md#submitMapStage)
Expand Down Expand Up @@ -139,6 +141,8 @@ Job [jobId] finished: [callSite], took [time] s
Job [jobId] failed: [callSite], took [time] s
```

---

`runJob` is used when:

* `SparkContext` is requested to [run a job](../SparkContext.md#runJob)
Expand Down
1 change: 1 addition & 0 deletions mkdocs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -524,6 +524,7 @@ nav:
- NewHadoopRDD: rdd/NewHadoopRDD.md
- ParallelCollectionRDD: rdd/ParallelCollectionRDD.md
- RDD: rdd/RDD.md
- RDDOperationScope: rdd/RDDOperationScope.md
- ReliableCheckpointRDD: rdd/ReliableCheckpointRDD.md
- ShuffleDependency: rdd/ShuffleDependency.md
- ShuffledRDD: rdd/ShuffledRDD.md
Expand Down

0 comments on commit 21d8785

Please sign in to comment.