Skip to content

Commit

Permalink
sdk: add latency to RelayConnectionStats
Browse files Browse the repository at this point in the history
  • Loading branch information
yukibtc committed Aug 25, 2023
1 parent fd2dfcf commit bfecac7
Showing 1 changed file with 190 additions and 5 deletions.
195 changes: 190 additions & 5 deletions crates/nostr-sdk/src/relay/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,20 @@

//! Relay

#[cfg(not(target_arch = "wasm32"))]
use std::collections::VecDeque;
use std::collections::{HashMap, HashSet};
use std::fmt;
#[cfg(not(target_arch = "wasm32"))]
use std::net::SocketAddr;
use std::sync::atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering};
use std::sync::Arc;
use std::time::Duration;
#[cfg(not(target_arch = "wasm32"))]
use std::time::Instant;

#[cfg(not(target_arch = "wasm32"))]
use async_utility::futures_util::stream::AbortHandle;
use async_utility::{futures_util, thread, time};
use nostr::message::MessageHandleError;
#[cfg(feature = "nip11")]
Expand Down Expand Up @@ -120,7 +126,12 @@ pub enum RelayEvent {
SendMsg(Box<ClientMessage>),
/// Send multiple messages at once
Batch(Vec<ClientMessage>),
// Ping,
/// Ping
#[cfg(not(target_arch = "wasm32"))]
Ping {
/// Nonce
nonce: u64,
},
/// Close
Close,
/// Stop
Expand All @@ -129,6 +140,66 @@ pub enum RelayEvent {
Terminate,
}

/// Ping Stats
#[cfg(not(target_arch = "wasm32"))]
#[derive(Debug, Clone)]
pub struct PingStats {
sent_at: Arc<Mutex<Instant>>,
last_nonce: Arc<AtomicU64>,
replied: Arc<AtomicBool>,
}

#[cfg(not(target_arch = "wasm32"))]
impl Default for PingStats {
fn default() -> Self {
Self::new()
}
}

#[cfg(not(target_arch = "wasm32"))]
impl PingStats {
/// New default ping stats
pub fn new() -> Self {
Self {
sent_at: Arc::new(Mutex::new(Instant::now())),
last_nonce: Arc::new(AtomicU64::new(0)),
replied: Arc::new(AtomicBool::new(false)),
}
}

/// Get sent at
pub async fn sent_at(&self) -> Instant {
*self.sent_at.lock().await
}

/// Last nonce
pub fn last_nonce(&self) -> u64 {
self.last_nonce.load(Ordering::SeqCst)
}

/// Replied
pub fn replied(&self) -> bool {
self.replied.load(Ordering::SeqCst)
}

pub(crate) async fn just_sent(&self) {
let mut sent_at = self.sent_at.lock().await;
*sent_at = Instant::now();
}

pub(crate) fn set_last_nonce(&self, nonce: u64) {
let _ = self
.last_nonce
.fetch_update(Ordering::SeqCst, Ordering::SeqCst, |_| Some(nonce));
}

pub(crate) fn set_replied(&self, replied: bool) {
let _ = self
.replied
.fetch_update(Ordering::SeqCst, Ordering::SeqCst, |_| Some(replied));
}
}

/// [`Relay`] connection stats
#[derive(Debug, Clone)]
pub struct RelayConnectionStats {
Expand All @@ -137,6 +208,10 @@ pub struct RelayConnectionStats {
bytes_sent: Arc<AtomicUsize>,
bytes_received: Arc<AtomicUsize>,
connected_at: Arc<AtomicU64>,
#[cfg(not(target_arch = "wasm32"))]
latencies: Arc<Mutex<VecDeque<Duration>>>,
#[cfg(not(target_arch = "wasm32"))]
ping: PingStats,
}

impl Default for RelayConnectionStats {
Expand All @@ -154,6 +229,10 @@ impl RelayConnectionStats {
bytes_sent: Arc::new(AtomicUsize::new(0)),
bytes_received: Arc::new(AtomicUsize::new(0)),
connected_at: Arc::new(AtomicU64::new(0)),
#[cfg(not(target_arch = "wasm32"))]
latencies: Arc::new(Mutex::new(VecDeque::new())),
#[cfg(not(target_arch = "wasm32"))]
ping: PingStats::default(),
}
}

Expand Down Expand Up @@ -182,6 +261,20 @@ impl RelayConnectionStats {
Timestamp::from(self.connected_at.load(Ordering::SeqCst))
}

/// Calculate latency
#[cfg(not(target_arch = "wasm32"))]
pub async fn latency(&self) -> Option<Duration> {
let latencies = self.latencies.lock().await;
let sum: Duration = latencies.iter().sum();
sum.checked_div(latencies.len() as u32)
}

/// Calculate latency
#[cfg(all(not(target_arch = "wasm32"), feature = "blocking"))]
pub fn latency_blocking(&self) -> Option<Duration> {
RUNTIME.block_on(async { self.latency().await })
}

pub(crate) fn new_attempt(&self) {
self.attempts.fetch_add(1, Ordering::SeqCst);
}
Expand All @@ -202,6 +295,15 @@ impl RelayConnectionStats {
pub(crate) fn add_bytes_received(&self, size: usize) {
self.bytes_received.fetch_add(size, Ordering::SeqCst);
}

#[cfg(not(target_arch = "wasm32"))]
pub(crate) async fn save_latency(&self, latency: Duration) {
let mut latencies = self.latencies.lock().await;
if latencies.len() >= 5 {
latencies.pop_back();
}
latencies.push_front(latency)
}
}

/// Internal Subscription ID
Expand Down Expand Up @@ -499,6 +601,11 @@ impl Relay {
relay.relay_sender.capacity()
);

#[cfg(not(target_arch = "wasm32"))]
if let Some(latency) = relay.stats.latency().await {
tracing::info!("{} latency: {} ms", relay.url, latency.as_millis());
}

// Schedule relay for termination
// Needed to terminate the auto reconnect loop, also if the relay is not connected yet.
if relay.is_scheduled_for_stop() {
Expand Down Expand Up @@ -578,6 +685,37 @@ impl Relay {

self.stats.new_success();

#[cfg(not(target_arch = "wasm32"))]
let ping_abort_handle: AbortHandle = {
let relay = self.clone();
thread::abortable(async move {
tracing::debug!("Relay Ping Thread Started");

loop {
if relay.stats.ping.last_nonce() != 0 && !relay.stats.ping.replied() {
tracing::warn!("{} not replied to ping", relay.url);
break;
}

let nonce: u64 = nostr::secp256k1::rand::random();
relay.stats.ping.set_last_nonce(nonce);
relay.stats.ping.set_replied(false);
if let Err(e) = relay.send_relay_event(RelayEvent::Ping { nonce }, None)
{
tracing::error!("Impossible to ping {}: {e}", relay.url);
break;
};
thread::sleep(Duration::from_secs(60)).await;
}

tracing::debug!("Exited from Ping Thread of {}", relay.url);

if let Err(err) = relay.disconnect().await {
tracing::error!("Impossible to disconnect {}: {}", relay.url, err);
}
})
};

let relay = self.clone();
thread::spawn(async move {
tracing::debug!("Relay Event Thread Started");
Expand Down Expand Up @@ -663,6 +801,25 @@ impl Relay {
}
}
}
#[cfg(not(target_arch = "wasm32"))]
RelayEvent::Ping { nonce } => {
match ws_tx
.send(WsMessage::Ping(nonce.to_string().as_bytes().to_vec()))
.await
{
Ok(_) => {
relay.stats.ping.just_sent().await;
tracing::debug!("Ping {} (nonce {})", relay.url, nonce);
}
Err(e) => {
tracing::error!(
"Impossible to ping {}: {}",
relay.url(),
e.to_string()
);
}
}
}
RelayEvent::Close => {
let _ = ws_tx.close().await;
relay.set_status(RelayStatus::Disconnected).await;
Expand All @@ -689,7 +846,11 @@ impl Relay {
}
}
}

tracing::debug!("Exited from Relay Event Thread");

#[cfg(not(target_arch = "wasm32"))]
ping_abort_handle.abort();
});

let relay = self.clone();
Expand Down Expand Up @@ -733,10 +894,34 @@ impl Relay {
#[cfg(not(target_arch = "wasm32"))]
while let Some(msg_res) = ws_rx.next().await {
if let Ok(msg) = msg_res {
let data: Vec<u8> = msg.into_data();
let exit: bool = func(&relay, data).await;
if exit {
break;
match msg {
WsMessage::Pong(bytes) => match String::from_utf8(bytes) {
Ok(nonce) => match nonce.parse::<u64>() {
Ok(nonce) => {
if relay.stats.ping.last_nonce() == nonce {
tracing::debug!(
"Pong from {} match nonce: {}",
relay.url,
nonce
);
relay.stats.ping.set_replied(true);
let sent_at = relay.stats.ping.sent_at().await;
relay.stats.save_latency(sent_at.elapsed()).await;
} else {
tracing::error!("Pong nonce not match: received={nonce}, expected={}", relay.stats.ping.last_nonce());
}
}
Err(e) => tracing::error!("{e}"),
},
Err(e) => tracing::error!("{e}"),
},
_ => {
let data: Vec<u8> = msg.into_data();
let exit: bool = func(&relay, data).await;
if exit {
break;
}
}
}
}
}
Expand Down

0 comments on commit bfecac7

Please sign in to comment.