Skip to content

Commit

Permalink
Add caplin metrics for aggregation and block processing (#12134)
Browse files Browse the repository at this point in the history
  • Loading branch information
Giulio2002 authored Sep 30, 2024
1 parent 1861dcd commit 4e24b44
Show file tree
Hide file tree
Showing 10 changed files with 92 additions and 30 deletions.
71 changes: 70 additions & 1 deletion cl/monitor/metrics.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,15 @@
package monitor

import "github.com/erigontech/erigon-lib/metrics"
import (
"sync"
"time"

"github.com/erigontech/erigon-lib/metrics"
)

var (
// VALIDATOR METRICS

// metricAttestHit is the number of attestations that hit for those validators we observe within current_epoch-2
metricAttestHit = metrics.GetOrCreateCounter("validator_attestation_hit")
// metricAttestMiss is the number of attestations that miss for those validators we observe within current_epoch-2
Expand All @@ -11,4 +18,66 @@ var (
metricProposerHit = metrics.GetOrCreateCounter("validator_proposal_hit")
// metricProposerMiss is the number of proposals that miss for those validators we observe in previous slot
metricProposerMiss = metrics.GetOrCreateCounter("validator_proposal_miss")

// Block processing metrics
fullBlockProcessingTime = metrics.GetOrCreateGauge("full_block_processing_time")
attestationBlockProcessingTime = metrics.GetOrCreateGauge("attestation_block_processing_time")
batchVerificationThroughput = metrics.GetOrCreateGauge("aggregation_per_signature")

// Network metrics
gossipTopicsMetricCounterPrefix = "gossip_topics_seen"
gossipMetricsMap = sync.Map{}
)

type batchVerificationThroughputMetric struct {
totalVerified uint64
currentAverageSecs float64
mu sync.Mutex
}

var batchVerificationThroughputMetricStruct = &batchVerificationThroughputMetric{}

func (b *batchVerificationThroughputMetric) observe(t time.Duration, totalSigs int) float64 {
b.mu.Lock()
defer b.mu.Unlock()
elapsedInMillisecs := float64(t.Microseconds()) / 1000
if b.totalVerified == 0 {
b.currentAverageSecs = elapsedInMillisecs
} else {
b.currentAverageSecs = (b.currentAverageSecs*float64(b.totalVerified) + elapsedInMillisecs) / float64(b.totalVerified+uint64(totalSigs))
}
b.totalVerified += uint64(totalSigs)
return b.currentAverageSecs
}

func microToMilli(micros int64) float64 {
return float64(micros) / 1000
}

// ObserveAttestHit increments the attestation hit metric
func ObserveAttestationBlockProcessingTime(startTime time.Time) {
attestationBlockProcessingTime.Set(microToMilli(time.Since(startTime).Microseconds()))
}

// ObserveFullBlockProcessingTime increments the full block processing time metric
func ObserveFullBlockProcessingTime(startTime time.Time) {
fullBlockProcessingTime.Set(microToMilli(time.Since(startTime).Microseconds()))
}

// ObserveBatchVerificationThroughput increments the batch verification throughput metric
func ObserveBatchVerificationThroughput(d time.Duration, totalSigs int) {
batchVerificationThroughput.Set(batchVerificationThroughputMetricStruct.observe(d, totalSigs))
}

// ObserveGossipTopicSeen increments the gossip topic seen metric
func ObserveGossipTopicSeen(topic string, l int) {
var metric metrics.Counter
metricI, ok := gossipMetricsMap.LoadOrStore(topic, metrics.GetOrCreateCounter(gossipTopicsMetricCounterPrefix+"_"+topic))
if ok {
metric = metricI.(metrics.Counter)
} else {
metric = metrics.GetOrCreateCounter(gossipTopicsMetricCounterPrefix + "_" + topic)
gossipMetricsMap.Store(topic, metric)
}
metric.Add(float64(l))
}
4 changes: 0 additions & 4 deletions cl/phase1/core/state/ssz.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,25 +22,21 @@ import (
)

func (b *CachingBeaconState) EncodeSSZ(buf []byte) ([]byte, error) {
h := metrics.NewHistTimer("encode_ssz_beacon_state_dur")
bts, err := b.BeaconState.EncodeSSZ(buf)
if err != nil {
return nil, err
}
h.PutSince()
sz := metrics.NewHistTimer("encode_ssz_beacon_state_size")
sz.Observe(float64(len(bts)))
return bts, err
}

func (b *CachingBeaconState) DecodeSSZ(buf []byte, version int) error {
h := metrics.NewHistTimer("decode_ssz_beacon_state_dur")
if err := b.BeaconState.DecodeSSZ(buf, version); err != nil {
return err
}
sz := metrics.NewHistTimer("decode_ssz_beacon_state_size")
sz.Observe(float64(len(buf)))
h.PutSince()
return b.InitBeaconState()
}

Expand Down
7 changes: 6 additions & 1 deletion cl/phase1/forkchoice/on_block.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/erigontech/erigon/cl/clparams"
"github.com/erigontech/erigon/cl/cltypes"
"github.com/erigontech/erigon/cl/cltypes/solid"
"github.com/erigontech/erigon/cl/monitor"
"github.com/erigontech/erigon/cl/phase1/core/state"
"github.com/erigontech/erigon/cl/phase1/execution_client"
"github.com/erigontech/erigon/cl/phase1/forkchoice/fork_graph"
Expand Down Expand Up @@ -145,10 +146,12 @@ func (f *ForkChoiceStore) OnBlock(ctx context.Context, block *cltypes.SignedBeac
}
}
log.Trace("OnBlock: engine", "elapsed", time.Since(startEngine))
startStateProcess := time.Now()
lastProcessedState, status, err := f.forkGraph.AddChainSegment(block, fullValidation)
if err != nil {
return err
}
monitor.ObserveFullBlockProcessingTime(startStateProcess)
switch status {
case fork_graph.PreValidated:
return nil
Expand Down Expand Up @@ -209,11 +212,12 @@ func (f *ForkChoiceStore) OnBlock(ctx context.Context, block *cltypes.SignedBeac
finalizedCheckpoint = lastProcessedState.FinalizedCheckpoint().Copy()
justificationBits = lastProcessedState.JustificationBits().Copy()
)
f.operationsPool.NotifyBlock(block.Block)

// Eagerly compute unrealized justification and finality
if err := statechange.ProcessJustificationBitsAndFinality(lastProcessedState, nil); err != nil {
return err
}
f.operationsPool.NotifyBlock(block.Block)
f.updateUnrealizedCheckpoints(lastProcessedState.CurrentJustifiedCheckpoint().Copy(), lastProcessedState.FinalizedCheckpoint().Copy())
// Set the changed value pre-simulation
lastProcessedState.SetPreviousJustifiedCheckpoint(previousJustifiedCheckpoint)
Expand Down Expand Up @@ -244,6 +248,7 @@ func (f *ForkChoiceStore) OnBlock(ctx context.Context, block *cltypes.SignedBeac
if f.validatorMonitor != nil {
f.validatorMonitor.OnNewBlock(lastProcessedState, block.Block)
}

log.Trace("OnBlock", "elapsed", time.Since(start))
return nil
}
Expand Down
2 changes: 2 additions & 0 deletions cl/phase1/network/gossip_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"github.com/erigontech/erigon/cl/cltypes"
"github.com/erigontech/erigon/cl/cltypes/solid"
"github.com/erigontech/erigon/cl/gossip"
"github.com/erigontech/erigon/cl/monitor"
"github.com/erigontech/erigon/cl/phase1/forkchoice"
"github.com/erigontech/erigon/cl/phase1/network/services"
"github.com/erigontech/erigon/cl/utils/eth_clock"
Expand Down Expand Up @@ -136,6 +137,7 @@ func (g *GossipManager) onRecv(ctx context.Context, data *sentinel.GossipData, l
SubnetId: data.SubnetId,
Data: common.CopyBytes(data.Data),
}
monitor.ObserveGossipTopicSeen(data.Name, len(data.Data))

if err := g.routeAndProcess(ctx, data); err != nil {
return err
Expand Down
3 changes: 3 additions & 0 deletions cl/phase1/network/services/batch_signature_verification.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/Giulio2002/bls"
sentinel "github.com/erigontech/erigon-lib/gointerfaces/sentinelproto"
"github.com/erigontech/erigon-lib/log/v3"
"github.com/erigontech/erigon/cl/monitor"
)

const (
Expand Down Expand Up @@ -163,10 +164,12 @@ func (b *BatchSignatureVerifier) handleIncorrectSignatures(aggregateVerification
}

func (b *BatchSignatureVerifier) runBatchVerification(signatures [][]byte, signRoots [][]byte, pks [][]byte, fns []func()) error {
start := time.Now()
valid, err := blsVerifyMultipleSignatures(signatures, signRoots, pks)
if err != nil {
return errors.New("batch signature verification failed with the error: " + err.Error())
}
monitor.ObserveBatchVerificationThroughput(time.Since(start), len(signatures))

if !valid {
return errors.New("batch invalid signature")
Expand Down
1 change: 1 addition & 0 deletions cl/sentinel/gossip.go
Original file line number Diff line number Diff line change
Expand Up @@ -629,6 +629,7 @@ func (s *GossipSubscription) run(ctx context.Context, sub *pubsub.Subscription,
if msg.ReceivedFrom == s.host {
continue
}

s.ch <- &GossipMessage{
From: msg.ReceivedFrom,
TopicName: topicName,
Expand Down
22 changes: 3 additions & 19 deletions cl/transition/impl/eth2/operations.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,8 @@ import (
"slices"
"time"

"github.com/erigontech/erigon-lib/metrics"

"github.com/erigontech/erigon/cl/abstract"
"github.com/erigontech/erigon/cl/monitor"

"github.com/erigontech/erigon/cl/transition/impl/eth2/statechange"

Expand Down Expand Up @@ -536,10 +535,8 @@ func (I *impl) ProcessAttestations(
attestations *solid.ListSSZ[*solid.Attestation],
) error {
attestingIndiciesSet := make([][]uint64, attestations.Len())
h := metrics.NewHistTimer("beacon_process_attestations")
baseRewardPerIncrement := s.BaseRewardPerIncrement()

c := h.Tag("attestation_step", "process")
var err error
if err := solid.RangeErr[*solid.Attestation](attestations, func(i int, a *solid.Attestation, _ int) error {
if attestingIndiciesSet[i], err = I.processAttestation(s, a, baseRewardPerIncrement); err != nil {
Expand All @@ -553,17 +550,16 @@ func (I *impl) ProcessAttestations(
return err
}
var valid bool
c.PutSince()
if I.FullValidation {
c = h.Tag("attestation_step", "validate")
start := time.Now()
valid, err = verifyAttestations(s, attestations, attestingIndiciesSet)
if err != nil {
return err
}
if !valid {
return errors.New("ProcessAttestation: wrong bls data")
}
c.PutSince()
monitor.ObserveAttestationBlockProcessingTime(start)
}

return nil
Expand All @@ -579,9 +575,6 @@ func (I *impl) processAttestationPostAltair(
stateSlot := s.Slot()
beaconConfig := s.BeaconConfig()

h := metrics.NewHistTimer("beacon_process_attestation_post_altair")

c := h.Tag("step", "get_participation_flag")
participationFlagsIndicies, err := s.GetAttestationParticipationFlagIndicies(
data,
stateSlot-data.Slot(),
Expand All @@ -590,22 +583,16 @@ func (I *impl) processAttestationPostAltair(
if err != nil {
return nil, err
}
c.PutSince()

c = h.Tag("step", "get_attesting_indices")

attestingIndicies, err := s.GetAttestingIndicies(data, attestation.AggregationBits(), true)
if err != nil {
return nil, err
}

c.PutSince()

var proposerRewardNumerator uint64

isCurrentEpoch := data.Target().Epoch() == currentEpoch

c = h.Tag("step", "update_attestation")
for _, attesterIndex := range attestingIndicies {
val, err := s.ValidatorEffectiveBalance(int(attesterIndex))
if err != nil {
Expand All @@ -630,14 +617,11 @@ func (I *impl) processAttestationPostAltair(
proposerRewardNumerator += baseReward * weight
}
}
c.PutSince()
// Reward proposer
c = h.Tag("step", "get_proposer_index")
proposer, err := s.GetBeaconProposerIndex()
if err != nil {
return nil, err
}
c.PutSince()
proposerRewardDenominator := (beaconConfig.WeightDenominator - beaconConfig.ProposerWeight) * beaconConfig.WeightDenominator / beaconConfig.ProposerWeight
reward := proposerRewardNumerator / proposerRewardDenominator
if I.BlockRewardsCollector != nil {
Expand Down
3 changes: 0 additions & 3 deletions cl/transition/machine/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package machine
import (
"fmt"

"github.com/erigontech/erigon-lib/metrics"
"github.com/erigontech/erigon/cl/abstract"
"github.com/erigontech/erigon/cl/phase1/core/state"
"github.com/pkg/errors"
Expand All @@ -44,7 +43,6 @@ func ProcessBlock(impl BlockProcessor, s abstract.BeaconState, block cltypes.Gen
if block.Version() != version {
return fmt.Errorf("processBlindedBlock: wrong state version for block at slot %d", block.GetSlot())
}
h := metrics.NewHistTimer("beacon_process_blinded_block")
bodyRoot, err := body.HashSSZ()
if err != nil {
return errors.WithMessagef(err, "processBlindedBlock: failed to hash block body")
Expand Down Expand Up @@ -92,7 +90,6 @@ func ProcessBlock(impl BlockProcessor, s abstract.BeaconState, block cltypes.Gen
}
}

h.PutSince()
return nil
}

Expand Down
1 change: 1 addition & 0 deletions erigon-lib/state/state_changeset.go
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,7 @@ func (d *StateChangeSet) SerializeKeys(out []byte) []byte {
ret := out
tmp := make([]byte, 4)
for i := range d.Diffs {

diffSet := d.Diffs[i].GetDiffSet()
binary.BigEndian.PutUint32(tmp, uint32(SerializeDiffSetBufLen(diffSet)))
ret = append(ret, tmp...)
Expand Down
8 changes: 6 additions & 2 deletions eth/stagedsync/exec3.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ var (
mxExecGas = metrics.NewCounter(`exec_gas`)
mxExecBlocks = metrics.NewGauge("exec_blocks")

mxMgas = metrics.NewSummary(`exec_mgas`)
mxMgas = metrics.NewGauge(`exec_mgas`)
)

const (
Expand Down Expand Up @@ -203,7 +203,11 @@ func ExecV3(ctx context.Context,
chainConfig, genesis := cfg.chainConfig, cfg.genesis
totalGasUsed := uint64(0)
start := time.Now()
defer func() { mxMgas.Observe((float64(totalGasUsed) / 1e6) / time.Since(start).Seconds()) }()
defer func() {
if totalGasUsed > 0 {
mxMgas.Set((float64(totalGasUsed) / 1e6) / time.Since(start).Seconds())
}
}()

applyTx := txc.Tx
useExternalTx := applyTx != nil
Expand Down

0 comments on commit 4e24b44

Please sign in to comment.