Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ThreadContextElement for common #4208

Draft
wants to merge 4 commits into
base: develop
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions kotlinx-coroutines-core/api/kotlinx-coroutines-core.klib.api
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,11 @@ abstract interface <#A: kotlin/Any?> kotlinx.coroutines/CompletableDeferred : ko
abstract fun completeExceptionally(kotlin/Throwable): kotlin/Boolean // kotlinx.coroutines/CompletableDeferred.completeExceptionally|completeExceptionally(kotlin.Throwable){}[0]
}

abstract interface <#A: kotlin/Any?> kotlinx.coroutines/ThreadContextElement : kotlin.coroutines/CoroutineContext.Element { // kotlinx.coroutines/ThreadContextElement|null[0]
abstract fun restoreThreadContext(kotlin.coroutines/CoroutineContext, #A) // kotlinx.coroutines/ThreadContextElement.restoreThreadContext|restoreThreadContext(kotlin.coroutines.CoroutineContext;1:0){}[0]
abstract fun updateThreadContext(kotlin.coroutines/CoroutineContext): #A // kotlinx.coroutines/ThreadContextElement.updateThreadContext|updateThreadContext(kotlin.coroutines.CoroutineContext){}[0]
}

abstract interface <#A: kotlin/Throwable & kotlinx.coroutines/CopyableThrowable<#A>> kotlinx.coroutines/CopyableThrowable { // kotlinx.coroutines/CopyableThrowable|null[0]
abstract fun createCopy(): #A? // kotlinx.coroutines/CopyableThrowable.createCopy|createCopy(){}[0]
}
Expand Down
110 changes: 108 additions & 2 deletions kotlinx-coroutines-core/common/src/Builders.common.kt
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import kotlinx.atomicfu.*
import kotlinx.coroutines.internal.*
import kotlinx.coroutines.intrinsics.*
import kotlinx.coroutines.selects.*
import kotlin.concurrent.Volatile
import kotlin.contracts.*
import kotlin.coroutines.*
import kotlin.coroutines.intrinsics.*
Expand Down Expand Up @@ -205,10 +206,115 @@ private class LazyStandaloneCoroutine(
}

// Used by withContext when context changes, but dispatcher stays the same
internal expect class UndispatchedCoroutine<in T>(
internal class UndispatchedCoroutine<in T>(
context: CoroutineContext,
uCont: Continuation<T>
) : ScopeCoroutine<T>
) : ScopeCoroutine<T>(if (context[UndispatchedMarker] == null) context + UndispatchedMarker else context, uCont) {

/**
* The state of [ThreadContextElement]s associated with the current undispatched coroutine.
* It is stored in a thread local because this coroutine can be used concurrently in suspend-resume race scenario.
* See the followin, boiled down example with inlined `withContinuationContext` body:
* ```
* val state = saveThreadContext(ctx)
* try {
* invokeSmthWithThisCoroutineAsCompletion() // Completion implies that 'afterResume' will be called
* // COROUTINE_SUSPENDED is returned
* } finally {
* thisCoroutine().clearThreadContext() // Concurrently the "smth" could've been already resumed on a different thread
* // and it also calls saveThreadContext and clearThreadContext
* }
* ```
*
* Usage note:
*
* This part of the code is performance-sensitive.
* It is a well-established pattern to wrap various activities into system-specific undispatched
* `withContext` for the sake of logging, MDC, tracing etc., meaning that there exists thousands of
* undispatched coroutines.
* Each access to [CommonThreadLocal] on JVM leaves a footprint in the corresponding Thread's `ThreadLocalMap`
* that is cleared automatically as soon as the associated thread-local (-> UndispatchedCoroutine) is garbage collected.
* When such coroutines are promoted to old generation, `ThreadLocalMap`s become bloated and an arbitrary accesses to thread locals
* start to consume significant amount of CPU because these maps are open-addressed and cleaned up incrementally on each access.
* (You can read more about this effect as "GC nepotism").
*
* To avoid that, we attempt to narrow down the lifetime of this thread local as much as possible:
* - It's never accessed when we are sure there are no thread context elements
* - It's cleaned up via [CommonThreadLocal.remove] as soon as the coroutine is suspended or finished.
*/
private val threadStateToRecover = commonThreadLocal<Pair<CoroutineContext, Any?>?>(Symbol("UndispatchedCoroutine"))

/*
* Indicates that a coroutine has at least one thread context element associated with it
* and that 'threadStateToRecover' is going to be set in case of dispatchhing in order to preserve them.
* Better than nullable thread-local for easier debugging.
*
* It is used as a performance optimization to avoid 'threadStateToRecover' initialization
* (note: tl.get() initializes thread local),
* and is prone to false-positives as it is never reset: otherwise
* it may lead to logical data races between suspensions point where
* coroutine is yet being suspended in one thread while already being resumed
* in another.
*/
@Volatile
private var threadLocalIsSet = false

init {
/*
* This is a hack for a very specific case in #2930 unless #3253 is implemented.
* 'ThreadLocalStressTest' covers this change properly.
*
* The scenario this change covers is the following:
* 1) The coroutine is being started as plain non kotlinx.coroutines related suspend function,
* e.g. `suspend fun main` or, more importantly, Ktor `SuspendFunGun`, that is invoking
* `withContext(tlElement)` which creates `UndispatchedCoroutine`.
* 2) It (original continuation) is then not wrapped into `DispatchedContinuation` via `intercept()`
* and goes neither through `DC.run` nor through `resumeUndispatchedWith` that both
* do thread context element tracking.
* 3) So thread locals never got chance to get properly set up via `saveThreadContext`,
* but when `withContext` finishes, it attempts to recover thread locals in its `afterResume`.
*
* Here we detect precisely this situation and properly setup context to recover later.
*
*/
if (uCont.context[ContinuationInterceptor] !is CoroutineDispatcher) {
/*
* We cannot just "read" the elements as there is no such API,
* so we update-restore it immediately and use the intermediate value
* as the initial state, leveraging the fact that thread context element
* is idempotent and such situations are increasingly rare.
*/
val values = updateThreadContext(context, null)
restoreThreadContext(context, values)
saveThreadContext(context, values)
}
}

fun saveThreadContext(context: CoroutineContext, oldValue: Any?) {
threadLocalIsSet = true // Specify that thread-local is touched at all
threadStateToRecover.set(context to oldValue)
}

fun clearThreadContext(): Boolean {
return !(threadLocalIsSet && threadStateToRecover.get() == null).also {
threadStateToRecover.remove()
}
}

override fun afterResume(state: Any?) {
if (threadLocalIsSet) {
threadStateToRecover.get()?.let { (ctx, value) ->
restoreThreadContext(ctx, value)
}
threadStateToRecover.remove()
}
// resume undispatched -- update context but stay on the same dispatcher
val result = recoverResult(state, uCont)
withContinuationContext(uCont, null) {
uCont.resumeWith(result)
}
}
}

