diff --git a/cmd/src/batch_apply.go b/cmd/src/batch_apply.go index 28f715656d..0de796d3c9 100644 --- a/cmd/src/batch_apply.go +++ b/cmd/src/batch_apply.go @@ -5,6 +5,7 @@ import ( "flag" "fmt" + "github.com/sourcegraph/src-cli/internal/batches/executor" "github.com/sourcegraph/src-cli/internal/cmderrors" ) @@ -42,9 +43,20 @@ Examples: } ctx, cancel := contextCancelOnInterrupt(context.Background()) - defer cancel() - if err = executeBatchSpec(ctx, executeBatchSpecOpts{ + defer cancel(nil) + + failFastCancel := func(error) {} + if flags.failFast { + failFastCancel = cancel + } + + cctx := executor.CancelableContext{ + Context: ctx, + Cancel: failFastCancel, + } + + if err = executeBatchSpec(cctx, executeBatchSpecOpts{ flags: flags, client: cfg.apiClient(flags.api, flagSet.Output()), file: file, diff --git a/cmd/src/batch_common.go b/cmd/src/batch_common.go index 57b248cf06..6d219e3da0 100644 --- a/cmd/src/batch_common.go +++ b/cmd/src/batch_common.go @@ -91,6 +91,7 @@ type batchExecuteFlags struct { cleanArchives bool skipErrors bool runAsRoot bool + failFast bool // EXPERIMENTAL textOnly bool @@ -162,6 +163,10 @@ func newBatchExecuteFlags(flagSet *flag.FlagSet, cacheDir, tempDir string) *batc &caf.runAsRoot, "run-as-root", false, "If true, forces all step containers to run as root.", ) + flagSet.BoolVar( + &caf.failFast, "fail-fast", false, + "If true, errors encountered while executing steps in a repository will stop the execution of the batch spec.", + ) return caf } @@ -267,7 +272,7 @@ func createDockerWatchdog(ctx context.Context, execUI ui.ExecUI) *watchdog.Watch // executeBatchSpec performs all the steps required to upload the batch spec to // Sourcegraph, including execution as needed and applying the resulting batch // spec if specified. -func executeBatchSpec(ctx context.Context, opts executeBatchSpecOpts) (err error) { +func executeBatchSpec(ctx executor.CancelableContext, opts executeBatchSpecOpts) (err error) { var execUI ui.ExecUI if opts.flags.textOnly { execUI = &ui.JSONLines{} @@ -282,6 +287,9 @@ func executeBatchSpec(ctx context.Context, opts executeBatchSpecOpts) (err error defer func() { w.Stop() if err != nil { + if cerr := context.Cause(ctx); cerr != nil { + err = cerr + } execUI.ExecutionError(err) } }() @@ -463,6 +471,11 @@ func executeBatchSpec(ctx context.Context, opts executeBatchSpecOpts) (err error taskExecUI := execUI.ExecutingTasks(*verbose, parallelism) freshSpecs, logFiles, execErr := coord.ExecuteAndBuildSpecs(ctx, batchSpec, uncachedTasks, taskExecUI) + + if len(logFiles) > 0 && opts.flags.keepLogs { + execUI.LogFilesKept(logFiles) + } + // Add external changeset specs. importedSpecs, importErr := svc.CreateImportChangesetSpecs(ctx, batchSpec) if execErr != nil { @@ -487,10 +500,6 @@ func executeBatchSpec(ctx context.Context, opts executeBatchSpecOpts) (err error } } - if len(logFiles) > 0 && opts.flags.keepLogs { - execUI.LogFilesKept(logFiles) - } - specs = append(specs, freshSpecs...) specs = append(specs, importedSpecs...) @@ -628,22 +637,22 @@ func checkExecutable(cmd string, args ...string) error { return nil } -func contextCancelOnInterrupt(parent context.Context) (context.Context, func()) { - ctx, ctxCancel := context.WithCancel(parent) +func contextCancelOnInterrupt(parent context.Context) (context.Context, func(error)) { + ctx, ctxCancel := context.WithCancelCause(parent) c := make(chan os.Signal, 1) signal.Notify(c, os.Interrupt) go func() { select { case <-c: - ctxCancel() + ctxCancel(errors.New("Ctrl-C hit")) case <-ctx.Done(): } }() - return ctx, func() { + return ctx, func(err error) { signal.Stop(c) - ctxCancel() + ctxCancel(err) } } diff --git a/cmd/src/batch_exec.go b/cmd/src/batch_exec.go index 11aadf39e3..4fa28ab02e 100644 --- a/cmd/src/batch_exec.go +++ b/cmd/src/batch_exec.go @@ -101,7 +101,7 @@ Examples: } ctx, cancel := contextCancelOnInterrupt(context.Background()) - defer cancel() + defer cancel(nil) err := executeBatchSpecInWorkspaces(ctx, flags) if err != nil { diff --git a/cmd/src/batch_preview.go b/cmd/src/batch_preview.go index 8226bb5459..fc50982915 100644 --- a/cmd/src/batch_preview.go +++ b/cmd/src/batch_preview.go @@ -5,6 +5,7 @@ import ( "flag" "fmt" + "github.com/sourcegraph/src-cli/internal/batches/executor" "github.com/sourcegraph/src-cli/internal/cmderrors" ) @@ -40,9 +41,20 @@ Examples: } ctx, cancel := contextCancelOnInterrupt(context.Background()) - defer cancel() - if err = executeBatchSpec(ctx, executeBatchSpecOpts{ + defer cancel(nil) + + failFastCancel := func(error) {} + if flags.failFast { + failFastCancel = cancel + } + + cctx := executor.CancelableContext{ + Context: ctx, + Cancel: failFastCancel, + } + + if err = executeBatchSpec(cctx, executeBatchSpecOpts{ flags: flags, client: cfg.apiClient(flags.api, flagSet.Output()), file: file, diff --git a/internal/batches/executor/coordinator.go b/internal/batches/executor/coordinator.go index 15855f5d37..322c8f004b 100644 --- a/internal/batches/executor/coordinator.go +++ b/internal/batches/executor/coordinator.go @@ -13,10 +13,15 @@ import ( ) type taskExecutor interface { - Start(context.Context, []*Task, TaskExecutionUI) + Start(CancelableContext, []*Task, TaskExecutionUI) Wait(context.Context) ([]taskResult, error) } +type CancelableContext struct { + context.Context + Cancel context.CancelCauseFunc +} + // Coordinator coordinates the execution of Tasks. It makes use of an executor, // checks the ExecutionCache whether execution is necessary, and builds // batcheslib.ChangesetSpecs out of the executionResults. @@ -177,7 +182,7 @@ func (c *Coordinator) buildSpecs(ctx context.Context, batchSpec *batcheslib.Batc // ExecuteAndBuildSpecs executes the given tasks and builds changeset specs for the results. // It calls the ui on updates. -func (c *Coordinator) ExecuteAndBuildSpecs(ctx context.Context, batchSpec *batcheslib.BatchSpec, tasks []*Task, ui TaskExecutionUI) ([]*batcheslib.ChangesetSpec, []string, error) { +func (c *Coordinator) ExecuteAndBuildSpecs(ctx CancelableContext, batchSpec *batcheslib.BatchSpec, tasks []*Task, ui TaskExecutionUI) ([]*batcheslib.ChangesetSpec, []string, error) { ui.Start(tasks) // Run executor. diff --git a/internal/batches/executor/coordinator_test.go b/internal/batches/executor/coordinator_test.go index 9ce252cda3..eb862b1dfd 100644 --- a/internal/batches/executor/coordinator_test.go +++ b/internal/batches/executor/coordinator_test.go @@ -269,6 +269,9 @@ func TestCoordinator_Execute(t *testing.T) { for _, tc := range tests { t.Run(tc.name, func(t *testing.T) { ctx := context.Background() + cctx := CancelableContext{ + Context: ctx, + } // Set attributes on Task which would be set by the TaskBuilder for _, t := range tc.tasks { @@ -294,7 +297,7 @@ func TestCoordinator_Execute(t *testing.T) { // the batch spec. We'll run this multiple times to cover both the // cache and non-cache code paths. execute := func(t *testing.T) { - specs, _, err := coord.ExecuteAndBuildSpecs(ctx, tc.batchSpec, tc.tasks, newDummyTaskExecutionUI()) + specs, _, err := coord.ExecuteAndBuildSpecs(cctx, tc.batchSpec, tc.tasks, newDummyTaskExecutionUI()) if tc.wantErrInclude == "" { if err != nil { t.Fatalf("execution failed: %s", err) @@ -439,6 +442,10 @@ func TestCoordinator_Execute_StepCaching(t *testing.T) { // in a new Coordinator, setting cb as the startCallback on the executor. func execAndEnsure(t *testing.T, coord *Coordinator, exec *dummyExecutor, batchSpec *batcheslib.BatchSpec, task *Task, cb startCallback) { t.Helper() + ctx := context.Background() + cctx := CancelableContext{ + Context: ctx, + } // Setup the callback exec.startCb = cb @@ -450,7 +457,7 @@ func execAndEnsure(t *testing.T, coord *Coordinator, exec *dummyExecutor, batchS } // Execute - freshSpecs, _, err := coord.ExecuteAndBuildSpecs(context.Background(), batchSpec, uncached, newDummyTaskExecutionUI()) + freshSpecs, _, err := coord.ExecuteAndBuildSpecs(cctx, batchSpec, uncached, newDummyTaskExecutionUI()) if err != nil { t.Fatalf("execution of task failed: %s", err) } @@ -554,7 +561,7 @@ type dummyExecutor struct { waitErr error } -func (d *dummyExecutor) Start(ctx context.Context, ts []*Task, ui TaskExecutionUI) { +func (d *dummyExecutor) Start(ctx CancelableContext, ts []*Task, ui TaskExecutionUI) { if d.startCb != nil { d.startCb(ctx, ts, ui) d.startCbCalled = true diff --git a/internal/batches/executor/executor.go b/internal/batches/executor/executor.go index 9ccb94a8fc..08bccddac0 100644 --- a/internal/batches/executor/executor.go +++ b/internal/batches/executor/executor.go @@ -92,12 +92,14 @@ func NewExecutor(opts NewExecutorOpts) *executor { } } +var ErrFastFail = errors.New("Execution stopped due to fast-fail mode.") + // Start starts the execution of the given Tasks in goroutines, calling the // given taskStatusHandler to update the progress of the tasks. -func (x *executor) Start(ctx context.Context, tasks []*Task, ui TaskExecutionUI) { +func (x *executor) Start(ctx CancelableContext, tasks []*Task, ui TaskExecutionUI) { defer func() { close(x.doneEnqueuing) }() - for _, task := range tasks { + fmt.Println(task.Repository) select { case <-ctx.Done(): return @@ -115,6 +117,7 @@ func (x *executor) Start(ctx context.Context, tasks []*Task, ui TaskExecutionUI) default: err := x.do(ctx, task, ui) if err != nil { + ctx.Cancel(ErrFastFail) x.par.Error(err) } } diff --git a/internal/batches/executor/executor_test.go b/internal/batches/executor/executor_test.go index c4accc0f87..730b0e9304 100644 --- a/internal/batches/executor/executor_test.go +++ b/internal/batches/executor/executor_test.go @@ -175,7 +175,6 @@ func TestExecutor_Integration(t *testing.T) { }, {Run: `touch output-${{ outputs.myOutput }}`}, }, - tasks: []*Task{ {Repository: testRepo1}, }, @@ -396,8 +395,15 @@ func TestExecutor_Integration(t *testing.T) { dummyUI := newDummyTaskExecutionUI() executor := NewExecutor(opts) + ctx := context.Background() + failFastCancel := func(error) {} + cctx := CancelableContext{ + Context: ctx, + Cancel: failFastCancel, + } + // Run executor - executor.Start(context.Background(), tc.tasks, dummyUI) + executor.Start(cctx, tc.tasks, dummyUI) results, err := executor.Wait(context.Background()) if tc.wantErrInclude == "" { @@ -809,8 +815,11 @@ func testExecuteTasks(t *testing.T, tasks []*Task, archives ...mock.RepoArchive) Parallelism: runtime.GOMAXPROCS(0), Timeout: 30 * time.Second, }) - - executor.Start(context.Background(), tasks, newDummyTaskExecutionUI()) + ctx := context.Background() + cctx := CancelableContext{ + Context: ctx, + } + executor.Start(cctx, tasks, newDummyTaskExecutionUI()) return executor.Wait(context.Background()) }