From bcee5fbe3cee4de75da4c598b7f64d160f512078 Mon Sep 17 00:00:00 2001 From: Giulio rebuffo Date: Wed, 2 Oct 2024 22:34:46 +0200 Subject: [PATCH] Track beacon chain analytics about attestation aggregates (#12165) --- cl/monitor/metrics.go | 88 +++++++++++++++++-- cl/phase1/forkchoice/checkpoint_state.go | 2 + .../services/aggregate_and_proof_service.go | 3 + cl/phase1/stages/forkchoice.go | 8 ++ cl/sentinel/gossip.go | 11 +++ cmd/caplin/caplin1/run.go | 2 + 6 files changed, 108 insertions(+), 6 deletions(-) diff --git a/cl/monitor/metrics.go b/cl/monitor/metrics.go index 3f0d3aefc1a..39f9d33f9e8 100644 --- a/cl/monitor/metrics.go +++ b/cl/monitor/metrics.go @@ -1,6 +1,7 @@ package monitor import ( + "sort" "sync" "time" @@ -39,6 +40,21 @@ var ( // Network metrics gossipTopicsMetricCounterPrefix = "gossip_topics_seen" gossipMetricsMap = sync.Map{} + aggregateQuality50Per = metrics.GetOrCreateGauge("aggregate_quality_50") + aggregateQuality25Per = metrics.GetOrCreateGauge("aggregate_quality_25") + aggregateQuality75Per = metrics.GetOrCreateGauge("aggregate_quality_75") + aggregateQualityMin = metrics.GetOrCreateGauge("aggregate_quality_min") + aggregateQualityMax = metrics.GetOrCreateGauge("aggregate_quality_max") + + // Beacon chain metrics + committeeSize = metrics.GetOrCreateGauge("committee_size") + activeValidatorsCount = metrics.GetOrCreateGauge("active_validators_count") + currentSlot = metrics.GetOrCreateGauge("current_slot") + currentEpoch = metrics.GetOrCreateGauge("current_epoch") + + // Snapshot metrics + frozenBlocks = metrics.GetOrCreateGauge("frozen_blocks") + frozenBlobs = metrics.GetOrCreateGauge("frozen_blobs") ) type batchVerificationThroughputMetric struct { @@ -47,19 +63,51 @@ type batchVerificationThroughputMetric struct { mu sync.Mutex } -var batchVerificationThroughputMetricStruct = &batchVerificationThroughputMetric{} +type aggregateQualityMetric struct { + qualities []float64 + mu sync.Mutex +} + +func (a *aggregateQualityMetric) observe(participationCount int, totalCount int) { + a.mu.Lock() + defer a.mu.Unlock() + newPercentage := float64(participationCount) / float64(totalCount) + a.qualities = append(a.qualities, newPercentage) + if len(a.qualities) <= 40 { + return + } + sort.Float64s(a.qualities) + aggregateQuality50Per.Set(a.qualities[len(a.qualities)/2]) + aggregateQuality25Per.Set(a.qualities[len(a.qualities)/4]) + aggregateQuality75Per.Set(a.qualities[(len(a.qualities)*3)/4]) + aggregateQualityMin.Set(a.qualities[0]) + aggregateQualityMax.Set(a.qualities[len(a.qualities)-1]) + + a.qualities = a.qualities[:0] + +} + +var ( + batchVerificationThroughputMetricStruct = &batchVerificationThroughputMetric{} + aggregateQualityMetricStruct = &aggregateQualityMetric{} +) func (b *batchVerificationThroughputMetric) observe(t time.Duration, totalSigs int) float64 { b.mu.Lock() defer b.mu.Unlock() - elapsedInMillisecs := float64(t.Microseconds()) / 1000 + elapsedInMillisecsPerSig := float64(t.Microseconds()) / 1000 / float64(totalSigs) if b.totalVerified == 0 { - b.currentAverageSecs = elapsedInMillisecs + b.currentAverageSecs = elapsedInMillisecsPerSig } else { - b.currentAverageSecs = (b.currentAverageSecs*float64(b.totalVerified) + elapsedInMillisecs) / float64(b.totalVerified+uint64(totalSigs)) + b.currentAverageSecs = (b.currentAverageSecs*float64(b.totalVerified) + elapsedInMillisecsPerSig) / float64(b.totalVerified+1) } - b.totalVerified += uint64(totalSigs) - return b.currentAverageSecs + b.totalVerified++ + ret := b.currentAverageSecs + if b.totalVerified > 1000 { + b.currentAverageSecs = 0 + b.totalVerified = 0 + } + return ret } func microToMilli(micros int64) float64 { @@ -143,3 +191,31 @@ func ObserveGossipTopicSeen(topic string, l int) { } metric.Add(float64(l)) } + +func ObserveAggregateQuality(participationCount int, totalCount int) { + aggregateQualityMetricStruct.observe(participationCount, totalCount) +} + +func ObserveCommitteeSize(size float64) { + committeeSize.Set(size) +} + +func ObserveActiveValidatorsCount(count int) { + activeValidatorsCount.Set(float64(count)) +} + +func ObserveCurrentSlot(slot uint64) { + currentSlot.Set(float64(slot)) +} + +func ObserveCurrentEpoch(epoch uint64) { + currentEpoch.Set(float64(epoch)) +} + +func ObserveFrozenBlocks(count int) { + frozenBlocks.Set(float64(count)) +} + +func ObserveFrozenBlobs(count int) { + frozenBlobs.Set(float64(count)) +} diff --git a/cl/phase1/forkchoice/checkpoint_state.go b/cl/phase1/forkchoice/checkpoint_state.go index f1c095d8bf0..3c56027875e 100644 --- a/cl/phase1/forkchoice/checkpoint_state.go +++ b/cl/phase1/forkchoice/checkpoint_state.go @@ -21,6 +21,7 @@ import ( "fmt" "github.com/erigontech/erigon/cl/cltypes/solid" + "github.com/erigontech/erigon/cl/monitor" "github.com/erigontech/erigon/cl/phase1/core/state/shuffling" "github.com/Giulio2002/bls" @@ -111,6 +112,7 @@ func newCheckpointState(beaconConfig *clparams.BeaconChainConfig, anchorPublicKe mixPosition := (epoch + beaconConfig.EpochsPerHistoricalVector - beaconConfig.MinSeedLookahead - 1) % beaconConfig.EpochsPerHistoricalVector activeIndicies := c.getActiveIndicies(epoch) + monitor.ObserveActiveValidatorsCount(len(activeIndicies)) c.shuffledSet = make([]uint64, len(activeIndicies)) c.shuffledSet = shuffling.ComputeShuffledIndicies(c.beaconConfig, c.randaoMixes.Get(int(mixPosition)), c.shuffledSet, activeIndicies, epoch*beaconConfig.SlotsPerEpoch) return c diff --git a/cl/phase1/network/services/aggregate_and_proof_service.go b/cl/phase1/network/services/aggregate_and_proof_service.go index d01355de207..e1d9f9119bf 100644 --- a/cl/phase1/network/services/aggregate_and_proof_service.go +++ b/cl/phase1/network/services/aggregate_and_proof_service.go @@ -34,6 +34,7 @@ import ( "github.com/erigontech/erigon/cl/cltypes/solid" "github.com/erigontech/erigon/cl/fork" "github.com/erigontech/erigon/cl/merkle_tree" + "github.com/erigontech/erigon/cl/monitor" "github.com/erigontech/erigon/cl/phase1/core/state" "github.com/erigontech/erigon/cl/phase1/core/state/lru" "github.com/erigontech/erigon/cl/phase1/forkchoice" @@ -202,6 +203,8 @@ func (a *aggregateAndProofServiceImpl) ProcessMessage( // further processing will be done after async signature verification aggregateVerificationData.F = func() { + monitor.ObserveAggregateQuality(len(attestingIndicies), len(committee)) + monitor.ObserveCommitteeSize(float64(len(committee))) a.opPool.AttestationsPool.Insert( aggregateAndProof.SignedAggregateAndProof.Message.Aggregate.Signature(), aggregateAndProof.SignedAggregateAndProof.Message.Aggregate, diff --git a/cl/phase1/stages/forkchoice.go b/cl/phase1/stages/forkchoice.go index b72e9530ed7..9bc1f7bb188 100644 --- a/cl/phase1/stages/forkchoice.go +++ b/cl/phase1/stages/forkchoice.go @@ -13,6 +13,7 @@ import ( "github.com/erigontech/erigon-lib/log/v3" "github.com/erigontech/erigon/cl/beacon/beaconevents" "github.com/erigontech/erigon/cl/clparams" + "github.com/erigontech/erigon/cl/monitor" "github.com/erigontech/erigon/cl/persistence/beacon_indicies" state_accessors "github.com/erigontech/erigon/cl/persistence/state" "github.com/erigontech/erigon/cl/phase1/core/state" @@ -30,6 +31,13 @@ func computeAndNotifyServicesOfNewForkChoice(ctx context.Context, logger log.Log err = fmt.Errorf("failed to get head: %w", err) return } + // Observe the current slot and epoch in the monitor + monitor.ObserveCurrentSlot(headSlot) + monitor.ObserveCurrentEpoch(headSlot / cfg.beaconCfg.SlotsPerEpoch) + if cfg.sn != nil { + monitor.ObserveFrozenBlocks(int(cfg.sn.BlocksAvailable())) + monitor.ObserveFrozenBlobs(int(cfg.sn.FrozenBlobs())) + } // Perform fork choice update if the engine is available if cfg.forkChoice.Engine() != nil { diff --git a/cl/sentinel/gossip.go b/cl/sentinel/gossip.go index 1c434f24ed3..99028e91e8b 100644 --- a/cl/sentinel/gossip.go +++ b/cl/sentinel/gossip.go @@ -498,6 +498,7 @@ func (g *GossipManager) Close() { func (g *GossipManager) Start(ctx context.Context) { go func() { checkingInterval := time.NewTicker(time.Second) + dbgLogInterval := time.NewTicker(5 * time.Second) for { select { case <-ctx.Done(): @@ -508,6 +509,16 @@ func (g *GossipManager) Start(ctx context.Context) { sub.checkIfTopicNeedsToEnabledOrDisabled() return true }) + case <-dbgLogInterval.C: + logArgs := []interface{}{} + g.subscriptions.Range(func(key, value any) bool { + sub := value.(*GossipSubscription) + if sub.topic != nil { + logArgs = append(logArgs, sub.topic.String(), sub.subscribed.Load()) + } + return true + }) + log.Debug("[Gossip] Subscriptions", "subscriptions", logArgs) } } }() diff --git a/cmd/caplin/caplin1/run.go b/cmd/caplin/caplin1/run.go index ee2e87e610d..8dd6b806a43 100644 --- a/cmd/caplin/caplin1/run.go +++ b/cmd/caplin/caplin1/run.go @@ -347,6 +347,8 @@ func RunCaplinService(ctx context.Context, engine execution_client.ExecutionEngi case <-logIntervalPeers.C: if peerCount, err := beaconRpc.Peers(); err == nil { logger.Info("P2P", "peers", peerCount) + } else { + logger.Error("P2P", "err", err) } case <-ctx.Done(): return