From d139926b8b18580d52f480925bd83e9e9b76fae9 Mon Sep 17 00:00:00 2001 From: Jacek Laskowski Date: Sat, 17 Feb 2024 19:49:45 +0100 Subject: [PATCH] Serialized Task Size Threshold + Finding Preferred Locations for RDD Partition --- docs/SparkContext.md | 36 +++++++++++++++++++------------- docs/scheduler/DAGScheduler.md | 9 ++++++-- docs/scheduler/TaskSetManager.md | 33 +++++++++++++++++++++++------ 3 files changed, 55 insertions(+), 23 deletions(-) diff --git a/docs/SparkContext.md b/docs/SparkContext.md index 61f69801ab..6bf34e4a77 100644 --- a/docs/SparkContext.md +++ b/docs/SparkContext.md @@ -513,6 +513,27 @@ withScope[U]( !!! note `withScope` is used for most (if not all) `SparkContext` API operators. +## Finding Preferred Locations for RDD Partition { #getPreferredLocs } + +```scala +getPreferredLocs( + rdd: RDD[_], + partition: Int): Seq[TaskLocation] +``` + +`getPreferredLocs` requests the [DAGScheduler](#dagScheduler) for the [preferred locations](scheduler/DAGScheduler.md#getPreferredLocs) of the given `partition` (of the given [RDD](rdd/RDD.md)). + +!!! note + **Preferred locations** of a RDD partition are also referred to as _placement preferences_ or _locality preferences_. + +--- + +`getPreferredLocs` is used when: + +* `CoalescedRDDPartition` is requested to `localFraction` +* `DefaultPartitionCoalescer` is requested to `currPrefLocs` +* `PartitionerAwareUnionRDD` is requested to `currPrefLocs` + ## Logging Enable `ALL` logging level for `org.apache.spark.SparkContext` logger to see what happens inside. @@ -1236,21 +1257,6 @@ SparkContext may have a core:ContextCleaner.md[ContextCleaner] defined. `ContextCleaner` is created when `SparkContext` is created with configuration-properties.md#spark.cleaner.referenceTracking[spark.cleaner.referenceTracking] configuration property enabled. -== [[getPreferredLocs]] Finding Preferred Locations (Placement Preferences) for RDD Partition - -[source, scala] ----- -getPreferredLocs( - rdd: RDD[_], - partition: Int): Seq[TaskLocation] ----- - -getPreferredLocs simply scheduler:DAGScheduler.md#getPreferredLocs[requests `DAGScheduler` for the preferred locations for `partition`]. - -NOTE: Preferred locations of a partition of a RDD are also called *placement preferences* or *locality preferences*. - -getPreferredLocs is used in CoalescedRDDPartition, DefaultPartitionCoalescer and PartitionerAwareUnionRDD. - == [[persistRDD]] Registering RDD in persistentRdds Internal Registry -- `persistRDD` Internal Method [source, scala] diff --git a/docs/scheduler/DAGScheduler.md b/docs/scheduler/DAGScheduler.md index ff97bab82e..98e9c20479 100644 --- a/docs/scheduler/DAGScheduler.md +++ b/docs/scheduler/DAGScheduler.md @@ -736,13 +736,18 @@ In the end, with no tasks to submit for execution, `submitMissingTasks` [submits ```scala getPreferredLocs( - rdd: RDD[_], + rdd: RDD[_], partition: Int): Seq[TaskLocation] ``` `getPreferredLocs` is simply an alias for the internal (recursive) [getPreferredLocsInternal](#getPreferredLocsInternal). -`getPreferredLocs` is used when...FIXME +--- + +`getPreferredLocs` is used when: + +* `SparkContext` is requested to [getPreferredLocs](../SparkContext.md#getPreferredLocs) +* `DAGScheduler` is requested to [submit the missing tasks of a stage](#submitMissingTasks) ## Finding BlockManagers (Executors) for Cached RDD Partitions (aka Block Location Discovery) { #getCacheLocs } diff --git a/docs/scheduler/TaskSetManager.md b/docs/scheduler/TaskSetManager.md index d70fea18a0..863688895c 100644 --- a/docs/scheduler/TaskSetManager.md +++ b/docs/scheduler/TaskSetManager.md @@ -35,7 +35,7 @@ Epoch for [taskSet]: [epoch] `TaskSetManager` [adds the tasks as pending execution](#addPendingTask) (in reverse order from the highest partition to the lowest). -### Number of Task Failures +### Number of Task Failures { #maxTaskFailures } `TaskSetManager` is given `maxTaskFailures` value that is how many times a [single task can fail](#handleFailedTask) before the whole [TaskSet](#taskSet) is [aborted](#abort). @@ -88,7 +88,7 @@ In the end, `resourceOffer` returns the `TaskDescription`, `hasScheduleDelayReje * `TaskSchedulerImpl` is requested to [resourceOfferSingleTaskSet](TaskSchedulerImpl.md#resourceOfferSingleTaskSet) -## Locality Wait +## Locality Wait { #getLocalityWait } ```scala getLocalityWait( @@ -116,11 +116,11 @@ Unless the value has been determined, `getLocalityWait` defaults to `0`. * `TaskSetManager` is [created](#localityWaits) and [recomputes locality preferences](#recomputeLocality) -## spark.driver.maxResultSize +## spark.driver.maxResultSize { #maxResultSize } `TaskSetManager` uses [spark.driver.maxResultSize](../configuration-properties.md#spark.driver.maxResultSize) configuration property to [check available memory for more task results](#canFetchMoreResults). -## Recomputing Task Locality Preferences +## Recomputing Task Locality Preferences { #recomputeLocality } ```java recomputeLocality(): Unit @@ -150,7 +150,7 @@ While in zombie state, a `TaskSetManager` can launch no new tasks and responds w A `TaskSetManager` remains in the zombie state until all tasks have finished running, i.e. to continue to track and account for the running tasks. -## Computing Locality Levels (for Scheduled Tasks) +## Computing Locality Levels (for Scheduled Tasks) { #computeValidLocalityLevels } ```scala computeValidLocalityLevels(): Array[TaskLocality.TaskLocality] @@ -182,7 +182,7 @@ Valid locality levels for [taskSet]: [comma-separated levels] * `TaskSetManager` is [created](#myLocalityLevels) and to [recomputeLocality](#recomputeLocality) -## executorAdded +## executorAdded { #executorAdded } ```scala executorAdded(): Unit @@ -222,6 +222,27 @@ prepareLaunchingTask( * `TaskSchedulerImpl` is requested to [resourceOffers](TaskSchedulerImpl.md#resourceOffers) * `TaskSetManager` is requested to [resourceOffers](#resourceOffers) +## Serialized Task Size Threshold { #TASK_SIZE_TO_WARN_KIB } + +`TaskSetManager` object defines `TASK_SIZE_TO_WARN_KIB` value as the threshold to warn a user if any stages contain a task that has a serialized size greater than `1000` kB. + +### DAGScheduler { #TASK_SIZE_TO_WARN_KIB-DAGScheduler } + +`DAGScheduler` can print out the following WARN message to the logs when requested to [submitMissingTasks](DAGScheduler.md#submitMissingTasks): + +```text +Broadcasting large task binary with size [taskBinaryBytes] [siByteSuffix] +``` + +### TaskSetManager { #TASK_SIZE_TO_WARN_KIB-TaskSetManager } + +`TaskSetManager` can print out the following WARN message to the logs when requested to [prepareLaunchingTask](#prepareLaunchingTask): + +```text +Stage [stageId] contains a task of very large size ([serializedTask] KiB). +The maximum recommended task size is 1000 KiB. +``` + ## Demo Enable `DEBUG` logging level for `org.apache.spark.scheduler.TaskSchedulerImpl` (or `org.apache.spark.scheduler.cluster.YarnScheduler` for YARN) and `org.apache.spark.scheduler.TaskSetManager` and execute the following two-stage job to see their low-level innerworkings.