diff --git a/.changeset/silver-tools-collect.md b/.changeset/silver-tools-collect.md new file mode 100644 index 0000000000..4464afd4e6 --- /dev/null +++ b/.changeset/silver-tools-collect.md @@ -0,0 +1,20 @@ +--- +"effect": patch +--- + +added Stream.mergeStruct + +Combines a struct of streams into a single stream of tagged values where the tag is the key of the struct. + +```ts +import { Stream } from "effect" + +// Stream.Stream<{ _tag: "a"; value: number; } | { _tag: "b"; value: string; }> +const stream = Stream.mergeStruct( + { + a: Stream.make(0), + b: Stream.make("") + }, + { concurrency: 1 } +) +``` diff --git a/packages/effect/dtslint/Stream.ts b/packages/effect/dtslint/Stream.ts index 92337a288f..2d2c4534ef 100644 --- a/packages/effect/dtslint/Stream.ts +++ b/packages/effect/dtslint/Stream.ts @@ -1,6 +1,7 @@ import { pipe } from "effect/Function" import * as Predicate from "effect/Predicate" import * as Stream from "effect/Stream" +import { Cause } from "../src/index.js" declare const numbers: Stream.Stream declare const numbersOrStrings: Stream.Stream @@ -249,3 +250,13 @@ Stream.zipLatestAll(numbers, numbersOrStrings) // $ExpectType Stream<[number, string | number, never], Error, never> Stream.zipLatestAll(numbers, numbersOrStrings, Stream.fail(new Error(""))) + +// ------------------------------------------------------------------------------------- +// merge +// ------------------------------------------------------------------------------------- + +// $ExpectType Stream<{ _tag: "a"; value: number; } | { _tag: "b"; value: string; }, NoSuchElementException, never> +Stream.mergeStruct({ + a: Stream.make(0).pipe(Stream.tap(() => new Cause.NoSuchElementException())), + b: Stream.make("") +}, { concurrency: 1 }) diff --git a/packages/effect/src/Stream.ts b/packages/effect/src/Stream.ts index adbc0c9242..1a36b76b18 100644 --- a/packages/effect/src/Stream.ts +++ b/packages/effect/src/Stream.ts @@ -2895,6 +2895,40 @@ export const mergeAll: { ): Stream } = internal.mergeAll +/** + * Merges a struct of streams into a single stream of tagged values. + * @category combinators + * @since 3.8.5 + * + * @example + * // Stream.Stream<{ _tag: "a"; value: number; } | { _tag: "b"; value: string; }> + * const res = mergeStruct({ + * a: Stream.make(0), + * b: Stream.make("") + * }) + */ +export const mergeStruct: { + }>( + streams: S, + options: { + readonly concurrency: number | "unbounded" + readonly bufferSize?: number | undefined + } + ): Stream< + { [K in keyof S]: { _tag: K; value: Stream.Success } }[keyof S], + Stream.Error, + Stream.Context + > + (options: { + readonly concurrency: number | "unbounded" + readonly bufferSize?: number | undefined + }): }>(streams: S) => Stream< + { [K in keyof S]: { _tag: K; value: Stream.Success } }[keyof S], + Stream.Error, + Stream.Context + > +} = internal.mergeStruct + /** * Merges this stream and the specified stream together to a common element * type with the specified mapping functions. diff --git a/packages/effect/src/internal/stream.ts b/packages/effect/src/internal/stream.ts index 3a4b6d0bb9..60707538ed 100644 --- a/packages/effect/src/internal/stream.ts +++ b/packages/effect/src/internal/stream.ts @@ -4107,6 +4107,33 @@ export const mergeAll = dual< }) => Stream.Stream >((args) => Symbol.iterator in args[0], (streams, options) => flatten(fromIterable(streams), options)) +/** @internal */ +export const mergeStruct: { + }>( + streams: S, + options: { + readonly concurrency: number | "unbounded" + readonly bufferSize?: number | undefined + } + ): Stream.Stream< + { [K in keyof S]: { _tag: K; value: Stream.Stream.Success } }[keyof S], + Stream.Stream.Error, + Stream.Stream.Context + > + (options: { + readonly concurrency: number | "unbounded" + readonly bufferSize?: number | undefined + }): }>(streams: S) => Stream.Stream< + { [K in keyof S]: { _tag: K; value: Stream.Stream.Success } }[keyof S], + Stream.Stream.Error, + Stream.Stream.Context + > +} = dual(2, (streams, options) => { + const keys = Object.keys(streams) + const values = keys.map((key) => streams[key].pipe(map((value) => ({ _tag: key, value })))) as any + return mergeAll(values, options) +}) + /** @internal */ export const mergeEither = dual< ( diff --git a/packages/effect/test/Stream/merging.test.ts b/packages/effect/test/Stream/merging.test.ts index e517975005..a48b1cf7e4 100644 --- a/packages/effect/test/Stream/merging.test.ts +++ b/packages/effect/test/Stream/merging.test.ts @@ -38,6 +38,20 @@ describe("Stream", () => { assert.deepStrictEqual(Array.from(result), [1]) })) + it.effect("mergeStruct", (ctx) => + Effect.gen(function*() { + const stream = Stream.mergeStruct({ + a: Stream.make(0), + b: Stream.make("") + }, { concurrency: 1 }) + + const res = Chunk.toArray(yield* Stream.runCollect(stream)) + ctx.expect(res).toEqual([ + { _tag: "a", value: 0 }, + { _tag: "b", value: "" } + ]) + })) + it.effect("mergeHaltLeft - terminates as soon as the first stream terminates", () => Effect.gen(function*($) { const queue1 = yield* $(Queue.unbounded())