Skip to content

Commit

Permalink
Serialized Task Size Threshold + Finding Preferred Locations for RDD …
Browse files Browse the repository at this point in the history
…Partition
  • Loading branch information
jaceklaskowski committed Feb 17, 2024
1 parent 2077326 commit d139926
Show file tree
Hide file tree
Showing 3 changed files with 55 additions and 23 deletions.
36 changes: 21 additions & 15 deletions docs/SparkContext.md
Original file line number Diff line number Diff line change
Expand Up @@ -513,6 +513,27 @@ withScope[U](
!!! note
`withScope` is used for most (if not all) `SparkContext` API operators.

## Finding Preferred Locations for RDD Partition { #getPreferredLocs }

```scala
getPreferredLocs(
rdd: RDD[_],
partition: Int): Seq[TaskLocation]
```

`getPreferredLocs` requests the [DAGScheduler](#dagScheduler) for the [preferred locations](scheduler/DAGScheduler.md#getPreferredLocs) of the given `partition` (of the given [RDD](rdd/RDD.md)).

!!! note
**Preferred locations** of a RDD partition are also referred to as _placement preferences_ or _locality preferences_.

---

`getPreferredLocs` is used when:

* `CoalescedRDDPartition` is requested to `localFraction`
* `DefaultPartitionCoalescer` is requested to `currPrefLocs`
* `PartitionerAwareUnionRDD` is requested to `currPrefLocs`

## Logging

Enable `ALL` logging level for `org.apache.spark.SparkContext` logger to see what happens inside.
Expand Down Expand Up @@ -1236,21 +1257,6 @@ SparkContext may have a core:ContextCleaner.md[ContextCleaner] defined.
`ContextCleaner` is created when `SparkContext` is created with configuration-properties.md#spark.cleaner.referenceTracking[spark.cleaner.referenceTracking] configuration property enabled.
== [[getPreferredLocs]] Finding Preferred Locations (Placement Preferences) for RDD Partition
[source, scala]
----
getPreferredLocs(
rdd: RDD[_],
partition: Int): Seq[TaskLocation]
----
getPreferredLocs simply scheduler:DAGScheduler.md#getPreferredLocs[requests `DAGScheduler` for the preferred locations for `partition`].
NOTE: Preferred locations of a partition of a RDD are also called *placement preferences* or *locality preferences*.
getPreferredLocs is used in CoalescedRDDPartition, DefaultPartitionCoalescer and PartitionerAwareUnionRDD.
== [[persistRDD]] Registering RDD in persistentRdds Internal Registry -- `persistRDD` Internal Method
[source, scala]
Expand Down
9 changes: 7 additions & 2 deletions docs/scheduler/DAGScheduler.md
Original file line number Diff line number Diff line change
Expand Up @@ -736,13 +736,18 @@ In the end, with no tasks to submit for execution, `submitMissingTasks` [submits

```scala
getPreferredLocs(
rdd: RDD[_],
rdd: RDD[_],
partition: Int): Seq[TaskLocation]
```

`getPreferredLocs` is simply an alias for the internal (recursive) [getPreferredLocsInternal](#getPreferredLocsInternal).

`getPreferredLocs` is used when...FIXME
---

`getPreferredLocs` is used when:

* `SparkContext` is requested to [getPreferredLocs](../SparkContext.md#getPreferredLocs)
* `DAGScheduler` is requested to [submit the missing tasks of a stage](#submitMissingTasks)

## Finding BlockManagers (Executors) for Cached RDD Partitions (aka Block Location Discovery) { #getCacheLocs }

Expand Down
33 changes: 27 additions & 6 deletions docs/scheduler/TaskSetManager.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ Epoch for [taskSet]: [epoch]

`TaskSetManager` [adds the tasks as pending execution](#addPendingTask) (in reverse order from the highest partition to the lowest).

### <span id="maxTaskFailures"> Number of Task Failures
### Number of Task Failures { #maxTaskFailures }

`TaskSetManager` is given `maxTaskFailures` value that is how many times a [single task can fail](#handleFailedTask) before the whole [TaskSet](#taskSet) is [aborted](#abort).

Expand Down Expand Up @@ -88,7 +88,7 @@ In the end, `resourceOffer` returns the `TaskDescription`, `hasScheduleDelayReje

* `TaskSchedulerImpl` is requested to [resourceOfferSingleTaskSet](TaskSchedulerImpl.md#resourceOfferSingleTaskSet)

## <span id="getLocalityWait"> Locality Wait
## Locality Wait { #getLocalityWait }

```scala
getLocalityWait(
Expand Down Expand Up @@ -116,11 +116,11 @@ Unless the value has been determined, `getLocalityWait` defaults to `0`.

* `TaskSetManager` is [created](#localityWaits) and [recomputes locality preferences](#recomputeLocality)

## <span id="maxResultSize"> spark.driver.maxResultSize
## spark.driver.maxResultSize { #maxResultSize }

`TaskSetManager` uses [spark.driver.maxResultSize](../configuration-properties.md#spark.driver.maxResultSize) configuration property to [check available memory for more task results](#canFetchMoreResults).

## <span id="recomputeLocality"> Recomputing Task Locality Preferences
## Recomputing Task Locality Preferences { #recomputeLocality }

```java
recomputeLocality(): Unit
Expand Down Expand Up @@ -150,7 +150,7 @@ While in zombie state, a `TaskSetManager` can launch no new tasks and responds w

A `TaskSetManager` remains in the zombie state until all tasks have finished running, i.e. to continue to track and account for the running tasks.

## <span id="computeValidLocalityLevels"> Computing Locality Levels (for Scheduled Tasks)
## Computing Locality Levels (for Scheduled Tasks) { #computeValidLocalityLevels }

```scala
computeValidLocalityLevels(): Array[TaskLocality.TaskLocality]
Expand Down Expand Up @@ -182,7 +182,7 @@ Valid locality levels for [taskSet]: [comma-separated levels]

* `TaskSetManager` is [created](#myLocalityLevels) and to [recomputeLocality](#recomputeLocality)

## <span id="executorAdded"> executorAdded
## executorAdded { #executorAdded }

```scala
executorAdded(): Unit
Expand Down Expand Up @@ -222,6 +222,27 @@ prepareLaunchingTask(
* `TaskSchedulerImpl` is requested to [resourceOffers](TaskSchedulerImpl.md#resourceOffers)
* `TaskSetManager` is requested to [resourceOffers](#resourceOffers)

## Serialized Task Size Threshold { #TASK_SIZE_TO_WARN_KIB }

`TaskSetManager` object defines `TASK_SIZE_TO_WARN_KIB` value as the threshold to warn a user if any stages contain a task that has a serialized size greater than `1000` kB.

### DAGScheduler { #TASK_SIZE_TO_WARN_KIB-DAGScheduler }

`DAGScheduler` can print out the following WARN message to the logs when requested to [submitMissingTasks](DAGScheduler.md#submitMissingTasks):

```text
Broadcasting large task binary with size [taskBinaryBytes] [siByteSuffix]
```

### TaskSetManager { #TASK_SIZE_TO_WARN_KIB-TaskSetManager }

`TaskSetManager` can print out the following WARN message to the logs when requested to [prepareLaunchingTask](#prepareLaunchingTask):

```text
Stage [stageId] contains a task of very large size ([serializedTask] KiB).
The maximum recommended task size is 1000 KiB.
```

## Demo

Enable `DEBUG` logging level for `org.apache.spark.scheduler.TaskSchedulerImpl` (or `org.apache.spark.scheduler.cluster.YarnScheduler` for YARN) and `org.apache.spark.scheduler.TaskSetManager` and execute the following two-stage job to see their low-level innerworkings.
Expand Down

0 comments on commit d139926

Please sign in to comment.