Skip to content

Commit

Permalink
added Stream.mergeStruct
Browse files Browse the repository at this point in the history
  • Loading branch information
jessekelly881 committed Sep 28, 2024
1 parent 3b5b332 commit ecad546
Show file tree
Hide file tree
Showing 5 changed files with 106 additions and 0 deletions.
20 changes: 20 additions & 0 deletions .changeset/silver-tools-collect.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
---
"effect": patch
---

added Stream.mergeStruct

Combines a struct of streams into a single stream of tagged values where the tag is the key of the struct.

```ts
import { Stream } from "effect"

// Stream.Stream<{ _tag: "a"; value: number; } | { _tag: "b"; value: string; }>
const stream = Stream.mergeStruct(
{
a: Stream.make(0),
b: Stream.make("")
},
{ concurrency: 1 }
)
```
11 changes: 11 additions & 0 deletions packages/effect/dtslint/Stream.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { pipe } from "effect/Function"
import * as Predicate from "effect/Predicate"
import * as Stream from "effect/Stream"
import { Cause } from "../src/index.js"

declare const numbers: Stream.Stream<number>
declare const numbersOrStrings: Stream.Stream<number | string>
Expand Down Expand Up @@ -249,3 +250,13 @@ Stream.zipLatestAll(numbers, numbersOrStrings)

// $ExpectType Stream<[number, string | number, never], Error, never>
Stream.zipLatestAll(numbers, numbersOrStrings, Stream.fail(new Error("")))

// -------------------------------------------------------------------------------------
// merge
// -------------------------------------------------------------------------------------

// $ExpectType Stream<{ _tag: "a"; value: number; } | { _tag: "b"; value: string; }, NoSuchElementException, never>
Stream.mergeStruct({
a: Stream.make(0).pipe(Stream.tap(() => new Cause.NoSuchElementException())),
b: Stream.make("")
}, { concurrency: 1 })
34 changes: 34 additions & 0 deletions packages/effect/src/Stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2895,6 +2895,40 @@ export const mergeAll: {
): Stream<A, E, R>
} = internal.mergeAll

/**
* Merges a struct of streams into a single stream of tagged values.
* @category combinators
* @since 3.8.5
*
* @example
* // Stream.Stream<{ _tag: "a"; value: number; } | { _tag: "b"; value: string; }>
* const res = mergeStruct({
* a: Stream.make(0),
* b: Stream.make("")
* })
*/
export const mergeStruct: {
<S extends { [k in string]: Stream<any, any, any> }>(
streams: S,
options: {
readonly concurrency: number | "unbounded"
readonly bufferSize?: number | undefined
}
): Stream<
{ [K in keyof S]: { _tag: K; value: Stream.Success<S[K]> } }[keyof S],
Stream.Error<S[keyof S]>,
Stream.Context<S[keyof S]>
>
(options: {
readonly concurrency: number | "unbounded"
readonly bufferSize?: number | undefined
}): <S extends { [k in string]: Stream<any, any, any> }>(streams: S) => Stream<
{ [K in keyof S]: { _tag: K; value: Stream.Success<S[K]> } }[keyof S],
Stream.Error<S[keyof S]>,
Stream.Context<S[keyof S]>
>
} = internal.mergeStruct

/**
* Merges this stream and the specified stream together to a common element
* type with the specified mapping functions.
Expand Down
27 changes: 27 additions & 0 deletions packages/effect/src/internal/stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4107,6 +4107,33 @@ export const mergeAll = dual<
}) => Stream.Stream<A, E, R>
>((args) => Symbol.iterator in args[0], (streams, options) => flatten(fromIterable(streams), options))
/** @internal */
export const mergeStruct: {
<S extends { [k in string]: Stream.Stream<any, any, any> }>(
streams: S,
options: {
readonly concurrency: number | "unbounded"
readonly bufferSize?: number | undefined
}
): Stream.Stream<
{ [K in keyof S]: { _tag: K; value: Stream.Stream.Success<S[K]> } }[keyof S],
Stream.Stream.Error<S[keyof S]>,
Stream.Stream.Context<S[keyof S]>
>
(options: {
readonly concurrency: number | "unbounded"
readonly bufferSize?: number | undefined
}): <S extends { [k in string]: Stream.Stream<any, any, any> }>(streams: S) => Stream.Stream<
{ [K in keyof S]: { _tag: K; value: Stream.Stream.Success<S[K]> } }[keyof S],
Stream.Stream.Error<S[keyof S]>,
Stream.Stream.Context<S[keyof S]>
>
} = dual(2, (streams, options) => {
const keys = Object.keys(streams)
const values = keys.map((key) => streams[key].pipe(map((value) => ({ _tag: key, value })))) as any
return mergeAll(values, options)
})
/** @internal */
export const mergeEither = dual<
<A2, E2, R2>(
Expand Down
14 changes: 14 additions & 0 deletions packages/effect/test/Stream/merging.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,20 @@ describe("Stream", () => {
assert.deepStrictEqual(Array.from(result), [1])
}))

it.effect("mergeStruct", (ctx) =>
Effect.gen(function*() {
const stream = Stream.mergeStruct({
a: Stream.make(0),
b: Stream.make("")
}, { concurrency: 1 })

const res = Chunk.toArray(yield* Stream.runCollect(stream))
ctx.expect(res).toEqual([
{ _tag: "a", value: 0 },
{ _tag: "b", value: "" }
])
}))

it.effect("mergeHaltLeft - terminates as soon as the first stream terminates", () =>
Effect.gen(function*($) {
const queue1 = yield* $(Queue.unbounded<number>())
Expand Down

0 comments on commit ecad546

Please sign in to comment.