Skip to content

Commit

Permalink
feat!: Connection functions now return impl Future
Browse files Browse the repository at this point in the history
Now that the MSRV is > 1.75 we can start making use of return position impl trait.

We need to specify send bounds on the returned futures so we return an impl
Trait, but it seems like rust is smart enough to allow us to use an actual
async function in the implementations.
  • Loading branch information
obmarg committed Jun 8, 2024
1 parent e195471 commit ab70fe4
Show file tree
Hide file tree
Showing 6 changed files with 106 additions and 101 deletions.
6 changes: 3 additions & 3 deletions src/doc_utils.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
use std::{future::Future, pin::Pin};
use std::future::Future;

use crate::{next::Message, Error};

pub struct Conn;

impl crate::next::Connection for Conn {
fn receive(&mut self) -> Pin<Box<dyn Future<Output = Option<Message>> + Send + '_>> {
async fn receive(&mut self) -> Option<Message> {
unimplemented!()
}

fn send(&mut self, _: Message) -> Pin<Box<dyn Future<Output = Result<(), Error>> + Send + '_>> {
async fn send(&mut self, _: Message) -> Result<(), Error> {
unimplemented!()
}
}
Expand Down
88 changes: 39 additions & 49 deletions src/native.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,4 @@
use std::{future::Future, pin::Pin};

use futures::{future::BoxFuture, Sink, SinkExt};
use futures::{Sink, SinkExt};
use futures_lite::{Stream, StreamExt};
use tungstenite::{self, protocol::CloseFrame};

