Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: Add mechanism that allows writing to the leader's disk in parallel with replication. #579

Draft
wants to merge 3 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions fuzzy/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
data
50 changes: 50 additions & 0 deletions fuzzy/async_logs_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package fuzzy

import (
"math/rand"
"testing"
"time"

"github.com/hashicorp/raft"
)

// 5 node cluster where the leader and another node get regularly partitioned off
// eventually all partitions heal, but using async log cache.
func TestRaft_AsyncLogWithPartitions(t *testing.T) {
hooks := NewPartitioner()

cluster := newRaftClusterWithFactory(t, testLogWriter, "lp", 5, hooks, newAsyncRaft)
cluster.Leader(time.Second * 10)
s := newApplySource("LeaderPartitions")
applier := s.apply(t, cluster, 5)
for i := 0; i < 10; i++ {
pg := hooks.PartitionOff(cluster.log, cluster.LeaderPlus(rand.Intn(4)))
time.Sleep(time.Second * 4)
r := rand.Intn(10)
if r < 1 {
cluster.log.Logf("Healing no partitions!")
} else if r < 4 {
hooks.HealAll(cluster.log)
} else {
hooks.Heal(cluster.log, pg)
}
time.Sleep(time.Second * 5)
}
hooks.HealAll(cluster.log)
cluster.Leader(time.Hour)
applier.stop()
cluster.Stop(t, time.Minute*10)
hooks.Report(t)
cluster.VerifyLog(t, applier.applied)
cluster.VerifyFSM(t)
}

func newAsyncRaft(conf *raft.Config, fsm raft.FSM, logs raft.LogStore, stable raft.StableStore, snaps raft.SnapshotStore, trans raft.Transport) (*raft.Raft, error) {
// Wrap the log store in an async cache
asyncLogs, err := raft.NewLogCacheAsync(128, logs)
if err != nil {
return nil, err
}

return raft.NewRaft(conf, fsm, asyncLogs, stable, snaps, trans)
}
8 changes: 6 additions & 2 deletions fuzzy/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,10 @@ func (a *LoggerAdapter) Logf(s string, v ...interface{}) {
}

func newRaftCluster(t *testing.T, logWriter io.Writer, namePrefix string, n uint, transportHooks TransportHooks) *cluster {
return newRaftClusterWithFactory(t, logWriter, namePrefix, n, transportHooks, raft.NewRaft)
}

