From b5c545f72e45713c04832a249330ec2c96eef9b0 Mon Sep 17 00:00:00 2001 From: kixelated Date: Fri, 3 Nov 2023 13:19:52 +0900 Subject: [PATCH] Initial moq-transport-01 support (#73) --- lib/contribute/broadcast.ts | 12 +- lib/media/catalog/index.ts | 6 +- lib/playback/mse/index.ts | 6 +- lib/playback/webcodecs/worker.ts | 6 +- lib/transport/client.ts | 2 +- lib/transport/connection.ts | 17 +- lib/transport/control.ts | 292 ++++++++++++++++++++++++------- lib/transport/object.ts | 46 ++--- lib/transport/publisher.ts | 35 +++- lib/transport/setup.ts | 92 +++++++--- lib/transport/stream.ts | 2 + lib/transport/subscriber.ts | 46 ++++- web/src/layouts/global.astro | 5 +- 13 files changed, 423 insertions(+), 144 deletions(-) diff --git a/lib/contribute/broadcast.ts b/lib/contribute/broadcast.ts index 6bf83a1..ee33e66 100644 --- a/lib/contribute/broadcast.ts +++ b/lib/contribute/broadcast.ts @@ -134,9 +134,9 @@ export class Broadcast { await subscriber.ack() const stream = await subscriber.data({ - sequence: 0, - priority: 0, // TODO Highest priority - expires: 0, // never expires + group: 0, + object: 0, + priority: 0, }) const writer = stream.getWriter() @@ -164,7 +164,8 @@ export class Broadcast { // Create a new stream for each segment. const stream = await subscriber.data({ - sequence: 0, + group: 0, + object: 0, priority: 0, // TODO expires: 0, // Never expires }) @@ -209,7 +210,8 @@ export class Broadcast { async #serveSegment(subscriber: SubscribeRecv, segment: Segment) { // Create a new stream for each segment. const stream = await subscriber.data({ - sequence: segment.id, + group: segment.id, + object: 0, priority: 0, // TODO expires: 30, // TODO configurable }) diff --git a/lib/media/catalog/index.ts b/lib/media/catalog/index.ts index 22ebb40..b8e0ee9 100644 --- a/lib/media/catalog/index.ts +++ b/lib/media/catalog/index.ts @@ -34,7 +34,11 @@ export class Catalog { const { header, stream } = segment - if (header.sequence !== 0) { + if (header.group !== 0) { + throw new Error("TODO updates not supported") + } + + if (header.object !== 0) { throw new Error("TODO delta updates not supported") } diff --git a/lib/playback/mse/index.ts b/lib/playback/mse/index.ts index 3e3efc4..603cf4b 100644 --- a/lib/playback/mse/index.ts +++ b/lib/playback/mse/index.ts @@ -138,7 +138,11 @@ export default class Player { track = this.#audio } - const segment = new Segment(track.source, init, msg.header.sequence) + if (msg.header.object !== 0) { + throw new Error("multiple objects per group not supported") + } + + const segment = new Segment(track.source, init, msg.header.group) track.add(segment) const container = new MP4.Parser() diff --git a/lib/playback/webcodecs/worker.ts b/lib/playback/webcodecs/worker.ts index 9f7f85b..af7733a 100644 --- a/lib/playback/webcodecs/worker.ts +++ b/lib/playback/webcodecs/worker.ts @@ -71,10 +71,14 @@ class Worker { const timeline = msg.kind === "audio" ? this.#timeline.audio : this.#timeline.video + if (msg.header.object !== 0) { + throw new Error("multiple objects per group not supported") + } + // Add the segment to the timeline const segments = timeline.segments.getWriter() await segments.write({ - sequence: msg.header.sequence, + sequence: msg.header.group, frames: container.decode.readable, }) segments.releaseLock() diff --git a/lib/transport/client.ts b/lib/transport/client.ts index 99f878a..62c63cc 100644 --- a/lib/transport/client.ts +++ b/lib/transport/client.ts @@ -47,7 +47,7 @@ export class Client { const setup = new Setup.Stream(reader, writer) // Send the setup message. - await setup.send.client({ versions: [Setup.Version.KIXEL_00], role: this.config.role }) + await setup.send.client({ versions: [Setup.Version.KIXEL_01], role: this.config.role }) // Receive the setup message. // TODO verify the SETUP response. diff --git a/lib/transport/connection.ts b/lib/transport/connection.ts index cf850a0..dd3a158 100644 --- a/lib/transport/connection.ts +++ b/lib/transport/connection.ts @@ -77,19 +77,10 @@ export class Connection { } async #recv(msg: Control.Message) { - switch (msg.kind) { - case Control.Msg.Announce: - return this.#subscriber.recvAnnounce(msg) - case Control.Msg.AnnounceOk: - return this.#publisher.recvAnnounceOk(msg) - case Control.Msg.AnnounceReset: - return this.#publisher.recvAnnounceReset(msg) - case Control.Msg.Subscribe: - return this.#publisher.recvSubscribe(msg) - case Control.Msg.SubscribeOk: - return this.#subscriber.recvSubscribeOk(msg) - case Control.Msg.SubscribeReset: - return this.#subscriber.recvSubscribeReset(msg) + if (Control.isPublisher(msg)) { + await this.#subscriber.recv(msg) + } else { + await this.#publisher.recv(msg) } } diff --git a/lib/transport/control.ts b/lib/transport/control.ts index cc13f48..d832f2a 100644 --- a/lib/transport/control.ts +++ b/lib/transport/control.ts @@ -1,25 +1,45 @@ import { Reader, Writer } from "./stream" export type Message = Subscriber | Publisher -export type Subscriber = Subscribe | SubscribeEnd | AnnounceOk | AnnounceReset -export type Publisher = SubscribeOk | SubscribeReset | Announce | AnnounceEnd + +// Sent by subscriber +export type Subscriber = Subscribe | Unsubscribe | AnnounceOk | AnnounceError + +export function isSubscriber(m: Message): m is Subscriber { + return ( + m.kind == Msg.Subscribe || m.kind == Msg.Unsubscribe || m.kind == Msg.AnnounceOk || m.kind == Msg.AnnounceError + ) +} + +// Sent by publisher +export type Publisher = SubscribeOk | SubscribeReset | SubscribeError | SubscribeFin | Announce | Unannounce + +export function isPublisher(m: Message): m is Publisher { + return ( + m.kind == Msg.SubscribeOk || + m.kind == Msg.SubscribeReset || + m.kind == Msg.SubscribeError || + m.kind == Msg.SubscribeFin || + m.kind == Msg.Announce || + m.kind == Msg.Unannounce + ) +} // I wish we didn't have to split Msg and Id into separate enums. // However using the string in the message makes it easier to debug. // We'll take the tiny performance hit until I'm better at Typescript. export enum Msg { // NOTE: object and setup are in other modules - // Object = 0, - // Setup = 1, - Subscribe = "subscribe", SubscribeOk = "subscribe_ok", - SubscribeReset = "subscribe_reset", // error termination by the publisher - SubscribeEnd = "subscribe_end", // clean termination by the subscriber + SubscribeError = "subscribe_error", + SubscribeReset = "subscribe_reset", + SubscribeFin = "subscribe_fin", + Unsubscribe = "unsubscribe", Announce = "announce", AnnounceOk = "announce_ok", - AnnounceReset = "announce_reset", // error termination by the subscriber - AnnounceEnd = "announce_end", // clean termination by the publisher + AnnounceError = "announce_error", + Unannounce = "unannounce", GoAway = "go_away", } @@ -30,32 +50,43 @@ enum Id { Subscribe = 0x3, SubscribeOk = 0x4, - SubscribeReset = 0x5, // error termination by the publisher - SubscribeEnd = 0x15, // clean termination by the subscriber + SubscribeError = 0x5, + SubscribeReset = 0xc, + SubscribeFin = 0xb, + Unsubscribe = 0xa, Announce = 0x6, AnnounceOk = 0x7, - AnnounceReset = 0x8, // error termination by the subscriber - AnnounceEnd = 0x18, // clean termination by the publisher + AnnounceError = 0x8, + Unannounce = 0x9, GoAway = 0x10, } -// NOTE: These are forked from moq-transport-00. -// 1. subscribe specifies the track_id, not subscribe_ok -// 2. messages lack a specified length -// 3. optional parameters are not supported (announce, subscribe) -// 4. not allowed on undirectional streams; only after SETUP on the bidirectional stream - export interface Subscribe { kind: Msg.Subscribe id: bigint namespace: string name: string + + start_group: Location + start_object: Location + end_group: Location + end_object: Location + + params?: Parameters +} + +export interface Location { + mode: "none" | "absolute" | "latest" | "future" + value?: number // ignored for type=none, otherwise defaults to 0 } +export type Parameters = Map + export interface SubscribeOk { kind: Msg.SubscribeOk id: bigint + expires?: bigint } export interface SubscribeReset { @@ -63,16 +94,33 @@ export interface SubscribeReset { id: bigint code: bigint reason: string + final_group: number + final_object: number +} + +export interface SubscribeFin { + kind: Msg.SubscribeFin + id: bigint + final_group: number + final_object: number +} + +export interface SubscribeError { + kind: Msg.SubscribeError + id: bigint + code: bigint + reason: string } -export interface SubscribeEnd { - kind: Msg.SubscribeEnd +export interface Unsubscribe { + kind: Msg.Unsubscribe id: bigint } export interface Announce { kind: Msg.Announce namespace: string + params?: Parameters } export interface AnnounceOk { @@ -80,15 +128,15 @@ export interface AnnounceOk { namespace: string } -export interface AnnounceReset { - kind: Msg.AnnounceReset +export interface AnnounceError { + kind: Msg.AnnounceError namespace: string code: bigint reason: string } -export interface AnnounceEnd { - kind: Msg.AnnounceEnd +export interface Unannounce { + kind: Msg.Unannounce namespace: string } @@ -154,16 +202,20 @@ export class Decoder { return Msg.SubscribeOk case Id.SubscribeReset: return Msg.SubscribeReset - case Id.SubscribeEnd: - return Msg.SubscribeEnd + case Id.SubscribeFin: + return Msg.SubscribeFin + case Id.SubscribeError: + return Msg.SubscribeError + case Id.Unsubscribe: + return Msg.Unsubscribe case Id.Announce: return Msg.Announce case Id.AnnounceOk: return Msg.AnnounceOk - case Id.AnnounceReset: - return Msg.AnnounceReset - case Id.AnnounceEnd: - return Msg.AnnounceEnd + case Id.AnnounceError: + return Msg.AnnounceError + case Id.Unannounce: + return Msg.Unannounce case Id.GoAway: return Msg.GoAway } @@ -180,38 +232,80 @@ export class Decoder { return this.subscribe_ok() case Msg.SubscribeReset: return this.subscribe_reset() - case Msg.SubscribeEnd: - return this.subscribe_end() + case Msg.SubscribeError: + return this.subscribe_error() + case Msg.SubscribeFin: + return this.subscribe_fin() + case Msg.Unsubscribe: + return this.unsubscribe() case Msg.Announce: return this.announce() case Msg.AnnounceOk: return this.announce_ok() - case Msg.AnnounceReset: - return this.announce_reset() - case Msg.AnnounceEnd: - return this.announce_end() + case Msg.Unannounce: + return this.unannounce() + case Msg.AnnounceError: + return this.announce_error() case Msg.GoAway: throw new Error("TODO: implement go away") } } private async subscribe(): Promise { - const id = await this.r.u62() - const namespace = await this.r.string() - const name = await this.r.string() - return { kind: Msg.Subscribe, - id, - namespace, - name, + id: await this.r.u62(), + namespace: await this.r.string(), + name: await this.r.string(), + start_group: await this.location(), + start_object: await this.location(), + end_group: await this.location(), + end_object: await this.location(), + params: await this.parameters(), + } + } + + private async location(): Promise { + const mode = await this.r.u62() + if (mode == 0n) { + return { mode: "none", value: 0 } + } else if (mode == 1n) { + return { mode: "absolute", value: await this.r.u53() } + } else if (mode == 2n) { + return { mode: "latest", value: await this.r.u53() } + } else if (mode == 3n) { + return { mode: "future", value: await this.r.u53() } + } else { + throw new Error(`invalid location mode: ${mode}`) + } + } + + private async parameters(): Promise { + const count = await this.r.u53() + if (count == 0) return undefined + + const params = new Map() + + for (let i = 0; i < count; i++) { + const id = await this.r.u62() + const size = await this.r.u53() + const value = await this.r.readExact(size) + + if (params.has(id)) { + throw new Error(`duplicate parameter id: ${id}`) + } + + params.set(id, value) } + + return params } private async subscribe_ok(): Promise { return { kind: Msg.SubscribeOk, id: await this.r.u62(), + expires: (await this.r.u62()) || undefined, } } @@ -221,12 +315,32 @@ export class Decoder { id: await this.r.u62(), code: await this.r.u62(), reason: await this.r.string(), + final_group: await this.r.u53(), + final_object: await this.r.u53(), + } + } + + private async subscribe_fin(): Promise { + return { + kind: Msg.SubscribeFin, + id: await this.r.u62(), + final_group: await this.r.u53(), + final_object: await this.r.u53(), + } + } + + private async subscribe_error(): Promise { + return { + kind: Msg.SubscribeError, + id: await this.r.u62(), + code: await this.r.u62(), + reason: await this.r.string(), } } - private async subscribe_end(): Promise { + private async unsubscribe(): Promise { return { - kind: Msg.SubscribeEnd, + kind: Msg.Unsubscribe, id: await this.r.u62(), } } @@ -237,6 +351,7 @@ export class Decoder { return { kind: Msg.Announce, namespace, + params: await this.parameters(), } } @@ -247,18 +362,18 @@ export class Decoder { } } - private async announce_reset(): Promise { + private async announce_error(): Promise { return { - kind: Msg.AnnounceReset, + kind: Msg.AnnounceError, namespace: await this.r.string(), code: await this.r.u62(), reason: await this.r.string(), } } - private async announce_end(): Promise { + private async unannounce(): Promise { return { - kind: Msg.AnnounceEnd, + kind: Msg.Unannounce, namespace: await this.r.string(), } } @@ -279,16 +394,20 @@ export class Encoder { return this.subscribe_ok(m) case Msg.SubscribeReset: return this.subscribe_reset(m) - case Msg.SubscribeEnd: - return this.subscribe_end(m) + case Msg.SubscribeError: + return this.subscribe_error(m) + case Msg.SubscribeFin: + return this.subscribe_fin(m) + case Msg.Unsubscribe: + return this.unsubscribe(m) case Msg.Announce: return this.announce(m) case Msg.AnnounceOk: return this.announce_ok(m) - case Msg.AnnounceReset: - return this.announce_reset(m) - case Msg.AnnounceEnd: - return this.announce_end(m) + case Msg.AnnounceError: + return this.announce_error(m) + case Msg.Unannounce: + return this.unannounce(m) } } @@ -297,11 +416,46 @@ export class Encoder { await this.w.u62(s.id) await this.w.string(s.namespace) await this.w.string(s.name) + await this.location(s.start_group) + await this.location(s.start_object) + await this.location(s.end_group) + await this.location(s.end_object) + await this.parameters(s.params) + } + + private async location(l: Location) { + if (l.mode == "none") { + await this.w.u8(0) + } else if (l.mode == "absolute") { + await this.w.u8(1) + await this.w.u53(l.value ?? 0) + } else if (l.mode == "latest") { + await this.w.u8(2) + await this.w.u53(l.value ?? 0) + } else if (l.mode == "future") { + await this.w.u8(3) + await this.w.u53(l.value ?? 0) + } + } + + private async parameters(p: Parameters | undefined) { + if (!p) { + await this.w.u8(0) + return + } + + await this.w.u53(p.size) + for (const [id, value] of p) { + await this.w.u62(id) + await this.w.u53(value.length) + await this.w.write(value) + } } async subscribe_ok(s: SubscribeOk) { await this.w.u53(Id.SubscribeOk) await this.w.u62(s.id) + await this.w.u62(s.expires ?? 0n) } async subscribe_reset(s: SubscribeReset) { @@ -309,10 +463,24 @@ export class Encoder { await this.w.u62(s.id) await this.w.u62(s.code) await this.w.string(s.reason) + await this.w.u53(s.final_group) + await this.w.u53(s.final_object) + } + + async subscribe_fin(s: SubscribeFin) { + await this.w.u53(Id.SubscribeFin) + await this.w.u62(s.id) + await this.w.u53(s.final_group) + await this.w.u53(s.final_object) + } + + async subscribe_error(s: SubscribeError) { + await this.w.u53(Id.SubscribeError) + await this.w.u62(s.id) } - async subscribe_end(s: SubscribeEnd) { - await this.w.u53(Id.SubscribeEnd) + async unsubscribe(s: Unsubscribe) { + await this.w.u53(Id.Unsubscribe) await this.w.u62(s.id) } @@ -326,15 +494,15 @@ export class Encoder { await this.w.string(a.namespace) } - async announce_reset(a: AnnounceReset) { - await this.w.u53(Id.AnnounceReset) + async announce_error(a: AnnounceError) { + await this.w.u53(Id.AnnounceError) await this.w.string(a.namespace) await this.w.u62(a.code) await this.w.string(a.reason) } - async announce_end(a: AnnounceEnd) { - await this.w.u53(Id.AnnounceEnd) + async unannounce(a: Unannounce) { + await this.w.u53(Id.Unannounce) await this.w.string(a.namespace) } } diff --git a/lib/transport/object.ts b/lib/transport/object.ts index 7913d4e..c1e0924 100644 --- a/lib/transport/object.ts +++ b/lib/transport/object.ts @@ -3,16 +3,13 @@ export { Reader, Writer } // This is OBJECT but we can't use that name because it's a reserved word. -// NOTE: This is forked from moq-transport-00. -// 1. messages lack a specified length -// 2. OBJECT must be the only message on a unidirectional stream - export interface Header { track: bigint - sequence: number // To make it easier, we don't use a bigint here. - priority: number // i32 - expires: number // 0 means never - // followed by payload + group: number // The group sequence, as a number because 2^53 is enough. + object: number // The object sequence within a group, as a number because 2^53 is enough. + priority: number // VarInt with a u32 maximum value + expires?: number // optional: expiration in seconds + size?: number // optional: size of payload, otherwise it continues until end of stream } export class Objects { @@ -39,6 +36,10 @@ export class Objects { const stream = value const header = await this.#decode(stream) + if (header.size) { + throw new Error("TODO: handle OBJECT with size") + } + //console.debug("received object: ", header) return { header, stream } } @@ -47,27 +48,30 @@ export class Objects { const r = new Reader(s) const type = await r.u8() - if (type !== 0) throw new Error(`OBJECT type must be 0, got ${type}`) + if (type !== 0 && type !== 2) { + throw new Error(`invalid OBJECT type, got ${type}`) + } - const track = await r.u62() - const sequence = await r.u53() - const priority = await r.i32() - const expires = await r.u53() + const has_size = type === 2 return { - track, - sequence, - priority, - expires, + track: await r.u62(), + group: await r.u53(), + object: await r.u53(), + priority: await r.u53(), + expires: (await r.u53()) || undefined, + size: has_size ? await r.u53() : undefined, } } async #encode(s: WritableStream, h: Header) { const w = new Writer(s) - await w.u8(0) + await w.u8(h.size ? 2 : 0) await w.u62(h.track) - await w.u53(h.sequence) - await w.i32(h.priority) - await w.u53(h.expires) + await w.u53(h.group) + await w.u53(h.object) + await w.u53(h.priority) + await w.u53(h.expires ?? 0) + if (h.size) await w.u53(h.size) } } diff --git a/lib/transport/publisher.ts b/lib/transport/publisher.ts index fd0f938..cd9c911 100644 --- a/lib/transport/publisher.ts +++ b/lib/transport/publisher.ts @@ -43,6 +43,20 @@ export class Publisher { return await this.#subscribeQueue.next() } + async recv(msg: Control.Subscriber) { + if (msg.kind == Control.Msg.Subscribe) { + await this.recvSubscribe(msg) + } else if (msg.kind == Control.Msg.Unsubscribe) { + this.recvUnsubscribe(msg) + } else if (msg.kind == Control.Msg.AnnounceOk) { + this.recvAnnounceOk(msg) + } else if (msg.kind == Control.Msg.AnnounceError) { + this.recvAnnounceError(msg) + } else { + throw new Error(`unknown control message`) // impossible + } + } + recvAnnounceOk(msg: Control.AnnounceOk) { const announce = this.#announce.get(msg.namespace) if (!announce) { @@ -52,7 +66,7 @@ export class Publisher { announce.onOk() } - recvAnnounceReset(msg: Control.AnnounceReset) { + recvAnnounceError(msg: Control.AnnounceError) { const announce = this.#announce.get(msg.namespace) if (!announce) { // TODO debug this @@ -74,6 +88,10 @@ export class Publisher { await this.#control.send({ kind: Control.Msg.SubscribeOk, id: msg.id }) } + + recvUnsubscribe(_msg: Control.Unsubscribe) { + throw new Error("TODO unsubscribe") + } } export class AnnounceSend { @@ -110,9 +128,9 @@ export class AnnounceSend { } } - async close(_code = 0n, _reason = "") { + async close() { // TODO implement unsubscribe - // await this.#inner.sendReset(code, reason) + // await this.#inner.sendUnsubscribe() } closed() { @@ -166,11 +184,18 @@ export class SubscribeRecv { if (this.#state === "closed") return this.#state = "closed" - return this.#control.send({ kind: Control.Msg.SubscribeReset, id: this.#id, code, reason }) + return this.#control.send({ + kind: Control.Msg.SubscribeReset, + id: this.#id, + code, + reason, + final_group: 0, // TODO + final_object: 0, // TODO + }) } // Create a writable data stream - async data(header: { sequence: number; priority: number; expires: number }) { + async data(header: { group: number; object: number; priority: number; expires?: number }) { return this.#objects.send({ track: this.#id, ...header }) } } diff --git a/lib/transport/setup.ts b/lib/transport/setup.ts index ea85152..2a5cf89 100644 --- a/lib/transport/setup.ts +++ b/lib/transport/setup.ts @@ -6,6 +6,7 @@ export type Role = "publisher" | "subscriber" | "both" export enum Version { DRAFT_00 = 0xff00, KIXEL_00 = 0xbad00, + KIXEL_01 = 0xbad01, } // NOTE: These are forked from moq-transport-00. @@ -16,11 +17,12 @@ export enum Version { export interface Client { versions: Version[] role: Role + params?: Parameters } export interface Server { version: Version - role: Role + params?: Parameters } export class Stream { @@ -33,6 +35,8 @@ export class Stream { } } +export type Parameters = Map + export class Decoder { r: Reader @@ -42,7 +46,7 @@ export class Decoder { async client(): Promise { const type = await this.r.u53() - if (type !== 1) throw new Error(`client SETUP type must be 1, got ${type}`) + if (type !== 0x40) throw new Error(`client SETUP type must be 0x40, got ${type}`) const count = await this.r.u53() @@ -52,37 +56,63 @@ export class Decoder { versions.push(version) } - const role = await this.role() + const params = await this.parameters() + const role = this.role(params?.get(0n)) return { versions, role, + params, } } async server(): Promise { const type = await this.r.u53() - if (type !== 2) throw new Error(`server SETUP type must be 2, got ${type}`) + if (type !== 0x41) throw new Error(`server SETUP type must be 0x41, got ${type}`) const version = await this.r.u53() - const role = await this.role() + const params = await this.parameters() return { version, - role, + params, + } + } + + private async parameters(): Promise { + const count = await this.r.u53() + if (count == 0) return undefined + + const params = new Map() + + for (let i = 0; i < count; i++) { + const id = await this.r.u62() + const size = await this.r.u53() + const value = await this.r.readExact(size) + + if (params.has(id)) { + throw new Error(`duplicate parameter id: ${id}`) + } + + params.set(id, value) } + + return params } - async role(): Promise { - const v = await this.r.u53() - if (v == 0) { - return "publisher" - } else if (v == 1) { - return "subscriber" - } else if (v == 2) { - return "both" - } else { - throw new Error(`invalid role: ${v}`) + role(raw: Uint8Array | undefined): Role { + if (!raw) throw new Error("missing role parameter") + if (raw.length != 1) throw new Error("multi-byte varint not supported") + + switch (raw[0]) { + case 1: + return "publisher" + case 2: + return "subscriber" + case 3: + return "both" + default: + throw new Error(`invalid role: ${raw[0]}`) } } } @@ -95,31 +125,35 @@ export class Encoder { } async client(c: Client) { - await this.w.u53(1) // message_type = 1 + await this.w.u53(0x40) await this.w.u53(c.versions.length) for (const v of c.versions) { await this.w.u53(v) } - await this.role(c.role) + // I hate it + const params = c.params ?? new Map() + params.set(0n, new Uint8Array([c.role == "publisher" ? 1 : c.role == "subscriber" ? 2 : 3])) + await this.parameters(params) } async server(s: Server) { - await this.w.u53(2) // message_type = 2 + await this.w.u53(0x41) await this.w.u53(s.version) - await this.role(s.role) + await this.parameters(s.params) } - async role(r: Role) { - let v - if (r == "publisher") { - v = 0 - } else if (r == "subscriber") { - v = 1 - } else { - v = 2 + private async parameters(p: Parameters | undefined) { + if (!p) { + await this.w.u8(0) + return } - return this.w.u53(v) + await this.w.u53(p.size) + for (const [id, value] of p) { + await this.w.u62(id) + await this.w.u53(value.length) + await this.w.write(value) + } } } diff --git a/lib/transport/stream.ts b/lib/transport/stream.ts index 3192675..0b2c36b 100644 --- a/lib/transport/stream.ts +++ b/lib/transport/stream.ts @@ -59,6 +59,8 @@ export class Reader { reader.releaseLock() + console.log("read", dst) + return dst } diff --git a/lib/transport/subscriber.ts b/lib/transport/subscriber.ts index 92785a5..a730aba 100644 --- a/lib/transport/subscriber.ts +++ b/lib/transport/subscriber.ts @@ -27,6 +27,24 @@ export class Subscriber { return this.#announceQueue } + async recv(msg: Control.Publisher) { + if (msg.kind == Control.Msg.Announce) { + await this.recvAnnounce(msg) + } else if (msg.kind == Control.Msg.Unannounce) { + this.recvUnannounce(msg) + } else if (msg.kind == Control.Msg.SubscribeOk) { + this.recvSubscribeOk(msg) + } else if (msg.kind == Control.Msg.SubscribeReset) { + await this.recvSubscribeReset(msg) + } else if (msg.kind == Control.Msg.SubscribeError) { + await this.recvSubscribeError(msg) + } else if (msg.kind == Control.Msg.SubscribeFin) { + await this.recvSubscribeFin(msg) + } else { + throw new Error(`unknown control message`) // impossible + } + } + async recvAnnounce(msg: Control.Announce) { if (this.#announce.has(msg.namespace)) { throw new Error(`duplicate announce for namespace: ${msg.namespace}`) @@ -40,6 +58,10 @@ export class Subscriber { this.#announceQueue.update((queue) => [...queue, announce]) } + recvUnannounce(_msg: Control.Unannounce) { + throw new Error(`TODO Unannounce`) + } + async subscribe(namespace: string, track: string) { const id = this.#subscribeNext++ @@ -51,6 +73,10 @@ export class Subscriber { id, namespace, name: track, + start_group: { mode: "latest", value: 0 }, + start_object: { mode: "absolute", value: 0 }, + end_group: { mode: "none" }, + end_object: { mode: "none" }, }) return subscribe @@ -74,6 +100,24 @@ export class Subscriber { await subscribe.onError(msg.code, msg.reason) } + async recvSubscribeError(msg: Control.SubscribeError) { + const subscribe = this.#subscribe.get(msg.id) + if (!subscribe) { + throw new Error(`subscribe error for unknown id: ${msg.id}`) + } + + await subscribe.onError(msg.code, msg.reason) + } + + async recvSubscribeFin(msg: Control.SubscribeFin) { + const subscribe = this.#subscribe.get(msg.id) + if (!subscribe) { + throw new Error(`subscribe error for unknown id: ${msg.id}`) + } + + await subscribe.onError(0n, "fin") + } + async recvObject(header: Header, stream: ReadableStream) { const subscribe = this.#subscribe.get(header.track) if (!subscribe) { @@ -110,7 +154,7 @@ export class AnnounceRecv { if (this.#state === "closed") return this.#state = "closed" - return this.#control.send({ kind: Control.Msg.AnnounceReset, namespace: this.namespace, code, reason }) + return this.#control.send({ kind: Control.Msg.AnnounceError, namespace: this.namespace, code, reason }) } } diff --git a/web/src/layouts/global.astro b/web/src/layouts/global.astro index e77e7b4..859e3f8 100644 --- a/web/src/layouts/global.astro +++ b/web/src/layouts/global.astro @@ -18,10 +18,7 @@ if (frontmatter && frontmatter.title) title = frontmatter.title - +