private const val UNDECIDED = 0
private const val SUSPENDED = 1
Expand Down
79 changes: 76 additions & 3 deletions kotlinx-coroutines-core/common/src/CoroutineContext.common.kt
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package kotlinx.coroutines

import kotlinx.coroutines.internal.*
import kotlin.coroutines.*

/**
Expand All @@ -20,8 +21,80 @@ public expect fun CoroutineContext.newCoroutineContext(addedContext: CoroutineCo
@Suppress("PropertyName")
internal expect val DefaultDelay: Delay

// countOrElement -- pre-cached value for ThreadContext.kt
internal expect inline fun <T> withCoroutineContext(context: CoroutineContext, countOrElement: Any?, block: () -> T): T
internal expect inline fun <T> withContinuationContext(continuation: Continuation<*>, countOrElement: Any?, block: () -> T): T
internal expect fun Continuation<*>.toDebugString(): String
internal expect val CoroutineContext.coroutineName: String?

/**
* Executes a block using a given coroutine context.
* @param countOrElement pre-cached value for [updateThreadContext]
*/
internal inline fun <T> withCoroutineContext(context: CoroutineContext, countOrElement: Any?, block: () -> T): T {
val oldValue = updateThreadContext(context, countOrElement)
try {
return block()
} finally {
restoreThreadContext(context, oldValue)
}
}

/**
* Executes a block using a context of a given continuation.
* @param countOrElement pre-cached value for [updateThreadContext]
*/
internal inline fun <T> withContinuationContext(continuation: Continuation<*>, countOrElement: Any?, block: () -> T): T {
val context = continuation.context
val oldValue = updateThreadContext(context, countOrElement)
val undispatchedCompletion = if (oldValue !== NO_THREAD_ELEMENTS) {
// Only if some values were replaced we'll go to the slow path of figuring out where/how to restore them
continuation.updateUndispatchedCompletion(context, oldValue)
} else {
null // fast path -- don't even try to find undispatchedCompletion as there's nothing to restore in the context
}
try {
return block()
} finally {
if (undispatchedCompletion == null || undispatchedCompletion.clearThreadContext()) {
restoreThreadContext(context, oldValue)
}
}
}

