Skip to content

Commit

Permalink
Use restart_supervisor macro in op::sync module
Browse files Browse the repository at this point in the history
  • Loading branch information
Thomasdezeeuw committed Aug 20, 2020
1 parent 7001a61 commit fa2b2aa
Showing 1 changed file with 8 additions and 47 deletions.
55 changes: 8 additions & 47 deletions src/op/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use heph::net::TcpStream;
use heph::rt::options::{ActorOptions, Priority};
use heph::rt::RuntimeAccess;
use heph::timer::Deadline;
use heph::{actor, Actor, NewActor, SupervisorStrategy};
use heph::{actor, restart_supervisor};
use log::{debug, error, warn};

use crate::buffer::Buffer;
Expand Down Expand Up @@ -296,11 +296,7 @@ impl SyncingPeer {
peer_address: SocketAddr,
) -> Option<SyncingPeer> {
let args = (db_ref.clone(), peer_address);
let supervisor = Supervisor {
db_ref,
peer_address,
restarts_left: MAX_RESTARTS,
};
let supervisor = Supervisor::new((db_ref, peer_address));
let peer_sync_actor = peer_sync_actor as fn(_, _, _) -> _;
let options = ActorOptions::default()
.with_priority(Priority::HIGH)
Expand Down Expand Up @@ -345,47 +341,12 @@ enum State {
Failed,
}

/// Supervisor for [`peer_sync_actor`].
#[derive(Debug)]
struct Supervisor {
db_ref: ActorRef<db::Message>,
peer_address: SocketAddr,
restarts_left: usize,
}

/// Maximum number of times the [`actor`] will be restarted.
const MAX_RESTARTS: usize = 5;

impl<NA, A> heph::Supervisor<NA> for Supervisor
where
NA: NewActor<Argument = (ActorRef<db::Message>, SocketAddr), Error = !, Actor = A>,
A: Actor<Error = crate::Error>,
{
fn decide(&mut self, err: crate::Error) -> SupervisorStrategy<NA::Argument> {
if self.restarts_left >= 1 {
self.restarts_left -= 1;
warn!(
"peer synchronisation actor failed, restarting it ({}/{} restarts left): {}: remote_addres={}",
self.restarts_left, MAX_RESTARTS, err, self.peer_address
);
SupervisorStrategy::Restart((self.db_ref.clone(), self.peer_address))
} else {
warn!(
"peer synchronisation actor failed, stopping it: {}: remote_address={}",
err, self.peer_address
);
SupervisorStrategy::Stop
}
}

fn decide_on_restart_error(&mut self, err: NA::Error) -> SupervisorStrategy<NA::Argument> {
err
}

fn second_restart_error(&mut self, err: NA::Error) {
err
}
}
restart_supervisor!(
Supervisor,
"peer synchronisation actor",
(ActorRef<db::Message>, SocketAddr),
5
);

/// Actor that synchronises with a single peer.
async fn peer_sync_actor<K>(
Expand Down

0 comments on commit fa2b2aa

Please sign in to comment.