Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(go,rs): nested async support #346

Draft
wants to merge 7 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 4 additions & 4 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -131,14 +131,14 @@ wasmtime = { version = "25", default-features = false }
wasmtime-wasi = { version = "25", default-features = false }
wit-bindgen = { version = "0.32", default-features = false }
wit-bindgen-core = { version = "0.32", default-features = false }
wit-bindgen-wrpc = { version = "0.6.5", default-features = false, path = "./crates/wit-bindgen" }
wit-bindgen-wrpc = { version = "0.7", default-features = false, path = "./crates/wit-bindgen" }
wit-bindgen-wrpc-go = { version = "0.9", default-features = false, path = "./crates/wit-bindgen-go" }
wit-bindgen-wrpc-rust = { version = "0.6.5", default-features = false, path = "./crates/wit-bindgen-rust" }
wit-bindgen-wrpc-rust-macro = { version = "0.6.5", default-features = false, path = "./crates/wit-bindgen-rust-macro" }
wit-bindgen-wrpc-rust = { version = "0.7", default-features = false, path = "./crates/wit-bindgen-rust" }
wit-bindgen-wrpc-rust-macro = { version = "0.7", default-features = false, path = "./crates/wit-bindgen-rust-macro" }
wit-component = { version = "0.217", default-features = false }
wit-parser = { version = "0.217", default-features = false }
wrpc-cli = { version = "0.3", path = "./crates/cli", default-features = false }
wrpc-introspect = { version = "0.3", default-features = false, path = "./crates/introspect" }
wrpc-introspect = { version = "0.3.1", default-features = false, path = "./crates/introspect" }
wrpc-runtime-wasmtime = { version = "0.22", path = "./crates/runtime-wasmtime", default-features = false }
wrpc-transport = { version = "0.26.8", path = "./crates/transport", default-features = false }
wrpc-transport-nats = { version = "0.23.2", path = "./crates/transport-nats", default-features = false }
Expand Down
2 changes: 1 addition & 1 deletion crates/introspect/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "wrpc-introspect"
version = "0.3.0"
version = "0.3.1"
description = "Component type introspection for wRPC"

