diff --git a/kotlinx-coroutines-core/common/src/JobSupport.kt b/kotlinx-coroutines-core/common/src/JobSupport.kt index 3252d1047f..db4c32827b 100644 --- a/kotlinx-coroutines-core/common/src/JobSupport.kt +++ b/kotlinx-coroutines-core/common/src/JobSupport.kt @@ -7,6 +7,7 @@ import kotlinx.coroutines.internal.* import kotlinx.coroutines.selects.* import kotlin.coroutines.* import kotlin.coroutines.intrinsics.* +import kotlin.experimental.* import kotlin.js.* import kotlin.jvm.* @@ -319,8 +320,7 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren private fun notifyCancelling(list: NodeList, cause: Throwable) { // first cancel our own children onCancelling(cause) - list.close(LIST_CANCELLATION_PERMISSION) - notifyHandlers(list, cause) { it.onCancelling } + notifyHandlers(list, LIST_CANCELLATION_PERMISSION, cause) { it.onCancelling } // then cancel parent cancelParent(cause) // tentative cancellation -- does not matter if there is no parent } @@ -352,13 +352,12 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren } private fun NodeList.notifyCompletion(cause: Throwable?) { - close(LIST_ON_COMPLETION_PERMISSION) - notifyHandlers(this, cause) { true } + notifyHandlers(this, LIST_ON_COMPLETION_PERMISSION, cause) { true } } - private inline fun notifyHandlers(list: NodeList, cause: Throwable?, predicate: (JobNode) -> Boolean) { + private fun notifyHandlers(list: NodeList, permissionBitmask: Byte, cause: Throwable?, predicate: (JobNode) -> Boolean) { var exception: Throwable? = null - list.forEach { node -> + list.forEach(forbidBitmask = permissionBitmask) { node, _, _ -> if (node is JobNode && predicate(node)) { try { node.invoke(cause) @@ -559,10 +558,9 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren private fun promoteSingleToNodeList(state: JobNode) { // try to promote it to list (SINGLE+ state) - state.addOneIfEmpty(NodeList()) - // it must be in SINGLE+ state or state has changed (node could have need removed from state) - val list = state.nextNode // either our NodeList or somebody else won the race, updated state - // just attempt converting it to list if state is still the same, then we'll continue lock-free loop + val list = NodeList() + val address = list.addLastWithoutModifying(state, permissionsBitmask = 0) + assert { address == 0L } _state.compareAndSet(state, list) } @@ -626,7 +624,7 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren } is Incomplete -> { // may have a list of completion handlers // remove node from the list if there is a list - if (state.list != null) node.remove() + state.list?.remove(node) return } else -> return // it is complete and does not have any completion handlers @@ -929,18 +927,7 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren // process cancelling notification here -- it cancels all the children _before_ we start to wait them (sic!!!) notifyRootCause?.let { notifyCancelling(list, it) } // now wait for children - // we can't close the list yet: while there are active children, adding new ones is still allowed. - val child = list.nextChild() - if (child != null && tryWaitForChild(finishing, child, proposedUpdate)) - return COMPLETING_WAITING_CHILDREN - // turns out, there are no children to await, so we close the list. - list.close(LIST_CHILD_PERMISSION) - // some children could have sneaked into the list, so we try waiting for them again. - // it would be more correct to re-open the list (otherwise, we get non-linearizable behavior), - // but it's too difficult with the current lock-free list implementation. - val anotherChild = list.nextChild() - if (anotherChild != null && tryWaitForChild(finishing, anotherChild, proposedUpdate)) - return COMPLETING_WAITING_CHILDREN + if (shouldWaitForChildren(finishing, proposedUpdate)) return COMPLETING_WAITING_CHILDREN // otherwise -- we have not children left (all were already cancelled?) return finalizeFinishingState(finishing, proposedUpdate) } @@ -948,60 +935,62 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren private val Any?.exceptionOrNull: Throwable? get() = (this as? CompletedExceptionally)?.cause - // return false when there is no more incomplete children to wait - // ## IMPORTANT INVARIANT: Only one thread can be concurrently invoking this method. - private tailrec fun tryWaitForChild(state: Finishing, child: ChildHandleNode, proposedUpdate: Any?): Boolean { - val handle = child.childJob.invokeOnCompletion( - invokeImmediately = false, - handler = ChildCompletion(this, state, child, proposedUpdate) - ) - if (handle !== NonDisposableHandle) return true // child is not complete and we've started waiting for it - val nextChild = child.nextChild() ?: return false - return tryWaitForChild(state, nextChild, proposedUpdate) + private fun shouldWaitForChildren( + state: Finishing, + proposedUpdate: Any?, + suggestedStartSegment: LockFreeLinkedListSegment? = null, + suggestedStartIndex: Int? = null + ): Boolean { + val list = state.list + fun tryFindChildren( + closeList: Boolean, + suggestedStartSegment: LockFreeLinkedListSegment? = null, + suggestedStartIndex: Int? = null, + ): Boolean { + var startSegment = suggestedStartSegment + var startIndex = suggestedStartIndex + while (true) { + val child = run { + list.forEach(forbidBitmask = if (closeList) LIST_CHILD_PERMISSION else 0, startInSegment = startSegment, startAfterIndex = startIndex) { node, segment, indexInSegment -> + if (node is ChildHandleNode) { + startSegment = segment + startIndex = indexInSegment + return@run node + } + } + null + } ?: break + val handle = child.childJob.invokeOnCompletion( + invokeImmediately = false, + handler = ChildCompletion(this, state, startSegment!!, startIndex!!, proposedUpdate) + ) + if (handle !== NonDisposableHandle) return true // child is not complete and we've started waiting for it + } + return false + } + // Look for children that are currently in the list after the suggested start node. + if (tryFindChildren(suggestedStartSegment = suggestedStartSegment, suggestedStartIndex = suggestedStartIndex, closeList = false)) return true + // We didn't find anyone in the list after the suggested start node. Let's check the beginning now. + if (suggestedStartSegment != null && tryFindChildren(closeList = false)) return true + // Now we know that, at the moment this function started, there were no more children. + // We can close the list for the new children, and if we still don't find any, we can be sure there are none. + return tryFindChildren(closeList = true) } // ## IMPORTANT INVARIANT: Only one thread can be concurrently invoking this method. - private fun continueCompleting(state: Finishing, lastChild: ChildHandleNode, proposedUpdate: Any?) { + private fun continueCompleting(state: Finishing, proposedUpdate: Any?, lastSegment: LockFreeLinkedListSegment, lastIndexInSegment: Int) { assert { this.state === state } // consistency check -- it cannot change while we are waiting for children - // figure out if we need to wait for the next child - val waitChild = lastChild.nextChild() - // try to wait for the next child - if (waitChild != null && tryWaitForChild(state, waitChild, proposedUpdate)) return // waiting for next child - // no more children to await, so *maybe* we can complete the job; for that, we stop accepting new children. - // potentially, the list can be closed for children more than once: if we detect that there are no more - // children, attempt to close the list, and then new children sneak in, this whole logic will be - // repeated, including closing the list. - state.list.close(LIST_CHILD_PERMISSION) - // did any new children sneak in? - val waitChildAgain = lastChild.nextChild() - if (waitChildAgain != null && tryWaitForChild(state, waitChildAgain, proposedUpdate)) { - // yes, so now we have to wait for them! - // ideally, we should re-open the list, - // but it's too difficult with the current lock-free list implementation, - // so we'll live with non-linearizable behavior for now. - return - } + if (shouldWaitForChildren(state, proposedUpdate, suggestedStartSegment = lastSegment, suggestedStartIndex = lastIndexInSegment)) return // waiting for the next child // no more children, now we are sure; try to update the state val finalState = finalizeFinishingState(state, proposedUpdate) afterCompletion(finalState) } - private fun LockFreeLinkedListNode.nextChild(): ChildHandleNode? { - var cur = this - while (cur.isRemoved) cur = cur.prevNode // rollback to prev non-removed (or list head) - while (true) { - cur = cur.nextNode - if (cur.isRemoved) continue - if (cur is ChildHandleNode) return cur - if (cur is NodeList) return null // checked all -- no more children - } - } - public final override val children: Sequence get() = sequence { when (val state = this@JobSupport.state) { is ChildHandleNode -> yield(state.childJob) is Incomplete -> state.list?.let { list -> - list.forEach { if (it is ChildHandleNode) yield(it.childJob) } + list.forEach { it, _, _ -> if (it is ChildHandleNode) yield(it.childJob) } } } } @@ -1059,7 +1048,7 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren * as this child didn't make it before [notifyCancelling] and won't be notified that it should be * cancelled. * - * And if the parent wasn't cancelled and the previous [LockFreeLinkedListNode.addLast] failed because + * And if the parent wasn't cancelled and the previous [LockFreeLinkedListHead.addLast] failed because * the job is in its final state already, we won't be able to attach anyway, so we must just invoke * the handler and return. */ @@ -1259,11 +1248,12 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren private class ChildCompletion( private val parent: JobSupport, private val state: Finishing, - private val child: ChildHandleNode, + private val segment: LockFreeLinkedListSegment, + private val indexInSegment: Int, private val proposedUpdate: Any? ) : JobNode() { override fun invoke(cause: Throwable?) { - parent.continueCompleting(state, child, proposedUpdate) + parent.continueCompleting(state, proposedUpdate, lastSegment = segment, lastIndexInSegment = indexInSegment) } override val onCancelling: Boolean get() = false } @@ -1410,9 +1400,9 @@ private val EMPTY_NEW = Empty(false) private val EMPTY_ACTIVE = Empty(true) // bit mask -private const val LIST_ON_COMPLETION_PERMISSION = 1 -private const val LIST_CHILD_PERMISSION = 2 -private const val LIST_CANCELLATION_PERMISSION = 4 +private const val LIST_ON_COMPLETION_PERMISSION = 1.toByte() +private const val LIST_CHILD_PERMISSION = 2.toByte() +private const val LIST_CANCELLATION_PERMISSION = 4.toByte() private class Empty(override val isActive: Boolean) : Incomplete { override val list: NodeList? get() = null @@ -1504,7 +1494,7 @@ internal class NodeList : LockFreeLinkedListHead(), Incomplete { append(state) append("}[") var first = true - this@NodeList.forEach { node -> + this@NodeList.forEach { node, _, _ -> if (node is JobNode) { if (first) first = false else append(", ") append(node) diff --git a/kotlinx-coroutines-core/common/src/internal/LockFreeLinkedList.common.kt b/kotlinx-coroutines-core/common/src/internal/LockFreeLinkedList.common.kt index 32209fc0ac..b9b54f10f6 100644 --- a/kotlinx-coroutines-core/common/src/internal/LockFreeLinkedList.common.kt +++ b/kotlinx-coroutines-core/common/src/internal/LockFreeLinkedList.common.kt @@ -2,24 +2,185 @@ package kotlinx.coroutines.internal -/** @suppress **This is unstable API and it is subject to change.** */ -public expect open class LockFreeLinkedListNode() { - public val isRemoved: Boolean - public val nextNode: LockFreeLinkedListNode - public val prevNode: LockFreeLinkedListNode - public fun addLast(node: LockFreeLinkedListNode, permissionsBitmask: Int): Boolean - public fun addOneIfEmpty(node: LockFreeLinkedListNode): Boolean - public open fun remove(): Boolean +import kotlinx.atomicfu.* +import kotlinx.coroutines.* +import kotlin.coroutines.* +import kotlin.experimental.* +import kotlin.jvm.* +/** @suppress **This is unstable API and it is subject to change.** */ +internal open class LockFreeLinkedListNode { /** - * Closes the list for anything that requests the permission [forbiddenElementsBit]. - * Only a single permission can be forbidden at a time, but this isn't checked. + * The default value of 0 means that either the node is not in any list or [LockFreeLinkedListHead.addLast] wasn't + * yet called on it. */ - public fun close(forbiddenElementsBit: Int) + var address: Long = 0 } /** @suppress **This is unstable API and it is subject to change.** */ -public expect open class LockFreeLinkedListHead() : LockFreeLinkedListNode { - public inline fun forEach(block: (LockFreeLinkedListNode) -> Unit) - public final override fun remove(): Nothing +internal open class LockFreeLinkedListHead: LockFreeLinkedListSegment( + id = 0, + prev = null, + pointers = 2, +) { + private val tail = atomic(this) + private val nextElement = atomic(0L) + + /** + * The list of bits that are forbidden from entering the list. + * + * TODO: we can store this in `cleanedAndPointers`, there's enough space for that there. + */ + private val forbiddenBits: AtomicInt = atomic(0) + + /** + * Iterates over all non-removed elements in this list, skipping every node until (and including) [startAfterIndex]. + */ + inline fun forEach( + forbidBitmask: Byte = 0, + startInSegment: LockFreeLinkedListSegment? = null, + startAfterIndex: Int? = null, + block: (LockFreeLinkedListNode, LockFreeLinkedListSegment, Int) -> Unit + ) { + forbiddenBits.update { it or forbidBitmask.toInt() } + var segment: LockFreeLinkedListSegment? = startInSegment ?: this + var startIndex: Int = startAfterIndex?.let { it + 1 } ?: 0 + while (segment != null) { + segment.forEach(forbidBitmask = forbidBitmask, startIndex = startIndex, block = block) + segment = segment.next + startIndex = 0 + } + } + + /** + * Adds the [node] to the end of the list if every bit in [permissionsBitmask] is still allowed in the list, + * and then sets the [node]'s address to the new address. + */ + fun addLast(node: LockFreeLinkedListNode, permissionsBitmask: Byte): Boolean { + node.address = addLastWithoutModifying(node, permissionsBitmask) ?: return false + return true + } + + /** + * Adds the [node] to the end of the list if every bit in [permissionsBitmask] is still allowed in the list. + * As opposed to [addLast], doesn't modify the [node]'s address. + */ + fun addLastWithoutModifying(node: LockFreeLinkedListNode, permissionsBitmask: Byte): Long? { + /** First, avoid modifying the list at all if it was already closed for elements like ours. */ + if (permissionsBitmask and forbiddenBits.value.toByte() != 0.toByte()) return null + /** Obtain the place from which the desired segment will certainly be reachable. */ + val curTail = tail.value + /** Allocate a place for our element. */ + val index = nextElement.getAndIncrement() + /** Find or create a segment where the node can be stored. */ + val createNewSegment = ::createSegment // can't just pass the function, as the compiler crashes (KT-67332) + val segmentId = index / SEGMENT_SIZE + val segment = tail.findSegmentAndMoveForward(id = segmentId, curTail, createNewSegment).segment + assert { segment.id == segmentId } + val indexInSegment = (index % SEGMENT_SIZE).toInt() + /** Double-check that it's still not forbidden for the node to enter the list. */ + if (permissionsBitmask and forbiddenBits.value.toByte() != 0.toByte()) return null + /** Now we know that the list was still not closed at some point *even after the segment* was created. + * Because [forbiddenBits] is set before [forEach] traverses the list, this means that [forEach] is guaranteed + * to observe the new segment and either break the cell where [node] wants to arrive or process the [node]. + * In any case, we have linearizable behavior. */ + return if (segment.tryAdd(node, permissionsBitmask = permissionsBitmask, indexInSegment = indexInSegment)) { + index + } else { + null + } + } + + fun remove(node: LockFreeLinkedListNode) { + val address = node.address + val id = address / SEGMENT_SIZE + var segment: LockFreeLinkedListSegment = this + while (segment.id < id) { segment = segment.next!! } + if (segment.id == id) { + segment.clearSlot((address % SEGMENT_SIZE).toInt(), node) + } + } } + +internal open class LockFreeLinkedListSegment( + id: Long, + prev: LockFreeLinkedListSegment?, + pointers: Int, +) : Segment(id = id, prev = prev, pointers = pointers) +{ + /** Each cell is a [LockFreeLinkedListNode], a [BrokenForSomeElements], or `null`. */ + private val cells = atomicArrayOfNulls(SEGMENT_SIZE) + + override val numberOfSlots: Int get() = SEGMENT_SIZE + + fun clearSlot(index: Int, node: LockFreeLinkedListNode) { + if (cells[index].compareAndSet(node, null)) + onSlotCleaned() + } + + inline fun forEach(forbidBitmask: Byte, startIndex: Int, block: (LockFreeLinkedListNode, LockFreeLinkedListSegment, Int) -> Unit) { + for (i in startIndex until SEGMENT_SIZE) { + val node = breakCellOrGetValue(forbidBitmask, i) + if (node != null) block(node, this, i) + } + } + + private fun breakCellOrGetValue(forbidBitmask: Byte, index: Int): LockFreeLinkedListNode? { + while (true) { + val value = cells[index].value + if (value is BrokenForSomeElements?) { + val newForbiddenBits = value.forbiddenBits or forbidBitmask + if (newForbiddenBits == value.forbiddenBits + || cells[index].compareAndSet(value, BrokenForSomeElements.fromBitmask(newForbiddenBits))) + return null + } else { + return value as LockFreeLinkedListNode + } + } + } + + /** + * Adds the [node] to the array of cells if the slot wasn't broken. + */ + fun tryAdd(node: LockFreeLinkedListNode, permissionsBitmask: Byte, indexInSegment: Int): Boolean { + if (cells[indexInSegment].compareAndSet(null, node)) return true + cells[indexInSegment].loop { value -> + // This means that some elements are forbidden from entering the list. + value as BrokenForSomeElements + // Are *we* forbidden from entering the list? + if (value.forbiddenBits and permissionsBitmask != 0.toByte()) { + cells[indexInSegment].value = BrokenForSomeElements.FULLY_BROKEN + onSlotCleaned() + return false + } + // We aren't forbidden. Let's try entering it. + if (cells[indexInSegment].compareAndSet(value, node)) return true + } + } + + override fun onCancellation(index: Int, cause: Throwable?, context: CoroutineContext) { + throw UnsupportedOperationException("Cancellation is not supported on LockFreeLinkedList") + } +} + +private fun createSegment(id: Long, prev: LockFreeLinkedListSegment): LockFreeLinkedListSegment = + LockFreeLinkedListSegment( + id = id, + prev = prev, + pointers = 0, + ) + +private const val SEGMENT_SIZE = 8 + +private class BrokenForSomeElements private constructor(val forbiddenBits: Byte) { + companion object { + fun fromBitmask(forbiddenBits: Byte): BrokenForSomeElements? = when (forbiddenBits) { + 0.toByte() -> null // no one is forbidden + else -> BrokenForSomeElements(forbiddenBits) + } + + val FULLY_BROKEN = BrokenForSomeElements(255.toByte()) + } +} + +private val BrokenForSomeElements?.forbiddenBits get() = this?.forbiddenBits ?: 0 diff --git a/kotlinx-coroutines-core/concurrent/src/internal/LockFreeLinkedList.kt b/kotlinx-coroutines-core/concurrent/src/internal/LockFreeLinkedList.kt deleted file mode 100644 index a172e66c8b..0000000000 --- a/kotlinx-coroutines-core/concurrent/src/internal/LockFreeLinkedList.kt +++ /dev/null @@ -1,289 +0,0 @@ -@file:Suppress("NO_EXPLICIT_VISIBILITY_IN_API_MODE") - -package kotlinx.coroutines.internal - -import kotlinx.atomicfu.* -import kotlinx.coroutines.* -import kotlin.jvm.* - -private typealias Node = LockFreeLinkedListNode - -/** - * Doubly-linked concurrent list node with remove support. - * Based on paper - * ["Lock-Free and Practical Doubly Linked List-Based Deques Using Single-Word Compare-and-Swap"](https://citeseerx.ist.psu.edu/viewdoc/download?doi=10.1.1.140.4693&rep=rep1&type=pdf) - * by Sundell and Tsigas with considerable changes. - * - * The core idea of the algorithm is to maintain a doubly-linked list with an ever-present sentinel node (it is - * never removed) that serves both as a list head and tail and to linearize all operations (both insert and remove) on - * the update of the next pointer. Removed nodes have their next pointer marked with [Removed] class. - * - * Important notes: - * - There are no operations to add items to left side of the list, only to the end (right side), because we cannot - * efficiently linearize them with atomic multi-step head-removal operations. In short, - * support for [describeRemoveFirst] operation precludes ability to add items at the beginning. - * - Previous pointers are not marked for removal. We don't support linearizable backwards traversal. - * - Remove-helping logic is simplified and consolidated in [correctPrev] method. - * - * @suppress **This is unstable API and it is subject to change.** - */ -@Suppress("LeakingThis") -@InternalCoroutinesApi -public actual open class LockFreeLinkedListNode { - private val _next = atomic(this) // Node | Removed | OpDescriptor - private val _prev = atomic(this) // Node to the left (cannot be marked as removed) - private val _removedRef = atomic(null) // lazily cached removed ref to this - - private fun removed(): Removed = - _removedRef.value ?: Removed(this).also { _removedRef.lazySet(it) } - - public actual open val isRemoved: Boolean get() = next is Removed - - // LINEARIZABLE. Returns Node | Removed - public val next: Any get() = _next.value - - // LINEARIZABLE. Returns next non-removed Node - public actual val nextNode: Node get() = - next.let { (it as? Removed)?.ref ?: it as Node } // unwraps the `next` node - - // LINEARIZABLE WHEN THIS NODE IS NOT REMOVED: - // Returns prev non-removed Node, makes sure prev is correct (prev.next === this) - // NOTE: if this node is removed, then returns non-removed previous node without applying - // prev.next correction, which does not provide linearizable backwards iteration, but can be used to - // resume forward iteration when current node was removed. - public actual val prevNode: Node - get() = correctPrev() ?: findPrevNonRemoved(_prev.value) - - private tailrec fun findPrevNonRemoved(current: Node): Node { - if (!current.isRemoved) return current - return findPrevNonRemoved(current._prev.value) - } - - // ------ addOneIfEmpty ------ - - public actual fun addOneIfEmpty(node: Node): Boolean { - node._prev.lazySet(this) - node._next.lazySet(this) - while (true) { - val next = next - if (next !== this) return false // this is not an empty list! - if (_next.compareAndSet(this, node)) { - // added successfully (linearized add) -- fixup the list - node.finishAdd(this) - return true - } - } - } - - // ------ addLastXXX ------ - - /** - * Adds last item to this list. Returns `false` if the list is closed. - */ - public actual fun addLast(node: Node, permissionsBitmask: Int): Boolean { - while (true) { // lock-free loop on prev.next - val currentPrev = prevNode - return when { - currentPrev is ListClosed -> - currentPrev.forbiddenElementsBitmask and permissionsBitmask == 0 && - currentPrev.addLast(node, permissionsBitmask) - currentPrev.addNext(node, this) -> true - else -> continue - } - } - } - - /** - * Forbids adding new items to this list. - */ - public actual fun close(forbiddenElementsBit: Int) { - addLast(ListClosed(forbiddenElementsBit), forbiddenElementsBit) - } - - /** - * Given: - * ``` - * +-----------------------+ - * this | node V next - * +---+---+ +---+---+ +---+---+ - * ... <-- | P | N | | P | N | | P | N | --> .... - * +---+---+ +---+---+ +---+---+ - * ^ | - * +-----------------------+ - * ``` - * Produces: - * ``` - * this node next - * +---+---+ +---+---+ +---+---+ - * ... <-- | P | N | ==> | P | N | --> | P | N | --> .... - * +---+---+ +---+---+ +---+---+ - * ^ | ^ | - * +---------+ +---------+ - * ``` - * Where `==>` denotes linearization point. - * Returns `false` if `next` was not following `this` node. - */ - @PublishedApi - internal fun addNext(node: Node, next: Node): Boolean { - node._prev.lazySet(this) - node._next.lazySet(next) - if (!_next.compareAndSet(next, node)) return false - // added successfully (linearized add) -- fixup the list - node.finishAdd(next) - return true - } - - // ------ removeXXX ------ - - /** - * Removes this node from the list. Returns `true` when removed successfully, or `false` if the node was already - * removed or if it was not added to any list in the first place. - * - * **Note**: Invocation of this operation does not guarantee that remove was actually complete if result was `false`. - * In particular, invoking [nextNode].[prevNode] might still return this node even though it is "already removed". - */ - public actual open fun remove(): Boolean = - removeOrNext() == null - - // returns null if removed successfully or next node if this node is already removed - @PublishedApi - internal fun removeOrNext(): Node? { - while (true) { // lock-free loop on next - val next = this.next - if (next is Removed) return next.ref // was already removed -- don't try to help (original thread will take care) - if (next === this) return next // was not even added - val removed = (next as Node).removed() - if (_next.compareAndSet(next, removed)) { - // was removed successfully (linearized remove) -- fixup the list - next.correctPrev() - return null - } - } - } - - // This is Harris's RDCSS (Restricted Double-Compare Single Swap) operation - // It inserts "op" descriptor of when "op" status is still undecided (rolls back otherwise) - - - // ------ other helpers ------ - - /** - * Given: - * ``` - * - * prev this next - * +---+---+ +---+---+ +---+---+ - * ... <-- | P | N | --> | P | N | --> | P | N | --> .... - * +---+---+ +---+---+ +---+---+ - * ^ ^ | | - * | +---------+ | - * +-------------------------+ - * ``` - * Produces: - * ``` - * prev this next - * +---+---+ +---+---+ +---+---+ - * ... <-- | P | N | --> | P | N | --> | P | N | --> .... - * +---+---+ +---+---+ +---+---+ - * ^ | ^ | - * +---------+ +---------+ - * ``` - */ - private fun finishAdd(next: Node) { - next._prev.loop { nextPrev -> - if (this.next !== next) return // this or next was removed or another node added, remover/adder fixes up links - if (next._prev.compareAndSet(nextPrev, this)) { - // This newly added node could have been removed, and the above CAS would have added it physically again. - // Let us double-check for this situation and correct if needed - if (isRemoved) next.correctPrev() - return - } - } - } - - /** - * Returns the corrected value of the previous node while also correcting the `prev` pointer - * (so that `this.prev.next === this`) and helps complete node removals to the left ot this node. - * - * It returns `null` in two special cases: - * - * - When this node is removed. In this case there is no need to waste time on corrections, because - * remover of this node will ultimately call [correctPrev] on the next node and that will fix all - * the links from this node, too. - */ - private tailrec fun correctPrev(): Node? { - val oldPrev = _prev.value - var prev: Node = oldPrev - var last: Node? = null // will be set so that last.next === prev - while (true) { // move the left until first non-removed node - val prevNext: Any = prev._next.value - when { - // fast path to find quickly find prev node when everything is properly linked - prevNext === this -> { - if (oldPrev === prev) return prev // nothing to update -- all is fine, prev found - // otherwise need to update prev - if (!this._prev.compareAndSet(oldPrev, prev)) { - // Note: retry from scratch on failure to update prev - return correctPrev() - } - return prev // return the correct prev - } - // slow path when we need to help remove operations - this.isRemoved -> return null // nothing to do, this node was removed, bail out asap to save time - prevNext is Removed -> { - if (last !== null) { - // newly added (prev) node is already removed, correct last.next around it - if (!last._next.compareAndSet(prev, prevNext.ref)) { - return correctPrev() // retry from scratch on failure to update next - } - prev = last - last = null - } else { - prev = prev._prev.value - } - } - else -> { // prevNext is a regular node, but not this -- help delete - last = prev - prev = prevNext as Node - } - } - } - } - - internal fun validateNode(prev: Node, next: Node) { - assert { prev === this._prev.value } - assert { next === this._next.value } - } - - override fun toString(): String = "${this::classSimpleName}@${this.hexAddress}" -} - -private class Removed(@JvmField val ref: Node) { - override fun toString(): String = "Removed[$ref]" -} - -/** - * Head (sentinel) item of the linked list that is never removed. - * - * @suppress **This is unstable API and it is subject to change.** - */ -public actual open class LockFreeLinkedListHead : LockFreeLinkedListNode() { - /** - * Iterates over all elements in this list of a specified type. - */ - public actual inline fun forEach(block: (Node) -> Unit) { - var cur: Node = next as Node - while (cur != this) { - block(cur) - cur = cur.nextNode - } - } - - // just a defensive programming -- makes sure that list head sentinel is never removed - public actual final override fun remove(): Nothing = error("head cannot be removed") - - // optimization: because head is never removed, we don't have to read _next.value to check these: - override val isRemoved: Boolean get() = false -} - -private class ListClosed(@JvmField val forbiddenElementsBitmask: Int): LockFreeLinkedListNode() diff --git a/kotlinx-coroutines-core/jsAndWasmShared/src/internal/LinkedList.kt b/kotlinx-coroutines-core/jsAndWasmShared/src/internal/LinkedList.kt deleted file mode 100644 index 6810d614d1..0000000000 --- a/kotlinx-coroutines-core/jsAndWasmShared/src/internal/LinkedList.kt +++ /dev/null @@ -1,73 +0,0 @@ -@file:Suppress("unused", "NO_EXPLICIT_RETURN_TYPE_IN_API_MODE", "NO_EXPLICIT_VISIBILITY_IN_API_MODE") - -package kotlinx.coroutines.internal - -private typealias Node = LockFreeLinkedListNode - -/** @suppress **This is unstable API and it is subject to change.** */ -public actual open class LockFreeLinkedListNode { - @PublishedApi internal var _next = this - @PublishedApi internal var _prev = this - @PublishedApi internal var _removed: Boolean = false - - public actual inline val nextNode get() = _next - inline actual val prevNode get() = _prev - inline actual val isRemoved get() = _removed - - public actual fun addLast(node: Node, permissionsBitmask: Int): Boolean = when (val prev = this._prev) { - is ListClosed -> - prev.forbiddenElementsBitmask and permissionsBitmask == 0 && prev.addLast(node, permissionsBitmask) - else -> { - node._next = this - node._prev = prev - prev._next = node - this._prev = node - true - } - } - - public actual fun close(forbiddenElementsBit: Int) { - addLast(ListClosed(forbiddenElementsBit), forbiddenElementsBit) - } - - /* - * Remove that is invoked as a virtual function with a - * potentially augmented behaviour. - * I.g. `LockFreeLinkedListHead` throws, while `SendElementWithUndeliveredHandler` - * invokes handler on remove - */ - public actual open fun remove(): Boolean { - if (_removed) return false - val prev = this._prev - val next = this._next - prev._next = next - next._prev = prev - _removed = true - return true - } - - public actual fun addOneIfEmpty(node: Node): Boolean { - if (_next !== this) return false - addLast(node, Int.MIN_VALUE) - return true - } -} - -/** @suppress **This is unstable API and it is subject to change.** */ -public actual open class LockFreeLinkedListHead : Node() { - /** - * Iterates over all elements in this list of a specified type. - */ - public actual inline fun forEach(block: (Node) -> Unit) { - var cur: Node = _next - while (cur != this) { - block(cur) - cur = cur._next - } - } - - // just a defensive programming -- makes sure that list head sentinel is never removed - public actual final override fun remove(): Nothing = throw UnsupportedOperationException() -} - -private class ListClosed(val forbiddenElementsBitmask: Int): LockFreeLinkedListNode() diff --git a/kotlinx-coroutines-core/jsAndWasmShared/test/internal/LinkedListTest.kt b/kotlinx-coroutines-core/jsAndWasmShared/test/internal/LinkedListTest.kt deleted file mode 100644 index 305484f741..0000000000 --- a/kotlinx-coroutines-core/jsAndWasmShared/test/internal/LinkedListTest.kt +++ /dev/null @@ -1,42 +0,0 @@ -package kotlinx.coroutines.internal - -import kotlin.test.Test -import kotlin.test.assertEquals -import kotlin.test.assertFalse -import kotlin.test.assertTrue - -class LinkedListTest { - data class IntNode(val i: Int) : LockFreeLinkedListNode() - - @Test - fun testSimpleAddLastRemove() { - val list = LockFreeLinkedListHead() - assertContents(list) - val n1 = IntNode(1).apply { list.addLast(this, Int.MAX_VALUE) } - assertContents(list, 1) - val n2 = IntNode(2).apply { list.addLast(this, Int.MAX_VALUE) } - assertContents(list, 1, 2) - val n3 = IntNode(3).apply { list.addLast(this, Int.MAX_VALUE) } - assertContents(list, 1, 2, 3) - val n4 = IntNode(4).apply { list.addLast(this, Int.MAX_VALUE) } - assertContents(list, 1, 2, 3, 4) - assertTrue(n1.remove()) - assertContents(list, 2, 3, 4) - assertTrue(n3.remove()) - assertContents(list, 2, 4) - assertTrue(n4.remove()) - assertContents(list, 2) - assertTrue(n2.remove()) - assertFalse(n2.remove()) - assertContents(list) - } - - private fun assertContents(list: LockFreeLinkedListHead, vararg expected: Int) { - val n = expected.size - val actual = IntArray(n) - var index = 0 - list.forEach { if (it is IntNode) actual[index++] = it.i } - assertEquals(n, index) - for (i in 0 until n) assertEquals(expected[i], actual[i], "item i") - } -} diff --git a/kotlinx-coroutines-core/jvm/test/MemoryFootprintTest.kt b/kotlinx-coroutines-core/jvm/test/MemoryFootprintTest.kt index 7a3b69fda6..a499365095 100644 --- a/kotlinx-coroutines-core/jvm/test/MemoryFootprintTest.kt +++ b/kotlinx-coroutines-core/jvm/test/MemoryFootprintTest.kt @@ -13,10 +13,10 @@ class MemoryFootprintTest : TestBase(true) { @Test fun testJobSize() { - assertTotalSize(jobWithChildren(1), 112) - assertTotalSize(jobWithChildren(2), 192) // + 80 - assertTotalSize(jobWithChildren(3), 248) // + 56 - assertTotalSize(jobWithChildren(4), 304) // + 56 + assertTotalSize(jobWithChildren(1), 112) // original: 112 + assertTotalSize(jobWithChildren(2), 288) // original: 192 + assertTotalSize(jobWithChildren(3), 344) // original: 248 + assertTotalSize(jobWithChildren(4), 400) // original: 304 } private fun jobWithChildren(numberOfChildren: Int): Job { diff --git a/kotlinx-coroutines-core/jvm/test/internal/LockFreeLinkedListLongStressTest.kt b/kotlinx-coroutines-core/jvm/test/internal/LockFreeLinkedListLongStressTest.kt deleted file mode 100644 index 95be1cbe62..0000000000 --- a/kotlinx-coroutines-core/jvm/test/internal/LockFreeLinkedListLongStressTest.kt +++ /dev/null @@ -1,81 +0,0 @@ -package kotlinx.coroutines.internal - -import kotlinx.coroutines.testing.TestBase -import org.junit.Test -import java.util.* -import java.util.concurrent.atomic.AtomicInteger -import kotlin.concurrent.thread - -/** - * This stress test has 2 threads adding on one side on list, 2 more threads adding on the other, - * and 6 threads iterating and concurrently removing items. The resulting list that is being - * stressed is long. - */ -class LockFreeLinkedListLongStressTest : TestBase() { - data class IntNode(val i: Int) : LockFreeLinkedListNode() - val list = LockFreeLinkedListHead() - - val threads = mutableListOf() - private val nAdded = 10_000_000 // should not stress more, because that'll run out of memory - private val nAddThreads = 4 // must be power of 2 (!!!) - private val nRemoveThreads = 6 - private val removeProbability = 0.2 - private val workingAdders = AtomicInteger(nAddThreads) - - private fun shallRemove(i: Int) = i and 63 != 42 - - @Test - fun testStress() { - println("--- LockFreeLinkedListLongStressTest") - for (j in 0 until nAddThreads) - threads += thread(start = false, name = "adder-$j") { - for (i in j until nAdded step nAddThreads) { - list.addLast(IntNode(i), Int.MAX_VALUE) - } - println("${Thread.currentThread().name} completed") - workingAdders.decrementAndGet() - } - for (j in 0 until nRemoveThreads) - threads += thread(start = false, name = "remover-$j") { - val rnd = Random() - do { - val lastTurn = workingAdders.get() == 0 - list.forEach { node -> - if (node is IntNode && shallRemove(node.i) && (lastTurn || rnd.nextDouble() < removeProbability)) - node.remove() - } - } while (!lastTurn) - println("${Thread.currentThread().name} completed") - } - println("Starting ${threads.size} threads") - for (thread in threads) - thread.start() - println("Joining threads") - for (thread in threads) - thread.join() - // verification - println("Verify result") - list.validate() - val expected = iterator { - for (i in 0 until nAdded) - if (!shallRemove(i)) - yield(i) - } - list.forEach { node -> - require(node !is IntNode || node.i == expected.next()) - } - require(!expected.hasNext()) - } - - private fun LockFreeLinkedListHead.validate() { - var prev: LockFreeLinkedListNode = this - var cur: LockFreeLinkedListNode = next as LockFreeLinkedListNode - while (cur != this) { - val next = cur.nextNode - cur.validateNode(prev, next) - prev = cur - cur = next - } - validateNode(prev, next as LockFreeLinkedListNode) - } -}