Expand All @@ -16,55 +14,47 @@ where
+ Unpin,
<T as Sink<tungstenite::Message>>::Error: std::fmt::Display,
{
fn receive(&mut self) -> Pin<Box<dyn Future<Output = Option<Message>> + Send + '_>> {
Box::pin(async move {
loop {
match self.next().await? {
Ok(tungstenite::Message::Text(text)) => {
return Some(crate::next::Message::Text(text))
}
Ok(tungstenite::Message::Ping(_)) => return Some(crate::next::Message::Ping),
Ok(tungstenite::Message::Pong(_)) => return Some(crate::next::Message::Pong),
Ok(tungstenite::Message::Close(frame)) => {
return Some(crate::next::Message::Close {
code: frame.as_ref().map(|frame| frame.code.into()),
reason: frame.map(|frame| frame.reason.to_string()),
})
}
Ok(tungstenite::Message::Frame(_) | tungstenite::Message::Binary(_)) => {
continue
}
Err(error) => {
#[allow(unused)]
let error = error;
crate::logging::warning!("error receiving message: {error:?}");
return None;
}
async fn receive(&mut self) -> Option<Message> {
loop {
match self.next().await? {
Ok(tungstenite::Message::Text(text)) => {
return Some(crate::next::Message::Text(text))
}
Ok(tungstenite::Message::Ping(_)) => return Some(crate::next::Message::Ping),
Ok(tungstenite::Message::Pong(_)) => return Some(crate::next::Message::Pong),
Ok(tungstenite::Message::Close(frame)) => {
return Some(crate::next::Message::Close {
code: frame.as_ref().map(|frame| frame.code.into()),
reason: frame.map(|frame| frame.reason.to_string()),
})
}
Ok(tungstenite::Message::Frame(_) | tungstenite::Message::Binary(_)) => continue,
Err(error) => {
#[allow(unused)]
let error = error;
crate::logging::warning!("error receiving message: {error:?}");
return None;
}
}
})
}
}

fn send(&mut self, message: crate::next::Message) -> BoxFuture<'_, Result<(), Error>> {
Box::pin(async move {
<Self as SinkExt<tungstenite::Message>>::send(
self,
match message {
crate::next::Message::Text(text) => tungstenite::Message::Text(text),
crate::next::Message::Close { code, reason } => {
tungstenite::Message::Close(code.zip(reason).map(|(code, reason)| {
CloseFrame {
code: code.into(),
reason: reason.into(),
}
}))
}
crate::next::Message::Ping => tungstenite::Message::Ping(vec![]),
crate::next::Message::Pong => tungstenite::Message::Pong(vec![]),
},
)
.await
.map_err(|error| Error::Send(error.to_string()))
})
async fn send(&mut self, message: crate::next::Message) -> Result<(), Error> {
<Self as SinkExt<tungstenite::Message>>::send(
self,
match message {
crate::next::Message::Text(text) => tungstenite::Message::Text(text),
crate::next::Message::Close { code, reason } => {
tungstenite::Message::Close(code.zip(reason).map(|(code, reason)| CloseFrame {
code: code.into(),
reason: reason.into(),
}))
}
crate::next::Message::Ping => tungstenite::Message::Ping(vec![]),
crate::next::Message::Pong => tungstenite::Message::Pong(vec![]),
},
)
.await
.map_err(|error| Error::Send(error.to_string()))
}
}
6 changes: 3 additions & 3 deletions src/next/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use crate::{
};

use super::{
connection::{Connection, Message},
connection::{Message, ObjectSafeConnection},
keepalive::KeepAliveSettings,
ConnectionCommand,
};
Expand All @@ -26,15 +26,15 @@ use super::{
/// with an async runtime.
pub struct ConnectionActor {
client: Option<async_channel::Receiver<ConnectionCommand>>,
connection: Box<dyn Connection + Send>,
connection: Box<dyn ObjectSafeConnection>,
operations: HashMap<usize, async_channel::Sender<Value>>,
keep_alive: KeepAliveSettings,
keep_alive_actor: stream::Boxed<ConnectionCommand>,
}

impl ConnectionActor {
pub(super) fn new(
connection: Box<dyn Connection + Send>,
connection: Box<dyn ObjectSafeConnection>,
client: async_channel::Receiver<ConnectionCommand>,
keep_alive: KeepAliveSettings,
) -> Self {
Expand Down
4 changes: 2 additions & 2 deletions src/next/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use crate::{graphql::GraphqlOperation, logging::trace, protocol::Event, Error};

use super::{
actor::ConnectionActor,
connection::{Connection, Message},
connection::{Connection, Message, ObjectSafeConnection},
keepalive::KeepAliveSettings,
production_future::read_from_producer,
Client, Subscription,
Expand All @@ -34,7 +34,7 @@ use super::{
pub struct ClientBuilder {
payload: Option<serde_json::Value>,
subscription_buffer_size: Option<usize>,
connection: Box<dyn Connection + Send>,
connection: Box<dyn ObjectSafeConnection>,
keep_alive: KeepAliveSettings,
}

Expand Down
31 changes: 26 additions & 5 deletions src/next/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,10 @@ use crate::Error;
/// If users wish to add support for a new client they should implement this trait.
pub trait Connection {
/// Receive the next message on this connection.
fn receive(&mut self) -> Pin<Box<dyn Future<Output = Option<Message>> + Send + '_>>;
fn receive(&mut self) -> impl Future<Output = Option<Message>> + Send;

/// Send a message with on connection
fn send(
&mut self,
message: Message,
) -> Pin<Box<dyn Future<Output = Result<(), Error>> + Send + '_>>;
fn send(&mut self, message: Message) -> impl Future<Output = Result<(), Error>> + Send;
}

/// A websocket message
Expand Down Expand Up @@ -69,3 +66,27 @@ impl Message {
)
}
}

/// An object safe wrapper around the Connection trait, allowing us
/// to use it dynamically
pub(crate) trait ObjectSafeConnection: Send {
fn receive(&mut self) -> Pin<Box<dyn Future<Output = Option<Message>> + Send + '_>>;

fn send(
&mut self,
message: Message,
) -> Pin<Box<dyn Future<Output = Result<(), Error>> + Send + '_>>;
}

impl<T: Connection + Sized + Send> ObjectSafeConnection for T {
fn receive(&mut self) -> Pin<Box<dyn Future<Output = Option<Message>> + Send + '_>> {
Box::pin(Connection::receive(self))
}

fn send(
&mut self,
message: Message,
) -> Pin<Box<dyn Future<Output = Result<(), Error>> + Send + '_>> {
Box::pin(Connection::send(self, message))
}
}
72 changes: 33 additions & 39 deletions src/ws_stream_wasm.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use futures::{future::BoxFuture, FutureExt, SinkExt, StreamExt};
use futures::{FutureExt, SinkExt, StreamExt};
use pharos::{Observable, ObserveConfig};
use ws_stream_wasm::{WsEvent, WsMessage, WsMeta, WsStream};

Expand Down Expand Up @@ -26,53 +26,47 @@ impl Connection {
}

impl crate::next::Connection for Connection {
fn receive(&mut self) -> BoxFuture<'_, Option<crate::next::Message>> {
Box::pin(async move {
use crate::next::Message;
loop {
match self.next().await? {
EventOrMessage::Event(WsEvent::Closed(close)) => {
return Some(Message::Close {
code: Some(close.code),
reason: Some(close.reason),
});
}
EventOrMessage::Event(WsEvent::Error | WsEvent::WsErr(_)) => {
return None;
}
EventOrMessage::Event(WsEvent::Open | WsEvent::Closing) => {
continue;
}
async fn receive(&mut self) -> Option<crate::next::Message> {
use crate::next::Message;
loop {
match self.next().await? {
EventOrMessage::Event(WsEvent::Closed(close)) => {
return Some(Message::Close {
code: Some(close.code),
reason: Some(close.reason),
});
}
EventOrMessage::Event(WsEvent::Error | WsEvent::WsErr(_)) => {
return None;
}
EventOrMessage::Event(WsEvent::Open | WsEvent::Closing) => {
continue;
}

EventOrMessage::Message(WsMessage::Text(text)) => {
return Some(Message::Text(text))
}
EventOrMessage::Message(WsMessage::Text(text)) => return Some(Message::Text(text)),

EventOrMessage::Message(WsMessage::Binary(_)) => {
// We shouldn't receive binary messages, but ignore them if we do
continue;
}
EventOrMessage::Message(WsMessage::Binary(_)) => {
// We shouldn't receive binary messages, but ignore them if we do
continue;
}
}
})
}
}

fn send(&mut self, message: crate::next::Message) -> BoxFuture<'_, Result<(), Error>> {
async fn send(&mut self, message: crate::next::Message) -> Result<(), Error> {
use crate::next::Message;

Box::pin(async move {
match message {
Message::Text(text) => self.messages.send(WsMessage::Text(text)).await,
Message::Close { code, reason } => match (code, reason) {
(Some(code), Some(reason)) => self.meta.close_reason(code, reason).await,
(Some(code), _) => self.meta.close_code(code).await,
_ => self.meta.close().await,
}
.map(|_| ()),
Message::Ping | Message::Pong => return Ok(()),
match message {
Message::Text(text) => self.messages.send(WsMessage::Text(text)).await,
Message::Close { code, reason } => match (code, reason) {
(Some(code), Some(reason)) => self.meta.close_reason(code, reason).await,
(Some(code), _) => self.meta.close_code(code).await,
_ => self.meta.close().await,
}
.map_err(|error| Error::Send(error.to_string()))
})
.map(|_| ()),
Message::Ping | Message::Pong => return Ok(()),
}
.map_err(|error| Error::Send(error.to_string()))
}
}

Expand Down

0 comments on commit ab70fe4

Please sign in to comment.