diff --git a/cmd/ipfs/kubo/daemon.go b/cmd/ipfs/kubo/daemon.go index eb9fff8e448..0e5637cdb3b 100644 --- a/cmd/ipfs/kubo/daemon.go +++ b/cmd/ipfs/kubo/daemon.go @@ -586,7 +586,7 @@ take effect. prometheus.MustRegister(&corehttp.IpfsNodeCollector{Node: node}) // start MFS pinning thread - startPinMFS(daemonConfigPollInterval, cctx, &ipfsPinMFSNode{node}) + startPinMFS(cctx, daemonConfigPollInterval, &ipfsPinMFSNode{node}) // The daemon is *finally* ready. fmt.Printf("Daemon is ready\n") diff --git a/cmd/ipfs/kubo/pinmfs.go b/cmd/ipfs/kubo/pinmfs.go index 846ee8a77af..c9187145ce3 100644 --- a/cmd/ipfs/kubo/pinmfs.go +++ b/cmd/ipfs/kubo/pinmfs.go @@ -12,7 +12,7 @@ import ( pinclient "github.com/ipfs/boxo/pinning/remote/client" cid "github.com/ipfs/go-cid" ipld "github.com/ipfs/go-ipld-format" - logging "github.com/ipfs/go-log" + logging "github.com/ipfs/go-log/v2" config "github.com/ipfs/kubo/config" "github.com/ipfs/kubo/core" @@ -40,6 +40,7 @@ func init() { d, err := time.ParseDuration(pollDurStr) if err != nil { mfslog.Error("error parsing MFS_PIN_POLL_INTERVAL, using default:", err) + return } daemonConfigPollInterval = d } @@ -74,56 +75,28 @@ func (x *ipfsPinMFSNode) PeerHost() host.Host { return x.node.PeerHost } -func startPinMFS(configPollInterval time.Duration, cctx pinMFSContext, node pinMFSNode) { - errCh := make(chan error) - go pinMFSOnChange(configPollInterval, cctx, node, errCh) - go func() { - for { - select { - case err, isOpen := <-errCh: - if !isOpen { - return - } - mfslog.Errorf("%v", err) - case <-cctx.Context().Done(): - return - } - } - }() +func startPinMFS(cctx pinMFSContext, configPollInterval time.Duration, node pinMFSNode) { + go pinMFSOnChange(cctx, configPollInterval, node) } -func pinMFSOnChange(configPollInterval time.Duration, cctx pinMFSContext, node pinMFSNode, errCh chan<- error) { - defer close(errCh) - - var tmo *time.Timer - defer func() { - if tmo != nil { - tmo.Stop() - } - }() +func pinMFSOnChange(cctx pinMFSContext, configPollInterval time.Duration, node pinMFSNode) { + tmo := time.NewTimer(configPollInterval) + defer tmo.Stop() lastPins := map[string]lastPin{} for { // polling sleep - if tmo == nil { - tmo = time.NewTimer(configPollInterval) - } else { - tmo.Reset(configPollInterval) - } select { case <-cctx.Context().Done(): return case <-tmo.C: + tmo.Reset(configPollInterval) } // reread the config, which may have changed in the meantime cfg, err := cctx.GetConfig() if err != nil { - select { - case errCh <- fmt.Errorf("pinning reading config (%v)", err): - case <-cctx.Context().Done(): - return - } + mfslog.Errorf("pinning reading config (%v)", err) continue } mfslog.Debugf("pinning loop is awake, %d remote services", len(cfg.Pinning.RemoteServices)) @@ -131,30 +104,29 @@ func pinMFSOnChange(configPollInterval time.Duration, cctx pinMFSContext, node p // get the most recent MFS root cid rootNode, err := node.RootNode() if err != nil { - select { - case errCh <- fmt.Errorf("pinning reading MFS root (%v)", err): - case <-cctx.Context().Done(): - return - } + mfslog.Errorf("pinning reading MFS root (%v)", err) continue } - rootCid := rootNode.Cid() // pin to all remote services in parallel - pinAllMFS(cctx.Context(), node, cfg, rootCid, lastPins, errCh) + pinAllMFS(cctx.Context(), node, cfg, rootNode.Cid(), lastPins) } } // pinAllMFS pins on all remote services in parallel to overcome DoS attacks. -func pinAllMFS(ctx context.Context, node pinMFSNode, cfg *config.Config, rootCid cid.Cid, lastPins map[string]lastPin, errCh chan<- error) { - ch := make(chan lastPin, len(cfg.Pinning.RemoteServices)) - for svcName_, svcConfig_ := range cfg.Pinning.RemoteServices { +func pinAllMFS(ctx context.Context, node pinMFSNode, cfg *config.Config, rootCid cid.Cid, lastPins map[string]lastPin) { + ch := make(chan lastPin) + var started int + + for svcName, svcConfig := range cfg.Pinning.RemoteServices { + if ctx.Err() != nil { + break + } + // skip services where MFS is not enabled - svcName, svcConfig := svcName_, svcConfig_ mfslog.Debugf("pinning MFS root considering service %q", svcName) if !svcConfig.Policies.MFS.Enable { mfslog.Debugf("pinning service %q is not enabled", svcName) - ch <- lastPin{} continue } // read mfs pin interval for this service @@ -165,11 +137,7 @@ func pinAllMFS(ctx context.Context, node pinMFSNode, cfg *config.Config, rootCid var err error repinInterval, err = time.ParseDuration(svcConfig.Policies.MFS.RepinInterval) if err != nil { - select { - case errCh <- fmt.Errorf("remote pinning service %q has invalid MFS.RepinInterval (%v)", svcName, err): - case <-ctx.Done(): - } - ch <- lastPin{} + mfslog.Errorf("remote pinning service %q has invalid MFS.RepinInterval (%v)", svcName, err) continue } } @@ -182,38 +150,30 @@ func pinAllMFS(ctx context.Context, node pinMFSNode, cfg *config.Config, rootCid } else { mfslog.Debugf("pinning MFS root to %q: skipped due to MFS.RepinInterval=%s (remaining: %s)", svcName, repinInterval.String(), (repinInterval - time.Since(last.Time)).String()) } - ch <- lastPin{} continue } } mfslog.Debugf("pinning MFS root %q to %q", rootCid, svcName) - go func() { - if r, err := pinMFS(ctx, node, rootCid, svcName, svcConfig); err != nil { - select { - case errCh <- fmt.Errorf("pinning MFS root %q to %q (%v)", rootCid, svcName, err): - case <-ctx.Done(): - } - ch <- lastPin{} - } else { - ch <- r + go func(svcName string, svcConfig config.RemotePinningService) { + r, err := pinMFS(ctx, node, rootCid, svcName, svcConfig) + if err != nil { + mfslog.Errorf("pinning MFS root %q to %q (%v)", rootCid, svcName, err) } - }() + ch <- r + }(svcName, svcConfig) + started++ } - for i := 0; i < len(cfg.Pinning.RemoteServices); i++ { + + // Collect results from all started goroutines. + for i := 0; i < started; i++ { if x := <-ch; x.IsValid() { lastPins[x.ServiceName] = x } } } -func pinMFS( - ctx context.Context, - node pinMFSNode, - cid cid.Cid, - svcName string, - svcConfig config.RemotePinningService, -) (lastPin, error) { +func pinMFS(ctx context.Context, node pinMFSNode, cid cid.Cid, svcName string, svcConfig config.RemotePinningService) (lastPin, error) { c := pinclient.NewClient(svcConfig.API.Endpoint, svcConfig.API.Key) pinName := svcConfig.Policies.MFS.PinName @@ -243,43 +203,46 @@ func pinMFS( } for range lsPinCh { // in case the prior loop exits early } - if err := <-lsErrCh; err != nil { + err := <-lsErrCh + if err != nil { return lastPin{}, fmt.Errorf("error while listing remote pins: %v", err) } - // CID of the current MFS root is already being pinned, nothing to do - if pinning { - mfslog.Debugf("pinning MFS to %q: pin for %q exists since %s, skipping", svcName, cid, pinTime.String()) - return lastPin{Time: pinTime, ServiceName: svcName, ServiceConfig: svcConfig, CID: cid}, nil - } - - // Prepare Pin.name - addOpts := []pinclient.AddOption{pinclient.PinOpts.WithName(pinName)} + if !pinning { + // Prepare Pin.name + addOpts := []pinclient.AddOption{pinclient.PinOpts.WithName(pinName)} - // Prepare Pin.origins - // Add own multiaddrs to the 'origins' array, so Pinning Service can - // use that as a hint and connect back to us (if possible) - if node.PeerHost() != nil { - addrs, err := peer.AddrInfoToP2pAddrs(host.InfoFromHost(node.PeerHost())) - if err != nil { - return lastPin{}, err + // Prepare Pin.origins + // Add own multiaddrs to the 'origins' array, so Pinning Service can + // use that as a hint and connect back to us (if possible) + if node.PeerHost() != nil { + addrs, err := peer.AddrInfoToP2pAddrs(host.InfoFromHost(node.PeerHost())) + if err != nil { + return lastPin{}, err + } + addOpts = append(addOpts, pinclient.PinOpts.WithOrigins(addrs...)) } - addOpts = append(addOpts, pinclient.PinOpts.WithOrigins(addrs...)) - } - // Create or replace pin for MFS root - if existingRequestID != "" { - mfslog.Debugf("pinning to %q: replacing existing MFS root pin with %q", svcName, cid) - _, err := c.Replace(ctx, existingRequestID, cid, addOpts...) - if err != nil { - return lastPin{}, err + // Create or replace pin for MFS root + if existingRequestID != "" { + mfslog.Debugf("pinning to %q: replacing existing MFS root pin with %q", svcName, cid) + if _, err = c.Replace(ctx, existingRequestID, cid, addOpts...); err != nil { + return lastPin{}, err + } + } else { + mfslog.Debugf("pinning to %q: creating a new MFS root pin for %q", svcName, cid) + if _, err = c.Add(ctx, cid, addOpts...); err != nil { + return lastPin{}, err + } } } else { - mfslog.Debugf("pinning to %q: creating a new MFS root pin for %q", svcName, cid) - _, err := c.Add(ctx, cid, addOpts...) - if err != nil { - return lastPin{}, err - } + mfslog.Debugf("pinning MFS to %q: pin for %q exists since %s, skipping", svcName, cid, pinTime.String()) } - return lastPin{Time: pinTime, ServiceName: svcName, ServiceConfig: svcConfig, CID: cid}, nil + + return lastPin{ + Time: pinTime, + ServiceName: svcName, + ServiceConfig: svcConfig, + CID: cid, + }, nil } diff --git a/cmd/ipfs/kubo/pinmfs_test.go b/cmd/ipfs/kubo/pinmfs_test.go index da71d362cdb..750be9c987d 100644 --- a/cmd/ipfs/kubo/pinmfs_test.go +++ b/cmd/ipfs/kubo/pinmfs_test.go @@ -1,14 +1,19 @@ package kubo import ( + "bufio" "context" + "encoding/json" + "errors" "fmt" + "io" "strings" "testing" "time" merkledag "github.com/ipfs/boxo/ipld/merkledag" ipld "github.com/ipfs/go-ipld-format" + logging "github.com/ipfs/go-log/v2" config "github.com/ipfs/kubo/config" "github.com/libp2p/go-libp2p/core/host" peer "github.com/libp2p/go-libp2p/core/peer" @@ -60,25 +65,37 @@ func isErrorSimilar(e1, e2 error) bool { } func TestPinMFSConfigError(t *testing.T) { - ctx := &testPinMFSContext{ - ctx: context.Background(), + ctx, cancel := context.WithTimeout(context.Background(), 2*testConfigPollInterval) + defer cancel() + + cctx := &testPinMFSContext{ + ctx: ctx, cfg: nil, err: fmt.Errorf("couldn't read config"), } node := &testPinMFSNode{} - errCh := make(chan error) - go pinMFSOnChange(testConfigPollInterval, ctx, node, errCh) - if !isErrorSimilar(<-errCh, ctx.err) { - t.Errorf("error did not propagate") + + logReader := logging.NewPipeReader() + go func() { + pinMFSOnChange(cctx, testConfigPollInterval, node) + logReader.Close() + }() + + level, msg := readLogLine(t, logReader) + if level != "error" { + t.Error("expected error to be logged") } - if !isErrorSimilar(<-errCh, ctx.err) { + if !isErrorSimilar(errors.New(msg), cctx.err) { t.Errorf("error did not propagate") } } func TestPinMFSRootNodeError(t *testing.T) { - ctx := &testPinMFSContext{ - ctx: context.Background(), + ctx, cancel := context.WithTimeout(context.Background(), 2*testConfigPollInterval) + defer cancel() + + cctx := &testPinMFSContext{ + ctx: ctx, cfg: &config.Config{ Pinning: config.Pinning{}, }, @@ -87,12 +104,16 @@ func TestPinMFSRootNodeError(t *testing.T) { node := &testPinMFSNode{ err: fmt.Errorf("cannot create root node"), } - errCh := make(chan error) - go pinMFSOnChange(testConfigPollInterval, ctx, node, errCh) - if !isErrorSimilar(<-errCh, node.err) { - t.Errorf("error did not propagate") + logReader := logging.NewPipeReader() + go func() { + pinMFSOnChange(cctx, testConfigPollInterval, node) + logReader.Close() + }() + level, msg := readLogLine(t, logReader) + if level != "error" { + t.Error("expected error to be logged") } - if !isErrorSimilar(<-errCh, node.err) { + if !isErrorSimilar(errors.New(msg), node.err) { t.Errorf("error did not propagate") } } @@ -155,7 +176,8 @@ func TestPinMFSService(t *testing.T) { } func testPinMFSServiceWithError(t *testing.T, cfg *config.Config, expectedErrorPrefix string) { - goctx, cancel := context.WithCancel(context.Background()) + goctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() ctx := &testPinMFSContext{ ctx: goctx, cfg: cfg, @@ -164,16 +186,36 @@ func testPinMFSServiceWithError(t *testing.T, cfg *config.Config, expectedErrorP node := &testPinMFSNode{ err: nil, } - errCh := make(chan error) - go pinMFSOnChange(testConfigPollInterval, ctx, node, errCh) - defer cancel() - // first pass through the pinning loop - err := <-errCh - if !strings.Contains((err).Error(), expectedErrorPrefix) { - t.Errorf("expecting error containing %q", expectedErrorPrefix) + logReader := logging.NewPipeReader() + go func() { + pinMFSOnChange(ctx, testConfigPollInterval, node) + logReader.Close() + }() + level, msg := readLogLine(t, logReader) + if level != "error" { + t.Error("expected error to be logged") } - // second pass through the pinning loop - if !strings.Contains((err).Error(), expectedErrorPrefix) { + if !strings.Contains(msg, expectedErrorPrefix) { t.Errorf("expecting error containing %q", expectedErrorPrefix) } } + +func readLogLine(t *testing.T, logReader io.Reader) (string, string) { + t.Helper() + + r := bufio.NewReader(logReader) + data, err := r.ReadBytes('\n') + if err != nil { + t.Fatal(err) + } + + logInfo := struct { + Level string `json:"level"` + Msg string `json:"msg"` + }{} + err = json.Unmarshal(data, &logInfo) + if err != nil { + t.Fatal(err) + } + return logInfo.Level, logInfo.Msg +} diff --git a/core/commands/pin/remotepin.go b/core/commands/pin/remotepin.go index 132532554cc..3721913e77c 100644 --- a/core/commands/pin/remotepin.go +++ b/core/commands/pin/remotepin.go @@ -221,6 +221,8 @@ NOTE: a comma-separated notation is supported in CLI for convenience: // Block unless --background=true is passed if !req.Options[pinBackgroundOptionName].(bool) { + const pinWaitTime = 500 * time.Millisecond + var timer *time.Timer requestID := ps.GetRequestId() for { ps, err = c.GetStatusByID(ctx, requestID) @@ -237,10 +239,15 @@ NOTE: a comma-separated notation is supported in CLI for convenience: if s == pinclient.StatusFailed { return fmt.Errorf("remote service failed to pin requestid=%q", requestID) } - tmr := time.NewTimer(time.Second / 2) + if timer == nil { + timer = time.NewTimer(pinWaitTime) + } else { + timer.Reset(pinWaitTime) + } select { - case <-tmr.C: + case <-timer.C: case <-ctx.Done(): + timer.Stop() return fmt.Errorf("waiting for pin interrupted, requestid=%q remains on remote service", requestID) } }