authors.workspace = true
Expand Down
14 changes: 4 additions & 10 deletions crates/introspect/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -164,18 +164,12 @@ pub fn async_paths_tyid(resolve: &Resolve, id: TypeId) -> (BTreeSet<VecDeque<Opt
(paths, false)
}
TypeDefKind::Future(ty) => {
let mut paths = BTreeSet::default();
if let Some(ty) = ty {
let (nested, fut) = async_paths_ty(resolve, ty);
for mut path in nested {
path.push_front(Some(0));
paths.insert(path);
}
if fut {
paths.insert(vec![Some(0)].into());
}
let (paths, _) = async_paths_ty(resolve, ty);
(paths, true)
} else {
(BTreeSet::default(), true)
}
(paths, true)
}
TypeDefKind::Stream(Stream { element, .. }) => {
let mut paths = BTreeSet::new();
Expand Down
12 changes: 11 additions & 1 deletion crates/transport-nats/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -871,20 +871,30 @@ impl wrpc_transport::Invoke for Client {
let paths = paths.as_ref();
let (result_rx, handshake_rx, nested) = try_join!(
async {
trace!(
subject = result_rx.as_str(),
"subscribing on result subject"
);
self.nats
.subscribe(result_rx.clone())
.await
.context("failed to subscribe on result subject")
},
async {
trace!(subject = rx.as_str(), "subscribing on handshake subject");
self.nats
.subscribe(rx.clone())
.await
.context("failed to subscribe on handshake subject")
},
try_join_all(paths.iter().map(|path| async {
let rx = Subject::from(subscribe_path(&result_rx, path.as_ref()));
trace!(
subject = rx.as_str(),
"subscribing on nested result subject"
);
self.nats
.subscribe(Subject::from(subscribe_path(&result_rx, path.as_ref())))
.subscribe(rx)
.await
.context("failed to subscribe on nested result subject")
}))
Expand Down
47 changes: 33 additions & 14 deletions crates/transport/src/value.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use tokio::{select, try_join};
use tokio_stream::wrappers::ReceiverStream;
use tokio_util::codec::{Encoder as _, FramedRead};
use tokio_util::io::StreamReader;
use tracing::{instrument, trace};
use tracing::{instrument, trace, Instrument as _};
use wasm_tokio::cm::{
BoolCodec, F32Codec, F64Codec, OptionDecoder, OptionEncoder, PrimValEncoder, ResultDecoder,
ResultEncoder, S16Codec, S32Codec, S64Codec, S8Codec, TupleDecoder, TupleEncoder, U16Codec,
Expand Down Expand Up @@ -1494,7 +1494,7 @@ where
// TODO: Check if future is resolved
dst.reserve(1);
dst.put_u8(0x00);
self.deferred = Some(Box::new(|w, mut path| {
self.deferred = Some(Box::new(|w, path| {
Box::pin(async move {
let mut root = w.index(&path).map_err(std::io::Error::other)?;
let item = item.await;
Expand All @@ -1503,7 +1503,6 @@ where
enc.encode(item, &mut buf)?;
try_join!(root.write_all(&buf), async {
if let Some(f) = enc.take_deferred() {
path.push(0);
f(w, path).await
} else {
Ok(())
Expand Down Expand Up @@ -1578,9 +1577,14 @@ where
// future is pending
let (tx, rx) = oneshot::channel();
let dec = mem::take(&mut self.dec).into_inner();
self.deferred = Some(Box::new(|r, mut path| {
self.deferred = Some(Box::new(|r, path| {
Box::pin(async move {
let indexed = r.index(&path).map_err(std::io::Error::other)?;
let indexed = if path.is_empty() {
Arc::try_unwrap(r)
.map_err(|_| std::io::Error::other("cannot decode a pending future element from a non-unique reference when path is empty"))?
} else {
r.index(&path).map_err(std::io::Error::other)?
};
let mut dec = FramedRead::new(indexed, dec);
trace!("receiving future element");
let Some(item) = dec.next().await else {
Expand All @@ -1598,15 +1602,15 @@ where
},
async {
if let Some(rx) = dec.decoder_mut().take_deferred() {
path.push(0);
rx(r, path).await
let indexed = dec.into_inner();
rx(Arc::new(indexed), Vec::default()).await
} else {
Ok(())
}
}
)?;
Ok(())
})
}.in_current_span())
}));
return Ok(Some(Box::pin(async {
rx.await.expect("future I/O dropped")
Expand Down Expand Up @@ -1708,7 +1712,7 @@ where
}
}
}
})
}.in_current_span())
}));
Ok(())
}
Expand Down Expand Up @@ -1959,7 +1963,12 @@ where
std::io::Error: From<C::Error>,
{
let dec = ListDecoder::new(dec);
let indexed = r.index(&path).map_err(std::io::Error::other)?;
let indexed = if path.is_empty() {
Arc::try_unwrap(r)
.map_err(|_| std::io::Error::other("cannot decode a pending stream chunk from a non-unique reference when path is empty"))?
} else {
r.index(&path).map_err(std::io::Error::other)?
};
let mut framed = FramedRead::new(indexed, dec);
let mut tasks = JoinSet::new();
let mut i = 0_usize;
Expand Down Expand Up @@ -1988,9 +1997,9 @@ where
for (i, deferred) in zip(i.., mem::take(&mut framed.decoder_mut().deferred)) {
if let Some(deferred) = deferred {
trace!(i, "handling async read");
path.push(i);
let indexed = r.index(&path).map_err(std::io::Error::other)?;
let indexed = framed.get_ref().index(&[i]).map_err(std::io::Error::other)?;
trace!("spawning receive task");
path.push(i);
tasks.spawn(deferred(indexed.into(), path.clone()));
path.pop();
}
Expand Down Expand Up @@ -2090,7 +2099,12 @@ where
let dec = mem::take(&mut self.dec);
self.deferred = Some(Box::new(|r, path| {
Box::pin(async move {
let indexed = r.index(&path).map_err(std::io::Error::other)?;
let indexed = if path.is_empty() {
Arc::try_unwrap(r)
.map_err(|_| std::io::Error::other("cannot decode a pending stream chunk from a non-unique reference when path is empty"))?
} else {
r.index(&path).map_err(std::io::Error::other)?
};
let mut framed = FramedRead::new(indexed, dec);
trace!("receiving stream chunk");
while let Some(chunk) = framed.next().await {
Expand Down Expand Up @@ -2163,7 +2177,12 @@ where
let dec = mem::take(&mut self.dec);
self.deferred = Some(Box::new(|r, path| {
Box::pin(async move {
let indexed = r.index(&path).map_err(std::io::Error::other)?;
let indexed = if path.is_empty() {
Arc::try_unwrap(r)
.map_err(|_| std::io::Error::other("cannot decode a pending stream chunk from a non-unique reference when path is empty"))?
} else {
r.index(&path).map_err(std::io::Error::other)?
};
let mut framed = FramedRead::new(indexed, dec);
trace!("receiving stream chunk");
while let Some(chunk) = framed.next().await {
Expand Down
29 changes: 17 additions & 12 deletions crates/wit-bindgen-go/src/interface.rs
Original file line number Diff line number Diff line change
Expand Up @@ -949,7 +949,7 @@ impl InterfaceGenerator<'_> {
case 0:
r, err := r.Index(path...)
if err != nil {{
return nil, {fmt}.Errorf("failed to index reader: %w", err)
return nil, {fmt}.Errorf("failed to index nested byte list future reader: %w", err)
}}
return {wrpc}.NewByteStreamReader(r), nil
case 1:
Expand Down Expand Up @@ -999,7 +999,7 @@ impl InterfaceGenerator<'_> {
case 0:
r, err := r.Index(path...)
if err != nil {{
return nil, {fmt}.Errorf("failed to index reader: %w", err)
return nil, {fmt}.Errorf("failed to index nested future reader: %w", err)
}}
return {wrpc}.NewDecodeReceiver(r, func(r {wrpc}.IndexReader) ("#
);
Expand Down Expand Up @@ -1068,7 +1068,7 @@ impl InterfaceGenerator<'_> {
case 0:
r, err := r.Index(path...)
if err != nil {{
return nil, {fmt}.Errorf("failed to index reader: %w", err)
return nil, {fmt}.Errorf("failed to index nested byte stream reader: %w", err)
}}
return {wrpc}.NewByteStreamReader(r), nil
case 1:
Expand Down Expand Up @@ -1121,7 +1121,7 @@ impl InterfaceGenerator<'_> {
case 0:
r, err := r.Index(path...)
if err != nil {{
return nil, {fmt}.Errorf("failed to index reader: %w", err)
return nil, {fmt}.Errorf("failed to index nested stream reader: %w", err)
}}
var total uint32
return {wrpc}.NewDecodeReceiver(r, func(r {wrpc}.IndexReader) ("#
Expand Down Expand Up @@ -1681,7 +1681,7 @@ impl InterfaceGenerator<'_> {
wg.Add(1)
w, err := w.Index(index)
if err != nil {{
return {fmt}.Errorf("failed to index writer: %w", err)
return {fmt}.Errorf("failed to index nested list writer: %w", err)
}}
write := write
go func() {{
Expand Down Expand Up @@ -1855,7 +1855,7 @@ impl InterfaceGenerator<'_> {
return func(w {wrpc}.IndexWriter) error {{
w, err := w.Index(0)
if err != nil {{
return {fmt}.Errorf("failed to index writer: %w", err)
return {fmt}.Errorf("failed to index nested tuple writer: %w", err)
}}
return write(w)
}}, nil
Expand Down Expand Up @@ -1908,7 +1908,7 @@ impl InterfaceGenerator<'_> {
wg.Add(1)
w, err := w.Index(index)
if err != nil {{
return {fmt}.Errorf("failed to index writer: %w", err)
return {fmt}.Errorf("failed to index nested tuple writer: %w", err)
}}
write := write
go func() {{
Expand Down Expand Up @@ -1952,6 +1952,7 @@ impl InterfaceGenerator<'_> {
defer func() {{
body, ok := v.({io}.Closer)
if ok {{
{slog}.Debug("closing byte list future writer")
if cErr := body.Close(); cErr != nil {{
if err == nil {{
err = {fmt}.Errorf("failed to close pending byte list future: %w", cErr)
Expand All @@ -1973,6 +1974,7 @@ impl InterfaceGenerator<'_> {
if err := {wrpc}.WriteUint32(uint32(n), w); err != nil {{
return {fmt}.Errorf("failed to write pending byte list future length of %d: %w", n, err)
}}
{slog}.Debug("writing pending byte list future contents", "buf", chunk[:n])
_, err = w.Write(chunk[:n])
if err != nil {{
return {fmt}.Errorf("failed to write pending byte list future contents: %w", err)
Expand All @@ -1999,6 +2001,7 @@ impl InterfaceGenerator<'_> {
defer func() {{
body, ok := v.({io}.Closer)
if ok {{
{slog}.Debug("closing future writer")
if cErr := body.Close(); cErr != nil {{
if err == nil {{
err = {fmt}.Errorf("failed to close pending future: %w", cErr)
Expand Down Expand Up @@ -2054,6 +2057,7 @@ impl InterfaceGenerator<'_> {
defer func() {{
body, ok := v.({io}.Closer)
if ok {{
{slog}.Debug("closing byte list stream writer")
if cErr := body.Close(); cErr != nil {{
if err == nil {{
err = {fmt}.Errorf("failed to close pending byte stream: %w", cErr)
Expand Down Expand Up @@ -2118,6 +2122,7 @@ impl InterfaceGenerator<'_> {
defer func() {{
body, ok := v.({io}.Closer)
if ok {{
{slog}.Debug("closing stream writer")
if cErr := body.Close(); cErr != nil {{
if err == nil {{
err = {fmt}.Errorf("failed to close pending stream: %w", cErr)
Expand Down Expand Up @@ -2166,7 +2171,7 @@ impl InterfaceGenerator<'_> {
wg.Add(1)
w, err := w.Index(total)
if err != nil {{
return {fmt}.Errorf("failed to index writer: %w", err)
return {fmt}.Errorf("failed to index nested stream writer: %w", err)
}}
go func() {{
defer wg.Done()
Expand Down Expand Up @@ -2599,7 +2604,7 @@ func ServeInterface(s {wrpc}.Server, h Handler) (stop func() error, err error) {
for index, write := range writes {{
w, err := w.Index(index)
if err != nil {{
{slog}.ErrorContext(ctx, "failed to index writer", "index", index, "instance", "{instance}", "name", "{name}", "err", err)
{slog}.ErrorContext(ctx, "failed to index result writer", "index", index, "instance", "{instance}", "name", "{name}", "err", err)
return
}}
index := index
Expand Down Expand Up @@ -2820,7 +2825,7 @@ func ServeInterface(s {wrpc}.Server, h Handler) (stop func() error, err error) {
if cErr := w__.Close(); cErr != nil {{
{slog}.DebugContext(ctx__, "failed to close outgoing stream", "instance", "{instance}", "name", "{}", "err", cErr)
}}
err__ = {fmt}.Errorf("failed to index writer at index `%v`: %w", index, err)
err__ = {fmt}.Errorf("failed to index param writer at index `%v`: %w", index, err)
return
}}
write := write
Expand Down Expand Up @@ -3363,7 +3368,7 @@ func (v *{name}) WriteToIndex(w {wrpc}.ByteWriter) (func({wrpc}.IndexWriter) err
wg.Add(1)
w, err := w.Index(index)
if err != nil {{
return {fmt}.Errorf("failed to index writer: %w", err)
return {fmt}.Errorf("failed to index nested record writer: %w", err)
}}
write := write
go func() {{
Expand Down Expand Up @@ -3678,7 +3683,7 @@ func (v *{name}) WriteToIndex(w {wrpc}.ByteWriter) (func({wrpc}.IndexWriter) err
return func(w {wrpc}.IndexWriter) error {{
w, err := w.Index({i})
if err != nil {{
return {fmt}.Errorf("failed to index writer: %w", err)
return {fmt}.Errorf("failed to index nested variant writer: %w", err)
}}
return write(w)
}}, nil
Expand Down
2 changes: 1 addition & 1 deletion crates/wit-bindgen-rust-macro/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "wit-bindgen-wrpc-rust-macro"
version = "0.6.5"
version = "0.7.0"
description = """
Procedural macro paired with the `wit-bindgen-wrpc` crate.
"""
Expand Down
Loading
Loading