diff --git a/src/doc_utils.rs b/src/doc_utils.rs index 6d63a63..540c5b0 100644 --- a/src/doc_utils.rs +++ b/src/doc_utils.rs @@ -1,15 +1,15 @@ -use std::{future::Future, pin::Pin}; +use std::future::Future; use crate::{next::Message, Error}; pub struct Conn; impl crate::next::Connection for Conn { - fn receive(&mut self) -> Pin> + Send + '_>> { + async fn receive(&mut self) -> Option { unimplemented!() } - fn send(&mut self, _: Message) -> Pin> + Send + '_>> { + async fn send(&mut self, _: Message) -> Result<(), Error> { unimplemented!() } } diff --git a/src/native.rs b/src/native.rs index cc7abd2..ee39aa8 100644 --- a/src/native.rs +++ b/src/native.rs @@ -1,6 +1,4 @@ -use std::{future::Future, pin::Pin}; - -use futures::{future::BoxFuture, Sink, SinkExt}; +use futures::{Sink, SinkExt}; use futures_lite::{Stream, StreamExt}; use tungstenite::{self, protocol::CloseFrame}; @@ -16,55 +14,47 @@ where + Unpin, >::Error: std::fmt::Display, { - fn receive(&mut self) -> Pin> + Send + '_>> { - Box::pin(async move { - loop { - match self.next().await? { - Ok(tungstenite::Message::Text(text)) => { - return Some(crate::next::Message::Text(text)) - } - Ok(tungstenite::Message::Ping(_)) => return Some(crate::next::Message::Ping), - Ok(tungstenite::Message::Pong(_)) => return Some(crate::next::Message::Pong), - Ok(tungstenite::Message::Close(frame)) => { - return Some(crate::next::Message::Close { - code: frame.as_ref().map(|frame| frame.code.into()), - reason: frame.map(|frame| frame.reason.to_string()), - }) - } - Ok(tungstenite::Message::Frame(_) | tungstenite::Message::Binary(_)) => { - continue - } - Err(error) => { - #[allow(unused)] - let error = error; - crate::logging::warning!("error receiving message: {error:?}"); - return None; - } + async fn receive(&mut self) -> Option { + loop { + match self.next().await? { + Ok(tungstenite::Message::Text(text)) => { + return Some(crate::next::Message::Text(text)) + } + Ok(tungstenite::Message::Ping(_)) => return Some(crate::next::Message::Ping), + Ok(tungstenite::Message::Pong(_)) => return Some(crate::next::Message::Pong), + Ok(tungstenite::Message::Close(frame)) => { + return Some(crate::next::Message::Close { + code: frame.as_ref().map(|frame| frame.code.into()), + reason: frame.map(|frame| frame.reason.to_string()), + }) + } + Ok(tungstenite::Message::Frame(_) | tungstenite::Message::Binary(_)) => continue, + Err(error) => { + #[allow(unused)] + let error = error; + crate::logging::warning!("error receiving message: {error:?}"); + return None; } } - }) + } } - fn send(&mut self, message: crate::next::Message) -> BoxFuture<'_, Result<(), Error>> { - Box::pin(async move { - >::send( - self, - match message { - crate::next::Message::Text(text) => tungstenite::Message::Text(text), - crate::next::Message::Close { code, reason } => { - tungstenite::Message::Close(code.zip(reason).map(|(code, reason)| { - CloseFrame { - code: code.into(), - reason: reason.into(), - } - })) - } - crate::next::Message::Ping => tungstenite::Message::Ping(vec![]), - crate::next::Message::Pong => tungstenite::Message::Pong(vec![]), - }, - ) - .await - .map_err(|error| Error::Send(error.to_string())) - }) + async fn send(&mut self, message: crate::next::Message) -> Result<(), Error> { + >::send( + self, + match message { + crate::next::Message::Text(text) => tungstenite::Message::Text(text), + crate::next::Message::Close { code, reason } => { + tungstenite::Message::Close(code.zip(reason).map(|(code, reason)| CloseFrame { + code: code.into(), + reason: reason.into(), + })) + } + crate::next::Message::Ping => tungstenite::Message::Ping(vec![]), + crate::next::Message::Pong => tungstenite::Message::Pong(vec![]), + }, + ) + .await + .map_err(|error| Error::Send(error.to_string())) } } diff --git a/src/next/actor.rs b/src/next/actor.rs index 5d2640a..e98597f 100644 --- a/src/next/actor.rs +++ b/src/next/actor.rs @@ -13,7 +13,7 @@ use crate::{ }; use super::{ - connection::{Connection, Message}, + connection::{Message, ObjectSafeConnection}, keepalive::KeepAliveSettings, ConnectionCommand, }; @@ -26,7 +26,7 @@ use super::{ /// with an async runtime. pub struct ConnectionActor { client: Option>, - connection: Box, + connection: Box, operations: HashMap>, keep_alive: KeepAliveSettings, keep_alive_actor: stream::Boxed, @@ -34,7 +34,7 @@ pub struct ConnectionActor { impl ConnectionActor { pub(super) fn new( - connection: Box, + connection: Box, client: async_channel::Receiver, keep_alive: KeepAliveSettings, ) -> Self { diff --git a/src/next/builder.rs b/src/next/builder.rs index 20d54a7..c27b5b7 100644 --- a/src/next/builder.rs +++ b/src/next/builder.rs @@ -10,7 +10,7 @@ use crate::{graphql::GraphqlOperation, logging::trace, protocol::Event, Error}; use super::{ actor::ConnectionActor, - connection::{Connection, Message}, + connection::{Connection, Message, ObjectSafeConnection}, keepalive::KeepAliveSettings, production_future::read_from_producer, Client, Subscription, @@ -34,7 +34,7 @@ use super::{ pub struct ClientBuilder { payload: Option, subscription_buffer_size: Option, - connection: Box, + connection: Box, keep_alive: KeepAliveSettings, } diff --git a/src/next/connection.rs b/src/next/connection.rs index 7210d8e..7ce83ac 100644 --- a/src/next/connection.rs +++ b/src/next/connection.rs @@ -10,13 +10,10 @@ use crate::Error; /// If users wish to add support for a new client they should implement this trait. pub trait Connection { /// Receive the next message on this connection. - fn receive(&mut self) -> Pin> + Send + '_>>; + fn receive(&mut self) -> impl Future> + Send; /// Send a message with on connection - fn send( - &mut self, - message: Message, - ) -> Pin> + Send + '_>>; + fn send(&mut self, message: Message) -> impl Future> + Send; } /// A websocket message @@ -69,3 +66,27 @@ impl Message { ) } } + +/// An object safe wrapper around the Connection trait, allowing us +/// to use it dynamically +pub(crate) trait ObjectSafeConnection: Send { + fn receive(&mut self) -> Pin> + Send + '_>>; + + fn send( + &mut self, + message: Message, + ) -> Pin> + Send + '_>>; +} + +impl ObjectSafeConnection for T { + fn receive(&mut self) -> Pin> + Send + '_>> { + Box::pin(Connection::receive(self)) + } + + fn send( + &mut self, + message: Message, + ) -> Pin> + Send + '_>> { + Box::pin(Connection::send(self, message)) + } +} diff --git a/src/ws_stream_wasm.rs b/src/ws_stream_wasm.rs index 2d9d956..7929133 100644 --- a/src/ws_stream_wasm.rs +++ b/src/ws_stream_wasm.rs @@ -1,4 +1,4 @@ -use futures::{future::BoxFuture, FutureExt, SinkExt, StreamExt}; +use futures::{FutureExt, SinkExt, StreamExt}; use pharos::{Observable, ObserveConfig}; use ws_stream_wasm::{WsEvent, WsMessage, WsMeta, WsStream}; @@ -26,53 +26,47 @@ impl Connection { } impl crate::next::Connection for Connection { - fn receive(&mut self) -> BoxFuture<'_, Option> { - Box::pin(async move { - use crate::next::Message; - loop { - match self.next().await? { - EventOrMessage::Event(WsEvent::Closed(close)) => { - return Some(Message::Close { - code: Some(close.code), - reason: Some(close.reason), - }); - } - EventOrMessage::Event(WsEvent::Error | WsEvent::WsErr(_)) => { - return None; - } - EventOrMessage::Event(WsEvent::Open | WsEvent::Closing) => { - continue; - } + async fn receive(&mut self) -> Option { + use crate::next::Message; + loop { + match self.next().await? { + EventOrMessage::Event(WsEvent::Closed(close)) => { + return Some(Message::Close { + code: Some(close.code), + reason: Some(close.reason), + }); + } + EventOrMessage::Event(WsEvent::Error | WsEvent::WsErr(_)) => { + return None; + } + EventOrMessage::Event(WsEvent::Open | WsEvent::Closing) => { + continue; + } - EventOrMessage::Message(WsMessage::Text(text)) => { - return Some(Message::Text(text)) - } + EventOrMessage::Message(WsMessage::Text(text)) => return Some(Message::Text(text)), - EventOrMessage::Message(WsMessage::Binary(_)) => { - // We shouldn't receive binary messages, but ignore them if we do - continue; - } + EventOrMessage::Message(WsMessage::Binary(_)) => { + // We shouldn't receive binary messages, but ignore them if we do + continue; } } - }) + } } - fn send(&mut self, message: crate::next::Message) -> BoxFuture<'_, Result<(), Error>> { + async fn send(&mut self, message: crate::next::Message) -> Result<(), Error> { use crate::next::Message; - Box::pin(async move { - match message { - Message::Text(text) => self.messages.send(WsMessage::Text(text)).await, - Message::Close { code, reason } => match (code, reason) { - (Some(code), Some(reason)) => self.meta.close_reason(code, reason).await, - (Some(code), _) => self.meta.close_code(code).await, - _ => self.meta.close().await, - } - .map(|_| ()), - Message::Ping | Message::Pong => return Ok(()), + match message { + Message::Text(text) => self.messages.send(WsMessage::Text(text)).await, + Message::Close { code, reason } => match (code, reason) { + (Some(code), Some(reason)) => self.meta.close_reason(code, reason).await, + (Some(code), _) => self.meta.close_code(code).await, + _ => self.meta.close().await, } - .map_err(|error| Error::Send(error.to_string())) - }) + .map(|_| ()), + Message::Ping | Message::Pong => return Ok(()), + } + .map_err(|error| Error::Send(error.to_string())) } }