Skip to content

Commit

Permalink
Minimal updates for draft-04 interop with moq-rs (#107)
Browse files Browse the repository at this point in the history
  • Loading branch information
englishm authored Aug 16, 2024
1 parent 3ef377d commit 0223de7
Show file tree
Hide file tree
Showing 9 changed files with 123 additions and 48 deletions.
6 changes: 5 additions & 1 deletion lib/media/catalog/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,11 @@ export async function fetch(connection: Connection, namespace: string): Promise<
await segment.close()
await subscribe.close() // we done

return decode(chunk.payload)
if (chunk.payload instanceof Uint8Array) {
return decode(chunk.payload)
} else {
throw new Error("invalid catalog chunk")
}
} catch (e) {
const err = asError(e)

Expand Down
1 change: 1 addition & 0 deletions lib/playback/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ export class Player {
// We don't care what type of reader we get, we just want the payload.
const chunk = await init.read()
if (!chunk) throw new Error("no init chunk")
if (!(chunk.payload instanceof Uint8Array)) throw new Error("invalid init chunk")

this.#backend.init({ data: chunk.payload, name })
} finally {
Expand Down
4 changes: 4 additions & 0 deletions lib/playback/worker/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,10 @@ class Worker {
break
}

if (!(chunk.payload instanceof Uint8Array)) {
throw new Error(`invalid payload: ${chunk.payload}`)
}

const frames = container.decode(chunk.payload)
for (const frame of frames) {
await segment.write(frame)
Expand Down
4 changes: 2 additions & 2 deletions lib/transport/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -47,13 +47,13 @@ export class Client {
const setup = new Setup.Stream(reader, writer)

// Send the setup message.
await setup.send.client({ versions: [Setup.Version.DRAFT_03], role: this.config.role })
await setup.send.client({ versions: [Setup.Version.DRAFT_04], role: this.config.role })

// Receive the setup message.
// TODO verify the SETUP response.
const server = await setup.recv.server()

if (server.version != Setup.Version.DRAFT_03) {
if (server.version != Setup.Version.DRAFT_04) {
throw new Error(`unsupported server version: ${server.version}`)
}

Expand Down
97 changes: 64 additions & 33 deletions lib/transport/control.ts
Original file line number Diff line number Diff line change
Expand Up @@ -66,17 +66,33 @@ export interface Subscribe {
namespace: string
name: string

start_group: Location
start_object: Location
end_group: Location
end_object: Location
location: Location

params?: Parameters
}

export interface Location {
mode: "none" | "absolute" | "latest" | "future"
value?: number // ignored for type=none, otherwise defaults to 0
export type Location = LatestGroup | LatestObject | AbsoluteStart | AbsoluteRange

export interface LatestGroup {
mode: "latest_group"
}

export interface LatestObject {
mode: "latest_object"
}

export interface AbsoluteStart {
mode: "absolute_start"
start_group: number
start_object: number
}

export interface AbsoluteRange {
mode: "absolute_range"
start_group: number
start_object: number
end_group: number
end_object: number
}

export type Parameters = Map<bigint, Uint8Array>
Expand Down Expand Up @@ -245,26 +261,37 @@ export class Decoder {
trackId: await this.r.u62(),
namespace: await this.r.string(),
name: await this.r.string(),
start_group: await this.location(),
start_object: await this.location(),
end_group: await this.location(),
end_object: await this.location(),
location: await this.location(),
params: await this.parameters(),
}
}

private async location(): Promise<Location> {
const mode = await this.r.u62()
if (mode == 0n) {
return { mode: "none", value: 0 }
} else if (mode == 1n) {
return { mode: "absolute", value: await this.r.u53() }
if (mode == 1n) {
return {
mode: "latest_group",
}
} else if (mode == 2n) {
return { mode: "latest", value: await this.r.u53() }
return {
mode: "latest_object",
}
} else if (mode == 3n) {
return { mode: "future", value: await this.r.u53() }
return {
mode: "absolute_start",
start_group: await this.r.u53(),
start_object: await this.r.u53(),
}
} else if (mode == 4n) {
return {
mode: "absolute_range",
start_group: await this.r.u53(),
start_object: await this.r.u53(),
end_group: await this.r.u53(),
end_object: await this.r.u53(),
}
} else {
throw new Error(`invalid location mode: ${mode}`)
throw new Error(`invalid filter type: ${mode}`)
}
}

Expand Down Expand Up @@ -419,25 +446,29 @@ export class Encoder {
await this.w.u62(s.trackId)
await this.w.string(s.namespace)
await this.w.string(s.name)
await this.location(s.start_group)
await this.location(s.start_object)
await this.location(s.end_group)
await this.location(s.end_object)
await this.location(s.location)
await this.parameters(s.params)
}

private async location(l: Location) {
if (l.mode == "none") {
await this.w.u8(0)
} else if (l.mode == "absolute") {
await this.w.u8(1)
await this.w.u53(l.value ?? 0)
} else if (l.mode == "latest") {
await this.w.u8(2)
await this.w.u53(l.value ?? 0)
} else if (l.mode == "future") {
await this.w.u8(3)
await this.w.u53(l.value ?? 0)
switch (l.mode) {
case "latest_group":
await this.w.u62(1n)
break
case "latest_object":
await this.w.u62(2n)
break
case "absolute_start":
await this.w.u62(3n)
await this.w.u53(l.start_group)
await this.w.u53(l.start_object)
break
case "absolute_range":
await this.w.u62(3n)
await this.w.u53(l.start_group)
await this.w.u53(l.start_object)
await this.w.u53(l.end_group)
await this.w.u53(l.end_object)
}
}

Expand Down
50 changes: 42 additions & 8 deletions lib/transport/objects.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,13 @@ export enum StreamType {
Group = 0x51,
}

export enum Status {
OBJECT_NULL = 1,
GROUP_NULL = 2,
GROUP_END = 3,
TRACK_END = 4,
}

export interface TrackHeader {
type: StreamType.Track
sub: bigint
Expand All @@ -17,7 +24,7 @@ export interface TrackHeader {
export interface TrackChunk {
group: number // The group sequence, as a number because 2^53 is enough.
object: number
payload: Uint8Array
payload: Uint8Array | Status
}

export interface GroupHeader {
Expand All @@ -30,7 +37,7 @@ export interface GroupHeader {

export interface GroupChunk {
object: number
payload: Uint8Array
payload: Uint8Array | Status
}

export interface ObjectHeader {
Expand All @@ -40,6 +47,7 @@ export interface ObjectHeader {
group: number
object: number
priority: number
status: number
}

export interface ObjectChunk {
Expand Down Expand Up @@ -75,6 +83,7 @@ export class Objects {
await w.u53(h.group)
await w.u53(h.object)
await w.u53(h.priority)
await w.u53(h.status)

res = new ObjectWriter(h, w) as WriterType<T>
} else if (h.type === StreamType.Group) {
Expand Down Expand Up @@ -132,6 +141,7 @@ export class Objects {
track: await r.u62(),
group: await r.u53(),
object: await r.u53(),
status: await r.u53(),
priority: await r.u53(),
}

Expand All @@ -155,8 +165,15 @@ export class TrackWriter {
async write(c: TrackChunk) {
await this.stream.u53(c.group)
await this.stream.u53(c.object)
await this.stream.u53(c.payload.byteLength)
await this.stream.write(c.payload)

if (c.payload instanceof Uint8Array) {
await this.stream.u53(c.payload.byteLength)
await this.stream.write(c.payload)
} else {
// empty payload with status
await this.stream.u53(0)
await this.stream.u53(c.payload as number)
}
}

async close() {
Expand All @@ -172,8 +189,13 @@ export class GroupWriter {

async write(c: GroupChunk) {
await this.stream.u53(c.object)
await this.stream.u53(c.payload.byteLength)
await this.stream.write(c.payload)
if (c.payload instanceof Uint8Array) {
await this.stream.u53(c.payload.byteLength)
await this.stream.write(c.payload)
} else {
await this.stream.u53(0)
await this.stream.u53(c.payload as number)
}
}

async close() {
Expand Down Expand Up @@ -210,7 +232,13 @@ export class TrackReader {
const group = await this.stream.u53()
const object = await this.stream.u53()
const size = await this.stream.u53()
const payload = await this.stream.read(size)

let payload
if (size == 0) {
payload = (await this.stream.u53()) as Status
} else {
payload = await this.stream.read(size)
}

return {
group,
Expand All @@ -237,7 +265,13 @@ export class GroupReader {

const object = await this.stream.u53()
const size = await this.stream.u53()
const payload = await this.stream.read(size)

let payload
if (size == 0) {
payload = (await this.stream.u53()) as Status
} else {
payload = await this.stream.read(size)
}

return {
object,
Expand Down
1 change: 1 addition & 0 deletions lib/transport/publisher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,7 @@ export class SubscribeRecv {
group: props.group,
object: props.object,
priority: props.priority ?? 0,
status: 0,
})
}
}
1 change: 1 addition & 0 deletions lib/transport/setup.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ export enum Version {
DRAFT_01 = 0xff000001,
DRAFT_02 = 0xff000002,
DRAFT_03 = 0xff000003,
DRAFT_04 = 0xff000004,
KIXEL_00 = 0xbad00,
KIXEL_01 = 0xbad01,
}
Expand Down
7 changes: 3 additions & 4 deletions lib/transport/subscriber.ts
Original file line number Diff line number Diff line change
Expand Up @@ -72,10 +72,9 @@ export class Subscriber {
trackId: id,
namespace,
name: track,
start_group: { mode: "latest", value: 0 },
start_object: { mode: "absolute", value: 0 },
end_group: { mode: "none" },
end_object: { mode: "none" },
location: {
mode: "latest_group",
},
})

return subscribe
Expand Down

0 comments on commit 0223de7

Please sign in to comment.