func newRaftClusterWithFactory(t *testing.T, logWriter io.Writer, namePrefix string, n uint, transportHooks TransportHooks, factory factoryFn) *cluster {
res := make([]*raftNode, 0, n)
names := make([]string, 0, n)
for i := uint(0); i < n; i++ {
Expand All @@ -67,11 +71,11 @@ func newRaftCluster(t *testing.T, logWriter io.Writer, namePrefix string, n uint
transports := newTransports(l)
for _, i := range names {

r, err := newRaftNode(hclog.New(&hclog.LoggerOptions{
r, err := newRaftNodeFromFactory(hclog.New(&hclog.LoggerOptions{
Name: i + ":",
Output: logWriter,
Level: hclog.DefaultLevel,
}), transports, transportHooks, names, i)
}), transports, transportHooks, names, i, factory)
if err != nil {
t.Fatalf("Unable to create raftNode:%v : %v", i, err)
}
Expand Down
2 changes: 2 additions & 0 deletions fuzzy/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,8 @@ github.com/stretchr/testify v1.7.2/go.mod h1:R6va5+xMeoiuVRoj+gSkQ7d3FALtqAAGI1F
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
github.com/stretchr/testify v1.8.2 h1:+h33VjcLVPDHtOdpUCuF+7gSuG3yGIftsP1YvFihtJ8=
github.com/stretchr/testify v1.8.2/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk=
github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo=
github.com/tv42/httpunix v0.0.0-20150427012821-b75d8614f926/go.mod h1:9ESjWnEqriFuLhtthL60Sar/7RFoluCcXsuvEwTV5KM=
golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
Expand Down
12 changes: 9 additions & 3 deletions fuzzy/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,13 @@ type raftNode struct {
}

func newRaftNode(logger hclog.Logger, tc *transports, h TransportHooks, nodes []string, name string) (*raftNode, error) {
return newRaftNodeFromFactory(logger, tc, h, nodes, name, raft.NewRaft)
}

// Same type as raft.NewRaft
type factoryFn func(conf *raft.Config, fsm raft.FSM, logs raft.LogStore, stable raft.StableStore, snaps raft.SnapshotStore, trans raft.Transport) (*raft.Raft, error)

func newRaftNodeFromFactory(logger hclog.Logger, tc *transports, h TransportHooks, nodes []string, name string, factory factoryFn) (*raftNode, error) {
var err error
var datadir string
datadir, err = resolveDirectory(fmt.Sprintf("data/%v", name), true)
Expand All @@ -46,8 +53,7 @@ func newRaftNode(logger hclog.Logger, tc *transports, h TransportHooks, nodes []
config.ShutdownOnRemove = false
config.LocalID = raft.ServerID(name)

var store *rdb.BoltStore
store, err = rdb.NewBoltStore(filepath.Join(datadir, "store.bolt"))
store, err := rdb.NewBoltStore(filepath.Join(datadir, "store.bolt"))
if err != nil {
return nil, fmt.Errorf("unable to initialize log %v", err.Error())
}
Expand All @@ -65,7 +71,7 @@ func newRaftNode(logger hclog.Logger, tc *transports, h TransportHooks, nodes []
}
fsm := &fuzzyFSM{}
var r *raft.Raft
r, err = raft.NewRaft(config, fsm, store, store, ss, transport)
r, err = factory(config, fsm, store, store, ss, transport)
if err != nil {
return nil, err
}
Expand Down
50 changes: 47 additions & 3 deletions inmem_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,19 @@ package raft
import (
"errors"
"sync"
"sync/atomic"
)

// InmemStore implements the LogStore and StableStore interface.
// It should NOT EVER be used for production. It is used only for
// unit tests. Use the MDBStore implementation instead.
type InmemStore struct {
storeFail uint32 // accessed atomically as a bool 0/1

// storeSem lets the test control exactly when s StoreLog(s) call takes
// effect.
storeSem chan struct{}

l sync.RWMutex
lowIndex uint64
highIndex uint64
Expand All @@ -24,13 +31,35 @@ type InmemStore struct {
// use for production. Only for testing.
func NewInmemStore() *InmemStore {
i := &InmemStore{
logs: make(map[uint64]*Log),
kv: make(map[string][]byte),
kvInt: make(map[string]uint64),
storeSem: make(chan struct{}, 1),
logs: make(map[uint64]*Log),
kv: make(map[string][]byte),
kvInt: make(map[string]uint64),
}
return i
}

// BlockStore will cause further calls to StoreLog(s) to block indefinitely
// until the returned cancel func is called. Note that if the code or test is
// buggy this could cause a deadlock
func (i *InmemStore) BlockStore() func() {
i.storeSem <- struct{}{}
cancelled := false
return func() {
// Allow multiple calls, subsequent ones are a no op
if !cancelled {
<-i.storeSem
cancelled = true
}
}
}

// FailNext signals that the next call to StoreLog(s) should return an error
// without modifying the log contents. Subsequent calls will succeed again.
func (i *InmemStore) FailNext() {
atomic.StoreUint32(&i.storeFail, 1)
}

// FirstIndex implements the LogStore interface.
func (i *InmemStore) FirstIndex() (uint64, error) {
i.l.RLock()
Expand Down Expand Up @@ -64,8 +93,23 @@ func (i *InmemStore) StoreLog(log *Log) error {

// StoreLogs implements the LogStore interface.
func (i *InmemStore) StoreLogs(logs []*Log) error {
// Block waiting for the semaphore slot if BlockStore has been called. We must
// do this before we take the lock because otherwise we'll block GetLog and
// others too by holding the lock while blocked.
i.storeSem <- struct{}{}
defer func() {
<-i.storeSem
}()

// Switch out fail if it is set so we only fail once
shouldFail := atomic.SwapUint32(&i.storeFail, 0)
if shouldFail == 1 {
return errors.New("IO error")
}

i.l.Lock()
defer i.l.Unlock()

for _, l := range logs {
i.logs[l.Index] = l
if i.lowIndex == 0 {
Expand Down
56 changes: 48 additions & 8 deletions log.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,18 +130,58 @@ type LogStore interface {
}

// MonotonicLogStore is an optional interface for LogStore implementations that
// cannot tolerate gaps in between the Index values of consecutive log entries. For example,
// this may allow more efficient indexing because the Index values are densely populated. If true is
// returned, Raft will avoid relying on gaps to trigger re-synching logs on followers after a
// snapshot is restored. The LogStore must have an efficient implementation of
// DeleteLogs for the case where all logs are removed, as this must be called after snapshot restore when gaps are not allowed.
// We avoid deleting all records for LogStores that do not implement MonotonicLogStore
// because although it's always correct to do so, it has a major negative performance impact on the BoltDB store that is currently
// the most widely used.
// cannot tolerate gaps in between the Index values of consecutive log entries.
// For example, this may allow more efficient indexing because the Index values
// are densely populated. If true is returned, Raft will avoid relying on gaps
// to trigger re-synching logs on followers after a snapshot is restored. The
// LogStore must have an efficient implementation of DeleteLogs for the case
// where all logs are removed, as this must be called after snapshot restore
// when gaps are not allowed. We avoid deleting all records for LogStores that
// do not implement MonotonicLogStore because although it's always correct to do
// so, it has a major negative performance impact on the BoltDB store that is
// currently the most widely used.
type MonotonicLogStore interface {
IsMonotonic() bool
}

type LogWriteCompletion struct {
PersistentIndex uint64
Error error
Duration time.Duration
}

type AsyncLogStore interface {
LogStore

// EnableAsync is called on the log store when a node starts the leader loop.
// A Channel is passed to deliver write completion events. The implementation
// chooses how many events to buffer but the chan may block and this should be
// used as a back-pressure mechanism to slow down syncs to disk. Must be
// called serially with StoreLog* and DeleteRange (i.e from the main
// leader/follower thread). After this returns calls to StoreLog(s) will
// return an error and only StoreLogsAsync should be used until DisableAsync
// is called.
EnableAsync(chan<- LogWriteCompletion)

// DisableAsync is called when the leader steps down to return the LogStore to
// Sync mode since followers currently use Sync writes. They may in the future
// use async writes too however explicit switching modes makes it easier to
// reason about the behaviour of Async vs Sync storage calls as well as
// providing the channel to deliver updates explicitly. DisableAsync will
// block until all in-flight writes are persisted (or fail).
DisableAsync()

// StoreLogsAsync may only be called after EnableAsync but before the
// corresponding DisableAsync call. It will return as soon as the logs are
// available to read from GetLog and reflected in LastIndex, though they may
// still be in-memory only. It will trigger background writing of the logs to
// disk. The background process must eventually deliver a LogWriteCompletion
// to the channel provided to the last EnableAsync call. Each
// LowWriteCompletion indicates that all logs up to the PersistentIndex are
// safely stored on durable storage, or an error has occurred.
StoreLogsAsync(logs []*Log) error
}

func oldestLog(s LogStore) (Log, error) {
var l Log

Expand Down
Loading