Skip to content

Commit

Permalink
Fix/initial payload (#2)
Browse files Browse the repository at this point in the history
* feat: add postgres listener from pool

* fix: incorrect signature

* refactor

* chore: update env

* feat: add initial payload
  • Loading branch information
ImSoZRious authored Jun 7, 2023
1 parent 0edd33c commit 1055ea2
Show file tree
Hide file tree
Showing 9 changed files with 348 additions and 124 deletions.
4 changes: 3 additions & 1 deletion .cargo/config.toml
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
[env]
DB_CONNECTION_URI="postgres://<USER>:<PASSWORD>@<HOST>:<PORT>/<DATABASE>"
RUST_LOG="debug"
POSTGRES_URI="postgres://<USER>:<PASSWORD>@<HOST>:<PORT>/<DATABASE>"
POSTGRES_LISTEN_CHANNELS="update"
RTSS_HOST="0.0.0.0"
RTSS_PORT="3001"
67 changes: 67 additions & 0 deletions bin/config.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
use super::Result;

pub struct Config {
pub postgres: PostgresConfig,
pub axum: AxumConfig,
}

pub struct PostgresConfig {
pub uri: String,
pub listen_channels: Vec<String>,
}

pub struct AxumConfig {
pub port: u32,
pub host: String,
}

pub fn load_config() -> Result<Config> {
Ok(Config {
postgres: load_postgres_config()?,
axum: load_axum_config()?,
})
}

fn load_postgres_config() -> Result<PostgresConfig> {
let uri = env_string("POSTGRES_URI")?;
let listen_channels = env_string("POSTGRES_LISTEN_CHANNELS")?;
let listen_channels = listen_channels
.split(',')
.map(ToString::to_string)
.collect();

Ok(PostgresConfig {
uri,
listen_channels,
})
}

fn load_axum_config() -> Result<AxumConfig> {
let host = env_string("RTSS_HOST")?;
let port = env_u32("RTSS_PORT")?;

Ok(AxumConfig { host, port })
}

fn env_string(key: impl AsRef<str>) -> Result<String> {
let key = key.as_ref();

match std::env::var(key) {
Ok(x) => Ok(x),
Err(..) => Err(format!("Environment variable `{key}` is not set").into()),
}
}

fn env_u32(key: impl AsRef<str>) -> Result<u32> {
let key = key.as_ref();

let raw = match std::env::var(key) {
Ok(x) => x,
Err(..) => return Err(format!("Environment variable `{key}` is not set").into()),
};

match raw.parse() {
Ok(x) => Ok(x),
Err(..) => Err(format!("Unable to parse environment variable `{key}`").into()),
}
}
22 changes: 22 additions & 0 deletions bin/listener.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
use crate::config::Config;
use sqlx::postgres::PgPool;

use super::Payload;
use super::Result;
use rusty_rtss::postgres::PgListener;

pub async fn get_pool_from_config(config: &Config) -> Result<PgPool> {
PgPool::connect(&config.postgres.uri)
.await
.map_err(Into::into)
}

pub async fn get_listener_from_pool(pool: &PgPool, config: &Config) -> Result<PgListener<Payload>> {
let channels = &config.postgres.listen_channels;

let channels = channels.iter().map(|x| x.as_str()).collect();

PgListener::from_pool(pool, channels)
.await
.map_err(|_| "unable to get listener from pool".into())
}
162 changes: 39 additions & 123 deletions bin/main.rs
Original file line number Diff line number Diff line change
@@ -1,147 +1,63 @@
#![feature(result_option_inspect)]
use std::sync::Arc;

use futures_util::StreamExt;

use axum::{
extract::{Path, State},
response::{sse::Event, IntoResponse, Sse},
routing::get,
Router, Server,
};
use futures::channel::mpsc::unbounded;

use rusty_rtss::{
app::App,
postgres::{PgListener, PgListenerConfig},
sse::{SsePublisher, SseSubscriber},
};
use tower_http::cors::Any;

mod payload {
use axum::response::sse::Event;
use rusty_rtss::sse::Identifiable;
use serde::{Deserialize, Serialize};
use sqlx::postgres::PgNotification;

#[derive(Debug, Deserialize, Serialize)]
pub struct Payload {
pub id: i32,
pub groups: Vec<Group>,
pub score: i32,
pub status: String,
}

#[derive(Debug, Deserialize, Serialize)]
pub struct Group {
score: f64,
full_score: f64,
submission_id: String,
group_index: i32,
run_result: Vec<RunResult>,
}

#[derive(Debug, Deserialize, Serialize)]
pub struct RunResult {
submission_id: String,
test_index: i32,
status: String,
time_usage: f64,
memory_usage: i32,
score: f64,
message: String,
}

impl Identifiable for Payload {
type Identifier = i32;

fn id(&self) -> Self::Identifier {
self.id
}
}

impl From<Payload> for Event {
fn from(value: Payload) -> Self {
Event::default()
.json_data(value)
.expect("unable to serialize payload")
}
}
use repository::SubmisisonRepository;
use rusty_rtss::{app::App, sse::SsePublisher};

impl From<PgNotification> for Payload {
fn from(value: PgNotification) -> Self {
serde_json::from_str(value.payload()).unwrap()
}
}
}
mod config;
mod listener;
mod payload;
mod publisher;
mod repository;
mod router;

type Error = Box<dyn std::error::Error + Send + Sync>;
pub type Error = Box<dyn std::error::Error + Send + Sync>;
pub type Result<T> = std::result::Result<T, Error>;

type Identifier = i32;

type Payload = payload::Payload;

type SharedState = Arc<App<SsePublisher<Identifier, Payload>>>;

async fn handler(
Path(submission_id): Path<i32>,
State(shared_state): State<SharedState>,
) -> impl IntoResponse {
let (tx, rx) = unbounded::<Event>();

let subscriber = SseSubscriber::new(submission_id, tx);

let _ = shared_state
.add_subscriber(subscriber)
.await
.inspect_err(|err| {
log::warn!("error while adding subscriber: {err:?}");
});

let rx = rx.map(Result::<Event, Error>::Ok);

Sse::new(rx)
}

async fn healthz() -> impl IntoResponse {
"OK"
#[derive(Clone)]
pub struct SharedState {
app: Arc<App<SsePublisher<Identifier, Payload>>>,
repository: SubmisisonRepository,
}

#[tokio::main]
async fn main() {
env_logger::init();

let port = std::env::var("RTSS_PORT").expect("`RTSS_PORT` is not provided");
let config = match config::load_config() {
Ok(x) => x,
Err(e) => {
log::error!("Unable to load config: {e}");
return;
}
};

let addr = format!("0.0.0.0:{port}").parse().unwrap();
let pool = listener::get_pool_from_config(&config)
.await
.expect("Unable to create connection pool");
log::info!("Connected to database");

let listener = listener::get_listener_from_pool(&pool, &config)
.await
.expect("Unable to create listener from connection pool");
log::info!("Listened to channel");

log::info!("Connection to database");
let listener = PgListener::<Payload>::connect(PgListenerConfig {
channels: vec!["update"],
url: std::env::var("DB_CONNECTION_URI")
.expect("`DB_CONNECTION_URI` is not provided")
.as_str(),
})
.await
.expect("unable to connect to database");
let repository = repository::SubmisisonRepository::new(pool);
log::info!("Created repository");

log::info!("Creating publisher");
let publisher = SsePublisher::new();
let publisher = publisher::get_publisher();
log::info!("Created publisher");

log::info!("Creating app");
let shared_state = Arc::new(App::new(listener, publisher).expect("unable to create app"));
let app = Arc::new(App::new(listener, publisher).expect("Unable to create app"));
log::info!("Created app");

let cors = tower_http::cors::CorsLayer::new().allow_methods(Any).allow_origin(Any);
let shared_state = SharedState { app, repository };

let app = Router::new()
.route("/:submission_id", get(handler))
.route("/", get(healthz))
.layer(cors)
.with_state(shared_state);
let router = router::get_router(shared_state);

log::info!("Serving on {addr:?}");
Server::bind(&addr)
.serve(app.into_make_service())
.await
.unwrap();
router::serve(router, &config).await.unwrap();
}
88 changes: 88 additions & 0 deletions bin/payload.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
use axum::response::sse::Event;
use rusty_rtss::sse::Identifiable;
use serde::{de::DeserializeOwned, Deserialize, Serialize};
use sqlx::{
postgres::{PgNotification, PgRow},
types::JsonValue,
FromRow, Row,
};

#[derive(Debug, Deserialize, Serialize)]
pub struct Payload {
pub id: i32,
pub groups: Vec<Group>,
pub score: i32,
pub status: String,
}

#[derive(Debug, Deserialize, Serialize)]
pub struct Group {
score: f64,
full_score: f64,
submission_id: String,
group_index: i32,
run_result: Vec<RunResult>,
}

#[derive(Debug, Deserialize, Serialize)]
pub struct RunResult {
submission_id: String,
test_index: i32,
status: String,
time_usage: f64,
memory_usage: i32,
score: f64,
message: String,
}

impl Identifiable for Payload {
type Identifier = i32;

fn id(&self) -> Self::Identifier {
self.id
}
}

impl From<Payload> for Event {
fn from(value: Payload) -> Self {
Event::default()
.json_data(value)
.expect("unable to serialize payload")
}
}

impl From<PgNotification> for Payload {
fn from(value: PgNotification) -> Self {
serde_json::from_str(value.payload()).unwrap()
}
}

impl FromRow<'_, PgRow> for Payload {
fn from_row(row: &PgRow) -> std::result::Result<Self, sqlx::Error> {
Ok(Payload {
id: row.try_get("id")?,
groups: row.try_get("groups").and_then(json_value_to_vec)?,
score: row.try_get("score")?,
status: row.try_get("status")?,
})
}
}

fn json_value_to_vec<T>(json: JsonValue) -> std::result::Result<Vec<T>, sqlx::Error>
where
T: DeserializeOwned,
{
let vec = match json {
JsonValue::Array(vec) => vec,
_ => {
return Err(sqlx::Error::ColumnNotFound(
"Json object is not array".to_string(),
))
}
};

vec.into_iter()
.map(serde_json::from_value)
.collect::<Result<Vec<T>, _>>()
.map_err(|_| sqlx::Error::ColumnNotFound("Unable to deserialize object".to_string()))
}
5 changes: 5 additions & 0 deletions bin/publisher.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
use rusty_rtss::sse::SsePublisher;

pub fn get_publisher() -> SsePublisher<super::Identifier, super::payload::Payload> {
SsePublisher::new()
}
Loading

0 comments on commit 1055ea2

Please sign in to comment.