Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Flow size- and time-based chunked #2378

Open
wants to merge 28 commits into
base: develop
Choose a base branch
from

Conversation

circusmagnus
Copy link

@circusmagnus circusmagnus commented Nov 10, 2020

Add two operators for size- and time-based chunking of Flows. Targets issues: #1290
#1302

New operators:
public fun <T> Flow<T>.chunked(maxSize: Int, minSize: Int = 1)

public fun <T> Flow<T>.chunked( chunkDuration: Duration, minSize: Int = 1, maxSize: Int = NO_MAXIMUM ): Flow<List<T>>

public fun <T> Flow<T>.chunked( chunkDurationMs: Long, minSize: Int = 1, maxSize: Int = NO_MAXIMUM ): Flow<List<T>>

Time based impl:

  • By default starts chunking values with first emission since start of collection or last chunked emission (non-continuous time windows)
  • With minSize = 0 switches to continuous time-windows: Starts chunking right away. New chunking window starts right after previous
  • Can specify max size of a chunk. If specified, upon reaching max size, it emits chunked values and resets timer.

To be added:

@circusmagnus circusmagnus changed the title Flow time based chunked Flow size- and time-based chunked Nov 10, 2020
@elizarov
Copy link
Contributor

Please, please, read contributing guidelines first: https://github.com/Kotlin/kotlinx.coroutines/blob/master/CONTRIBUTING.md

If you introduce any new public APIs:

  • All new APIs must come with documentation and tests.
  • All new APIs are initially released with @ExperimentalCoroutineApi annotation and are graduated later.
  • Update the public API dumps and commit the resulting changes as well. It will not pass the tests otherwise.
  • If you plan large API additions, then please start by submitting an issue with the proposed API design
    to gather community feedback.
  • Contact the maintainers to coordinate any big piece of work in advance.

Copy link
Contributor

@elizarov elizarov left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This PR does not follow contributing guidelines: it has no docs and it does not have a community-discussed design.

@circusmagnus
Copy link
Author

Sure. I have initiated discussion in #1302 (comment)

Commits with lacking docs, annotations and api dump will follow shortly.

@circusmagnus circusmagnus marked this pull request as draft November 13, 2020 13:25
@circusmagnus
Copy link
Author

I am converting this PR to Draft until we reach some outcome in #1302 discussion.

@circusmagnus circusmagnus changed the base branch from master to develop April 15, 2021 21:20
@@ -7,7 +7,7 @@
[![Slack channel](https://img.shields.io/badge/chat-slack-green.svg?logo=slack)](https://kotlinlang.slack.com/messages/coroutines/)
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This file has been apparently changed on master but not on develop - with PR: Improve readability. #2563

@circusmagnus
Copy link
Author

I have:

  • Updated the solution
  • Added @experimental annotations
  • Updated ApiDump

@circusmagnus circusmagnus marked this pull request as ready for review April 16, 2021 04:34
@nikiJava
Copy link

nikiJava commented Dec 1, 2022

Hi :) Are there any updates? The operator looks very handy and useful.

@gregsh
Copy link

gregsh commented Jan 23, 2023

It looks to me that using onTime select clause in the provided implementation chunked(ByTime) is not efficient. It keeps executing even when the source flow is really idle.

For example, if one tries to "chunk" mouse clicks, the code in onTimeout(3.seconds) keeps running each 3 seconds regardless of whether user really clicks the mouse or just left the app running for a night.

Also, in the scenario above we probably want to buffer events while user keeps clicking and emit that buffer only when user stops. I see 2 options how to "chunk" here. We may want to a) emit something right away and cancel later as collectLatest does (responsive UI) or b) just buffer until there is duration-long silence. There are no such options in this PR.

val elements = produce<T>(capacity = maxSize) {
collect { element ->
val hasCapacity = channel.trySend(element).isSuccess
if (!hasCapacity) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what do you think about size instead of maxSize like SizedBased? when channel reached to size it should be emit

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants