diff --git a/cmd/influxd/run/command.go b/cmd/influxd/run/command.go index cfb7e67b153..6f30536bf5a 100644 --- a/cmd/influxd/run/command.go +++ b/cmd/influxd/run/command.go @@ -153,6 +153,10 @@ func (cmd *Command) Run(args ...string) error { s.Logger = cmd.Logger s.CPUProfile = options.CPUProfile s.MemProfile = options.MemProfile + + sl := NewStartupProgressLogger(s.Logger) + s.SetStartupMetrics(sl) + if err := s.Open(); err != nil { return fmt.Errorf("open server: %s", err) } diff --git a/cmd/influxd/run/server.go b/cmd/influxd/run/server.go index e8b747b4659..c0340450271 100644 --- a/cmd/influxd/run/server.go +++ b/cmd/influxd/run/server.go @@ -65,6 +65,11 @@ type BuildInfo struct { Time string } +type StartupProgress interface { + AddShard() + CompletedShard() +} + // Server represents a container for the metadata and storage data and services. // It is built using a Config and it manages the startup and shutdown of all // services in the proper order. @@ -96,6 +101,8 @@ type Server struct { Monitor *monitor.Monitor + StartupProgressMetrics StartupProgress + // Server reporting and registration reportingDisabled bool @@ -279,6 +286,10 @@ func (s *Server) SetLogOutput(w io.Writer) { s.MuxLogger = tcp.MuxLogger(w) } +func (s *Server) SetStartupMetrics(sp StartupProgress) { + s.StartupProgressMetrics = sp +} + func (s *Server) appendMonitorService() { s.Services = append(s.Services, s.Monitor) } @@ -465,6 +476,9 @@ func (s *Server) Open() error { s.MetaClient.WithLogger(s.Logger) } s.TSDBStore.WithLogger(s.Logger) + + s.TSDBStore.WithStartupMetrics(s.StartupProgressMetrics) + if s.config.Data.QueryLogEnabled { s.QueryExecutor.WithLogger(s.Logger) } else if s.config.Coordinator.LogQueriesAfter > 0 || s.config.Coordinator.LogTimedOutQueries { diff --git a/cmd/influxd/run/startup_logger.go b/cmd/influxd/run/startup_logger.go new file mode 100644 index 00000000000..e471deff07f --- /dev/null +++ b/cmd/influxd/run/startup_logger.go @@ -0,0 +1,32 @@ +package run + +import ( + "fmt" + "sync/atomic" + + "go.uber.org/zap" +) + +type StartupProgressLogger struct { + shardsCompleted atomic.Uint64 + shardsTotal atomic.Uint64 + logger *zap.Logger +} + +func NewStartupProgressLogger(logger *zap.Logger) *StartupProgressLogger { + return &StartupProgressLogger{ + logger: logger, + } +} + +func (s *StartupProgressLogger) AddShard() { + s.shardsTotal.Add(1) +} + +func (s *StartupProgressLogger) CompletedShard() { + shardsCompleted := s.shardsCompleted.Add(1) + totalShards := s.shardsTotal.Load() + + percentShards := float64(shardsCompleted) / float64(totalShards) * 100 + s.logger.Info(fmt.Sprintf("Finished loading shard, current progress %.1f%% shards (%d / %d).", percentShards, shardsCompleted, totalShards)) +} diff --git a/tsdb/load_shards.go b/tsdb/load_shards.go new file mode 100644 index 00000000000..1f5f415b8f6 --- /dev/null +++ b/tsdb/load_shards.go @@ -0,0 +1,304 @@ +package tsdb + +import ( + "fmt" + "os" + "path/filepath" + "runtime" + "strconv" + "sync" + "time" + + "github.com/influxdata/influxdb/logger" + "github.com/influxdata/influxdb/pkg/limiter" + "go.uber.org/zap" + "go.uber.org/zap/zapcore" +) + +// res holds the result from opening each shard in a goroutine +type res struct { + s *Shard + err error +} + +func (s *Store) loadShards() error { + // Limit the number of concurrent TSM files to be opened to the number of cores. + s.EngineOptions.OpenLimiter = limiter.NewFixed(runtime.GOMAXPROCS(0)) + + // Setup a shared limiter for compactions + lim := s.EngineOptions.Config.MaxConcurrentCompactions + if lim == 0 { + lim = runtime.GOMAXPROCS(0) / 2 // Default to 50% of cores for compactions + + if lim < 1 { + lim = 1 + } + } + + // Don't allow more compactions to run than cores. + if lim > runtime.GOMAXPROCS(0) { + lim = runtime.GOMAXPROCS(0) + } + + s.EngineOptions.CompactionLimiter = limiter.NewFixed(lim) + + compactionSettings := []zapcore.Field{zap.Int("max_concurrent_compactions", lim)} + throughput := int(s.EngineOptions.Config.CompactThroughput) + throughputBurst := int(s.EngineOptions.Config.CompactThroughputBurst) + if throughput > 0 { + if throughputBurst < throughput { + throughputBurst = throughput + } + + compactionSettings = append( + compactionSettings, + zap.Int("throughput_bytes_per_second", throughput), + zap.Int("throughput_bytes_per_second_burst", throughputBurst), + ) + s.EngineOptions.CompactionThroughputLimiter = limiter.NewRate(throughput, throughputBurst) + } else { + compactionSettings = append( + compactionSettings, + zap.String("throughput_bytes_per_second", "unlimited"), + zap.String("throughput_bytes_per_second_burst", "unlimited"), + ) + } + + s.Logger.Info("Compaction settings", compactionSettings...) + + log, logEnd := logger.NewOperation(s.Logger, "Open store", "tsdb_open") + defer logEnd() + + shardLoaderWg := new(sync.WaitGroup) + t := limiter.NewFixed(runtime.GOMAXPROCS(0)) + resC := make(chan *res) + + // Determine how many shards we need to open by checking the store path. + dbDirs, err := os.ReadDir(s.path) + if err != nil { + return err + } + + walkShardsAndProcess := func(fn func(sfile *SeriesFile, idx interface{}, sh os.DirEntry, db os.DirEntry, rp os.DirEntry) error) error { + for _, db := range dbDirs { + rpDirs, err := s.getRetentionPolicyDirs(db, log) + if err != nil { + return err + } else if rpDirs == nil { + continue + } + + // Load series file. + sfile, err := s.openSeriesFile(db.Name()) + if err != nil { + return err + } + + // Retrieve database index. + idx, err := s.createIndexIfNotExists(db.Name()) + if err != nil { + return err + } + + for _, rp := range rpDirs { + shardDirs, err := s.getShards(rp, db, log) + if err != nil { + return err + } else if shardDirs == nil { + continue + } + + for _, sh := range shardDirs { + // Series file should not be in a retention policy but skip just in case. + if sh.Name() == SeriesFileDirectory { + log.Warn("Skipping series file in retention policy dir", zap.String("path", filepath.Join(s.path, db.Name(), rp.Name()))) + continue + } + + if err := fn(sfile, idx, sh, db, rp); err != nil { + return err + } + } + } + } + + return nil + } + + if s.startupProgressMetrics != nil { + err := walkShardsAndProcess(func(sfile *SeriesFile, idx interface{}, sh os.DirEntry, db os.DirEntry, rp os.DirEntry) error { + s.startupProgressMetrics.AddShard() + return nil + }) + if err != nil { + return err + } + } + + err = walkShardsAndProcess(func(sfile *SeriesFile, idx interface{}, sh os.DirEntry, db os.DirEntry, rp os.DirEntry) error { + shardLoaderWg.Add(1) + + go func(db, rp, sh string) { + defer shardLoaderWg.Done() + + t.Take() + defer t.Release() + + start := time.Now() + path := filepath.Join(s.path, db, rp, sh) + walPath := filepath.Join(s.EngineOptions.Config.WALDir, db, rp, sh) + + // Shard file names are numeric shardIDs + shardID, err := strconv.ParseUint(sh, 10, 64) + if err != nil { + log.Info("invalid shard ID found at path", zap.String("path", path)) + resC <- &res{err: fmt.Errorf("%s is not a valid ID. Skipping shard.", sh)} + if s.startupProgressMetrics != nil { + s.startupProgressMetrics.CompletedShard() + } + return + } + + if s.EngineOptions.ShardFilter != nil && !s.EngineOptions.ShardFilter(db, rp, shardID) { + log.Info("skipping shard", zap.String("path", path), logger.Shard(shardID)) + resC <- &res{} + if s.startupProgressMetrics != nil { + s.startupProgressMetrics.CompletedShard() + } + return + } + + // Copy options and assign shared index. + opt := s.EngineOptions + opt.InmemIndex = idx + + // Provide an implementation of the ShardIDSets + opt.SeriesIDSets = shardSet{store: s, db: db} + + // Existing shards should continue to use inmem index. + if _, err := os.Stat(filepath.Join(path, "index")); os.IsNotExist(err) { + opt.IndexVersion = InmemIndexName + } + + // Open engine. + shard := NewShard(shardID, path, walPath, sfile, opt) + + // Disable compactions, writes and queries until all shards are loaded + shard.EnableOnOpen = false + shard.CompactionDisabled = s.EngineOptions.CompactionDisabled + shard.WithLogger(s.baseLogger) + + err = s.OpenShard(shard, false) + if err != nil { + log.Error("Failed to open shard", logger.Shard(shardID), zap.Error(err)) + resC <- &res{err: fmt.Errorf("failed to open shard: %d: %w", shardID, err)} + if s.startupProgressMetrics != nil { + s.startupProgressMetrics.CompletedShard() + } + return + } + + resC <- &res{s: shard} + log.Info("Opened shard", zap.String("index_version", shard.IndexType()), zap.String("path", path), zap.Duration("duration", time.Since(start))) + if s.startupProgressMetrics != nil { + s.startupProgressMetrics.CompletedShard() + } + }(db.Name(), rp.Name(), sh.Name()) + + return nil + }) + + if err := s.enableShards(shardLoaderWg, resC); err != nil { + return err + } + + return nil +} + +func (s *Store) enableShards(wg *sync.WaitGroup, resC chan *res) error { + go func() { + wg.Wait() + close(resC) + }() + + for res := range resC { + if res.s == nil || res.err != nil { + continue + } + s.shards[res.s.id] = res.s + s.epochs[res.s.id] = newEpochTracker() + if _, ok := s.databases[res.s.database]; !ok { + s.databases[res.s.database] = new(databaseState) + } + s.databases[res.s.database].addIndexType(res.s.IndexType()) + } + + // Check if any databases are running multiple index types. + for db, state := range s.databases { + if state.hasMultipleIndexTypes() { + var fields []zapcore.Field + for idx, cnt := range state.indexTypes { + fields = append(fields, zap.Int(fmt.Sprintf("%s_count", idx), cnt)) + } + s.Logger.Warn("Mixed shard index types", append(fields, logger.Database(db))...) + } + } + + // Enable all shards + for _, sh := range s.shards { + sh.SetEnabled(true) + if isIdle, _ := sh.IsIdle(); isIdle { + if err := sh.Free(); err != nil { + return err + } + } + } + + return nil +} + +func (s *Store) getRetentionPolicyDirs(db os.DirEntry, log *zap.Logger) ([]os.DirEntry, error) { + dbPath := filepath.Join(s.path, db.Name()) + if !db.IsDir() { + log.Info("Skipping database dir", zap.String("name", db.Name()), zap.String("reason", "not a directory")) + return nil, nil + } + + if s.EngineOptions.DatabaseFilter != nil && !s.EngineOptions.DatabaseFilter(db.Name()) { + log.Info("Skipping database dir", logger.Database(db.Name()), zap.String("reason", "failed database filter")) + return nil, nil + } + + // Load each retention policy within the database directory. + rpDirs, err := os.ReadDir(dbPath) + if err != nil { + return nil, err + } + + return rpDirs, nil +} + +func (s *Store) getShards(rpDir os.DirEntry, dbDir os.DirEntry, log *zap.Logger) ([]os.DirEntry, error) { + rpPath := filepath.Join(s.path, dbDir.Name(), rpDir.Name()) + if !rpDir.IsDir() { + log.Info("Skipping retention policy dir", zap.String("name", rpDir.Name()), zap.String("reason", "not a directory")) + return nil, nil + } + + // The .series directory is not a retention policy. + if rpDir.Name() == SeriesFileDirectory { + return nil, nil + } + + if s.EngineOptions.RetentionPolicyFilter != nil && !s.EngineOptions.RetentionPolicyFilter(dbDir.Name(), rpDir.Name()) { + log.Info("Skipping retention policy dir", logger.RetentionPolicy(rpDir.Name()), zap.String("reason", "failed retention policy filter")) + return nil, nil + } + + shardDirs, err := os.ReadDir(rpPath) + if err != nil { + return nil, err + } + + return shardDirs, nil +} diff --git a/tsdb/store.go b/tsdb/store.go index 74ad52c36b7..73fc1b1cb8f 100644 --- a/tsdb/store.go +++ b/tsdb/store.go @@ -6,9 +6,9 @@ import ( "errors" "fmt" "io" + "maps" "os" "path/filepath" - "runtime" "sort" "strconv" "sync" @@ -135,6 +135,11 @@ type Store struct { baseLogger *zap.Logger Logger *zap.Logger + startupProgressMetrics interface { + AddShard() + CompletedShard() + } + closing chan struct{} wg sync.WaitGroup opened bool @@ -167,6 +172,13 @@ func (s *Store) WithLogger(log *zap.Logger) { } } +func (s *Store) WithStartupMetrics(sp interface { + AddShard() + CompletedShard() +}) { + s.startupProgressMetrics = sp +} + // Statistics returns statistics for period monitoring. func (s *Store) Statistics(tags map[string]string) []models.Statistic { s.mu.RLock() @@ -309,226 +321,6 @@ func (s *Store) Open() error { return nil } -func (s *Store) loadShards() error { - // res holds the result from opening each shard in a goroutine - type res struct { - s *Shard - err error - } - - // Limit the number of concurrent TSM files to be opened to the number of cores. - s.EngineOptions.OpenLimiter = limiter.NewFixed(runtime.GOMAXPROCS(0)) - - // Setup a shared limiter for compactions - lim := s.EngineOptions.Config.MaxConcurrentCompactions - if lim == 0 { - lim = runtime.GOMAXPROCS(0) / 2 // Default to 50% of cores for compactions - - if lim < 1 { - lim = 1 - } - } - - // Don't allow more compactions to run than cores. - if lim > runtime.GOMAXPROCS(0) { - lim = runtime.GOMAXPROCS(0) - } - - s.EngineOptions.CompactionLimiter = limiter.NewFixed(lim) - - compactionSettings := []zapcore.Field{zap.Int("max_concurrent_compactions", lim)} - throughput := int(s.EngineOptions.Config.CompactThroughput) - throughputBurst := int(s.EngineOptions.Config.CompactThroughputBurst) - if throughput > 0 { - if throughputBurst < throughput { - throughputBurst = throughput - } - - compactionSettings = append( - compactionSettings, - zap.Int("throughput_bytes_per_second", throughput), - zap.Int("throughput_bytes_per_second_burst", throughputBurst), - ) - s.EngineOptions.CompactionThroughputLimiter = limiter.NewRate(throughput, throughputBurst) - } else { - compactionSettings = append( - compactionSettings, - zap.String("throughput_bytes_per_second", "unlimited"), - zap.String("throughput_bytes_per_second_burst", "unlimited"), - ) - } - - s.Logger.Info("Compaction settings", compactionSettings...) - - log, logEnd := logger.NewOperation(s.Logger, "Open store", "tsdb_open") - defer logEnd() - - t := limiter.NewFixed(runtime.GOMAXPROCS(0)) - resC := make(chan *res) - var n int - - // Determine how many shards we need to open by checking the store path. - dbDirs, err := os.ReadDir(s.path) - if err != nil { - return err - } - - for _, db := range dbDirs { - dbPath := filepath.Join(s.path, db.Name()) - if !db.IsDir() { - log.Info("Skipping database dir", zap.String("name", db.Name()), zap.String("reason", "not a directory")) - continue - } - - if s.EngineOptions.DatabaseFilter != nil && !s.EngineOptions.DatabaseFilter(db.Name()) { - log.Info("Skipping database dir", logger.Database(db.Name()), zap.String("reason", "failed database filter")) - continue - } - - // Load series file. - sfile, err := s.openSeriesFile(db.Name()) - if err != nil { - return err - } - - // Retrieve database index. - idx, err := s.createIndexIfNotExists(db.Name()) - if err != nil { - return err - } - - // Load each retention policy within the database directory. - rpDirs, err := os.ReadDir(dbPath) - if err != nil { - return err - } - - for _, rp := range rpDirs { - rpPath := filepath.Join(s.path, db.Name(), rp.Name()) - if !rp.IsDir() { - log.Info("Skipping retention policy dir", zap.String("name", rp.Name()), zap.String("reason", "not a directory")) - continue - } - - // The .series directory is not a retention policy. - if rp.Name() == SeriesFileDirectory { - continue - } - - if s.EngineOptions.RetentionPolicyFilter != nil && !s.EngineOptions.RetentionPolicyFilter(db.Name(), rp.Name()) { - log.Info("Skipping retention policy dir", logger.RetentionPolicy(rp.Name()), zap.String("reason", "failed retention policy filter")) - continue - } - - shardDirs, err := os.ReadDir(rpPath) - if err != nil { - return err - } - - for _, sh := range shardDirs { - // Series file should not be in a retention policy but skip just in case. - if sh.Name() == SeriesFileDirectory { - log.Warn("Skipping series file in retention policy dir", zap.String("path", filepath.Join(s.path, db.Name(), rp.Name()))) - continue - } - - n++ - go func(db, rp, sh string) { - t.Take() - defer t.Release() - - start := time.Now() - path := filepath.Join(s.path, db, rp, sh) - walPath := filepath.Join(s.EngineOptions.Config.WALDir, db, rp, sh) - - // Shard file names are numeric shardIDs - shardID, err := strconv.ParseUint(sh, 10, 64) - if err != nil { - log.Info("invalid shard ID found at path", zap.String("path", path)) - resC <- &res{err: fmt.Errorf("%s is not a valid ID. Skipping shard.", sh)} - return - } - - if s.EngineOptions.ShardFilter != nil && !s.EngineOptions.ShardFilter(db, rp, shardID) { - log.Info("skipping shard", zap.String("path", path), logger.Shard(shardID)) - resC <- &res{} - return - } - - // Copy options and assign shared index. - opt := s.EngineOptions - opt.InmemIndex = idx - - // Provide an implementation of the ShardIDSets - opt.SeriesIDSets = shardSet{store: s, db: db} - - // Existing shards should continue to use inmem index. - if _, err := os.Stat(filepath.Join(path, "index")); os.IsNotExist(err) { - opt.IndexVersion = InmemIndexName - } - - // Open engine. - shard := NewShard(shardID, path, walPath, sfile, opt) - - // Disable compactions, writes and queries until all shards are loaded - shard.EnableOnOpen = false - shard.CompactionDisabled = s.EngineOptions.CompactionDisabled - shard.WithLogger(s.baseLogger) - - err = s.OpenShard(shard, false) - if err != nil { - log.Error("Failed to open shard", logger.Shard(shardID), zap.Error(err)) - resC <- &res{err: fmt.Errorf("failed to open shard: %d: %w", shardID, err)} - return - } - - resC <- &res{s: shard} - log.Info("Opened shard", zap.String("index_version", shard.IndexType()), zap.String("path", path), zap.Duration("duration", time.Since(start))) - }(db.Name(), rp.Name(), sh.Name()) - } - } - } - - // Gather results of opening shards concurrently, keeping track of how - // many databases we are managing. - for i := 0; i < n; i++ { - res := <-resC - if res.s == nil || res.err != nil { - continue - } - s.shards[res.s.id] = res.s - s.epochs[res.s.id] = newEpochTracker() - if _, ok := s.databases[res.s.database]; !ok { - s.databases[res.s.database] = new(databaseState) - } - s.databases[res.s.database].addIndexType(res.s.IndexType()) - } - close(resC) - - // Check if any databases are running multiple index types. - for db, state := range s.databases { - if state.hasMultipleIndexTypes() { - var fields []zapcore.Field - for idx, cnt := range state.indexTypes { - fields = append(fields, zap.Int(fmt.Sprintf("%s_count", idx), cnt)) - } - s.Logger.Warn("Mixed shard index types", append(fields, logger.Database(db))...) - } - } - - // Enable all shards - for _, sh := range s.shards { - sh.SetEnabled(true) - if isIdle, _ := sh.IsIdle(); isIdle { - if err := sh.Free(); err != nil { - return err - } - } - } - - return nil -} - // Close closes the store and all associated shards. After calling Close accessing // shards through the Store will result in ErrStoreClosed being returned. func (s *Store) Close() error { @@ -633,6 +425,18 @@ func (s *Store) Shard(id uint64) *Shard { return sh } +func (s *Store) ClearBadShardList() map[uint64]error { + badShards := maps.Clone(s.badShards.shardErrors) + clear(s.badShards.shardErrors) + + return badShards +} + +// GetBadShardList is exposed as a method for test purposes +func (s *Store) GetBadShardList() map[uint64]error { + return s.badShards.shardErrors +} + type ErrPreviousShardFail struct { error } diff --git a/tsdb/store_test.go b/tsdb/store_test.go index 3c2010db513..75ced12b538 100644 --- a/tsdb/store_test.go +++ b/tsdb/store_test.go @@ -144,6 +144,54 @@ func TestStore_CreateShard(t *testing.T) { } } +// Ensure the store can create a new shard. +func TestStore_StartupShardProgress(t *testing.T) { + t.Parallel() + + test := func(index string) { + fmt.Println(index) + s := MustOpenStore(index) + defer s.Close() + + // Create a new shard and verify that it exists. + require.NoError(t, s.CreateShard("db0", "rp0", 1, true)) + sh := s.Shard(1) + require.NotNil(t, sh) + + // Create another shard and verify that it exists. + require.NoError(t, s.CreateShard("db0", "rp0", 2, true)) + sh = s.Shard(2) + require.NotNil(t, sh) + + msl := &mockStartupLogger{} + + // Reopen shard and recheck. + require.NoError(t, s.ReopenWithStartupMetrics(msl)) + sh = s.Shard(1) + require.NotNil(t, sh) + + // Create another shard and verify that it exists. + require.NoError(t, s.CreateShard("db0", "rp0", 2, true)) + sh = s.Shard(2) + require.NotNil(t, sh) + + // Equality check to make sure shards are always added prior to + // completion being called. + require.Equal(t, msl.shardTracker, []string{ + "shard-add", + "shard-add", + "shard-add", + "shard-complete", + "shard-complete", + "shard-complete", + }) + } + + for _, index := range tsdb.RegisteredIndexes() { + t.Run(index, func(t *testing.T) { test(index) }) + } +} + func TestStore_BadShard(t *testing.T) { const errStr = "a shard open error" indexes := tsdb.RegisteredIndexes() @@ -171,6 +219,40 @@ func TestStore_BadShard(t *testing.T) { } } +func TestStore_BadShardClear(t *testing.T) { + const errStr = "a shard open error" + indexes := tsdb.RegisteredIndexes() + for _, idx := range indexes { + func() { + s := MustOpenStore(idx) + defer require.NoErrorf(t, s.Close(), "closing store with index type: %s", idx) + + sh := tsdb.NewTempShard(idx) + shId := sh.ID() + err := s.OpenShard(sh.Shard, false) + require.NoError(t, err, "opening temp shard") + defer require.NoError(t, sh.Close(), "closing temporary shard") + + expErr := errors.New(errStr) + s.SetShardOpenErrorForTest(sh.ID(), expErr) + err2 := s.OpenShard(sh.Shard, false) + require.Error(t, err2, "no error opening bad shard") + require.True(t, errors.Is(err2, tsdb.ErrPreviousShardFail{}), "exp: ErrPreviousShardFail, got: %v", err2) + require.EqualError(t, err2, fmt.Errorf("not attempting to open shard %d; opening shard previously failed with: %w", shId, expErr).Error()) + + require.Equal(t, 1, len(s.Store.GetBadShardList())) + + badShards := s.ClearBadShardList() + if len(badShards) != 1 { + t.Fatalf("expected 1 shard, got %d", len(badShards)) + } + + // Check that bad shard list has been cleared + require.Equal(t, 0, len(s.Store.GetBadShardList())) + }() + } +} + func TestStore_CreateMixedShards(t *testing.T) { t.Parallel() @@ -2682,6 +2764,25 @@ func (s *Store) Reopen() error { return s.Store.Open() } +// Reopen closes and reopens the store as a new store. +func (s *Store) ReopenWithStartupMetrics(msl *mockStartupLogger) error { + if err := s.Store.Close(); err != nil { + return err + } + + s.Store = tsdb.NewStore(s.Path()) + s.EngineOptions.IndexVersion = s.index + s.EngineOptions.Config.WALDir = filepath.Join(s.Path(), "wal") + s.EngineOptions.Config.TraceLoggingEnabled = true + + s.WithStartupMetrics(msl) + + if testing.Verbose() { + s.WithLogger(logger.New(os.Stdout)) + } + return s.Store.Open() +} + // Close closes the store and removes the underlying data. func (s *Store) Close() error { defer os.RemoveAll(s.Path()) @@ -2754,3 +2855,20 @@ func dirExists(path string) bool { } return !os.IsNotExist(err) } + +type mockStartupLogger struct { + shardTracker []string + mu sync.Mutex +} + +func (m *mockStartupLogger) AddShard() { + m.mu.Lock() + m.shardTracker = append(m.shardTracker, fmt.Sprintf("shard-add")) + m.mu.Unlock() +} + +func (m *mockStartupLogger) CompletedShard() { + m.mu.Lock() + m.shardTracker = append(m.shardTracker, fmt.Sprintf("shard-complete")) + m.mu.Unlock() +}