Skip to content

Commit

Permalink
Refactor/auto reconnect (#4)
Browse files Browse the repository at this point in the history
* fix: incorrect hierachy

* chore: remove time stamp in logging

systemd already provide time format

* refactor: use inner pattern

* refactor: use builder pattern

* chore: use task for each listener

* refactor: use connector instead of listener

fix: update main

chore: improve logging

fix: test

* fix: incorrect reconnect logic
  • Loading branch information
ImSoZRious authored Jul 4, 2023
1 parent ae69a73 commit 302107d
Show file tree
Hide file tree
Showing 13 changed files with 422 additions and 131 deletions.
26 changes: 26 additions & 0 deletions bin/connector.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
use crate::config::Config;
use rusty_rtss::postgres::PgConnector;
use sqlx::postgres::PgPool;

use super::Payload;
use super::Result;

pub async fn get_pool_from_config(config: &Config) -> Result<PgPool> {
PgPool::connect(&config.postgres.uri)
.await
.map_err(Into::into)
}

pub async fn get_connector_from_pool(
pool: &PgPool,
config: &Config,
) -> Result<PgConnector<Payload>> {
let channels = config.postgres.listen_channels.clone();

PgConnector::builder()
.with_pool(pool)
.add_channels(channels)
.build()
.await
.map_err(|_| "Unable to get listener from pool".into())
}
22 changes: 0 additions & 22 deletions bin/listener.rs

This file was deleted.

24 changes: 16 additions & 8 deletions bin/main.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
#![feature(result_option_inspect)]
use std::sync::Arc;

use repository::SubmisisonRepository;
use rusty_rtss::{app::App, sse::SsePublisher};
use rusty_rtss::{app::App, postgres::PgConnector, sse::SsePublisher};

mod config;
mod listener;
mod connector;
mod payload;
mod publisher;
mod repository;
Expand All @@ -20,13 +19,13 @@ type Payload = payload::Payload;

#[derive(Clone)]
pub struct SharedState {
app: Arc<App<SsePublisher<Identifier, Payload>>>,
app: App<PgConnector<Payload>, SsePublisher<Identifier, Payload>>,
repository: SubmisisonRepository,
}

#[tokio::main]
async fn main() {
env_logger::init();
env_logger::builder().format_timestamp(None).init();

let config = match config::load_config() {
Ok(x) => x,
Expand All @@ -36,12 +35,12 @@ async fn main() {
}
};

let pool = listener::get_pool_from_config(&config)
let pool = connector::get_pool_from_config(&config)
.await
.expect("Unable to create connection pool");
log::info!("Connected to database");

let listener = listener::get_listener_from_pool(&pool, &config)
let connector = connector::get_connector_from_pool(&pool, &config)
.await
.expect("Unable to create listener from connection pool");
log::info!("Listened to channel");
Expand All @@ -52,9 +51,18 @@ async fn main() {
let publisher = publisher::get_publisher();
log::info!("Created publisher");

let app = Arc::new(App::new(listener, publisher).expect("Unable to create app"));
let app = App::new(connector, publisher)
.await
.expect("Unable to create app");
log::info!("Created app");

let _app = app.clone();
tokio::spawn(async move {
if let Err(e) = _app.handle_connection().await {
log::error!("Handle connection join error: {e:?}");
}
});

let shared_state = SharedState { app, repository };

let router = router::get_router(shared_state);
Expand Down
2 changes: 1 addition & 1 deletion bin/publisher.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use rusty_rtss::sse::SsePublisher;

pub fn get_publisher() -> SsePublisher<super::Identifier, super::payload::Payload> {
pub fn get_publisher() -> SsePublisher<super::Identifier, super::Payload> {
SsePublisher::new()
}
143 changes: 113 additions & 30 deletions src/app.rs
Original file line number Diff line number Diff line change
@@ -1,55 +1,138 @@
use crate::listener::Connector;

use super::{listener::Listener, publisher::Publisher};

use futures_util::StreamExt;
use futures_util::{Stream, StreamExt};
use std::sync::Arc;
use tokio::task::JoinHandle;

pub struct App<P> {
_handle: JoinHandle<()>,
publisher: Arc<P>,
struct Inner<C, P> {
connector: C,
publisher: P,
}

/// Wrapper around [Inner](Inner)
/// All of the logic should be perform in [Inner](Inner)
pub struct App<C, P> {
inner: Arc<Inner<C, P>>,
}

impl<P> App<P> {
pub fn new<L, T>(listener: L, publisher: P) -> Result<Self, Box<dyn std::error::Error>>
impl<C, P> App<C, P> {
pub async fn new<T>(connector: C, publisher: P) -> Result<Self, Box<dyn std::error::Error>>
where
L: Listener<Data = T> + 'static,
P: Publisher<PublishData = T> + 'static,
T: Send + Sync + 'static,
C: Connector + 'static,
<C as Connector>::Listener: Listener<Data = T> + 'static,
{
let publisher = Arc::new(publisher);
let inner = Arc::new(Inner {
connector,
publisher,
});

let cloned_publisher = Arc::clone(&publisher);
let handle = tokio::spawn(async move {
let cloned_publisher = cloned_publisher;
let stream = listener.into_stream();
let app = App { inner };

stream
.for_each_concurrent(10, move |payload| {
let cloned_publisher = Arc::clone(&cloned_publisher);
Ok(app)
}

async move {
let cloned_publisher = cloned_publisher;
pub async fn add_subscriber<S>(&self, subscriber: S) -> Result<(), Box<dyn std::error::Error>>
where
P: Publisher<Subscriber = S> + 'static,
{
let inner = Arc::clone(&self.inner);

cloned_publisher.publish(payload).await;
}
})
.await
});
if let Err(e) = inner.add_subscriber(subscriber).await {
log::warn!("Unable to add subscriber: {e:?}");
};

Ok(App {
_handle: handle,
publisher,
})
tokio::task::yield_now().await;

Ok(())
}

pub async fn add_subscriber<S>(&self, subscriber: S) -> Result<(), Box<dyn std::error::Error>>
pub fn add_stream<S, T>(&self, stream: S) -> tokio::task::JoinHandle<()>
where
S: Stream<Item = T> + Send + 'static,
P: Publisher<PublishData = T> + 'static,
T: Send + Sync + 'static,
C: Send + Sync + 'static,
{
Arc::clone(&self.inner).add_stream(stream)
}

pub async fn handle_connection<T>(&self) -> Result<(), tokio::task::JoinError>
where
C: Connector + 'static,
<C as Connector>::Listener: Listener<Data = T>,
P: Publisher<PublishData = T> + 'static,
T: Send + Sync + 'static,
{
Arc::clone(&self.inner).handle_connection().await
}
}

impl<C, P> Clone for App<C, P> {
fn clone(&self) -> Self {
App {
inner: Arc::clone(&self.inner),
}
}
}

impl<C, P> Inner<C, P> {
pub async fn add_subscriber<S>(
self: Arc<Self>,
subscriber: S,
) -> Result<(), Box<dyn std::error::Error>>
where
P: Publisher<Subscriber = S> + 'static,
{
self.publisher.add_subscriber(subscriber);

tokio::task::yield_now().await;

Ok(())
}

pub fn add_stream<S, T>(self: Arc<Self>, stream: S) -> tokio::task::JoinHandle<()>
where
S: Stream<Item = T> + Send + 'static,
P: Publisher<PublishData = T> + 'static,
T: Send + Sync + 'static,
C: Send + Sync + 'static,
{
tokio::spawn(async move {
stream
.for_each_concurrent(10, move |payload| Arc::clone(&self).handle_payload(payload))
.await;

log::info!("Stream end");
})
}

async fn handle_payload<T>(self: Arc<Self>, payload: T)
where
P: Publisher<PublishData = T> + 'static,
T: Send + Sync + 'static,
{
self.publisher.publish(payload).await
}

/// Future become ready after connector give up on connection
async fn handle_connection<T>(self: Arc<Self>) -> Result<(), tokio::task::JoinError>
where
C: Connector + 'static,
<C as Connector>::Listener: Listener<Data = T>,
P: Publisher<PublishData = T> + 'static,
T: Send + Sync + 'static,
{
tokio::spawn(async move {
while let Some(listener) = self.connector.connect().await {
let stream_handle = Arc::clone(&self).add_stream(listener.into_stream());

stream_handle.await?
}
log::info!("Connector give up");
Ok(())
})
.await
.flatten()
}
}
2 changes: 2 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
#![feature(result_flattening)]

pub mod app;
pub mod listener;
pub mod postgres;
Expand Down
11 changes: 11 additions & 0 deletions src/listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,14 @@ pub trait Listener: Send + Sync {

fn into_stream(self) -> Self::S;
}

#[async_trait::async_trait]
pub trait Connector: Send + Sync {
type Listener: Listener;

/// `None` indicates that there will be no connection continue
/// the default implementation is also `None`
async fn connect(&self) -> Option<Self::Listener> {
None
}
}
63 changes: 0 additions & 63 deletions src/postgres.rs

This file was deleted.

Loading

0 comments on commit 302107d

Please sign in to comment.