Skip to content

Commit

Permalink
allow shutdown during network snapshot transfer
Browse files Browse the repository at this point in the history
  • Loading branch information
mauri870 committed Mar 7, 2024
1 parent 0f873ea commit 9e01890
Showing 1 changed file with 44 additions and 27 deletions.
71 changes: 44 additions & 27 deletions raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -1750,37 +1750,54 @@ func (r *Raft) installSnapshot(rpc RPC, req *InstallSnapshotRequest) {
return
}

// Separately track the progress of streaming a snapshot over the network
// because this too can take a long time.
countingRPCReader := newCountingReader(rpc.Reader)

// Spill the remote snapshot to disk
transferMonitor := startSnapshotRestoreMonitor(r.logger, countingRPCReader, req.Size, true)
n, err := io.Copy(sink, countingRPCReader)
transferMonitor.StopAndWait()
if err != nil {
sink.Cancel()
r.logger.Error("failed to copy snapshot", "error", err)
rpcErr = err
return
}
// Spill the remote snapshot to disk Spawn a goroutine to copy the
// snapshot to disk, so we can respond to the shutdown as well.
diskCopyErrCh := make(chan error, 1)
go func() {
// Separately track the progress of streaming a snapshot over the network
// because this too can take a long time.
countingRPCReader := newCountingReader(rpc.Reader)
transferMonitor := startSnapshotRestoreMonitor(r.logger, countingRPCReader, req.Size, true)
n, err := io.Copy(sink, countingRPCReader)
transferMonitor.StopAndWait()
if err != nil {
sink.Cancel()
r.logger.Error("failed to copy snapshot", "error", err)
diskCopyErrCh <- err
return
}

// Check that we received it all
if n != req.Size {
sink.Cancel()
r.logger.Error("failed to receive whole snapshot",
"received", hclog.Fmt("%d / %d", n, req.Size))
rpcErr = fmt.Errorf("short read")
return
}
// Check that we received it all
if n != req.Size {
sink.Cancel()
r.logger.Error("failed to receive whole snapshot",
"received", hclog.Fmt("%d / %d", n, req.Size))
diskCopyErrCh <- fmt.Errorf("short read")
return
}

// Finalize the snapshot
if err := sink.Close(); err != nil {
r.logger.Error("failed to finalize snapshot", "error", err)
rpcErr = err
// Finalize the snapshot
if err := sink.Close(); err != nil {
r.logger.Error("failed to finalize snapshot", "error", err)
diskCopyErrCh <- err
return
}
r.logger.Info("copied to local snapshot", "bytes", n)
diskCopyErrCh <- nil
}()

// Wait for snapshot transfer or shutdown
select {
case err := <-diskCopyErrCh:
if err != nil {
rpcErr = err
return
}
case <-r.shutdownCh:
sink.Cancel()
rpcErr = ErrRaftShutdown
return
}
r.logger.Info("copied to local snapshot", "bytes", n)

// Restore snapshot
future := &restoreFuture{ID: sink.ID()}
Expand Down

0 comments on commit 9e01890

Please sign in to comment.