diff --git a/Dockerfile b/Dockerfile index b62e569259..7b7224212d 100644 --- a/Dockerfile +++ b/Dockerfile @@ -65,6 +65,7 @@ COPY ./blsSignatures ./blsSignatures COPY ./cmd/chaininfo ./cmd/chaininfo COPY ./cmd/replay ./cmd/replay COPY ./das/dastree ./das/dastree +COPY ./das/eigenda ./das/eigenda COPY ./precompiles ./precompiles COPY ./statetransfer ./statetransfer COPY ./util ./util diff --git a/arbnode/batch_poster.go b/arbnode/batch_poster.go index 8c0fdd332a..4369582f1c 100644 --- a/arbnode/batch_poster.go +++ b/arbnode/batch_poster.go @@ -37,6 +37,7 @@ import ( "github.com/offchainlabs/nitro/cmd/chaininfo" "github.com/offchainlabs/nitro/cmd/genericconf" "github.com/offchainlabs/nitro/das" + "github.com/offchainlabs/nitro/das/eigenda" "github.com/offchainlabs/nitro/solgen/go/bridgegen" "github.com/offchainlabs/nitro/util/arbmath" "github.com/offchainlabs/nitro/util/headerreader" @@ -71,6 +72,7 @@ type BatchPoster struct { gasRefunderAddr common.Address building *buildingBatch daWriter das.DataAvailabilityServiceWriter + eigenDAWriter eigenda.EigenDAWriter dataPoster *dataposter.DataPoster redisLock *redislock.Simple firstEphemeralError time.Time // first time a continuous error suspected to be ephemeral occurred @@ -100,8 +102,9 @@ const ( ) type BatchPosterConfig struct { - Enable bool `koanf:"enable"` - DisableDasFallbackStoreDataOnChain bool `koanf:"disable-das-fallback-store-data-on-chain" reload:"hot"` + Enable bool `koanf:"enable"` + DisableDasFallbackStoreDataOnChain bool `koanf:"disable-das-fallback-store-data-on-chain" reload:"hot"` + DisableEigenDAFallbackStoreDataOnChain bool `koanf:"disable-eigenda-fallback-store-data-on-chain" reload:"hot"` // Max batch size. MaxSize int `koanf:"max-size" reload:"hot"` // Max batch post delay. @@ -219,15 +222,16 @@ var TestBatchPosterConfig = BatchPosterConfig{ } type BatchPosterOpts struct { - DataPosterDB ethdb.Database - L1Reader *headerreader.HeaderReader - Inbox *InboxTracker - Streamer *TransactionStreamer - SyncMonitor *SyncMonitor - Config BatchPosterConfigFetcher - DeployInfo *chaininfo.RollupAddresses - TransactOpts *bind.TransactOpts - DAWriter das.DataAvailabilityServiceWriter + DataPosterDB ethdb.Database + L1Reader *headerreader.HeaderReader + Inbox *InboxTracker + Streamer *TransactionStreamer + SyncMonitor *SyncMonitor + Config BatchPosterConfigFetcher + DeployInfo *chaininfo.RollupAddresses + TransactOpts *bind.TransactOpts + DAWriter das.DataAvailabilityServiceWriter + EigenDAWriter eigenda.EigenDAWriter } func NewBatchPoster(ctx context.Context, opts *BatchPosterOpts) (*BatchPoster, error) { @@ -272,6 +276,7 @@ func NewBatchPoster(ctx context.Context, opts *BatchPosterOpts) (*BatchPoster, e gasRefunderAddr: opts.Config().gasRefunder, bridgeAddr: opts.DeployInfo.Bridge, daWriter: opts.DAWriter, + eigenDAWriter: opts.EigenDAWriter, redisLock: redisLock, } b.messagesPerBatch, err = arbmath.NewMovingAverage[uint64](20) @@ -1035,6 +1040,25 @@ func (b *BatchPoster) maybePostSequencerBatch(ctx context.Context) (bool, error) } } + if b.daWriter == nil && b.eigenDAWriter != nil { + log.Info("Start to write data to eigenda: ", "data", hex.EncodeToString(sequencerMsg)) + daRef, err := b.eigenDAWriter.Store(ctx, sequencerMsg) + if err != nil { + if config.DisableEigenDAFallbackStoreDataOnChain { + log.Warn("Falling back to storing data on chain", "err", err) + return false, errors.New("unable to post batch to EigenDA and fallback storing data on chain is disabled") + } + } + + pointer, err := b.eigenDAWriter.Serialize(daRef) + if err != nil { + log.Warn("DaRef serialization failed", "err", err) + return false, errors.New("DaRef serialization failed") + } + log.Info("EigenDA transaction receipt(data pointer): ", "hash", hex.EncodeToString(daRef.BatchHeaderHash), "index", daRef.BlobIndex) + sequencerMsg = pointer + } + data, err := b.encodeAddBatch(new(big.Int).SetUint64(batchPosition.NextSeqNum), batchPosition.MessageCount, b.building.msgCount, sequencerMsg, b.building.segments.delayedMsg) if err != nil { return false, err diff --git a/arbnode/delayed_seq_reorg_test.go b/arbnode/delayed_seq_reorg_test.go index a28eebb5dc..262d9a5eb7 100644 --- a/arbnode/delayed_seq_reorg_test.go +++ b/arbnode/delayed_seq_reorg_test.go @@ -19,7 +19,7 @@ func TestSequencerReorgFromDelayed(t *testing.T) { defer cancel() exec, streamer, db, _ := NewTransactionStreamerForTest(t, common.Address{}) - tracker, err := NewInboxTracker(db, streamer, nil) + tracker, err := NewInboxTracker(db, streamer, nil, nil) Require(t, err) err = streamer.Start(ctx) diff --git a/arbnode/inbox_tracker.go b/arbnode/inbox_tracker.go index 51f74cbeb4..31fe69fadf 100644 --- a/arbnode/inbox_tracker.go +++ b/arbnode/inbox_tracker.go @@ -23,6 +23,7 @@ import ( "github.com/offchainlabs/nitro/arbutil" "github.com/offchainlabs/nitro/broadcaster" m "github.com/offchainlabs/nitro/broadcaster/message" + "github.com/offchainlabs/nitro/das/eigenda" "github.com/offchainlabs/nitro/staker" "github.com/offchainlabs/nitro/util/containers" ) @@ -38,12 +39,13 @@ type InboxTracker struct { mutex sync.Mutex validator *staker.BlockValidator das arbstate.DataAvailabilityReader + eigenDA eigenda.EigenDAReader batchMetaMutex sync.Mutex batchMeta *containers.LruCache[uint64, BatchMetadata] } -func NewInboxTracker(db ethdb.Database, txStreamer *TransactionStreamer, das arbstate.DataAvailabilityReader) (*InboxTracker, error) { +func NewInboxTracker(db ethdb.Database, txStreamer *TransactionStreamer, das arbstate.DataAvailabilityReader, eigenDAReader eigenda.EigenDAReader) (*InboxTracker, error) { // We support a nil txStreamer for the pruning code if txStreamer != nil && txStreamer.chainConfig.ArbitrumChainParams.DataAvailabilityCommittee && das == nil { return nil, errors.New("data availability service required but unconfigured") @@ -52,6 +54,7 @@ func NewInboxTracker(db ethdb.Database, txStreamer *TransactionStreamer, das arb db: db, txStreamer: txStreamer, das: das, + eigenDA: eigenDAReader, batchMeta: containers.NewLruCache[uint64, BatchMetadata](1000), } return tracker, nil @@ -603,7 +606,7 @@ func (t *InboxTracker) AddSequencerBatches(ctx context.Context, client arbutil.L ctx: ctx, client: client, } - multiplexer := arbstate.NewInboxMultiplexer(backend, prevbatchmeta.DelayedMessageCount, t.das, arbstate.KeysetValidate) + multiplexer := arbstate.NewInboxMultiplexer(backend, prevbatchmeta.DelayedMessageCount, t.das, t.eigenDA, arbstate.KeysetValidate) batchMessageCounts := make(map[uint64]arbutil.MessageIndex) currentpos := prevbatchmeta.MessageCount + 1 for { diff --git a/arbnode/node.go b/arbnode/node.go index 45ef7206e1..1695114636 100644 --- a/arbnode/node.go +++ b/arbnode/node.go @@ -32,6 +32,7 @@ import ( "github.com/offchainlabs/nitro/broadcaster" "github.com/offchainlabs/nitro/cmd/chaininfo" "github.com/offchainlabs/nitro/das" + "github.com/offchainlabs/nitro/das/eigenda" "github.com/offchainlabs/nitro/execution" "github.com/offchainlabs/nitro/execution/gethexec" "github.com/offchainlabs/nitro/solgen/go/bridgegen" @@ -85,6 +86,7 @@ type Config struct { Staker staker.L1ValidatorConfig `koanf:"staker" reload:"hot"` SeqCoordinator SeqCoordinatorConfig `koanf:"seq-coordinator"` DataAvailability das.DataAvailabilityConfig `koanf:"data-availability"` + EigenDA eigenda.EigenDAConfig `koanf:"eigen-da"` SyncMonitor SyncMonitorConfig `koanf:"sync-monitor"` Dangerous DangerousConfig `koanf:"dangerous"` TransactionStreamer TransactionStreamerConfig `koanf:"transaction-streamer" reload:"hot"` @@ -484,6 +486,8 @@ func createNodeImpl( var daWriter das.DataAvailabilityServiceWriter var daReader das.DataAvailabilityServiceReader var dasLifecycleManager *das.LifecycleManager + var eigenDAReader eigenda.EigenDAReader + var eigenDAWriter eigenda.EigenDAWriter if config.DataAvailability.Enable { if config.BatchPoster.Enable { daWriter, daReader, dasLifecycleManager, err = das.CreateBatchPosterDAS(ctx, &config.DataAvailability, dataSigner, l1client, deployInfo.SequencerInbox) @@ -507,9 +511,16 @@ func createNodeImpl( } } else if l2Config.ArbitrumChainParams.DataAvailabilityCommittee { return nil, errors.New("a data availability service is required for this chain, but it was not configured") + } else if config.EigenDA.Enable { + eigenDAService, err := eigenda.NewEigenDA(config.EigenDA.Rpc) + if err != nil { + return nil, err + } + eigenDAReader = eigenDAService + eigenDAWriter = eigenDAService } - inboxTracker, err := NewInboxTracker(arbDb, txStreamer, daReader) + inboxTracker, err := NewInboxTracker(arbDb, txStreamer, daReader, eigenDAReader) if err != nil { return nil, err } @@ -528,6 +539,7 @@ func createNodeImpl( exec, rawdb.NewTable(arbDb, storage.BlockValidatorPrefix), daReader, + eigenDAReader, func() *staker.BlockValidatorConfig { return &configFetcher.Get().BlockValidator }, stack, ) @@ -634,15 +646,16 @@ func createNodeImpl( return nil, errors.New("batchposter, but no TxOpts") } batchPoster, err = NewBatchPoster(ctx, &BatchPosterOpts{ - DataPosterDB: rawdb.NewTable(arbDb, storage.BatchPosterPrefix), - L1Reader: l1Reader, - Inbox: inboxTracker, - Streamer: txStreamer, - SyncMonitor: syncMonitor, - Config: func() *BatchPosterConfig { return &configFetcher.Get().BatchPoster }, - DeployInfo: deployInfo, - TransactOpts: txOptsBatchPoster, - DAWriter: daWriter, + DataPosterDB: rawdb.NewTable(arbDb, storage.BatchPosterPrefix), + L1Reader: l1Reader, + Inbox: inboxTracker, + Streamer: txStreamer, + SyncMonitor: syncMonitor, + Config: func() *BatchPosterConfig { return &configFetcher.Get().BatchPoster }, + DeployInfo: deployInfo, + TransactOpts: txOptsBatchPoster, + DAWriter: daWriter, + EigenDAWriter: eigenDAWriter, }) if err != nil { return nil, err diff --git a/arbstate/inbox.go b/arbstate/inbox.go index 3995bcf308..d0f060b4bc 100644 --- a/arbstate/inbox.go +++ b/arbstate/inbox.go @@ -6,7 +6,9 @@ package arbstate import ( "bytes" "context" + "crypto/sha256" "encoding/binary" + "encoding/hex" "errors" "io" "math/big" @@ -21,6 +23,7 @@ import ( "github.com/offchainlabs/nitro/arbos/l1pricing" "github.com/offchainlabs/nitro/arbutil" "github.com/offchainlabs/nitro/das/dastree" + "github.com/offchainlabs/nitro/das/eigenda" "github.com/offchainlabs/nitro/zeroheavy" ) @@ -50,7 +53,7 @@ const maxZeroheavyDecompressedLen = 101*MaxDecompressedLen/100 + 64 const MaxSegmentsPerSequencerMessage = 100 * 1024 const MinLifetimeSecondsForDataAvailabilityCert = 7 * 24 * 60 * 60 // one week -func parseSequencerMessage(ctx context.Context, batchNum uint64, data []byte, dasReader DataAvailabilityReader, keysetValidationMode KeysetValidationMode) (*sequencerMessage, error) { +func parseSequencerMessage(ctx context.Context, batchNum uint64, data []byte, dasReader DataAvailabilityReader, eigenDAReader eigenda.EigenDAReader, keysetValidationMode KeysetValidationMode) (*sequencerMessage, error) { if len(data) < 40 { return nil, errors.New("sequencer message missing L1 header") } @@ -63,6 +66,7 @@ func parseSequencerMessage(ctx context.Context, batchNum uint64, data []byte, da segments: [][]byte{}, } payload := data[40:] + log.Info("Inbox parse sequencer message: ", "payload", hex.EncodeToString(payload)) if len(payload) > 0 && IsDASMessageHeaderByte(payload[0]) { if dasReader == nil { @@ -79,6 +83,21 @@ func parseSequencerMessage(ctx context.Context, batchNum uint64, data []byte, da } } + if len(payload) > 0 && eigenda.IsEigenDAMessageHeaderByte(payload[0]) { + if eigenDAReader == nil { + log.Error("No EigenDA Reader configured, but sequencer message found with EigenDA header") + } else { + var err error + payload, err = RecoverPayloadFromEigenDABatch(ctx, batchNum, payload[1:], eigenDAReader, nil) + if err != nil { + return nil, err + } + if payload == nil { + return parsedMsg, nil + } + } + } + if len(payload) > 0 && IsZeroheavyEncodedHeaderByte(payload[0]) { pl, err := io.ReadAll(io.LimitReader(zeroheavy.NewZeroheavyDecoder(bytes.NewReader(payload[1:])), int64(maxZeroheavyDecompressedLen))) if err != nil { @@ -124,6 +143,43 @@ func parseSequencerMessage(ctx context.Context, batchNum uint64, data []byte, da return parsedMsg, nil } +func RecoverPayloadFromEigenDABatch(ctx context.Context, + batchNum uint64, + sequencerMsg []byte, + daReader eigenda.EigenDAReader, + preimages map[arbutil.PreimageType]map[common.Hash][]byte, +) ([]byte, error) { + log.Info("Start recovering payload from eigenda: ", "data", hex.EncodeToString(sequencerMsg)) + var shaPreimages map[common.Hash][]byte + if preimages != nil { + if preimages[arbutil.Sha2_256PreimageType] == nil { + preimages[arbutil.Sha2_256PreimageType] = make(map[common.Hash][]byte) + } + shaPreimages = preimages[arbutil.Sha2_256PreimageType] + } + // 00000020 + // 91c127a758d669ce7c8ed915679653e87bf1dfbcf54d028c522d129c482c897d + var daRef eigenda.EigenDARef + daRef.BlobIndex = binary.BigEndian.Uint32(sequencerMsg[:4]) + daRef.BatchHeaderHash = sequencerMsg[4:] + log.Info("Data pointer: ", "info", hex.EncodeToString(daRef.BatchHeaderHash), "index", daRef.BlobIndex) + data, err := daReader.QueryBlob(ctx, &daRef) + if err != nil { + log.Error("Failed to query data from EigenDA", "err", err) + return nil, err + } + // log.Info("data: ", "info", hex.EncodeToString(data)) + // record preimage data + log.Info("Recording preimage data for EigenDA") + shaDataHash := sha256.New() + shaDataHash.Write(sequencerMsg) + dataHash := shaDataHash.Sum([]byte{}) + if shaPreimages != nil { + shaPreimages[common.BytesToHash(dataHash)] = data + } + return data, nil +} + func RecoverPayloadFromDasBatch( ctx context.Context, batchNum uint64, @@ -242,6 +298,7 @@ type inboxMultiplexer struct { backend InboxBackend delayedMessagesRead uint64 dasReader DataAvailabilityReader + eigenDAReader eigenda.EigenDAReader cachedSequencerMessage *sequencerMessage cachedSequencerMessageNum uint64 cachedSegmentNum uint64 @@ -251,11 +308,12 @@ type inboxMultiplexer struct { keysetValidationMode KeysetValidationMode } -func NewInboxMultiplexer(backend InboxBackend, delayedMessagesRead uint64, dasReader DataAvailabilityReader, keysetValidationMode KeysetValidationMode) arbostypes.InboxMultiplexer { +func NewInboxMultiplexer(backend InboxBackend, delayedMessagesRead uint64, dasReader DataAvailabilityReader, eigenDAReader eigenda.EigenDAReader, keysetValidationMode KeysetValidationMode) arbostypes.InboxMultiplexer { return &inboxMultiplexer{ backend: backend, delayedMessagesRead: delayedMessagesRead, dasReader: dasReader, + eigenDAReader: eigenDAReader, keysetValidationMode: keysetValidationMode, } } @@ -276,7 +334,7 @@ func (r *inboxMultiplexer) Pop(ctx context.Context) (*arbostypes.MessageWithMeta } r.cachedSequencerMessageNum = r.backend.GetSequencerInboxPosition() var err error - r.cachedSequencerMessage, err = parseSequencerMessage(ctx, r.cachedSequencerMessageNum, bytes, r.dasReader, r.keysetValidationMode) + r.cachedSequencerMessage, err = parseSequencerMessage(ctx, r.cachedSequencerMessageNum, bytes, r.dasReader, r.eigenDAReader, r.keysetValidationMode) if err != nil { return nil, err } diff --git a/arbstate/inbox_fuzz_test.go b/arbstate/inbox_fuzz_test.go index fcb80cbd73..2859e3280e 100644 --- a/arbstate/inbox_fuzz_test.go +++ b/arbstate/inbox_fuzz_test.go @@ -66,7 +66,7 @@ func FuzzInboxMultiplexer(f *testing.F) { delayedMessage: delayedMsg, positionWithinMessage: 0, } - multiplexer := NewInboxMultiplexer(backend, 0, nil, KeysetValidate) + multiplexer := NewInboxMultiplexer(backend, 0, nil, nil, KeysetValidate) _, err := multiplexer.Pop(context.TODO()) if err != nil { panic(err) diff --git a/cmd/nitro/nitro.go b/cmd/nitro/nitro.go index 55c8d7704a..ecc8f1715d 100644 --- a/cmd/nitro/nitro.go +++ b/cmd/nitro/nitro.go @@ -213,7 +213,7 @@ func mainImpl() int { return 1 } - log.Info("Running Arbitrum nitro node", "revision", vcsRevision, "vcs.time", vcsTime) + log.Info("Running Arbitrum nitro node with eigenda integration", "revision", vcsRevision, "vcs.time", vcsTime) if nodeConfig.Node.Dangerous.NoL1Listener { nodeConfig.Node.ParentChainReader.Enable = false diff --git a/cmd/pruning/pruning.go b/cmd/pruning/pruning.go index 68d89302f0..da015ac52c 100644 --- a/cmd/pruning/pruning.go +++ b/cmd/pruning/pruning.go @@ -189,7 +189,7 @@ func findImportantRoots(ctx context.Context, chainDb ethdb.Database, stack *node return nil, fmt.Errorf("failed to get finalized block: %w", err) } l1BlockNum := l1Block.NumberU64() - tracker, err := arbnode.NewInboxTracker(arbDb, nil, nil) + tracker, err := arbnode.NewInboxTracker(arbDb, nil, nil, nil) if err != nil { return nil, err } diff --git a/cmd/replay/main.go b/cmd/replay/main.go index 2fb13ceed8..5fcc386dad 100644 --- a/cmd/replay/main.go +++ b/cmd/replay/main.go @@ -6,6 +6,7 @@ package main import ( "bytes" "context" + "crypto/sha256" "encoding/hex" "encoding/json" "fmt" @@ -29,6 +30,7 @@ import ( "github.com/offchainlabs/nitro/arbutil" "github.com/offchainlabs/nitro/cmd/chaininfo" "github.com/offchainlabs/nitro/das/dastree" + "github.com/offchainlabs/nitro/das/eigenda" "github.com/offchainlabs/nitro/gethhook" "github.com/offchainlabs/nitro/wavmio" ) @@ -117,6 +119,19 @@ func (dasReader *PreimageDASReader) ExpirationPolicy(ctx context.Context) (arbst return arbstate.DiscardImmediately, nil } +type PreimageEigenDAReader struct{} + +func (dasReader *PreimageEigenDAReader) QueryBlob(ctx context.Context, ref *eigenda.EigenDARef) ([]byte, error) { + dataPointer, err := ref.Serialize() + if err != nil { + return nil, err + } + shaDataHash := sha256.New() + shaDataHash.Write(dataPointer) + dataHash := shaDataHash.Sum([]byte{}) + return wavmio.ResolveTypedPreimage(arbutil.Sha2_256PreimageType, common.BytesToHash(dataHash)) +} + // To generate: // key, _ := crypto.HexToECDSA("0000000000000000000000000000000000000000000000000000000000000001") // sig, _ := crypto.Sign(make([]byte, 32), key) @@ -171,16 +186,21 @@ func main() { if lastBlockHeader != nil { delayedMessagesRead = lastBlockHeader.Nonce.Uint64() } - var dasReader arbstate.DataAvailabilityReader + // var dasReader arbstate.DataAvailabilityReader + // if dasEnabled { + // dasReader = &PreimageDASReader{} + // } + var dasReader eigenda.EigenDAReader if dasEnabled { - dasReader = &PreimageDASReader{} + dasReader = &PreimageEigenDAReader{} } backend := WavmInbox{} - var keysetValidationMode = arbstate.KeysetPanicIfInvalid - if backend.GetPositionWithinMessage() > 0 { - keysetValidationMode = arbstate.KeysetDontValidate - } - inboxMultiplexer := arbstate.NewInboxMultiplexer(backend, delayedMessagesRead, dasReader, keysetValidationMode) + // var keysetValidationMode = arbstate.KeysetPanicIfInvalid + // if backend.GetPositionWithinMessage() > 0 { + // keysetValidationMode = arbstate.KeysetDontValidate + // } + // todo + inboxMultiplexer := arbstate.NewInboxMultiplexer(backend, delayedMessagesRead, nil, dasReader, arbstate.KeysetDontValidate) ctx := context.Background() message, err := inboxMultiplexer.Pop(ctx) if err != nil { @@ -232,7 +252,8 @@ func main() { } } - message := readMessage(chainConfig.ArbitrumChainParams.DataAvailabilityCommittee) + // message := readMessage(chainConfig.ArbitrumChainParams.DataAvailabilityCommittee) + message := readMessage(true) chainContext := WavmChainContext{} batchFetcher := func(batchNum uint64) ([]byte, error) { diff --git a/das/eigenda/eigenda.go b/das/eigenda/eigenda.go new file mode 100644 index 0000000000..fa5fde88b3 --- /dev/null +++ b/das/eigenda/eigenda.go @@ -0,0 +1,174 @@ +package eigenda + +import ( + "bytes" + "context" + "crypto/tls" + "encoding/binary" + "errors" + "fmt" + "time" + + "github.com/Layr-Labs/eigenda/api/grpc/disperser" + "github.com/ethereum/go-ethereum/log" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials" +) + +// EigenDAMessageHeaderFlag indicated that the message is a EigenDARef which will be used to retrieve data from EigenDA +const EigenDAMessageHeaderFlag byte = 0x0c + +func IsEigenDAMessageHeaderByte(header byte) bool { + return (EigenDAMessageHeaderFlag & header) > 0 +} + +type EigenDAWriter interface { + Store(context.Context, []byte) (*EigenDARef, error) + Serialize(eigenDARef *EigenDARef) ([]byte, error) +} + +type EigenDAReader interface { + QueryBlob(ctx context.Context, ref *EigenDARef) ([]byte, error) +} + +type EigenDAConfig struct { + Enable bool `koanf:"enable"` + Rpc string `koanf:"rpc"` +} + +func (ec *EigenDAConfig) String() { + fmt.Println(ec.Enable) + fmt.Println(ec.Rpc) + // fmt.Sprintf("enable: %b, rpc: %s", ec.Enable, ec.Rpc) +} + +type EigenDARef struct { + BatchHeaderHash []byte + BlobIndex uint32 +} + +func (b *EigenDARef) Serialize() ([]byte, error) { + buf := new(bytes.Buffer) + err := binary.Write(buf, binary.BigEndian, b.BlobIndex) + if err != nil { + return nil, err + } + _, err = buf.Write(b.BatchHeaderHash) + if err != nil { + return nil, err + } + return buf.Bytes(), nil +} + +func (b *EigenDARef) Deserialize(data []byte) error { + buf := bytes.NewReader(data) + err := binary.Read(buf, binary.BigEndian, &b.BlobIndex) + if err != nil { + return err + } + // _, err = buf.Read(b.BatchHeaderHash) + err = binary.Read(buf, binary.BigEndian, &b.BatchHeaderHash) + if err != nil { + return err + } + return nil +} + +type EigenDA struct { + client disperser.DisperserClient +} + + +func NewEigenDA(rpc string) (*EigenDA, error) { + creds := credentials.NewTLS(&tls.Config{ + InsecureSkipVerify: true, + }) + conn, err := grpc.Dial(rpc, grpc.WithTransportCredentials(creds)) + if err != nil { + return nil, err + } + return &EigenDA{ + client: disperser.NewDisperserClient(conn), + }, nil +} + +func (e *EigenDA) QueryBlob(ctx context.Context, ref *EigenDARef) ([]byte, error) { + res, err := e.client.RetrieveBlob(ctx, &disperser.RetrieveBlobRequest{ + BatchHeaderHash: ref.BatchHeaderHash, + BlobIndex: ref.BlobIndex, + }) + if err != nil { + return nil, err + } + return res.GetData(), nil +} + +func (e *EigenDA) Store(ctx context.Context, data []byte) (*EigenDARef, error) { + disperseBlobRequest := &disperser.DisperseBlobRequest{ + Data: data, + SecurityParams: []*disperser.SecurityParams{ + {QuorumId: 0, AdversaryThreshold: 25, QuorumThreshold: 50}, + }, + } + + res, err := e.client.DisperseBlob(ctx, disperseBlobRequest) + if err != nil { + return nil, err + } + + ticker := time.NewTicker(time.Second * 5) + defer ticker.Stop() + + var ref *EigenDARef + for range ticker.C { + statusReply, err := e.GetBlobStatus(ctx, res.GetRequestId()) + if err != nil { + log.Error("[eigenda]: GetBlobStatus error: ", err.Error()) + continue + } + switch statusReply.GetStatus() { + case disperser.BlobStatus_CONFIRMED, disperser.BlobStatus_FINALIZED: + ref = &EigenDARef{ + BatchHeaderHash: statusReply.GetInfo().GetBlobVerificationProof().GetBatchMetadata().GetBatchHeaderHash(), + BlobIndex: statusReply.GetInfo().GetBlobVerificationProof().GetBlobIndex(), + } + return ref, nil + case disperser.BlobStatus_FAILED: + return nil, errors.New("disperser blob failed") + default: + continue + } + } + return nil, errors.New("disperser blob query status timeout") + +} + +func (e *EigenDA) GetBlobStatus(ctx context.Context, reqeustId []byte) (*disperser.BlobStatusReply, error) { + blockStatusRequest := &disperser.BlobStatusRequest{ + RequestId: reqeustId, + } + return e.client.GetBlobStatus(ctx, blockStatusRequest) +} + +// Serialize implements EigenDAWriter. +func (e *EigenDA) Serialize(eigenDARef *EigenDARef) ([]byte, error) { + eigenDARefData, err := eigenDARef.Serialize() + if err != nil { + log.Warn("eigenDARef serialize error", "err", err) + return nil, err + } + buf := new(bytes.Buffer) + err = binary.Write(buf, binary.BigEndian, EigenDAMessageHeaderFlag) + if err != nil { + log.Warn("batch type byte serialization failed", "err", err) + return nil, err + } + err = binary.Write(buf, binary.BigEndian, eigenDARefData) + + if err != nil { + log.Warn("data pointer serialization failed", "err", err) + return nil, err + } + serializedBlobPointerData := buf.Bytes() + return serializedBlobPointerData, nil +} diff --git a/go.mod b/go.mod index bdda6a61a1..7cb49a942a 100644 --- a/go.mod +++ b/go.mod @@ -307,6 +307,7 @@ require ( ) require ( + github.com/Layr-Labs/eigenda/api v0.1.1 github.com/StackExchange/wmi v0.0.0-20180116203802-5d049714c4a6 // indirect github.com/VictoriaMetrics/fastcache v1.6.0 // indirect github.com/cespare/xxhash/v2 v2.2.0 // indirect diff --git a/go.sum b/go.sum index bf8b4b826d..a3723959b5 100644 --- a/go.sum +++ b/go.sum @@ -55,6 +55,8 @@ github.com/Joker/hpp v1.0.0/go.mod h1:8x5n+M1Hp5hC0g8okX3sR3vFQwynaX/UgSOM9MeBKz github.com/Knetic/govaluate v3.0.1-0.20171022003610-9aa49832a739+incompatible h1:1G1pk05UrOh0NlF1oeaaix1x8XzrfjIDK47TY0Zehcw= github.com/Knetic/govaluate v3.0.1-0.20171022003610-9aa49832a739+incompatible/go.mod h1:r7JcOSlj0wfOMncg0iLm8Leh48TZaKVeNIfJntJ2wa0= github.com/Kubuxu/go-os-helper v0.0.1/go.mod h1:N8B+I7vPCT80IcP58r50u4+gEEcsZETFUpAzWW2ep1Y= +github.com/Layr-Labs/eigenda/api v0.1.1 h1:vzSxUQirg/jWVSoaOhuIxcvPTh8HImGD0KnPgqU1kj4= +github.com/Layr-Labs/eigenda/api v0.1.1/go.mod h1:kVXqWM13s/1hXyv9QdHweWAbKin9MeOBbS4i8c9rLbU= github.com/OneOfOne/xxhash v1.2.2 h1:KMrpdQIwFcEqXDklaen+P1axHaj9BSKzvpUUfnHldSE= github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU= github.com/Shopify/goreferrer v0.0.0-20181106222321-ec9c9a553398/go.mod h1:a1uqRtAwp2Xwc6WNPJEufxJ7fx3npB4UV/JOLmbu5I0= @@ -234,6 +236,7 @@ github.com/cockroachdb/pebble v0.0.0-20230209160836-829675f94811 h1:ytcWPaNPhNoG github.com/cockroachdb/pebble v0.0.0-20230209160836-829675f94811/go.mod h1:Nb5lgvnQ2+oGlE/EyZy4+2/CxRh9KfvCXnag1vtpxVM= github.com/cockroachdb/redact v1.1.3 h1:AKZds10rFSIj7qADf0g46UixK8NNLwWTNdCIGS5wfSQ= github.com/cockroachdb/redact v1.1.3/go.mod h1:BVNblN9mBWFyMyqK1k3AAiSxhvhfK2oOZZ2lK+dpvRg= +github.com/cockroachdb/sentry-go v0.6.1-cockroachdb.2/go.mod h1:8BT+cPK6xvFOcRlk0R8eg+OTkcqI6baNH4xAkpiYVvQ= github.com/codahale/hdrhistogram v0.0.0-20161010025455-3a0bb77429bd/go.mod h1:sE/e/2PUdi/liOCUjSTXgM1o87ZssimdTWN964YiIeI= github.com/codeclysm/extract/v3 v3.0.2 h1:sB4LcE3Php7LkhZwN0n2p8GCwZe92PEQutdbGURf5xc= github.com/codeclysm/extract/v3 v3.0.2/go.mod h1:NKsw+hqua9H+Rlwy/w/3Qgt9jDonYEgB6wJu+25eOKw= @@ -278,6 +281,7 @@ github.com/davidlazar/go-crypto v0.0.0-20200604182044-b73af7476f6c/go.mod h1:6Uh github.com/deckarep/golang-set/v2 v2.1.0 h1:g47V4Or+DUdzbs8FxCCmgb6VYd+ptPAngjM6dtGktsI= github.com/deckarep/golang-set/v2 v2.1.0/go.mod h1:VAky9rY/yGXJOLEDv3OMci+7wtDpOF4IN+y82NBOac4= github.com/decred/dcrd/crypto/blake256 v1.0.0 h1:/8DMNYp9SGi5f0w7uCm6d6M4OU2rGFK09Y2A4Xv7EE0= +github.com/decred/dcrd/crypto/blake256 v1.0.0/go.mod h1:sQl2p6Y26YV+ZOcSTP6thNdn47hh8kt6rqSlvmrXFAc= github.com/decred/dcrd/dcrec/secp256k1/v4 v4.1.0 h1:HbphB4TFFXpv7MNrT52FGrrgVXF1owhMVTHFZIlnvd4= github.com/decred/dcrd/dcrec/secp256k1/v4 v4.1.0/go.mod h1:DZGJHZMqrU4JJqFAWUS2UO1+lbSKsdiOoYi9Zzey7Fc= github.com/decred/dcrd/lru v1.0.0/go.mod h1:mxKOwFd7lFjN2GZYsiz/ecgqR6kkYAl+0pz0tEMk218= @@ -395,6 +399,7 @@ github.com/go-ldap/ldap v3.0.2+incompatible/go.mod h1:qfd9rJvER9Q0/D/Sqn1DfHRoBp github.com/go-logfmt/logfmt v0.3.0/go.mod h1:Qt1PoO58o5twSAckw1HlFXLmHsOX5/0LbT9GBnD5lWE= github.com/go-logfmt/logfmt v0.4.0/go.mod h1:3RMwSq7FuexP4Kalkev3ejPJsZTpXXBr9+V4qmtdjCk= github.com/go-logfmt/logfmt v0.5.0/go.mod h1:wCYkCAKZfumFQihp8CzCvQ3paCTfi41vtzG1KdI/P7A= +github.com/go-logfmt/logfmt v0.5.1/go.mod h1:WYhtIu8zTZfxdn5+rREduYbwxfcBr/Vr6KEVveWlfTs= github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= github.com/go-logr/logr v1.2.3 h1:2DntVwHkVopvECVRSlL5PSo9eG+cAkDCuckLubN+rq0= github.com/go-logr/logr v1.2.3/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= @@ -891,6 +896,7 @@ github.com/json-iterator/go v1.1.8/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/u github.com/json-iterator/go v1.1.9/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4= github.com/json-iterator/go v1.1.10/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4= github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM= +github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo= github.com/jstemmer/go-junit-report v0.0.0-20190106144839-af01ea7f8024/go.mod h1:6v2b51hI/fHJwM22ozAgKL4VKDeJcHhJFhtBdhmNjmU= github.com/jstemmer/go-junit-report v0.9.1/go.mod h1:Brl9GWCQeLvo8nXZwPNNblvFj/XSXhF0NWZEnDohbsk= github.com/jtolds/gls v4.2.1+incompatible/go.mod h1:QJZ7F/aHp+rZTRtaJ1ow/lLfFfVYBRgL+9YlvaHOwJU= @@ -1269,6 +1275,7 @@ github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJ github.com/modern-go/reflect2 v0.0.0-20180701023420-4b7aa43c6742/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0= github.com/modern-go/reflect2 v1.0.1/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0= github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9Gz0M= +github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk= github.com/moul/http2curl v1.0.0/go.mod h1:8UbvGypXm98wA/IqH45anm5Y2Z6ep6O31QGOAZ3H0fQ= github.com/mr-tron/base58 v1.1.0/go.mod h1:xcD2VGqlgYjBdcBLw+TuYLr8afG+Hj8g2eTVqeSzSU8= github.com/mr-tron/base58 v1.1.1/go.mod h1:xcD2VGqlgYjBdcBLw+TuYLr8afG+Hj8g2eTVqeSzSU8= @@ -1410,6 +1417,7 @@ github.com/pelletier/go-toml v1.7.0/go.mod h1:vwGMzjaWMwyfHwgIBhI2YUM4fB6nL6lVAv github.com/pelletier/go-toml/v2 v2.0.5 h1:ipoSadvV8oGUjnUbMub59IDPPwfxF694nG/jwbMiyQg= github.com/performancecopilot/speed v3.0.0+incompatible/go.mod h1:/CLtqpZ5gBg1M9iaPbIdPPGyKcA8hKdoy6hAWba7Yac= github.com/petar/GoLLRB v0.0.0-20210522233825-ae3b015fd3e9 h1:1/WtZae0yGtPq+TI6+Tv1WTxkukpXeMlviSxvL7SRgk= +github.com/petar/GoLLRB v0.0.0-20210522233825-ae3b015fd3e9/go.mod h1:x3N5drFsm2uilKKuuYo6LdyD8vZAW55sH/9w+pbo1sw= github.com/pierrec/lz4 v1.0.2-0.20190131084431-473cd7ce01a1/go.mod h1:3/3N9NVKO0jef7pBehbT1qWhCMrIgbYNnFAZCqQ5LRc= github.com/pierrec/lz4 v2.0.5+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY= github.com/pierrec/lz4 v2.6.1+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY= @@ -1646,6 +1654,7 @@ github.com/wealdtech/go-merkletree v1.0.0/go.mod h1:cdil512d/8ZC7Kx3bfrDvGMQXB25 github.com/whyrusleeping/base32 v0.0.0-20170828182744-c30ac30633cc h1:BCPnHtcboadS0DvysUuJXZ4lWVv5Bh5i7+tbIyi+ck4= github.com/whyrusleeping/base32 v0.0.0-20170828182744-c30ac30633cc/go.mod h1:r45hJU7yEoA81k6MWNhpMj/kms0n14dkzkxYHoB96UM= github.com/whyrusleeping/cbor v0.0.0-20171005072247-63513f603b11 h1:5HZfQkwe0mIfyDmc1Em5GqlNRzcdtlv4HTNmdpt7XH0= +github.com/whyrusleeping/cbor v0.0.0-20171005072247-63513f603b11/go.mod h1:Wlo/SzPmxVp6vXpGt/zaXhHH0fn4IxgqZc82aKg6bpQ= github.com/whyrusleeping/cbor-gen v0.0.0-20200123233031-1cdf64d27158/go.mod h1:Xj/M2wWU+QdTdRbu/L/1dIZY8/Wb2K9pAhtroQuxJJI= github.com/whyrusleeping/cbor-gen v0.0.0-20230126041949-52956bd4c9aa h1:EyA027ZAkuaCLoxVX4r1TZMPy1d31fM6hbfQ4OU4I5o= github.com/whyrusleeping/cbor-gen v0.0.0-20230126041949-52956bd4c9aa/go.mod h1:fgkXqYy7bV2cFeIEOkVTZS/WjXARfBqSH6Q2qHL33hQ= diff --git a/staker/block_validator.go b/staker/block_validator.go index 61e5ed519b..0b59988a35 100644 --- a/staker/block_validator.go +++ b/staker/block_validator.go @@ -714,6 +714,7 @@ validationsLoop: var runs []validator.ValidationRun for _, moduleRoot := range wasmRoots { for i, spawner := range v.validationSpawners { + log.Info("block_validator", "function name", "advanceValidations") run := spawner.Launch(input, moduleRoot) log.Trace("advanceValidations: launched", "pos", validationStatus.Entry.Pos, "moduleRoot", moduleRoot, "spawner", i) runs = append(runs, run) diff --git a/staker/stateless_block_validator.go b/staker/stateless_block_validator.go index acd86f8627..e1fb56c91e 100644 --- a/staker/stateless_block_validator.go +++ b/staker/stateless_block_validator.go @@ -11,6 +11,7 @@ import ( "sync" "testing" + "github.com/offchainlabs/nitro/das/eigenda" "github.com/offchainlabs/nitro/execution" "github.com/offchainlabs/nitro/util/rpcclient" "github.com/offchainlabs/nitro/validator/server_api" @@ -34,11 +35,12 @@ type StatelessBlockValidator struct { recorder execution.ExecutionRecorder - inboxReader InboxReaderInterface - inboxTracker InboxTrackerInterface - streamer TransactionStreamerInterface - db ethdb.Database - daService arbstate.DataAvailabilityReader + inboxReader InboxReaderInterface + inboxTracker InboxTrackerInterface + streamer TransactionStreamerInterface + db ethdb.Database + daService arbstate.DataAvailabilityReader + eigenDAService eigenda.EigenDAReader moduleMutex sync.Mutex currentWasmModuleRoot common.Hash @@ -219,6 +221,7 @@ func NewStatelessBlockValidator( recorder execution.ExecutionRecorder, arbdb ethdb.Database, das arbstate.DataAvailabilityReader, + eigenDAService eigenda.EigenDAReader, config func() *BlockValidatorConfig, stack *node.Node, ) (*StatelessBlockValidator, error) { @@ -234,6 +237,7 @@ func NewStatelessBlockValidator( inboxTracker: inbox, streamer: streamer, db: arbdb, + eigenDAService: eigenDAService, daService: das, } return validator, nil @@ -284,17 +288,26 @@ func (v *StatelessBlockValidator) ValidationEntryRecord(ctx context.Context, e * if len(batch.Data) <= 40 { continue } - if !arbstate.IsDASMessageHeaderByte(batch.Data[40]) { - continue + if arbstate.IsDASMessageHeaderByte(batch.Data[40]) { + if v.daService == nil { + log.Warn("No DAS configured, but sequencer message found with DAS header") + } else { + _, err := arbstate.RecoverPayloadFromDasBatch( + ctx, batch.Number, batch.Data, v.daService, e.Preimages, arbstate.KeysetValidate, + ) + if err != nil { + return err + } + } } - if v.daService == nil { - log.Warn("No DAS configured, but sequencer message found with DAS header") - } else { - _, err := arbstate.RecoverPayloadFromDasBatch( - ctx, batch.Number, batch.Data, v.daService, e.Preimages, arbstate.KeysetValidate, - ) - if err != nil { - return err + if eigenda.IsEigenDAMessageHeaderByte(batch.Data[40]) { + if v.eigenDAService == nil { + log.Warn("EigenDA not configured, but sequencer message found with EigenDA header") + } else { + _, err := arbstate.RecoverPayloadFromEigenDABatch(ctx, batch.Number, batch.Data[41:], v.eigenDAService, e.Preimages) + if err != nil { + return err + } } } } @@ -397,6 +410,7 @@ func (v *StatelessBlockValidator) ValidateResult( } var runs []validator.ValidationRun for _, spawner := range spawners { + log.Info("ValidateResult: ", "input", input, "moduleRoot", moduleRoot) run := spawner.Launch(input, moduleRoot) runs = append(runs, run) } diff --git a/system_tests/state_fuzz_test.go b/system_tests/state_fuzz_test.go index b14215fbf0..0981b49442 100644 --- a/system_tests/state_fuzz_test.go +++ b/system_tests/state_fuzz_test.go @@ -41,7 +41,7 @@ func BuildBlock( if lastBlockHeader != nil { delayedMessagesRead = lastBlockHeader.Nonce.Uint64() } - inboxMultiplexer := arbstate.NewInboxMultiplexer(inbox, delayedMessagesRead, nil, arbstate.KeysetValidate) + inboxMultiplexer := arbstate.NewInboxMultiplexer(inbox, delayedMessagesRead, nil, nil, arbstate.KeysetValidate) ctx := context.Background() message, err := inboxMultiplexer.Pop(ctx) diff --git a/util/rpcclient/rpcclient.go b/util/rpcclient/rpcclient.go index dee6e9252a..f1c199f161 100644 --- a/util/rpcclient/rpcclient.go +++ b/util/rpcclient/rpcclient.go @@ -161,6 +161,7 @@ func (c *RpcClient) CallContext(ctx_in context.Context, result interface{}, meth if err != nil && err.Error() != "already known" { logger = log.Info } + logger("CallContext") logger("rpc response", "method", method, "logId", logId, "err", err, "result", limitedMarshal{limit, result}, "attempt", i, "args", limitedArgumentsMarshal{limit, args}) if err == nil { return nil diff --git a/validator/server_api/validation_client.go b/validator/server_api/validation_client.go index d6143ca917..5e67ceec0b 100644 --- a/validator/server_api/validation_client.go +++ b/validator/server_api/validation_client.go @@ -38,6 +38,7 @@ func (c *ValidationClient) Launch(entry *validator.ValidationInput, moduleRoot c promise := stopwaiter.LaunchPromiseThread[validator.GoGlobalState](c, func(ctx context.Context) (validator.GoGlobalState, error) { input := ValidationInputToJson(entry) var res validator.GoGlobalState + log.Info("validation_client", "input", input, "moduleRoot", moduleRoot) err := c.client.CallContext(ctx, &res, Namespace+"_validate", input, moduleRoot) atomic.AddInt32(&c.room, 1) return res, err