private fun Continuation<*>.updateUndispatchedCompletion(context: CoroutineContext, oldValue: Any?): UndispatchedCoroutine<*>? {
if (this !is CoroutineStackFrame) return null
/*
* Fast-path to detect whether we have undispatched coroutine at all in our stack.
*
* Implementation note.
* If we ever find that stackwalking for thread-locals is way too slow, here is another idea:
* 1) Store undispatched coroutine right in the `UndispatchedMarker` instance
* 2) To avoid issues with cross-dispatch boundary, remove `UndispatchedMarker`
* from the context when creating dispatched coroutine in `withContext`.
* Another option is to "unmark it" instead of removing to save an allocation.
* Both options should work, but it requires more careful studying of the performance
* and, mostly, maintainability impact.
*/
val potentiallyHasUndispatchedCoroutine = context[UndispatchedMarker] !== null
if (!potentiallyHasUndispatchedCoroutine) return null
val completion = undispatchedCompletion()
completion?.saveThreadContext(context, oldValue)
return completion
}

private tailrec fun CoroutineStackFrame.undispatchedCompletion(): UndispatchedCoroutine<*>? {
// Find direct completion of this continuation
val completion: CoroutineStackFrame = when (this) {
is DispatchedCoroutine<*> -> return null
else -> callerFrame ?: return null // something else -- not supported
}
if (completion is UndispatchedCoroutine<*>) return completion // found UndispatchedCoroutine!
return completion.undispatchedCompletion() // walk up the call stack with tail call
}

/**
* Marker indicating that [UndispatchedCoroutine] exists somewhere up in the stack.
* Used as a performance optimization to avoid stack walking where it is not necessary.
*/
internal object UndispatchedMarker: CoroutineContext.Element, CoroutineContext.Key<UndispatchedMarker> {
override val key: CoroutineContext.Key<*>
get() = this
}
82 changes: 82 additions & 0 deletions kotlinx-coroutines-core/common/src/ThreadContextElement.common.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
package kotlinx.coroutines

import kotlin.coroutines.*

/**
* Defines elements in [CoroutineContext] that are installed into thread context
* every time the coroutine with this element in the context is resumed on a thread.
*
* Implementations of this interface define a type [S] of the thread-local state that they need to store on
* resume of a coroutine and restore later on suspend. The infrastructure provides the corresponding storage.
*
* Example usage looks like this:
*
* ```
* // Appends "name" of a coroutine to a current thread name when coroutine is executed
* class CoroutineName(val name: String) : ThreadContextElement<String> {
* // declare companion object for a key of this element in coroutine context
* companion object Key : CoroutineContext.Key<CoroutineName>
*
* // provide the key of the corresponding context element
* override val key: CoroutineContext.Key<CoroutineName>
* get() = Key
*
* // this is invoked before coroutine is resumed on current thread
* override fun updateThreadContext(context: CoroutineContext): String {
* val previousName = Thread.currentThread().name
* Thread.currentThread().name = "$previousName # $name"
* return previousName
* }
*
* // this is invoked after coroutine has suspended on current thread
* override fun restoreThreadContext(context: CoroutineContext, oldState: String) {
* Thread.currentThread().name = oldState
* }
* }
*
* // Usage
* launch(Dispatchers.Main + CoroutineName("Progress bar coroutine")) { ... }
* ```
*
* Every time this coroutine is resumed on a thread, UI thread name is updated to
* "UI thread original name # Progress bar coroutine" and the thread name is restored to the original one when
* this coroutine suspends.
*
* To use [ThreadLocal] variable within the coroutine use [ThreadLocal.asContextElement][asContextElement] function.
*
* ### Reentrancy and thread-safety
*
* Correct implementations of this interface must expect that calls to [restoreThreadContext]
* may happen in parallel to the subsequent [updateThreadContext] and [restoreThreadContext] operations.
* See [CopyableThreadContextElement] for advanced interleaving details.
*
* All implementations of [ThreadContextElement] should be thread-safe and guard their internal mutable state
* within an element accordingly.
*/
public interface ThreadContextElement<S> : CoroutineContext.Element {
/**
* Updates context of the current thread.
* This function is invoked before the coroutine in the specified [context] is resumed in the current thread
* when the context of the coroutine this element.
* The result of this function is the old value of the thread-local state that will be passed to [restoreThreadContext].
* This method should handle its own exceptions and do not rethrow it. Thrown exceptions will leave coroutine which
* context is updated in an undefined state and may crash an application.
*
* @param context the coroutine context.
*/
public fun updateThreadContext(context: CoroutineContext): S

/**
* Restores context of the current thread.
* This function is invoked after the coroutine in the specified [context] is suspended in the current thread
* if [updateThreadContext] was previously invoked on resume of this coroutine.
* The value of [oldState] is the result of the previous invocation of [updateThreadContext] and it should
* be restored in the thread-local state by this function.
* This method should handle its own exceptions and do not rethrow it. Thrown exceptions will leave coroutine which
* context is updated in an undefined state and may crash an application.
*
* @param context the coroutine context.
* @param oldState the value returned by the previous invocation of [updateThreadContext].
*/
public fun restoreThreadContext(context: CoroutineContext, oldState: S)
}
Loading