Skip to content

Commit

Permalink
feat: support eigenda
Browse files Browse the repository at this point in the history
  • Loading branch information
renlulu committed Jan 19, 2024
1 parent a5b5de6 commit e2b9f09
Show file tree
Hide file tree
Showing 18 changed files with 375 additions and 54 deletions.
1 change: 1 addition & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ COPY ./blsSignatures ./blsSignatures
COPY ./cmd/chaininfo ./cmd/chaininfo
COPY ./cmd/replay ./cmd/replay
COPY ./das/dastree ./das/dastree
COPY ./das/eigenda ./das/eigenda
COPY ./precompiles ./precompiles
COPY ./statetransfer ./statetransfer
COPY ./util ./util
Expand Down
46 changes: 35 additions & 11 deletions arbnode/batch_poster.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import (
"github.com/offchainlabs/nitro/cmd/chaininfo"
"github.com/offchainlabs/nitro/cmd/genericconf"
"github.com/offchainlabs/nitro/das"
"github.com/offchainlabs/nitro/das/eigenda"
"github.com/offchainlabs/nitro/solgen/go/bridgegen"
"github.com/offchainlabs/nitro/util/arbmath"
"github.com/offchainlabs/nitro/util/headerreader"
Expand Down Expand Up @@ -71,6 +72,7 @@ type BatchPoster struct {
gasRefunderAddr common.Address
building *buildingBatch
daWriter das.DataAvailabilityServiceWriter
eigenDAWriter eigenda.EigenDAWriter
dataPoster *dataposter.DataPoster
redisLock *redislock.Simple
firstEphemeralError time.Time // first time a continuous error suspected to be ephemeral occurred
Expand Down Expand Up @@ -100,8 +102,9 @@ const (
)

type BatchPosterConfig struct {
Enable bool `koanf:"enable"`
DisableDasFallbackStoreDataOnChain bool `koanf:"disable-das-fallback-store-data-on-chain" reload:"hot"`
Enable bool `koanf:"enable"`
DisableDasFallbackStoreDataOnChain bool `koanf:"disable-das-fallback-store-data-on-chain" reload:"hot"`
DisableEigenDAFallbackStoreDataOnChain bool `koanf:"disable-eigenda-fallback-store-data-on-chain" reload:"hot"`
// Max batch size.
MaxSize int `koanf:"max-size" reload:"hot"`
// Max batch post delay.
Expand Down Expand Up @@ -219,15 +222,16 @@ var TestBatchPosterConfig = BatchPosterConfig{
}

type BatchPosterOpts struct {
DataPosterDB ethdb.Database
L1Reader *headerreader.HeaderReader
Inbox *InboxTracker
Streamer *TransactionStreamer
SyncMonitor *SyncMonitor
Config BatchPosterConfigFetcher
DeployInfo *chaininfo.RollupAddresses
TransactOpts *bind.TransactOpts
DAWriter das.DataAvailabilityServiceWriter
DataPosterDB ethdb.Database
L1Reader *headerreader.HeaderReader
Inbox *InboxTracker
Streamer *TransactionStreamer
SyncMonitor *SyncMonitor
Config BatchPosterConfigFetcher
DeployInfo *chaininfo.RollupAddresses
TransactOpts *bind.TransactOpts
DAWriter das.DataAvailabilityServiceWriter
EigenDAWriter eigenda.EigenDAWriter
}

func NewBatchPoster(ctx context.Context, opts *BatchPosterOpts) (*BatchPoster, error) {
Expand Down Expand Up @@ -272,6 +276,7 @@ func NewBatchPoster(ctx context.Context, opts *BatchPosterOpts) (*BatchPoster, e
gasRefunderAddr: opts.Config().gasRefunder,
bridgeAddr: opts.DeployInfo.Bridge,
daWriter: opts.DAWriter,
eigenDAWriter: opts.EigenDAWriter,
redisLock: redisLock,
}
b.messagesPerBatch, err = arbmath.NewMovingAverage[uint64](20)
Expand Down Expand Up @@ -1035,6 +1040,25 @@ func (b *BatchPoster) maybePostSequencerBatch(ctx context.Context) (bool, error)
}
}

if b.daWriter == nil && b.eigenDAWriter != nil {
log.Info("Start to write data to eigenda: ", "data", hex.EncodeToString(sequencerMsg))
daRef, err := b.eigenDAWriter.Store(ctx, sequencerMsg)
if err != nil {
if config.DisableEigenDAFallbackStoreDataOnChain {
log.Warn("Falling back to storing data on chain", "err", err)
return false, errors.New("unable to post batch to EigenDA and fallback storing data on chain is disabled")
}
}

pointer, err := b.eigenDAWriter.Serialize(daRef)
if err != nil {
log.Warn("DaRef serialization failed", "err", err)
return false, errors.New("DaRef serialization failed")
}
log.Info("EigenDA transaction receipt(data pointer): ", "hash", hex.EncodeToString(daRef.BatchHeaderHash), "index", daRef.BlobIndex)
sequencerMsg = pointer
}

data, err := b.encodeAddBatch(new(big.Int).SetUint64(batchPosition.NextSeqNum), batchPosition.MessageCount, b.building.msgCount, sequencerMsg, b.building.segments.delayedMsg)
if err != nil {
return false, err
Expand Down
2 changes: 1 addition & 1 deletion arbnode/delayed_seq_reorg_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ func TestSequencerReorgFromDelayed(t *testing.T) {
defer cancel()

exec, streamer, db, _ := NewTransactionStreamerForTest(t, common.Address{})
tracker, err := NewInboxTracker(db, streamer, nil)
tracker, err := NewInboxTracker(db, streamer, nil, nil)
Require(t, err)

err = streamer.Start(ctx)
Expand Down
7 changes: 5 additions & 2 deletions arbnode/inbox_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/offchainlabs/nitro/arbutil"
"github.com/offchainlabs/nitro/broadcaster"
m "github.com/offchainlabs/nitro/broadcaster/message"
"github.com/offchainlabs/nitro/das/eigenda"
"github.com/offchainlabs/nitro/staker"
"github.com/offchainlabs/nitro/util/containers"
)
Expand All @@ -38,12 +39,13 @@ type InboxTracker struct {
mutex sync.Mutex
validator *staker.BlockValidator
das arbstate.DataAvailabilityReader
eigenDA eigenda.EigenDAReader

batchMetaMutex sync.Mutex
batchMeta *containers.LruCache[uint64, BatchMetadata]
}

func NewInboxTracker(db ethdb.Database, txStreamer *TransactionStreamer, das arbstate.DataAvailabilityReader) (*InboxTracker, error) {
func NewInboxTracker(db ethdb.Database, txStreamer *TransactionStreamer, das arbstate.DataAvailabilityReader, eigenDAReader eigenda.EigenDAReader) (*InboxTracker, error) {
// We support a nil txStreamer for the pruning code
if txStreamer != nil && txStreamer.chainConfig.ArbitrumChainParams.DataAvailabilityCommittee && das == nil {
return nil, errors.New("data availability service required but unconfigured")
Expand All @@ -52,6 +54,7 @@ func NewInboxTracker(db ethdb.Database, txStreamer *TransactionStreamer, das arb
db: db,
txStreamer: txStreamer,
das: das,
eigenDA: eigenDAReader,
batchMeta: containers.NewLruCache[uint64, BatchMetadata](1000),
}
return tracker, nil
Expand Down Expand Up @@ -603,7 +606,7 @@ func (t *InboxTracker) AddSequencerBatches(ctx context.Context, client arbutil.L
ctx: ctx,
client: client,
}
multiplexer := arbstate.NewInboxMultiplexer(backend, prevbatchmeta.DelayedMessageCount, t.das, arbstate.KeysetValidate)
multiplexer := arbstate.NewInboxMultiplexer(backend, prevbatchmeta.DelayedMessageCount, t.das, t.eigenDA, arbstate.KeysetValidate)
batchMessageCounts := make(map[uint64]arbutil.MessageIndex)
currentpos := prevbatchmeta.MessageCount + 1
for {
Expand Down
33 changes: 23 additions & 10 deletions arbnode/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/offchainlabs/nitro/broadcaster"
"github.com/offchainlabs/nitro/cmd/chaininfo"
"github.com/offchainlabs/nitro/das"
"github.com/offchainlabs/nitro/das/eigenda"
"github.com/offchainlabs/nitro/execution"
"github.com/offchainlabs/nitro/execution/gethexec"
"github.com/offchainlabs/nitro/solgen/go/bridgegen"
Expand Down Expand Up @@ -85,6 +86,7 @@ type Config struct {
Staker staker.L1ValidatorConfig `koanf:"staker" reload:"hot"`
SeqCoordinator SeqCoordinatorConfig `koanf:"seq-coordinator"`
DataAvailability das.DataAvailabilityConfig `koanf:"data-availability"`
EigenDA eigenda.EigenDAConfig `koanf:"eigen-da"`
SyncMonitor SyncMonitorConfig `koanf:"sync-monitor"`
Dangerous DangerousConfig `koanf:"dangerous"`
TransactionStreamer TransactionStreamerConfig `koanf:"transaction-streamer" reload:"hot"`
Expand Down Expand Up @@ -484,6 +486,8 @@ func createNodeImpl(
var daWriter das.DataAvailabilityServiceWriter
var daReader das.DataAvailabilityServiceReader
var dasLifecycleManager *das.LifecycleManager
var eigenDAReader eigenda.EigenDAReader
var eigenDAWriter eigenda.EigenDAWriter
if config.DataAvailability.Enable {
if config.BatchPoster.Enable {
daWriter, daReader, dasLifecycleManager, err = das.CreateBatchPosterDAS(ctx, &config.DataAvailability, dataSigner, l1client, deployInfo.SequencerInbox)
Expand All @@ -507,9 +511,16 @@ func createNodeImpl(
}
} else if l2Config.ArbitrumChainParams.DataAvailabilityCommittee {
return nil, errors.New("a data availability service is required for this chain, but it was not configured")
} else if config.EigenDA.Enable {
eigenDAService, err := eigenda.NewEigenDA(config.EigenDA.Rpc)
if err != nil {
return nil, err
}
eigenDAReader = eigenDAService
eigenDAWriter = eigenDAService
}

inboxTracker, err := NewInboxTracker(arbDb, txStreamer, daReader)
inboxTracker, err := NewInboxTracker(arbDb, txStreamer, daReader, eigenDAReader)
if err != nil {
return nil, err
}
Expand All @@ -528,6 +539,7 @@ func createNodeImpl(
exec,
rawdb.NewTable(arbDb, storage.BlockValidatorPrefix),
daReader,
eigenDAReader,
func() *staker.BlockValidatorConfig { return &configFetcher.Get().BlockValidator },
stack,
)
Expand Down Expand Up @@ -634,15 +646,16 @@ func createNodeImpl(
return nil, errors.New("batchposter, but no TxOpts")
}
batchPoster, err = NewBatchPoster(ctx, &BatchPosterOpts{
DataPosterDB: rawdb.NewTable(arbDb, storage.BatchPosterPrefix),
L1Reader: l1Reader,
Inbox: inboxTracker,
Streamer: txStreamer,
SyncMonitor: syncMonitor,
Config: func() *BatchPosterConfig { return &configFetcher.Get().BatchPoster },
DeployInfo: deployInfo,
TransactOpts: txOptsBatchPoster,
DAWriter: daWriter,
DataPosterDB: rawdb.NewTable(arbDb, storage.BatchPosterPrefix),
L1Reader: l1Reader,
Inbox: inboxTracker,
Streamer: txStreamer,
SyncMonitor: syncMonitor,
Config: func() *BatchPosterConfig { return &configFetcher.Get().BatchPoster },
DeployInfo: deployInfo,
TransactOpts: txOptsBatchPoster,
DAWriter: daWriter,
EigenDAWriter: eigenDAWriter,
})
if err != nil {
return nil, err
Expand Down
64 changes: 61 additions & 3 deletions arbstate/inbox.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@ package arbstate
import (
"bytes"
"context"
"crypto/sha256"
"encoding/binary"
"encoding/hex"
"errors"
"io"
"math/big"
Expand All @@ -21,6 +23,7 @@ import (
"github.com/offchainlabs/nitro/arbos/l1pricing"
"github.com/offchainlabs/nitro/arbutil"
"github.com/offchainlabs/nitro/das/dastree"
"github.com/offchainlabs/nitro/das/eigenda"
"github.com/offchainlabs/nitro/zeroheavy"
)

Expand Down Expand Up @@ -50,7 +53,7 @@ const maxZeroheavyDecompressedLen = 101*MaxDecompressedLen/100 + 64
const MaxSegmentsPerSequencerMessage = 100 * 1024
const MinLifetimeSecondsForDataAvailabilityCert = 7 * 24 * 60 * 60 // one week

func parseSequencerMessage(ctx context.Context, batchNum uint64, data []byte, dasReader DataAvailabilityReader, keysetValidationMode KeysetValidationMode) (*sequencerMessage, error) {
func parseSequencerMessage(ctx context.Context, batchNum uint64, data []byte, dasReader DataAvailabilityReader, eigenDAReader eigenda.EigenDAReader, keysetValidationMode KeysetValidationMode) (*sequencerMessage, error) {
if len(data) < 40 {
return nil, errors.New("sequencer message missing L1 header")
}
Expand All @@ -63,6 +66,7 @@ func parseSequencerMessage(ctx context.Context, batchNum uint64, data []byte, da
segments: [][]byte{},
}
payload := data[40:]
log.Info("Inbox parse sequencer message: ", "payload", hex.EncodeToString(payload))

if len(payload) > 0 && IsDASMessageHeaderByte(payload[0]) {
if dasReader == nil {
Expand All @@ -79,6 +83,21 @@ func parseSequencerMessage(ctx context.Context, batchNum uint64, data []byte, da
}
}

if len(payload) > 0 && eigenda.IsEigenDAMessageHeaderByte(payload[0]) {
if eigenDAReader == nil {
log.Error("No EigenDA Reader configured, but sequencer message found with EigenDA header")
} else {
var err error
payload, err = RecoverPayloadFromEigenDABatch(ctx, batchNum, payload[1:], eigenDAReader, nil)
if err != nil {
return nil, err
}
if payload == nil {
return parsedMsg, nil
}
}
}

if len(payload) > 0 && IsZeroheavyEncodedHeaderByte(payload[0]) {
pl, err := io.ReadAll(io.LimitReader(zeroheavy.NewZeroheavyDecoder(bytes.NewReader(payload[1:])), int64(maxZeroheavyDecompressedLen)))
if err != nil {
Expand Down Expand Up @@ -124,6 +143,43 @@ func parseSequencerMessage(ctx context.Context, batchNum uint64, data []byte, da
return parsedMsg, nil
}

func RecoverPayloadFromEigenDABatch(ctx context.Context,
batchNum uint64,
sequencerMsg []byte,
daReader eigenda.EigenDAReader,
preimages map[arbutil.PreimageType]map[common.Hash][]byte,
) ([]byte, error) {
log.Info("Start recovering payload from eigenda: ", "data", hex.EncodeToString(sequencerMsg))
var shaPreimages map[common.Hash][]byte
if preimages != nil {
if preimages[arbutil.Sha2_256PreimageType] == nil {
preimages[arbutil.Sha2_256PreimageType] = make(map[common.Hash][]byte)
}
shaPreimages = preimages[arbutil.Sha2_256PreimageType]
}
// 00000020
// 91c127a758d669ce7c8ed915679653e87bf1dfbcf54d028c522d129c482c897d
var daRef eigenda.EigenDARef
daRef.BlobIndex = binary.BigEndian.Uint32(sequencerMsg[:4])
daRef.BatchHeaderHash = sequencerMsg[4:]
log.Info("Data pointer: ", "info", hex.EncodeToString(daRef.BatchHeaderHash), "index", daRef.BlobIndex)
data, err := daReader.QueryBlob(ctx, &daRef)
if err != nil {
log.Error("Failed to query data from EigenDA", "err", err)
return nil, err
}
// log.Info("data: ", "info", hex.EncodeToString(data))
// record preimage data
log.Info("Recording preimage data for EigenDA")
shaDataHash := sha256.New()
shaDataHash.Write(sequencerMsg)
dataHash := shaDataHash.Sum([]byte{})
if shaPreimages != nil {
shaPreimages[common.BytesToHash(dataHash)] = data
}
return data, nil
}

func RecoverPayloadFromDasBatch(
ctx context.Context,
batchNum uint64,
Expand Down Expand Up @@ -242,6 +298,7 @@ type inboxMultiplexer struct {
backend InboxBackend
delayedMessagesRead uint64
dasReader DataAvailabilityReader
eigenDAReader eigenda.EigenDAReader
cachedSequencerMessage *sequencerMessage
cachedSequencerMessageNum uint64
cachedSegmentNum uint64
Expand All @@ -251,11 +308,12 @@ type inboxMultiplexer struct {
keysetValidationMode KeysetValidationMode
}

func NewInboxMultiplexer(backend InboxBackend, delayedMessagesRead uint64, dasReader DataAvailabilityReader, keysetValidationMode KeysetValidationMode) arbostypes.InboxMultiplexer {
func NewInboxMultiplexer(backend InboxBackend, delayedMessagesRead uint64, dasReader DataAvailabilityReader, eigenDAReader eigenda.EigenDAReader, keysetValidationMode KeysetValidationMode) arbostypes.InboxMultiplexer {
return &inboxMultiplexer{
backend: backend,
delayedMessagesRead: delayedMessagesRead,
dasReader: dasReader,
eigenDAReader: eigenDAReader,
keysetValidationMode: keysetValidationMode,
}
}
Expand All @@ -276,7 +334,7 @@ func (r *inboxMultiplexer) Pop(ctx context.Context) (*arbostypes.MessageWithMeta
}
r.cachedSequencerMessageNum = r.backend.GetSequencerInboxPosition()
var err error
r.cachedSequencerMessage, err = parseSequencerMessage(ctx, r.cachedSequencerMessageNum, bytes, r.dasReader, r.keysetValidationMode)
r.cachedSequencerMessage, err = parseSequencerMessage(ctx, r.cachedSequencerMessageNum, bytes, r.dasReader, r.eigenDAReader, r.keysetValidationMode)
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion arbstate/inbox_fuzz_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func FuzzInboxMultiplexer(f *testing.F) {
delayedMessage: delayedMsg,
positionWithinMessage: 0,
}
multiplexer := NewInboxMultiplexer(backend, 0, nil, KeysetValidate)
multiplexer := NewInboxMultiplexer(backend, 0, nil, nil, KeysetValidate)
_, err := multiplexer.Pop(context.TODO())
if err != nil {
panic(err)
Expand Down
2 changes: 1 addition & 1 deletion cmd/nitro/nitro.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ func mainImpl() int {
return 1
}

log.Info("Running Arbitrum nitro node", "revision", vcsRevision, "vcs.time", vcsTime)
log.Info("Running Arbitrum nitro node with eigenda integration", "revision", vcsRevision, "vcs.time", vcsTime)

if nodeConfig.Node.Dangerous.NoL1Listener {
nodeConfig.Node.ParentChainReader.Enable = false
Expand Down
2 changes: 1 addition & 1 deletion cmd/pruning/pruning.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ func findImportantRoots(ctx context.Context, chainDb ethdb.Database, stack *node
return nil, fmt.Errorf("failed to get finalized block: %w", err)
}
l1BlockNum := l1Block.NumberU64()
tracker, err := arbnode.NewInboxTracker(arbDb, nil, nil)
tracker, err := arbnode.NewInboxTracker(arbDb, nil, nil, nil)
if err != nil {
return nil, err
}
Expand Down
Loading

0 comments on commit e2b9f09

Please sign in to comment.