Skip to content

Commit

Permalink
Track beacon chain analytics about attestation aggregates (#12165)
Browse files Browse the repository at this point in the history
  • Loading branch information
Giulio2002 authored Oct 2, 2024
1 parent 95d3780 commit bcee5fb
Show file tree
Hide file tree
Showing 6 changed files with 108 additions and 6 deletions.
88 changes: 82 additions & 6 deletions cl/monitor/metrics.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package monitor

import (
"sort"
"sync"
"time"

Expand Down Expand Up @@ -39,6 +40,21 @@ var (
// Network metrics
gossipTopicsMetricCounterPrefix = "gossip_topics_seen"
gossipMetricsMap = sync.Map{}
aggregateQuality50Per = metrics.GetOrCreateGauge("aggregate_quality_50")
aggregateQuality25Per = metrics.GetOrCreateGauge("aggregate_quality_25")
aggregateQuality75Per = metrics.GetOrCreateGauge("aggregate_quality_75")
aggregateQualityMin = metrics.GetOrCreateGauge("aggregate_quality_min")
aggregateQualityMax = metrics.GetOrCreateGauge("aggregate_quality_max")

// Beacon chain metrics
committeeSize = metrics.GetOrCreateGauge("committee_size")
activeValidatorsCount = metrics.GetOrCreateGauge("active_validators_count")
currentSlot = metrics.GetOrCreateGauge("current_slot")
currentEpoch = metrics.GetOrCreateGauge("current_epoch")

// Snapshot metrics
frozenBlocks = metrics.GetOrCreateGauge("frozen_blocks")
frozenBlobs = metrics.GetOrCreateGauge("frozen_blobs")
)

type batchVerificationThroughputMetric struct {
Expand All @@ -47,19 +63,51 @@ type batchVerificationThroughputMetric struct {
mu sync.Mutex
}

var batchVerificationThroughputMetricStruct = &batchVerificationThroughputMetric{}
type aggregateQualityMetric struct {
qualities []float64
mu sync.Mutex
}

func (a *aggregateQualityMetric) observe(participationCount int, totalCount int) {
a.mu.Lock()
defer a.mu.Unlock()
newPercentage := float64(participationCount) / float64(totalCount)
a.qualities = append(a.qualities, newPercentage)
if len(a.qualities) <= 40 {
return
}
sort.Float64s(a.qualities)
aggregateQuality50Per.Set(a.qualities[len(a.qualities)/2])
aggregateQuality25Per.Set(a.qualities[len(a.qualities)/4])
aggregateQuality75Per.Set(a.qualities[(len(a.qualities)*3)/4])
aggregateQualityMin.Set(a.qualities[0])
aggregateQualityMax.Set(a.qualities[len(a.qualities)-1])

a.qualities = a.qualities[:0]

}

var (
batchVerificationThroughputMetricStruct = &batchVerificationThroughputMetric{}
aggregateQualityMetricStruct = &aggregateQualityMetric{}
)

func (b *batchVerificationThroughputMetric) observe(t time.Duration, totalSigs int) float64 {
b.mu.Lock()
defer b.mu.Unlock()
elapsedInMillisecs := float64(t.Microseconds()) / 1000
elapsedInMillisecsPerSig := float64(t.Microseconds()) / 1000 / float64(totalSigs)
if b.totalVerified == 0 {
b.currentAverageSecs = elapsedInMillisecs
b.currentAverageSecs = elapsedInMillisecsPerSig
} else {
b.currentAverageSecs = (b.currentAverageSecs*float64(b.totalVerified) + elapsedInMillisecs) / float64(b.totalVerified+uint64(totalSigs))
b.currentAverageSecs = (b.currentAverageSecs*float64(b.totalVerified) + elapsedInMillisecsPerSig) / float64(b.totalVerified+1)
}
b.totalVerified += uint64(totalSigs)
return b.currentAverageSecs
b.totalVerified++
ret := b.currentAverageSecs
if b.totalVerified > 1000 {
b.currentAverageSecs = 0
b.totalVerified = 0
}
return ret
}

func microToMilli(micros int64) float64 {
Expand Down Expand Up @@ -143,3 +191,31 @@ func ObserveGossipTopicSeen(topic string, l int) {
}
metric.Add(float64(l))
}

func ObserveAggregateQuality(participationCount int, totalCount int) {
aggregateQualityMetricStruct.observe(participationCount, totalCount)
}

func ObserveCommitteeSize(size float64) {
committeeSize.Set(size)
}

func ObserveActiveValidatorsCount(count int) {
activeValidatorsCount.Set(float64(count))
}

func ObserveCurrentSlot(slot uint64) {
currentSlot.Set(float64(slot))
}

func ObserveCurrentEpoch(epoch uint64) {
currentEpoch.Set(float64(epoch))
}

func ObserveFrozenBlocks(count int) {
frozenBlocks.Set(float64(count))
}

func ObserveFrozenBlobs(count int) {
frozenBlobs.Set(float64(count))
}
2 changes: 2 additions & 0 deletions cl/phase1/forkchoice/checkpoint_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"fmt"

"github.com/erigontech/erigon/cl/cltypes/solid"
"github.com/erigontech/erigon/cl/monitor"
"github.com/erigontech/erigon/cl/phase1/core/state/shuffling"

"github.com/Giulio2002/bls"
Expand Down Expand Up @@ -111,6 +112,7 @@ func newCheckpointState(beaconConfig *clparams.BeaconChainConfig, anchorPublicKe
mixPosition := (epoch + beaconConfig.EpochsPerHistoricalVector - beaconConfig.MinSeedLookahead - 1) %
beaconConfig.EpochsPerHistoricalVector
activeIndicies := c.getActiveIndicies(epoch)
monitor.ObserveActiveValidatorsCount(len(activeIndicies))
c.shuffledSet = make([]uint64, len(activeIndicies))
c.shuffledSet = shuffling.ComputeShuffledIndicies(c.beaconConfig, c.randaoMixes.Get(int(mixPosition)), c.shuffledSet, activeIndicies, epoch*beaconConfig.SlotsPerEpoch)
return c
Expand Down
3 changes: 3 additions & 0 deletions cl/phase1/network/services/aggregate_and_proof_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"github.com/erigontech/erigon/cl/cltypes/solid"
"github.com/erigontech/erigon/cl/fork"
"github.com/erigontech/erigon/cl/merkle_tree"
"github.com/erigontech/erigon/cl/monitor"
"github.com/erigontech/erigon/cl/phase1/core/state"
"github.com/erigontech/erigon/cl/phase1/core/state/lru"
"github.com/erigontech/erigon/cl/phase1/forkchoice"
Expand Down Expand Up @@ -202,6 +203,8 @@ func (a *aggregateAndProofServiceImpl) ProcessMessage(

// further processing will be done after async signature verification
aggregateVerificationData.F = func() {
monitor.ObserveAggregateQuality(len(attestingIndicies), len(committee))
monitor.ObserveCommitteeSize(float64(len(committee)))
a.opPool.AttestationsPool.Insert(
aggregateAndProof.SignedAggregateAndProof.Message.Aggregate.Signature(),
aggregateAndProof.SignedAggregateAndProof.Message.Aggregate,
Expand Down
8 changes: 8 additions & 0 deletions cl/phase1/stages/forkchoice.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/erigontech/erigon-lib/log/v3"
"github.com/erigontech/erigon/cl/beacon/beaconevents"
"github.com/erigontech/erigon/cl/clparams"
"github.com/erigontech/erigon/cl/monitor"
"github.com/erigontech/erigon/cl/persistence/beacon_indicies"
state_accessors "github.com/erigontech/erigon/cl/persistence/state"
"github.com/erigontech/erigon/cl/phase1/core/state"
Expand All @@ -30,6 +31,13 @@ func computeAndNotifyServicesOfNewForkChoice(ctx context.Context, logger log.Log
err = fmt.Errorf("failed to get head: %w", err)
return
}
// Observe the current slot and epoch in the monitor
monitor.ObserveCurrentSlot(headSlot)
monitor.ObserveCurrentEpoch(headSlot / cfg.beaconCfg.SlotsPerEpoch)
if cfg.sn != nil {
monitor.ObserveFrozenBlocks(int(cfg.sn.BlocksAvailable()))
monitor.ObserveFrozenBlobs(int(cfg.sn.FrozenBlobs()))
}

// Perform fork choice update if the engine is available
if cfg.forkChoice.Engine() != nil {
Expand Down
11 changes: 11 additions & 0 deletions cl/sentinel/gossip.go
Original file line number Diff line number Diff line change
Expand Up @@ -498,6 +498,7 @@ func (g *GossipManager) Close() {
func (g *GossipManager) Start(ctx context.Context) {
go func() {
checkingInterval := time.NewTicker(time.Second)
dbgLogInterval := time.NewTicker(5 * time.Second)
for {
select {
case <-ctx.Done():
Expand All @@ -508,6 +509,16 @@ func (g *GossipManager) Start(ctx context.Context) {
sub.checkIfTopicNeedsToEnabledOrDisabled()
return true
})
case <-dbgLogInterval.C:
logArgs := []interface{}{}
g.subscriptions.Range(func(key, value any) bool {
sub := value.(*GossipSubscription)
if sub.topic != nil {
logArgs = append(logArgs, sub.topic.String(), sub.subscribed.Load())
}
return true
})
log.Debug("[Gossip] Subscriptions", "subscriptions", logArgs)
}
}
}()
Expand Down
2 changes: 2 additions & 0 deletions cmd/caplin/caplin1/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -347,6 +347,8 @@ func RunCaplinService(ctx context.Context, engine execution_client.ExecutionEngi
case <-logIntervalPeers.C:
if peerCount, err := beaconRpc.Peers(); err == nil {
logger.Info("P2P", "peers", peerCount)
} else {
logger.Error("P2P", "err", err)
}
case <-ctx.Done():
return
Expand Down

0 comments on commit bcee5fb

Please sign in to comment.