From 9c5c8c2dbc71eb3ae2c979b3e224078f93effea1 Mon Sep 17 00:00:00 2001 From: Mike English Date: Fri, 7 Jun 2024 13:16:44 -0400 Subject: [PATCH 01/14] Start on exclusive support for draft -04 Not trying for any backwards compatibility here, will just roll forward to the latest draft. --- moq-transport/src/session/mod.rs | 8 ++++---- moq-transport/src/setup/version.rs | 3 +++ 2 files changed, 7 insertions(+), 4 deletions(-) diff --git a/moq-transport/src/session/mod.rs b/moq-transport/src/session/mod.rs index 47ddc4d7..5e46b5a4 100644 --- a/moq-transport/src/session/mod.rs +++ b/moq-transport/src/session/mod.rs @@ -77,7 +77,7 @@ impl Session { let mut sender = Writer::new(control.0); let mut recver = Reader::new(control.1); - let versions: setup::Versions = [setup::Version::DRAFT_03].into(); + let versions: setup::Versions = [setup::Version::DRAFT_04].into(); let client = setup::Client { role, @@ -126,10 +126,10 @@ impl Session { let client: setup::Client = recver.decode().await?; log::debug!("received client SETUP: {:?}", client); - if !client.versions.contains(&setup::Version::DRAFT_03) { + if !client.versions.contains(&setup::Version::DRAFT_04) { return Err(SessionError::Version( client.versions, - [setup::Version::DRAFT_03].into(), + [setup::Version::DRAFT_04].into(), )); } @@ -150,7 +150,7 @@ impl Session { let server = setup::Server { role, - version: setup::Version::DRAFT_03, + version: setup::Version::DRAFT_04, params: Default::default(), }; diff --git a/moq-transport/src/setup/version.rs b/moq-transport/src/setup/version.rs index 5d67f8c2..8ddc9913 100644 --- a/moq-transport/src/setup/version.rs +++ b/moq-transport/src/setup/version.rs @@ -18,6 +18,9 @@ impl Version { /// https://www.ietf.org/archive/id/draft-ietf-moq-transport-03.html pub const DRAFT_03: Version = Version(0xff000003); + + /// https://www.ietf.org/archive/id/draft-ietf-moq-transport-04.html + pub const DRAFT_04: Version = Version(0xff000004); } impl From for Version { From 487210cc477c1d4d912927e57880b17cd5f83432 Mon Sep 17 00:00:00 2001 From: Mike English Date: Mon, 17 Jun 2024 01:23:42 -0700 Subject: [PATCH 02/14] moq-transport: Add TRACK_STATUS_REQUEST message Serialization/Deserialization for this new message type in moq-transport https://www.ietf.org/archive/id/draft-ietf-moq-transport-04.html#message-track-status-req Introduced in https://github.com/moq-wg/moq-transport/pull/425 Note: Does not include any handling for this message type yet --- moq-transport/src/message/mod.rs | 5 ++++ .../src/message/track_status_request.rs | 30 +++++++++++++++++++ 2 files changed, 35 insertions(+) create mode 100644 moq-transport/src/message/track_status_request.rs diff --git a/moq-transport/src/message/mod.rs b/moq-transport/src/message/mod.rs index 1812f769..6edaf9ae 100644 --- a/moq-transport/src/message/mod.rs +++ b/moq-transport/src/message/mod.rs @@ -42,6 +42,7 @@ mod subscribe_done; mod subscribe_error; mod subscribe_ok; mod subscriber; +mod track_status_request; mod unannounce; mod unsubscribe; @@ -56,6 +57,7 @@ pub use subscribe_done::*; pub use subscribe_error::*; pub use subscribe_ok::*; pub use subscriber::*; +pub use track_status_request::*; pub use unannounce::*; pub use unsubscribe::*; @@ -158,6 +160,9 @@ message_types! { AnnounceError = 0x8, AnnounceCancel = 0xc, + // TRACK_STATUS_REQUEST, sent by subscriber + TrackStatusRequest = 0xd, + // Misc GoAway = 0x10, } diff --git a/moq-transport/src/message/track_status_request.rs b/moq-transport/src/message/track_status_request.rs new file mode 100644 index 00000000..89000662 --- /dev/null +++ b/moq-transport/src/message/track_status_request.rs @@ -0,0 +1,30 @@ +use crate::coding::{Decode, DecodeError, Encode, EncodeError}; + +#[derive(Clone, Debug)] +pub struct TrackStatusRequest { + /// Track Namespace + pub track_namespace: String, + /// Track Name + pub track_name: String, +} + +impl Decode for TrackStatusRequest { + fn decode(r: &mut R) -> Result { + let track_namespace = String::decode(r)?; + let track_name = String::decode(r)?; + + Ok(Self { + track_namespace, + track_name, + }) + } +} + +impl Encode for TrackStatusRequest { + fn encode(&self, w: &mut W) -> Result<(), EncodeError> { + self.track_namespace.encode(w)?; + self.track_name.encode(w)?; + + Ok(()) + } +} From 56d1b2fd48245234b10227a4b7cd35426c17d18a Mon Sep 17 00:00:00 2001 From: Mike English Date: Mon, 17 Jun 2024 13:17:10 -0700 Subject: [PATCH 03/14] moq-transport: Add TRACK_STATUS message Serialization/Deserialization for this new message type in moq-transport https://www.ietf.org/archive/id/draft-ietf-moq-transport-04.html#section-6.17 Introduced in https://github.com/moq-wg/moq-transport/pull/425 Note: Does not include any handling for this message type yet, nor any validation of status codes --- moq-transport/src/message/mod.rs | 5 +++ moq-transport/src/message/track_status.rs | 45 +++++++++++++++++++++++ 2 files changed, 50 insertions(+) create mode 100644 moq-transport/src/message/track_status.rs diff --git a/moq-transport/src/message/mod.rs b/moq-transport/src/message/mod.rs index 6edaf9ae..dc702e11 100644 --- a/moq-transport/src/message/mod.rs +++ b/moq-transport/src/message/mod.rs @@ -42,6 +42,7 @@ mod subscribe_done; mod subscribe_error; mod subscribe_ok; mod subscriber; +mod track_status; mod track_status_request; mod unannounce; mod unsubscribe; @@ -57,6 +58,7 @@ pub use subscribe_done::*; pub use subscribe_error::*; pub use subscribe_ok::*; pub use subscriber::*; +pub use track_status::*; pub use track_status_request::*; pub use unannounce::*; pub use unsubscribe::*; @@ -163,6 +165,9 @@ message_types! { // TRACK_STATUS_REQUEST, sent by subscriber TrackStatusRequest = 0xd, + // TRACK_STATUS, sent by publisher + TrackStatus = 0xe, + // Misc GoAway = 0x10, } diff --git a/moq-transport/src/message/track_status.rs b/moq-transport/src/message/track_status.rs new file mode 100644 index 00000000..edda8d4f --- /dev/null +++ b/moq-transport/src/message/track_status.rs @@ -0,0 +1,45 @@ +use crate::coding::{Decode, DecodeError, Encode, EncodeError}; + +#[derive(Clone, Debug)] +pub struct TrackStatus { + /// Track Namespace + pub track_namespace: String, + /// Track Name + pub track_name: String, + /// Status Code + // TODO: encode/decode for values: + // 0x00: The track is in progress, and subsequent fields contain the highest group and object ID for that track. + // 0x01: The track does not exist. Subsequent fields MUST be zero, and any other value is a malformed message. + // 0x02: The track has not yet begun. Subsequent fields MUST be zero. Any other value is a malformed message. + // 0x03: The track has finished, so there is no "live edge." Subsequent fields contain the highest Group and object ID known. + // 0x04: The sender is a relay that cannot obtain the current track status from upstream. Subsequent fields contain the largest group and object ID known. + // And treat any other value as a malformed message. + pub status_code: u64, + /// Last Group ID + pub last_group_id: u64, + /// Last Object ID + pub last_object_id: u64, +} + +impl Decode for TrackStatus { + fn decode(r: &mut R) -> Result { + Ok(Self { + track_namespace: String::decode(r)?, + track_name: String::decode(r)?, + status_code: u64::decode(r)?, + last_group_id: u64::decode(r)?, + last_object_id: u64::decode(r)?, + }) + } +} + +impl Encode for TrackStatus { + fn encode(&self, w: &mut W) -> Result<(), EncodeError> { + self.track_namespace.encode(w)?; + self.track_name.encode(w)?; + self.status_code.encode(w)?; + self.last_group_id.encode(w)?; + self.last_object_id.encode(w)?; + Ok(()) + } +} From 88b68166134327d4ccf43bb723c92d50b2ff68c0 Mon Sep 17 00:00:00 2001 From: Mike English Date: Mon, 17 Jun 2024 16:41:03 -0700 Subject: [PATCH 04/14] WIP on -04 Subscribe message updates --- moq-transport/src/coding/decode.rs | 3 + moq-transport/src/coding/encode.rs | 3 + moq-transport/src/message/mod.rs | 15 +++++ moq-transport/src/message/subscribe.rs | 80 +++++++++++++++++++++----- moq-transport/src/session/subscribe.rs | 11 ++-- 5 files changed, 92 insertions(+), 20 deletions(-) diff --git a/moq-transport/src/coding/decode.rs b/moq-transport/src/coding/decode.rs index 3e9be526..f9d4a6cf 100644 --- a/moq-transport/src/coding/decode.rs +++ b/moq-transport/src/coding/decode.rs @@ -34,6 +34,9 @@ pub enum DecodeError { #[error("invalid subscribe location")] InvalidSubscribeLocation, + #[error("invalid filter type")] + InvalidFilterType, + #[error("invalid value")] InvalidValue, diff --git a/moq-transport/src/coding/encode.rs b/moq-transport/src/coding/encode.rs index 04bc9a61..548dbb97 100644 --- a/moq-transport/src/coding/encode.rs +++ b/moq-transport/src/coding/encode.rs @@ -28,6 +28,9 @@ pub enum EncodeError { #[error("invalid value")] InvalidValue, + #[error("missing field")] + MissingField, + #[error("i/o error: {0}")] Io(sync::Arc), } diff --git a/moq-transport/src/message/mod.rs b/moq-transport/src/message/mod.rs index dc702e11..86d5256b 100644 --- a/moq-transport/src/message/mod.rs +++ b/moq-transport/src/message/mod.rs @@ -171,3 +171,18 @@ message_types! { // Misc GoAway = 0x10, } + +/// Track Status Codes +/// https://www.ietf.org/archive/id/draft-ietf-moq-transport-04.html#name-track_status +pub enum TrackStatusCode { + // 0x00: The track is in progress, and subsequent fields contain the highest group and object ID for that track. + InProgress = 0x00, + // 0x01: The track does not exist. Subsequent fields MUST be zero, and any other value is a malformed message. + DoesNotExist = 0x01, + // 0x02: The track has not yet begun. Subsequent fields MUST be zero. Any other value is a malformed message. + NotYetBegun = 0x02, + // 0x03: The track has finished, so there is no "live edge." Subsequent fields contain the highest Group and object ID known. + Finished = 0x03, + // 0x04: The sender is a relay that cannot obtain the current track status from upstream. Subsequent fields contain the largest group and object ID known. + Relay = 0x04, +} diff --git a/moq-transport/src/message/subscribe.rs b/moq-transport/src/message/subscribe.rs index 328adb8f..3ca761e6 100644 --- a/moq-transport/src/message/subscribe.rs +++ b/moq-transport/src/message/subscribe.rs @@ -1,5 +1,38 @@ use crate::coding::{Decode, DecodeError, Encode, EncodeError, Params}; +/// Filter Types +/// https://www.ietf.org/archive/id/draft-ietf-moq-transport-04.html#name-filter-types +#[derive(Clone, Debug, PartialEq)] +pub enum FilterType { + LatestGroup = 0x1, + LatestObject = 0x2, + AbsoluteStart = 0x3, + AbsoluteRange = 0x4, +} + +impl Encode for FilterType { + fn encode(&self, w: &mut W) -> Result<(), EncodeError> { + match self { + Self::LatestGroup => (0x1_u64).encode(w), + Self::LatestObject => (0x2_u64).encode(w), + Self::AbsoluteStart => (0x3_u64).encode(w), + Self::AbsoluteRange => (0x4_u64).encode(w), + } + } +} + +impl Decode for FilterType { + fn decode(r: &mut R) -> Result { + match u64::decode(r)? { + 0x01 => Ok(Self::LatestGroup), + 0x02 => Ok(Self::LatestObject), + 0x03 => Ok(Self::AbsoluteStart), + 0x04 => Ok(Self::AbsoluteRange), + _ => Err(DecodeError::InvalidFilterType), + } + } +} + /// Sent by the subscriber to request all future objects for the given track. /// /// Objects will use the provided ID instead of the full track name, to save bytes. @@ -13,9 +46,12 @@ pub struct Subscribe { pub track_namespace: String, pub track_name: String, - /// The start/end group/object. - pub start: SubscribePair, - pub end: SubscribePair, + /// Filter type + pub filter_type: FilterType, + + /// The start/end group/object. (TODO: Make optional) + pub start: Option, // TODO: Make optional + pub end: Option, // TODO: Make optional /// Optional parameters pub params: Params, @@ -28,18 +64,20 @@ impl Decode for Subscribe { let track_namespace = String::decode(r)?; let track_name = String::decode(r)?; - let start = SubscribePair::decode(r)?; - let end = SubscribePair::decode(r)?; + let filter_type = FilterType::decode(r)?; - // You can't have a start object without a start group. - if start.group == SubscribeLocation::None && start.object != SubscribeLocation::None { - return Err(DecodeError::InvalidSubscribeLocation); - } + let start = Some(SubscribePair::decode(r)?); + let end = Some(SubscribePair::decode(r)?); - // You can't have an end object without an end group. - if end.group == SubscribeLocation::None && end.object != SubscribeLocation::None { - return Err(DecodeError::InvalidSubscribeLocation); - } + // // You can't have a start object without a start group. + // if start.group == SubscribeLocation::None && start.object != SubscribeLocation::None { + // return Err(DecodeError::InvalidSubscribeLocation); + // } + + // // You can't have an end object without an end group. + // if end.group == SubscribeLocation::None && end.object != SubscribeLocation::None { + // return Err(DecodeError::InvalidSubscribeLocation); + // } // NOTE: There's some more location restrictions in the draft, but they're enforced at a higher level. @@ -50,6 +88,7 @@ impl Decode for Subscribe { track_alias, track_namespace, track_name, + filter_type, start, end, params, @@ -64,8 +103,19 @@ impl Encode for Subscribe { self.track_namespace.encode(w)?; self.track_name.encode(w)?; - self.start.encode(w)?; - self.end.encode(w)?; + self.filter_type.encode(w)?; + + if self.filter_type == FilterType::AbsoluteStart || self.filter_type == FilterType::AbsoluteRange { + if self.start.is_none() || self.end.is_none() { + return Err(EncodeError::MissingField); + } + if let Some(start) = &self.start { + start.encode(w)?; + } + if let Some(end) = &self.end { + end.encode(w)?; + } + } self.params.encode(w)?; diff --git a/moq-transport/src/session/subscribe.rs b/moq-transport/src/session/subscribe.rs index 9438dc37..7735a8c6 100644 --- a/moq-transport/src/session/subscribe.rs +++ b/moq-transport/src/session/subscribe.rs @@ -2,7 +2,7 @@ use std::ops; use crate::{ data, - message::{self, SubscribeLocation, SubscribePair}, + message::{self, FilterType, SubscribeLocation, SubscribePair}, serve::{self, ServeError, TrackWriter, TrackWriterMode}, }; @@ -47,15 +47,16 @@ impl Subscribe { track_alias: id, track_namespace: track.namespace.clone(), track_name: track.name.clone(), + filter_type: FilterType::LatestGroup, // TODO add these to the publisher. - start: SubscribePair { + start: Some(SubscribePair { group: SubscribeLocation::Latest(0), object: SubscribeLocation::Absolute(0), - }, - end: SubscribePair { + }), + end: Some(SubscribePair { group: SubscribeLocation::None, object: SubscribeLocation::None, - }, + }), params: Default::default(), }); From 692b0c57248b160084c8a0db3916f0e6e0db8f24 Mon Sep 17 00:00:00 2001 From: Mike English Date: Tue, 18 Jun 2024 00:53:08 -0700 Subject: [PATCH 05/14] Parse start/end depending on filter type --- moq-transport/src/coding/decode.rs | 3 ++ moq-transport/src/message/subscribe.rs | 47 +++++++++++++++++++------- 2 files changed, 38 insertions(+), 12 deletions(-) diff --git a/moq-transport/src/coding/decode.rs b/moq-transport/src/coding/decode.rs index f9d4a6cf..4fc22f04 100644 --- a/moq-transport/src/coding/decode.rs +++ b/moq-transport/src/coding/decode.rs @@ -37,6 +37,9 @@ pub enum DecodeError { #[error("invalid filter type")] InvalidFilterType, + #[error("missing field")] + MissingField, + #[error("invalid value")] InvalidValue, diff --git a/moq-transport/src/message/subscribe.rs b/moq-transport/src/message/subscribe.rs index 3ca761e6..be16f982 100644 --- a/moq-transport/src/message/subscribe.rs +++ b/moq-transport/src/message/subscribe.rs @@ -66,18 +66,41 @@ impl Decode for Subscribe { let filter_type = FilterType::decode(r)?; - let start = Some(SubscribePair::decode(r)?); - let end = Some(SubscribePair::decode(r)?); - - // // You can't have a start object without a start group. - // if start.group == SubscribeLocation::None && start.object != SubscribeLocation::None { - // return Err(DecodeError::InvalidSubscribeLocation); - // } - - // // You can't have an end object without an end group. - // if end.group == SubscribeLocation::None && end.object != SubscribeLocation::None { - // return Err(DecodeError::InvalidSubscribeLocation); - // } + let start: Option; + let end: Option; + match filter_type { + FilterType::AbsoluteStart => { + if r.remaining() < 2 { + return Err(DecodeError::MissingField); + } + start = Some(SubscribePair::decode(r)?); + end = None; + } + FilterType::AbsoluteRange => { + if r.remaining() < 4 { + return Err(DecodeError::MissingField); + } + start = Some(SubscribePair::decode(r)?); + end = Some(SubscribePair::decode(r)?); + } + _ => { + start = None; + end = None; + } + } + + if let Some(s) = &start { + // You can't have a start object without a start group. + if s.group == SubscribeLocation::None && s.object != SubscribeLocation::None { + return Err(DecodeError::InvalidSubscribeLocation); + } + } + if let Some(e) = &end { + // You can't have an end object without an end group. + if e.group == SubscribeLocation::None && e.object != SubscribeLocation::None { + return Err(DecodeError::InvalidSubscribeLocation); + } + } // NOTE: There's some more location restrictions in the draft, but they're enforced at a higher level. From 128219927798c075a9b9a1ec917b2344bf86dcf5 Mon Sep 17 00:00:00 2001 From: Mike English Date: Tue, 18 Jun 2024 13:27:04 -0700 Subject: [PATCH 06/14] Move filter types to own file --- moq-transport/src/message/filter_type.rs | 34 ++++++++++++++++++++++++ moq-transport/src/message/mod.rs | 2 ++ moq-transport/src/message/subscribe.rs | 34 +----------------------- 3 files changed, 37 insertions(+), 33 deletions(-) create mode 100644 moq-transport/src/message/filter_type.rs diff --git a/moq-transport/src/message/filter_type.rs b/moq-transport/src/message/filter_type.rs new file mode 100644 index 00000000..1edbf42f --- /dev/null +++ b/moq-transport/src/message/filter_type.rs @@ -0,0 +1,34 @@ +use crate::coding::{Decode, DecodeError, Encode, EncodeError}; + +/// Filter Types +/// https://www.ietf.org/archive/id/draft-ietf-moq-transport-04.html#name-filter-types +#[derive(Clone, Debug, PartialEq)] +pub enum FilterType { + LatestGroup = 0x1, + LatestObject = 0x2, + AbsoluteStart = 0x3, + AbsoluteRange = 0x4, +} + +impl Encode for FilterType { + fn encode(&self, w: &mut W) -> Result<(), EncodeError> { + match self { + Self::LatestGroup => (0x1_u64).encode(w), + Self::LatestObject => (0x2_u64).encode(w), + Self::AbsoluteStart => (0x3_u64).encode(w), + Self::AbsoluteRange => (0x4_u64).encode(w), + } + } +} + +impl Decode for FilterType { + fn decode(r: &mut R) -> Result { + match u64::decode(r)? { + 0x01 => Ok(Self::LatestGroup), + 0x02 => Ok(Self::LatestObject), + 0x03 => Ok(Self::AbsoluteStart), + 0x04 => Ok(Self::AbsoluteRange), + _ => Err(DecodeError::InvalidFilterType), + } + } +} diff --git a/moq-transport/src/message/mod.rs b/moq-transport/src/message/mod.rs index 86d5256b..522003e0 100644 --- a/moq-transport/src/message/mod.rs +++ b/moq-transport/src/message/mod.rs @@ -35,6 +35,7 @@ mod announce; mod announce_cancel; mod announce_error; mod announce_ok; +mod filter_type; mod go_away; mod publisher; mod subscribe; @@ -51,6 +52,7 @@ pub use announce::*; pub use announce_cancel::*; pub use announce_error::*; pub use announce_ok::*; +pub use filter_type::*; pub use go_away::*; pub use publisher::*; pub use subscribe::*; diff --git a/moq-transport/src/message/subscribe.rs b/moq-transport/src/message/subscribe.rs index be16f982..b9d807ce 100644 --- a/moq-transport/src/message/subscribe.rs +++ b/moq-transport/src/message/subscribe.rs @@ -1,37 +1,5 @@ use crate::coding::{Decode, DecodeError, Encode, EncodeError, Params}; - -/// Filter Types -/// https://www.ietf.org/archive/id/draft-ietf-moq-transport-04.html#name-filter-types -#[derive(Clone, Debug, PartialEq)] -pub enum FilterType { - LatestGroup = 0x1, - LatestObject = 0x2, - AbsoluteStart = 0x3, - AbsoluteRange = 0x4, -} - -impl Encode for FilterType { - fn encode(&self, w: &mut W) -> Result<(), EncodeError> { - match self { - Self::LatestGroup => (0x1_u64).encode(w), - Self::LatestObject => (0x2_u64).encode(w), - Self::AbsoluteStart => (0x3_u64).encode(w), - Self::AbsoluteRange => (0x4_u64).encode(w), - } - } -} - -impl Decode for FilterType { - fn decode(r: &mut R) -> Result { - match u64::decode(r)? { - 0x01 => Ok(Self::LatestGroup), - 0x02 => Ok(Self::LatestObject), - 0x03 => Ok(Self::AbsoluteStart), - 0x04 => Ok(Self::AbsoluteRange), - _ => Err(DecodeError::InvalidFilterType), - } - } -} +use crate::message::FilterType; /// Sent by the subscriber to request all future objects for the given track. /// From 3c84c1bbd7ed5e1d7aeba8c4789d05933d1a84e4 Mon Sep 17 00:00:00 2001 From: Mike English Date: Tue, 18 Jun 2024 14:35:10 -0700 Subject: [PATCH 07/14] Add basic parsing for object status We don't _do_ anything with this information yet, but now we can at least parse it. --- moq-transport/src/coding/decode.rs | 3 ++ moq-transport/src/data/object.rs | 39 +++++++++++++++++++++++++ moq-transport/src/serve/object.rs | 6 ++++ moq-transport/src/session/subscribed.rs | 2 ++ 4 files changed, 50 insertions(+) diff --git a/moq-transport/src/coding/decode.rs b/moq-transport/src/coding/decode.rs index 4fc22f04..0b68c432 100644 --- a/moq-transport/src/coding/decode.rs +++ b/moq-transport/src/coding/decode.rs @@ -37,6 +37,9 @@ pub enum DecodeError { #[error("invalid filter type")] InvalidFilterType, + #[error("invalid object status")] + InvalidObjectStatus, + #[error("missing field")] MissingField, diff --git a/moq-transport/src/data/object.rs b/moq-transport/src/data/object.rs index 6b601e28..e34ea617 100644 --- a/moq-transport/src/data/object.rs +++ b/moq-transport/src/data/object.rs @@ -1,5 +1,39 @@ use crate::coding::{Decode, DecodeError, Encode, EncodeError}; +#[derive(Clone, Copy, Debug, PartialEq)] +pub enum ObjectStatus { + Object = 0x0, + ObjectDoesNotExist = 0x1, + GroupDoesNotExist = 0x2, + EndOfGroup = 0x3, + EndOfTrack = 0x4, +} + +impl Decode for ObjectStatus { + fn decode(r: &mut B) -> Result { + match u64::decode(r)? { + 0x0 => Ok(Self::Object), + 0x1 => Ok(Self::ObjectDoesNotExist), + 0x2 => Ok(Self::GroupDoesNotExist), + 0x3 => Ok(Self::EndOfGroup), + 0x4 => Ok(Self::EndOfTrack), + _ => Err(DecodeError::InvalidObjectStatus), + } + } +} + +impl Encode for ObjectStatus { + fn encode(&self, w: &mut W) -> Result<(), EncodeError> { + match self { + Self::Object => (0x0_u64).encode(w), + Self::ObjectDoesNotExist => (0x1_u64).encode(w), + Self::GroupDoesNotExist => (0x2_u64).encode(w), + Self::EndOfGroup => (0x3_u64).encode(w), + Self::EndOfTrack => (0x4_u64).encode(w), + } + } +} + #[derive(Clone, Debug)] pub struct ObjectHeader { // The subscribe ID. @@ -16,6 +50,9 @@ pub struct ObjectHeader { // The send order, where **smaller** values are sent first. pub send_order: u64, + + // The object status + pub object_status: ObjectStatus, } impl Decode for ObjectHeader { @@ -26,6 +63,7 @@ impl Decode for ObjectHeader { group_id: u64::decode(r)?, object_id: u64::decode(r)?, send_order: u64::decode(r)?, + object_status: ObjectStatus::decode(r)?, }) } } @@ -37,6 +75,7 @@ impl Encode for ObjectHeader { self.group_id.encode(w)?; self.object_id.encode(w)?; self.send_order.encode(w)?; + self.object_status.encode(w)?; Ok(()) } diff --git a/moq-transport/src/serve/object.rs b/moq-transport/src/serve/object.rs index 3be37e9d..6b7753a1 100644 --- a/moq-transport/src/serve/object.rs +++ b/moq-transport/src/serve/object.rs @@ -14,6 +14,8 @@ use super::{ServeError, Track}; use crate::watch::State; use bytes::Bytes; +use crate::data::ObjectStatus; + pub struct Objects { pub track: Arc, } @@ -71,6 +73,7 @@ impl ObjectsWriter { group_id: object.group_id, object_id: object.object_id, priority: object.priority, + object_status: ObjectStatus::Object, }; let (writer, reader) = object.produce(); @@ -191,6 +194,9 @@ pub struct ObjectInfo { // The priority of the stream. pub priority: u64, + + // The object status + pub object_status: ObjectStatus, } impl Deref for ObjectInfo { diff --git a/moq-transport/src/session/subscribed.rs b/moq-transport/src/session/subscribed.rs index 2ee37f31..f84a3e03 100644 --- a/moq-transport/src/session/subscribed.rs +++ b/moq-transport/src/session/subscribed.rs @@ -299,6 +299,8 @@ impl Subscribed { group_id: object.group_id, object_id: object.object_id, send_order: object.priority, + object_status: object.object_status, + }; let publisher = self.publisher.clone(); From 64451cef98ceb13bb2b3aab82547307fac47b3ba Mon Sep 17 00:00:00 2001 From: Mike English Date: Tue, 18 Jun 2024 21:03:13 -0700 Subject: [PATCH 08/14] Wire up passthrough for object status --- moq-transport/src/data/datagram.rs | 8 ++++++++ moq-transport/src/data/group.rs | 10 +++++++++- moq-transport/src/serve/datagram.rs | 3 +++ moq-transport/src/serve/group.rs | 5 +++++ moq-transport/src/serve/object.rs | 4 ++-- moq-transport/src/serve/stream.rs | 5 +++++ moq-transport/src/session/subscribe.rs | 1 + moq-transport/src/session/subscribed.rs | 4 +++- 8 files changed, 36 insertions(+), 4 deletions(-) diff --git a/moq-transport/src/data/datagram.rs b/moq-transport/src/data/datagram.rs index 970b5220..84f96fc3 100644 --- a/moq-transport/src/data/datagram.rs +++ b/moq-transport/src/data/datagram.rs @@ -1,4 +1,6 @@ use crate::coding::{Decode, DecodeError, Encode, EncodeError}; +use crate::data::ObjectStatus; + #[derive(Clone, Debug)] pub struct Datagram { // The subscribe ID. @@ -16,6 +18,9 @@ pub struct Datagram { // The priority, where **smaller** values are sent first. pub send_order: u64, + // Object status + pub object_status: ObjectStatus, + // The payload. pub payload: bytes::Bytes, } @@ -27,6 +32,7 @@ impl Decode for Datagram { let group_id = u64::decode(r)?; let object_id = u64::decode(r)?; let send_order = u64::decode(r)?; + let object_status = ObjectStatus::decode(r)?; let payload = r.copy_to_bytes(r.remaining()); Ok(Self { @@ -35,6 +41,7 @@ impl Decode for Datagram { group_id, object_id, send_order, + object_status, payload, }) } @@ -47,6 +54,7 @@ impl Encode for Datagram { self.group_id.encode(w)?; self.object_id.encode(w)?; self.send_order.encode(w)?; + self.object_status.encode(w)?; Self::encode_remaining(w, self.payload.len())?; w.put_slice(&self.payload); diff --git a/moq-transport/src/data/group.rs b/moq-transport/src/data/group.rs index aee076a1..76464425 100644 --- a/moq-transport/src/data/group.rs +++ b/moq-transport/src/data/group.rs @@ -1,4 +1,5 @@ use crate::coding::{Decode, DecodeError, Encode, EncodeError}; +use crate::data::ObjectStatus; #[derive(Clone, Debug)] pub struct GroupHeader { @@ -41,14 +42,20 @@ impl Encode for GroupHeader { pub struct GroupObject { pub object_id: u64, pub size: usize, + pub status: ObjectStatus, } impl Decode for GroupObject { fn decode(r: &mut R) -> Result { let object_id = u64::decode(r)?; let size = usize::decode(r)?; + let status = ObjectStatus::decode(r)?; - Ok(Self { object_id, size }) + Ok(Self { + object_id, + size, + status, + }) } } @@ -56,6 +63,7 @@ impl Encode for GroupObject { fn encode(&self, w: &mut W) -> Result<(), EncodeError> { self.object_id.encode(w)?; self.size.encode(w)?; + self.status.encode(w)?; Ok(()) } diff --git a/moq-transport/src/serve/datagram.rs b/moq-transport/src/serve/datagram.rs index fc92c86f..3759dabf 100644 --- a/moq-transport/src/serve/datagram.rs +++ b/moq-transport/src/serve/datagram.rs @@ -1,6 +1,7 @@ use std::{fmt, sync::Arc}; use crate::watch::State; +use crate::data::ObjectStatus; use super::{ServeError, Track}; @@ -118,6 +119,7 @@ pub struct Datagram { pub group_id: u64, pub object_id: u64, pub priority: u64, + pub status: ObjectStatus, pub payload: bytes::Bytes, } @@ -127,6 +129,7 @@ impl fmt::Debug for Datagram { .field("object_id", &self.object_id) .field("group_id", &self.group_id) .field("priority", &self.priority) + .field("status", &self.status) .field("payload", &self.payload.len()) .finish() } diff --git a/moq-transport/src/serve/group.rs b/moq-transport/src/serve/group.rs index 78d861dd..52cadc99 100644 --- a/moq-transport/src/serve/group.rs +++ b/moq-transport/src/serve/group.rs @@ -11,6 +11,7 @@ use bytes::Bytes; use std::{cmp, ops::Deref, sync::Arc}; use crate::watch::State; +use crate::data::ObjectStatus; use super::{ServeError, Track}; @@ -270,6 +271,7 @@ impl GroupWriter { let (writer, reader) = GroupObject { group: self.info.clone(), object_id: self.next, + status: ObjectStatus::Object, size, } .produce(); @@ -396,6 +398,9 @@ pub struct GroupObject { // The size of the object. pub size: usize, + + // Object status + pub status: ObjectStatus, } impl GroupObject { diff --git a/moq-transport/src/serve/object.rs b/moq-transport/src/serve/object.rs index 6b7753a1..29293a5c 100644 --- a/moq-transport/src/serve/object.rs +++ b/moq-transport/src/serve/object.rs @@ -73,7 +73,7 @@ impl ObjectsWriter { group_id: object.group_id, object_id: object.object_id, priority: object.priority, - object_status: ObjectStatus::Object, + status: ObjectStatus::Object, }; let (writer, reader) = object.produce(); @@ -196,7 +196,7 @@ pub struct ObjectInfo { pub priority: u64, // The object status - pub object_status: ObjectStatus, + pub status: ObjectStatus, } impl Deref for ObjectInfo { diff --git a/moq-transport/src/serve/stream.rs b/moq-transport/src/serve/stream.rs index c843ba43..e4e9abc5 100644 --- a/moq-transport/src/serve/stream.rs +++ b/moq-transport/src/serve/stream.rs @@ -2,6 +2,7 @@ use bytes::Bytes; use std::{ops::Deref, sync::Arc}; use crate::watch::State; +use crate::data::ObjectStatus; use super::{ServeError, Track}; @@ -233,6 +234,7 @@ impl StreamGroupWriter { let (writer, reader) = StreamObject { group: self.info.clone(), object_id: self.next, + status: ObjectStatus::Object, size, } .produce(); @@ -325,6 +327,9 @@ pub struct StreamObject { // The size of the object. pub size: usize, + + // Object status + pub status: ObjectStatus, } impl StreamObject { diff --git a/moq-transport/src/session/subscribe.rs b/moq-transport/src/session/subscribe.rs index 7735a8c6..f91a449c 100644 --- a/moq-transport/src/session/subscribe.rs +++ b/moq-transport/src/session/subscribe.rs @@ -210,6 +210,7 @@ impl SubscribeRecv { group_id: datagram.group_id, object_id: datagram.object_id, priority: datagram.send_order, + status: datagram.object_status, payload: datagram.payload, })?; diff --git a/moq-transport/src/session/subscribed.rs b/moq-transport/src/session/subscribed.rs index f84a3e03..cff9aeec 100644 --- a/moq-transport/src/session/subscribed.rs +++ b/moq-transport/src/session/subscribed.rs @@ -263,6 +263,7 @@ impl Subscribed { let header = data::GroupObject { object_id: object.object_id, size: object.size, + status: object.status, }; writer.encode(&header).await?; @@ -299,7 +300,7 @@ impl Subscribed { group_id: object.group_id, object_id: object.object_id, send_order: object.priority, - object_status: object.object_status, + object_status: object.status, }; @@ -364,6 +365,7 @@ impl Subscribed { group_id: datagram.group_id, object_id: datagram.object_id, send_order: datagram.priority, + object_status: datagram.status, payload: datagram.payload, }; From 3d288dac4d0cd08f9b66052b097a4d3c03167999 Mon Sep 17 00:00:00 2001 From: Mike English Date: Tue, 18 Jun 2024 21:13:20 -0700 Subject: [PATCH 09/14] cargo fmt --- moq-transport/src/serve/datagram.rs | 2 +- moq-transport/src/serve/group.rs | 2 +- moq-transport/src/serve/stream.rs | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/moq-transport/src/serve/datagram.rs b/moq-transport/src/serve/datagram.rs index 3759dabf..9683389a 100644 --- a/moq-transport/src/serve/datagram.rs +++ b/moq-transport/src/serve/datagram.rs @@ -1,7 +1,7 @@ use std::{fmt, sync::Arc}; -use crate::watch::State; use crate::data::ObjectStatus; +use crate::watch::State; use super::{ServeError, Track}; diff --git a/moq-transport/src/serve/group.rs b/moq-transport/src/serve/group.rs index 52cadc99..d274ceba 100644 --- a/moq-transport/src/serve/group.rs +++ b/moq-transport/src/serve/group.rs @@ -10,8 +10,8 @@ use bytes::Bytes; use std::{cmp, ops::Deref, sync::Arc}; -use crate::watch::State; use crate::data::ObjectStatus; +use crate::watch::State; use super::{ServeError, Track}; diff --git a/moq-transport/src/serve/stream.rs b/moq-transport/src/serve/stream.rs index e4e9abc5..0aa210a8 100644 --- a/moq-transport/src/serve/stream.rs +++ b/moq-transport/src/serve/stream.rs @@ -1,8 +1,8 @@ use bytes::Bytes; use std::{ops::Deref, sync::Arc}; -use crate::watch::State; use crate::data::ObjectStatus; +use crate::watch::State; use super::{ServeError, Track}; From 43027d003f4022afc10b5a028bb6f368de25d7ef Mon Sep 17 00:00:00 2001 From: Mike English Date: Sun, 30 Jun 2024 04:28:43 -0400 Subject: [PATCH 10/14] Add Track Status Request Set up publisher sessions to respond to track status requests --- moq-transport/src/coding/decode.rs | 3 + moq-transport/src/message/mod.rs | 26 +++++ moq-transport/src/message/publisher.rs | 1 + moq-transport/src/message/subscriber.rs | 1 + moq-transport/src/message/track_status.rs | 12 +-- moq-transport/src/session/announce.rs | 34 +++++- moq-transport/src/session/mod.rs | 2 + moq-transport/src/session/publisher.rs | 101 ++++++++++++++---- moq-transport/src/session/subscriber.rs | 9 ++ .../src/session/track_status_requested.rs | 27 +++++ 10 files changed, 184 insertions(+), 32 deletions(-) create mode 100644 moq-transport/src/session/track_status_requested.rs diff --git a/moq-transport/src/coding/decode.rs b/moq-transport/src/coding/decode.rs index 0b68c432..998254d9 100644 --- a/moq-transport/src/coding/decode.rs +++ b/moq-transport/src/coding/decode.rs @@ -40,6 +40,9 @@ pub enum DecodeError { #[error("invalid object status")] InvalidObjectStatus, + #[error("invalid track status code")] + InvalidTrackStatusCode, + #[error("missing field")] MissingField, diff --git a/moq-transport/src/message/mod.rs b/moq-transport/src/message/mod.rs index 522003e0..92083a53 100644 --- a/moq-transport/src/message/mod.rs +++ b/moq-transport/src/message/mod.rs @@ -176,6 +176,7 @@ message_types! { /// Track Status Codes /// https://www.ietf.org/archive/id/draft-ietf-moq-transport-04.html#name-track_status +#[derive(Clone, Debug, PartialEq, Copy)] pub enum TrackStatusCode { // 0x00: The track is in progress, and subsequent fields contain the highest group and object ID for that track. InProgress = 0x00, @@ -188,3 +189,28 @@ pub enum TrackStatusCode { // 0x04: The sender is a relay that cannot obtain the current track status from upstream. Subsequent fields contain the largest group and object ID known. Relay = 0x04, } + +impl Decode for TrackStatusCode { + fn decode(r: &mut B) -> Result { + match u64::decode(r)? { + 0x00 => Ok(Self::InProgress), + 0x01 => Ok(Self::DoesNotExist), + 0x02 => Ok(Self::NotYetBegun), + 0x03 => Ok(Self::Finished), + 0x04 => Ok(Self::Relay), + _ => Err(DecodeError::InvalidTrackStatusCode), + } + } +} + +impl Encode for TrackStatusCode { + fn encode(&self, w: &mut W) -> Result<(), EncodeError> { + match self { + Self::InProgress => (0x00_u64).encode(w), + Self::DoesNotExist => (0x01_u64).encode(w), + Self::NotYetBegun => (0x02_u64).encode(w), + Self::Finished => (0x03_u64).encode(w), + Self::Relay => (0x04_u64).encode(w), + } + } +} diff --git a/moq-transport/src/message/publisher.rs b/moq-transport/src/message/publisher.rs index 24ed96e1..703f92ca 100644 --- a/moq-transport/src/message/publisher.rs +++ b/moq-transport/src/message/publisher.rs @@ -50,4 +50,5 @@ publisher_msgs! { SubscribeOk, SubscribeError, SubscribeDone, + TrackStatus, } diff --git a/moq-transport/src/message/subscriber.rs b/moq-transport/src/message/subscriber.rs index 914ba3c9..1e56195a 100644 --- a/moq-transport/src/message/subscriber.rs +++ b/moq-transport/src/message/subscriber.rs @@ -50,4 +50,5 @@ subscriber_msgs! { AnnounceCancel, Subscribe, Unsubscribe, + TrackStatusRequest, } diff --git a/moq-transport/src/message/track_status.rs b/moq-transport/src/message/track_status.rs index edda8d4f..16e0dc04 100644 --- a/moq-transport/src/message/track_status.rs +++ b/moq-transport/src/message/track_status.rs @@ -1,3 +1,4 @@ +use super::TrackStatusCode; use crate::coding::{Decode, DecodeError, Encode, EncodeError}; #[derive(Clone, Debug)] @@ -7,14 +8,7 @@ pub struct TrackStatus { /// Track Name pub track_name: String, /// Status Code - // TODO: encode/decode for values: - // 0x00: The track is in progress, and subsequent fields contain the highest group and object ID for that track. - // 0x01: The track does not exist. Subsequent fields MUST be zero, and any other value is a malformed message. - // 0x02: The track has not yet begun. Subsequent fields MUST be zero. Any other value is a malformed message. - // 0x03: The track has finished, so there is no "live edge." Subsequent fields contain the highest Group and object ID known. - // 0x04: The sender is a relay that cannot obtain the current track status from upstream. Subsequent fields contain the largest group and object ID known. - // And treat any other value as a malformed message. - pub status_code: u64, + pub status_code: TrackStatusCode, /// Last Group ID pub last_group_id: u64, /// Last Object ID @@ -26,7 +20,7 @@ impl Decode for TrackStatus { Ok(Self { track_namespace: String::decode(r)?, track_name: String::decode(r)?, - status_code: u64::decode(r)?, + status_code: TrackStatusCode::decode(r)?, last_group_id: u64::decode(r)?, last_object_id: u64::decode(r)?, }) diff --git a/moq-transport/src/session/announce.rs b/moq-transport/src/session/announce.rs index c85a91f8..4b499c89 100644 --- a/moq-transport/src/session/announce.rs +++ b/moq-transport/src/session/announce.rs @@ -3,7 +3,7 @@ use std::{collections::VecDeque, ops}; use crate::watch::State; use crate::{message, serve::ServeError}; -use super::{Publisher, Subscribed}; +use super::{Publisher, Subscribed, TrackStatusRequested}; #[derive(Debug, Clone)] pub struct AnnounceInfo { @@ -12,6 +12,7 @@ pub struct AnnounceInfo { struct AnnounceState { subscribers: VecDeque, + track_statuses_requested: VecDeque, ok: bool, closed: Result<(), ServeError>, } @@ -20,6 +21,7 @@ impl Default for AnnounceState { fn default() -> Self { Self { subscribers: Default::default(), + track_statuses_requested: Default::default(), ok: false, closed: Ok(()), } @@ -81,7 +83,7 @@ impl Announce { } } - pub async fn subscribed(&mut self) -> Result, ServeError> { + pub async fn subscribed(&self) -> Result, ServeError> { loop { { let state = self.state.lock(); @@ -99,6 +101,28 @@ impl Announce { } } + pub async fn track_status_requested(&self) -> Result, ServeError> { + loop { + { + let state = self.state.lock(); + if !state.track_statuses_requested.is_empty() { + return Ok(state.into_mut().and_then(|mut state| state.track_statuses_requested.pop_front())); + } + + state.closed.clone()?; + match state.modified() { + Some(notified) => { + notified + }, + None => { + return Ok(None) + }, + } + } + .await; + } + } + // Wait until an OK is received pub async fn ok(&self) -> Result<(), ServeError> { loop { @@ -172,4 +196,10 @@ impl AnnounceRecv { Ok(()) } + + pub fn recv_track_status_requested(&mut self, track_status_requested: TrackStatusRequested) -> Result<(), ServeError> { + let mut state = self.state.lock_mut().ok_or(ServeError::Done)?; + state.track_statuses_requested.push_back(track_status_requested); + Ok(()) + } } diff --git a/moq-transport/src/session/mod.rs b/moq-transport/src/session/mod.rs index 5e46b5a4..4caa3837 100644 --- a/moq-transport/src/session/mod.rs +++ b/moq-transport/src/session/mod.rs @@ -7,6 +7,7 @@ mod subscribe; mod subscribed; mod subscriber; mod writer; +mod track_status_requested; pub use announce::*; pub use announced::*; @@ -15,6 +16,7 @@ pub use publisher::*; pub use subscribe::*; pub use subscribed::*; pub use subscriber::*; +pub use track_status_requested::*; use reader::*; use writer::*; diff --git a/moq-transport/src/session/publisher.rs b/moq-transport/src/session/publisher.rs index 0335daf3..c7a031ee 100644 --- a/moq-transport/src/session/publisher.rs +++ b/moq-transport/src/session/publisher.rs @@ -13,7 +13,7 @@ use crate::{ use crate::watch::Queue; -use super::{Announce, AnnounceRecv, Session, SessionError, Subscribed, SubscribedRecv}; +use super::{Announce, AnnounceRecv, Session, SessionError, Subscribed, SubscribedRecv, TrackStatusRequested}; // TODO remove Clone. #[derive(Clone)] @@ -51,7 +51,7 @@ impl Publisher { /// Announce a namespace and serve tracks using the provided [serve::TracksReader]. /// The caller uses [serve::TracksWriter] for static tracks and [serve::TracksRequest] for dynamic tracks. pub async fn announce(&mut self, tracks: TracksReader) -> Result<(), SessionError> { - let mut announce = match self.announces.lock().unwrap().entry(tracks.namespace.clone()) { + let announce = match self.announces.lock().unwrap().entry(tracks.namespace.clone()) { hash_map::Entry::Occupied(_) => return Err(ServeError::Duplicate.into()), hash_map::Entry::Vacant(entry) => { let (send, recv) = Announce::new(self.clone(), tracks.namespace.clone()); @@ -60,29 +60,47 @@ impl Publisher { } }; - let mut tasks = FuturesUnordered::new(); - let mut done = None; + let mut subscribe_tasks = FuturesUnordered::new(); + let mut status_tasks = FuturesUnordered::new(); + let mut subscribe_done = false; + let mut status_done = false; loop { tokio::select! { - subscribe = announce.subscribed(), if done.is_none() => { - let subscribe = match subscribe { - Ok(Some(subscribe)) => subscribe, - Ok(None) => { done = Some(Ok(())); continue }, - Err(err) => { done = Some(Err(err)); continue }, - }; - - let tracks = tracks.clone(); - - tasks.push(async move { - let info = subscribe.info.clone(); - if let Err(err) = Self::serve_subscribe(subscribe, tracks).await { - log::warn!("failed serving subscribe: {:?}, error: {}", info, err) - } - }); + res = announce.subscribed(), if !subscribe_done => { + match res? { + Some(subscribed) => { + let tracks = tracks.clone(); + + subscribe_tasks.push(async move { + let info = subscribed.info.clone(); + if let Err(err) = Self::serve_subscribe(subscribed, tracks).await { + log::warn!("failed serving subscribe: {:?}, error: {}", info, err) + } + }); + }, + None => subscribe_done = true, + } + + }, + res = announce.track_status_requested(), if !status_done => { + match res? { + Some(status) => { + let tracks = tracks.clone(); + + status_tasks.push(async move { + let info = status.info.clone(); + if let Err(err) = Self::serve_track_status(status, tracks).await { + log::warn!("failed serving track status request: {:?}, error: {}", info, err) + } + }); + }, + None => status_done = true, + } }, - _ = tasks.next(), if !tasks.is_empty() => {}, - else => return Ok(done.unwrap()?) + Some(res) = subscribe_tasks.next() => res, + Some(res) = status_tasks.next() => res, + else => return Ok(()) } } } @@ -97,6 +115,34 @@ impl Publisher { Ok(()) } + pub async fn serve_track_status(mut track_status_request: TrackStatusRequested, mut tracks: TracksReader) -> Result<(), SessionError> { + let track = tracks.subscribe(&track_status_request.info.track.clone()).ok_or(ServeError::NotFound)?; + let response; + + if let Some((latest_group_id, latest_object_id)) = track.latest() { + response = message::TrackStatus { + track_namespace: track_status_request.info.namespace.clone(), + track_name: track_status_request.info.track.clone(), + status_code: message::TrackStatusCode::InProgress, + last_group_id: latest_group_id, + last_object_id: latest_object_id, + }; + } else { + response = message::TrackStatus { + track_namespace: track_status_request.info.namespace.clone(), + track_name: track_status_request.info.track.clone(), + status_code: message::TrackStatusCode::DoesNotExist, + last_group_id: 0, + last_object_id: 0, + }; + } + // TODO: can we know of any other statuses in this context? + + track_status_request.respond(response).await?; + + Ok(()) + } + // Returns subscriptions that do not map to an active announce. pub async fn subscribed(&mut self) -> Option { self.unknown.pop().await @@ -109,6 +155,7 @@ impl Publisher { message::Subscriber::AnnounceCancel(msg) => self.recv_announce_cancel(msg), message::Subscriber::Subscribe(msg) => self.recv_subscribe(msg), message::Subscriber::Unsubscribe(msg) => self.recv_unsubscribe(msg), + message::Subscriber::TrackStatusRequest(msg) => self.recv_track_status_request(msg), }; if let Err(err) = res { @@ -175,6 +222,18 @@ impl Publisher { Ok(()) } + fn recv_track_status_request(&mut self, msg: message::TrackStatusRequest) -> Result<(), SessionError> { + let namespace = msg.track_namespace.clone(); + + let mut announces = self.announces.lock().unwrap(); + let announce = announces.get_mut(&namespace).ok_or(SessionError::Internal)?; + + let track_status_requested = TrackStatusRequested::new(self.clone(), msg); + + announce.recv_track_status_requested(track_status_requested).map_err(Into::into) + } + + fn recv_unsubscribe(&mut self, msg: message::Unsubscribe) -> Result<(), SessionError> { if let Some(subscribed) = self.subscribed.lock().unwrap().get_mut(&msg.id) { subscribed.recv_unsubscribe()?; diff --git a/moq-transport/src/session/subscriber.rs b/moq-transport/src/session/subscriber.rs index 5e072fca..0b676db8 100644 --- a/moq-transport/src/session/subscriber.rs +++ b/moq-transport/src/session/subscriber.rs @@ -83,6 +83,7 @@ impl Subscriber { message::Publisher::SubscribeOk(msg) => self.recv_subscribe_ok(msg), message::Publisher::SubscribeError(msg) => self.recv_subscribe_error(msg), message::Publisher::SubscribeDone(msg) => self.recv_subscribe_done(msg), + message::Publisher::TrackStatus(msg) => self.recv_track_status(msg), }; if let Err(SessionError::Serve(err)) = res { @@ -144,6 +145,14 @@ impl Subscriber { Ok(()) } + fn recv_track_status(&mut self, _msg: &message::TrackStatus) -> Result<(), SessionError> { + + // TODO: Expose this somehow? + // TODO: Also add a way to sent a Track Status Request in the first place + + Ok(()) + } + fn drop_announce(&mut self, namespace: &str) { self.announced.lock().unwrap().remove(namespace); } diff --git a/moq-transport/src/session/track_status_requested.rs b/moq-transport/src/session/track_status_requested.rs new file mode 100644 index 00000000..2ef6fceb --- /dev/null +++ b/moq-transport/src/session/track_status_requested.rs @@ -0,0 +1,27 @@ +use super::{Publisher, SessionError}; +use crate::message; + +#[derive(Debug, Clone)] +pub struct TrackStatusRequestedInfo { + pub namespace: String, + pub track: String, +} + +pub struct TrackStatusRequested { + publisher: Publisher, + // msg: message::TrackStatusRequest, // TODO: See if we actually need this + pub info: TrackStatusRequestedInfo, +} + +impl TrackStatusRequested { + pub fn new(publisher: Publisher, msg: message::TrackStatusRequest) -> Self { + let namespace = msg.track_namespace.clone(); + let track = msg.track_name.clone(); + Self { publisher, info: TrackStatusRequestedInfo { namespace, track }} + } + + pub async fn respond(&mut self, status: message::TrackStatus) -> Result<(), SessionError> { + self.publisher.send_message(status); + Ok(()) + } +} From c9ca55f1d639bce28caf6a20d57d283db7b59398 Mon Sep 17 00:00:00 2001 From: Mike English Date: Thu, 25 Jul 2024 02:56:26 -0400 Subject: [PATCH 11/14] cargo fmt --- moq-transport/src/session/announce.rs | 17 ++++++------ moq-transport/src/session/mod.rs | 2 +- moq-transport/src/session/publisher.rs | 14 +++++++--- moq-transport/src/session/subscriber.rs | 1 - .../src/session/track_status_requested.rs | 27 ++++++++++--------- 5 files changed, 35 insertions(+), 26 deletions(-) diff --git a/moq-transport/src/session/announce.rs b/moq-transport/src/session/announce.rs index 4b499c89..706be529 100644 --- a/moq-transport/src/session/announce.rs +++ b/moq-transport/src/session/announce.rs @@ -106,17 +106,15 @@ impl Announce { { let state = self.state.lock(); if !state.track_statuses_requested.is_empty() { - return Ok(state.into_mut().and_then(|mut state| state.track_statuses_requested.pop_front())); + return Ok(state + .into_mut() + .and_then(|mut state| state.track_statuses_requested.pop_front())); } state.closed.clone()?; match state.modified() { - Some(notified) => { - notified - }, - None => { - return Ok(None) - }, + Some(notified) => notified, + None => return Ok(None), } } .await; @@ -197,7 +195,10 @@ impl AnnounceRecv { Ok(()) } - pub fn recv_track_status_requested(&mut self, track_status_requested: TrackStatusRequested) -> Result<(), ServeError> { + pub fn recv_track_status_requested( + &mut self, + track_status_requested: TrackStatusRequested, + ) -> Result<(), ServeError> { let mut state = self.state.lock_mut().ok_or(ServeError::Done)?; state.track_statuses_requested.push_back(track_status_requested); Ok(()) diff --git a/moq-transport/src/session/mod.rs b/moq-transport/src/session/mod.rs index 4caa3837..8c1c90d9 100644 --- a/moq-transport/src/session/mod.rs +++ b/moq-transport/src/session/mod.rs @@ -6,8 +6,8 @@ mod reader; mod subscribe; mod subscribed; mod subscriber; -mod writer; mod track_status_requested; +mod writer; pub use announce::*; pub use announced::*; diff --git a/moq-transport/src/session/publisher.rs b/moq-transport/src/session/publisher.rs index c7a031ee..643e032a 100644 --- a/moq-transport/src/session/publisher.rs +++ b/moq-transport/src/session/publisher.rs @@ -115,8 +115,13 @@ impl Publisher { Ok(()) } - pub async fn serve_track_status(mut track_status_request: TrackStatusRequested, mut tracks: TracksReader) -> Result<(), SessionError> { - let track = tracks.subscribe(&track_status_request.info.track.clone()).ok_or(ServeError::NotFound)?; + pub async fn serve_track_status( + mut track_status_request: TrackStatusRequested, + mut tracks: TracksReader, + ) -> Result<(), SessionError> { + let track = tracks + .subscribe(&track_status_request.info.track.clone()) + .ok_or(ServeError::NotFound)?; let response; if let Some((latest_group_id, latest_object_id)) = track.latest() { @@ -230,10 +235,11 @@ impl Publisher { let track_status_requested = TrackStatusRequested::new(self.clone(), msg); - announce.recv_track_status_requested(track_status_requested).map_err(Into::into) + announce + .recv_track_status_requested(track_status_requested) + .map_err(Into::into) } - fn recv_unsubscribe(&mut self, msg: message::Unsubscribe) -> Result<(), SessionError> { if let Some(subscribed) = self.subscribed.lock().unwrap().get_mut(&msg.id) { subscribed.recv_unsubscribe()?; diff --git a/moq-transport/src/session/subscriber.rs b/moq-transport/src/session/subscriber.rs index 0b676db8..2833e8aa 100644 --- a/moq-transport/src/session/subscriber.rs +++ b/moq-transport/src/session/subscriber.rs @@ -146,7 +146,6 @@ impl Subscriber { } fn recv_track_status(&mut self, _msg: &message::TrackStatus) -> Result<(), SessionError> { - // TODO: Expose this somehow? // TODO: Also add a way to sent a Track Status Request in the first place diff --git a/moq-transport/src/session/track_status_requested.rs b/moq-transport/src/session/track_status_requested.rs index 2ef6fceb..75fb1ba2 100644 --- a/moq-transport/src/session/track_status_requested.rs +++ b/moq-transport/src/session/track_status_requested.rs @@ -3,25 +3,28 @@ use crate::message; #[derive(Debug, Clone)] pub struct TrackStatusRequestedInfo { - pub namespace: String, - pub track: String, + pub namespace: String, + pub track: String, } pub struct TrackStatusRequested { publisher: Publisher, // msg: message::TrackStatusRequest, // TODO: See if we actually need this - pub info: TrackStatusRequestedInfo, + pub info: TrackStatusRequestedInfo, } impl TrackStatusRequested { - pub fn new(publisher: Publisher, msg: message::TrackStatusRequest) -> Self { - let namespace = msg.track_namespace.clone(); - let track = msg.track_name.clone(); - Self { publisher, info: TrackStatusRequestedInfo { namespace, track }} - } + pub fn new(publisher: Publisher, msg: message::TrackStatusRequest) -> Self { + let namespace = msg.track_namespace.clone(); + let track = msg.track_name.clone(); + Self { + publisher, + info: TrackStatusRequestedInfo { namespace, track }, + } + } - pub async fn respond(&mut self, status: message::TrackStatus) -> Result<(), SessionError> { - self.publisher.send_message(status); - Ok(()) - } + pub async fn respond(&mut self, status: message::TrackStatus) -> Result<(), SessionError> { + self.publisher.send_message(status); + Ok(()) + } } From 061316b919819c2f1c4ac63ad0cca8ad135de8eb Mon Sep 17 00:00:00 2001 From: Mike English Date: Thu, 25 Jul 2024 03:18:02 -0400 Subject: [PATCH 12/14] Add parsing for Subscribe Update Parsing for this mesage type, but not handling (yet) Uses code from https://github.com/moq-wg/moq-transport/pull/457 since it was missed in draft-04. --- moq-transport/src/message/mod.rs | 3 + moq-transport/src/message/subscribe_update.rs | 116 ++++++++++++++++++ moq-transport/src/message/subscriber.rs | 1 + moq-transport/src/session/publisher.rs | 6 + 4 files changed, 126 insertions(+) create mode 100644 moq-transport/src/message/subscribe_update.rs diff --git a/moq-transport/src/message/mod.rs b/moq-transport/src/message/mod.rs index 92083a53..54f8bb2c 100644 --- a/moq-transport/src/message/mod.rs +++ b/moq-transport/src/message/mod.rs @@ -42,6 +42,7 @@ mod subscribe; mod subscribe_done; mod subscribe_error; mod subscribe_ok; +mod subscribe_update; mod subscriber; mod track_status; mod track_status_request; @@ -59,6 +60,7 @@ pub use subscribe::*; pub use subscribe_done::*; pub use subscribe_error::*; pub use subscribe_ok::*; +pub use subscribe_update::*; pub use subscriber::*; pub use track_status::*; pub use track_status_request::*; @@ -147,6 +149,7 @@ message_types! { // SetupServer = 0x41 // SUBSCRIBE family, sent by subscriber + SubscribeUpdate = 0x2, Subscribe = 0x3, Unsubscribe = 0xa, diff --git a/moq-transport/src/message/subscribe_update.rs b/moq-transport/src/message/subscribe_update.rs new file mode 100644 index 00000000..37ac7814 --- /dev/null +++ b/moq-transport/src/message/subscribe_update.rs @@ -0,0 +1,116 @@ +use crate::coding::{Decode, DecodeError, Encode, EncodeError, Params}; +use crate::message::subscribe::{SubscribeLocation, SubscribePair}; +use crate::message::FilterType; + +/// Sent by the subscriber to request all future objects for the given track. +/// +/// Objects will use the provided ID instead of the full track name, to save bytes. +#[derive(Clone, Debug)] +pub struct SubscribeUpdate { + /// The subscription ID + pub id: u64, + + /// Track properties + pub track_alias: u64, // This alias is useless but part of the spec + pub track_namespace: String, + pub track_name: String, + + /// Filter type + pub filter_type: FilterType, + + /// The start/end group/object. (TODO: Make optional) + pub start: Option, // TODO: Make optional + pub end: Option, // TODO: Make optional + + /// Optional parameters + pub params: Params, +} + +impl Decode for SubscribeUpdate { + fn decode(r: &mut R) -> Result { + let id = u64::decode(r)?; + let track_alias = u64::decode(r)?; + let track_namespace = String::decode(r)?; + let track_name = String::decode(r)?; + + let filter_type = FilterType::decode(r)?; + + let start: Option; + let end: Option; + match filter_type { + FilterType::AbsoluteStart => { + if r.remaining() < 2 { + return Err(DecodeError::MissingField); + } + start = Some(SubscribePair::decode(r)?); + end = None; + } + FilterType::AbsoluteRange => { + if r.remaining() < 4 { + return Err(DecodeError::MissingField); + } + start = Some(SubscribePair::decode(r)?); + end = Some(SubscribePair::decode(r)?); + } + _ => { + start = None; + end = None; + } + } + + if let Some(s) = &start { + // You can't have a start object without a start group. + if s.group == SubscribeLocation::None && s.object != SubscribeLocation::None { + return Err(DecodeError::InvalidSubscribeLocation); + } + } + if let Some(e) = &end { + // You can't have an end object without an end group. + if e.group == SubscribeLocation::None && e.object != SubscribeLocation::None { + return Err(DecodeError::InvalidSubscribeLocation); + } + } + + // NOTE: There's some more location restrictions in the draft, but they're enforced at a higher level. + + let params = Params::decode(r)?; + + Ok(Self { + id, + track_alias, + track_namespace, + track_name, + filter_type, + start, + end, + params, + }) + } +} + +impl Encode for SubscribeUpdate { + fn encode(&self, w: &mut W) -> Result<(), EncodeError> { + self.id.encode(w)?; + self.track_alias.encode(w)?; + self.track_namespace.encode(w)?; + self.track_name.encode(w)?; + + self.filter_type.encode(w)?; + + if self.filter_type == FilterType::AbsoluteStart || self.filter_type == FilterType::AbsoluteRange { + if self.start.is_none() || self.end.is_none() { + return Err(EncodeError::MissingField); + } + if let Some(start) = &self.start { + start.encode(w)?; + } + if let Some(end) = &self.end { + end.encode(w)?; + } + } + + self.params.encode(w)?; + + Ok(()) + } +} diff --git a/moq-transport/src/message/subscriber.rs b/moq-transport/src/message/subscriber.rs index 1e56195a..5b699aa2 100644 --- a/moq-transport/src/message/subscriber.rs +++ b/moq-transport/src/message/subscriber.rs @@ -50,5 +50,6 @@ subscriber_msgs! { AnnounceCancel, Subscribe, Unsubscribe, + SubscribeUpdate, TrackStatusRequest, } diff --git a/moq-transport/src/session/publisher.rs b/moq-transport/src/session/publisher.rs index 643e032a..20da5532 100644 --- a/moq-transport/src/session/publisher.rs +++ b/moq-transport/src/session/publisher.rs @@ -160,6 +160,7 @@ impl Publisher { message::Subscriber::AnnounceCancel(msg) => self.recv_announce_cancel(msg), message::Subscriber::Subscribe(msg) => self.recv_subscribe(msg), message::Subscriber::Unsubscribe(msg) => self.recv_unsubscribe(msg), + message::Subscriber::SubscribeUpdate(msg) => self.recv_subscribe_update(msg), message::Subscriber::TrackStatusRequest(msg) => self.recv_track_status_request(msg), }; @@ -227,6 +228,11 @@ impl Publisher { Ok(()) } + fn recv_subscribe_update(&mut self, _msg: message::SubscribeUpdate) -> Result<(), SessionError> { + // TODO: Implement updating subscriptions. + Err(SessionError::Internal) + } + fn recv_track_status_request(&mut self, msg: message::TrackStatusRequest) -> Result<(), SessionError> { let namespace = msg.track_namespace.clone(); From 09747a591f8a5dbf3ab07b3c095c7a918e036d21 Mon Sep 17 00:00:00 2001 From: Mike English Date: Tue, 6 Aug 2024 01:38:04 -0400 Subject: [PATCH 13/14] Bump versions cargo semver-checks flagged moq-transport, but also bump binaries for wire image compatibility --- Cargo.lock | 14 +++++++------- moq-clock/Cargo.toml | 6 +++--- moq-dir/Cargo.toml | 6 +++--- moq-native/Cargo.toml | 4 ++-- moq-pub/Cargo.toml | 6 +++--- moq-relay/Cargo.toml | 6 +++--- moq-sub/Cargo.toml | 6 +++--- moq-transport/Cargo.toml | 2 +- 8 files changed, 25 insertions(+), 25 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 186d7e08..e390f48e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1083,7 +1083,7 @@ dependencies = [ [[package]] name = "moq-clock" -version = "0.4.2" +version = "0.5.0" dependencies = [ "anyhow", "chrono", @@ -1100,7 +1100,7 @@ dependencies = [ [[package]] name = "moq-dir" -version = "0.1.2" +version = "0.2.0" dependencies = [ "anyhow", "bytes", @@ -1118,7 +1118,7 @@ dependencies = [ [[package]] name = "moq-native" -version = "0.3.0" +version = "0.4.0" dependencies = [ "anyhow", "clap", @@ -1140,7 +1140,7 @@ dependencies = [ [[package]] name = "moq-pub" -version = "0.6.1" +version = "0.7.0" dependencies = [ "anyhow", "bytes", @@ -1161,7 +1161,7 @@ dependencies = [ [[package]] name = "moq-relay" -version = "0.5.1" +version = "0.6.0" dependencies = [ "anyhow", "axum", @@ -1183,7 +1183,7 @@ dependencies = [ [[package]] name = "moq-sub" -version = "0.1.1" +version = "0.2.0" dependencies = [ "anyhow", "clap", @@ -1200,7 +1200,7 @@ dependencies = [ [[package]] name = "moq-transport" -version = "0.5.3" +version = "0.6.0" dependencies = [ "bytes", "futures", diff --git a/moq-clock/Cargo.toml b/moq-clock/Cargo.toml index 3c75b6fd..4734b67a 100644 --- a/moq-clock/Cargo.toml +++ b/moq-clock/Cargo.toml @@ -5,7 +5,7 @@ authors = ["Luke Curley"] repository = "https://github.com/kixelated/moq-rs" license = "MIT OR Apache-2.0" -version = "0.4.2" +version = "0.5.0" edition = "2021" keywords = ["quic", "http3", "webtransport", "media", "live"] @@ -14,8 +14,8 @@ categories = ["multimedia", "network-programming", "web-programming"] # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -moq-native = { path = "../moq-native", version = "0.3" } -moq-transport = { path = "../moq-transport", version = "0.5" } +moq-native = { path = "../moq-native", version = "0.4" } +moq-transport = { path = "../moq-transport", version = "0.6" } # QUIC url = "2" diff --git a/moq-dir/Cargo.toml b/moq-dir/Cargo.toml index 2f4f36a3..32ac44dd 100644 --- a/moq-dir/Cargo.toml +++ b/moq-dir/Cargo.toml @@ -5,15 +5,15 @@ authors = ["Luke Curley"] repository = "https://github.com/kixelated/moq-rs" license = "MIT OR Apache-2.0" -version = "0.1.2" +version = "0.2.0" edition = "2021" keywords = ["quic", "http3", "webtransport", "media", "live"] categories = ["multimedia", "network-programming", "web-programming"] [dependencies] -moq-native = { path = "../moq-native", version = "0.3" } -moq-transport = { path = "../moq-transport", version = "0.5" } +moq-native = { path = "../moq-native", version = "0.4" } +moq-transport = { path = "../moq-transport", version = "0.6" } # QUIC web-transport = { workspace = true } diff --git a/moq-native/Cargo.toml b/moq-native/Cargo.toml index 9bf126bb..e88db7d4 100644 --- a/moq-native/Cargo.toml +++ b/moq-native/Cargo.toml @@ -5,14 +5,14 @@ authors = ["Luke Curley"] repository = "https://github.com/kixelated/moq-rs" license = "MIT OR Apache-2.0" -version = "0.3.0" +version = "0.4.0" edition = "2021" keywords = ["quic", "http3", "webtransport", "media", "live"] categories = ["multimedia", "network-programming", "web-programming"] [dependencies] -moq-transport = { path = "../moq-transport", version = "0.5" } +moq-transport = { path = "../moq-transport", version = "0.6" } web-transport = { workspace = true } web-transport-quinn = "0.3" diff --git a/moq-pub/Cargo.toml b/moq-pub/Cargo.toml index bb307c93..43a30bc3 100644 --- a/moq-pub/Cargo.toml +++ b/moq-pub/Cargo.toml @@ -5,7 +5,7 @@ authors = ["Mike English", "Luke Curley"] repository = "https://github.com/kixelated/moq-rs" license = "MIT OR Apache-2.0" -version = "0.6.1" +version = "0.7.0" edition = "2021" keywords = ["quic", "http3", "webtransport", "media", "live"] @@ -14,8 +14,8 @@ categories = ["multimedia", "network-programming", "web-programming"] # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -moq-native = { path = "../moq-native", version = "0.3" } -moq-transport = { path = "../moq-transport", version = "0.5" } +moq-native = { path = "../moq-native", version = "0.4" } +moq-transport = { path = "../moq-transport", version = "0.6" } moq-catalog = { path = "../moq-catalog", version = "0.2" } url = "2" diff --git a/moq-relay/Cargo.toml b/moq-relay/Cargo.toml index 971a9c56..3bca6230 100644 --- a/moq-relay/Cargo.toml +++ b/moq-relay/Cargo.toml @@ -5,15 +5,15 @@ authors = ["Luke Curley"] repository = "https://github.com/kixelated/moq-rs" license = "MIT OR Apache-2.0" -version = "0.5.1" +version = "0.6.0" edition = "2021" keywords = ["quic", "http3", "webtransport", "media", "live"] categories = ["multimedia", "network-programming", "web-programming"] [dependencies] -moq-transport = { path = "../moq-transport", version = "0.5" } -moq-native = { path = "../moq-native", version = "0.3" } +moq-transport = { path = "../moq-transport", version = "0.6" } +moq-native = { path = "../moq-native", version = "0.4" } moq-api = { path = "../moq-api", version = "0.2" } # QUIC diff --git a/moq-sub/Cargo.toml b/moq-sub/Cargo.toml index 62b1ca39..8d68f24f 100644 --- a/moq-sub/Cargo.toml +++ b/moq-sub/Cargo.toml @@ -5,7 +5,7 @@ authors = [] repository = "https://github.com/kixelated/moq-rs" license = "MIT OR Apache-2.0" -version = "0.1.1" +version = "0.2.0" edition = "2021" keywords = ["quic", "http3", "webtransport", "media", "live"] @@ -14,8 +14,8 @@ categories = ["multimedia", "network-programming", "web-programming"] # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -moq-transport = { path = "../moq-transport", version = "0.5" } -moq-native = { path = "../moq-native", version = "0.3" } +moq-transport = { path = "../moq-transport", version = "0.6" } +moq-native = { path = "../moq-native", version = "0.4" } url = "2" # Async stuff diff --git a/moq-transport/Cargo.toml b/moq-transport/Cargo.toml index 4eae1329..583fadb7 100644 --- a/moq-transport/Cargo.toml +++ b/moq-transport/Cargo.toml @@ -5,7 +5,7 @@ authors = ["Luke Curley"] repository = "https://github.com/kixelated/moq-rs" license = "MIT OR Apache-2.0" -version = "0.5.3" +version = "0.6.0" edition = "2021" keywords = ["quic", "http3", "webtransport", "media", "live"] From f8a4ecb3afc62c1e77c81e63625e0619b4ba411d Mon Sep 17 00:00:00 2001 From: Mike English Date: Wed, 7 Aug 2024 00:30:27 -0400 Subject: [PATCH 14/14] Fix optional object status for track/group modes --- moq-transport/src/data/group.rs | 16 ++++++++++++++-- moq-transport/src/data/track.rs | 17 +++++++++++++++++ moq-transport/src/session/subscribed.rs | 1 + 3 files changed, 32 insertions(+), 2 deletions(-) diff --git a/moq-transport/src/data/group.rs b/moq-transport/src/data/group.rs index 76464425..417a3453 100644 --- a/moq-transport/src/data/group.rs +++ b/moq-transport/src/data/group.rs @@ -49,7 +49,14 @@ impl Decode for GroupObject { fn decode(r: &mut R) -> Result { let object_id = u64::decode(r)?; let size = usize::decode(r)?; - let status = ObjectStatus::decode(r)?; + + // If the size is 0, then the status is sent explicitly. + // Otherwise, the status is assumed to be 0x0 (Object). + let status = if size == 0 { + ObjectStatus::decode(r)? + } else { + ObjectStatus::Object + }; Ok(Self { object_id, @@ -63,7 +70,12 @@ impl Encode for GroupObject { fn encode(&self, w: &mut W) -> Result<(), EncodeError> { self.object_id.encode(w)?; self.size.encode(w)?; - self.status.encode(w)?; + + // If the size is 0, then the status is sent explicitly. + // Otherwise, the status is assumed to be 0x0 (Object). + if self.size == 0 { + self.status.encode(w)?; + } Ok(()) } diff --git a/moq-transport/src/data/track.rs b/moq-transport/src/data/track.rs index 70ce3a06..ff0aa3c7 100644 --- a/moq-transport/src/data/track.rs +++ b/moq-transport/src/data/track.rs @@ -1,4 +1,5 @@ use crate::coding::{Decode, DecodeError, Encode, EncodeError}; +use crate::data::ObjectStatus; #[derive(Clone, Debug)] pub struct TrackHeader { @@ -37,6 +38,7 @@ pub struct TrackObject { pub group_id: u64, pub object_id: u64, pub size: usize, + pub status: ObjectStatus, } impl Decode for TrackObject { @@ -46,10 +48,19 @@ impl Decode for TrackObject { let object_id = u64::decode(r)?; let size = usize::decode(r)?; + // If the size is 0, then the status is sent explicitly. + // Otherwise, the status is assumed to be 0x0 (Object). + let status = if size == 0 { + ObjectStatus::decode(r)? + } else { + ObjectStatus::Object + }; + Ok(Self { group_id, object_id, size, + status, }) } } @@ -60,6 +71,12 @@ impl Encode for TrackObject { self.object_id.encode(w)?; self.size.encode(w)?; + // If the size is 0, then the status is sent explicitly. + // Otherwise, the status is assumed to be 0x0 (Object). + if self.size == 0 { + self.status.encode(w)?; + } + Ok(()) } } diff --git a/moq-transport/src/session/subscribed.rs b/moq-transport/src/session/subscribed.rs index cff9aeec..1b40efa9 100644 --- a/moq-transport/src/session/subscribed.rs +++ b/moq-transport/src/session/subscribed.rs @@ -183,6 +183,7 @@ impl Subscribed { group_id: object.group_id, object_id: object.object_id, size: object.size, + status: object.status, }; self.state