Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add fail-fast flag to src batch #1049

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 14 additions & 2 deletions cmd/src/batch_apply.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"flag"
"fmt"

"github.com/sourcegraph/src-cli/internal/batches/executor"
"github.com/sourcegraph/src-cli/internal/cmderrors"
)

Expand Down Expand Up @@ -42,9 +43,20 @@ Examples:
}

ctx, cancel := contextCancelOnInterrupt(context.Background())
defer cancel()

if err = executeBatchSpec(ctx, executeBatchSpecOpts{
defer cancel(nil)

failFastCancel := func(error) {}
if flags.failFast {
failFastCancel = cancel
}

cctx := executor.CancelableContext{
Context: ctx,
Cancel: failFastCancel,
}

if err = executeBatchSpec(cctx, executeBatchSpecOpts{
flags: flags,
client: cfg.apiClient(flags.api, flagSet.Output()),
file: file,
Expand Down
29 changes: 19 additions & 10 deletions cmd/src/batch_common.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ type batchExecuteFlags struct {
cleanArchives bool
skipErrors bool
runAsRoot bool
failFast bool

// EXPERIMENTAL
textOnly bool
Expand Down Expand Up @@ -162,6 +163,10 @@ func newBatchExecuteFlags(flagSet *flag.FlagSet, cacheDir, tempDir string) *batc
&caf.runAsRoot, "run-as-root", false,
"If true, forces all step containers to run as root.",
)
flagSet.BoolVar(
&caf.failFast, "fail-fast", false,
"If true, errors encountered while executing steps in a repository will stop the execution of the batch spec.",
)

return caf
}
Expand Down Expand Up @@ -267,7 +272,7 @@ func createDockerWatchdog(ctx context.Context, execUI ui.ExecUI) *watchdog.Watch
// executeBatchSpec performs all the steps required to upload the batch spec to
// Sourcegraph, including execution as needed and applying the resulting batch
// spec if specified.
func executeBatchSpec(ctx context.Context, opts executeBatchSpecOpts) (err error) {
func executeBatchSpec(ctx executor.CancelableContext, opts executeBatchSpecOpts) (err error) {
var execUI ui.ExecUI
if opts.flags.textOnly {
execUI = &ui.JSONLines{}
Expand All @@ -282,6 +287,9 @@ func executeBatchSpec(ctx context.Context, opts executeBatchSpecOpts) (err error
defer func() {
w.Stop()
if err != nil {
if cerr := context.Cause(ctx); cerr != nil {
err = cerr
}
execUI.ExecutionError(err)
}
}()
Expand Down Expand Up @@ -463,6 +471,11 @@ func executeBatchSpec(ctx context.Context, opts executeBatchSpecOpts) (err error

taskExecUI := execUI.ExecutingTasks(*verbose, parallelism)
freshSpecs, logFiles, execErr := coord.ExecuteAndBuildSpecs(ctx, batchSpec, uncachedTasks, taskExecUI)

if len(logFiles) > 0 && opts.flags.keepLogs {
execUI.LogFilesKept(logFiles)
}

// Add external changeset specs.
importedSpecs, importErr := svc.CreateImportChangesetSpecs(ctx, batchSpec)
if execErr != nil {
Expand All @@ -487,10 +500,6 @@ func executeBatchSpec(ctx context.Context, opts executeBatchSpecOpts) (err error
}
}

if len(logFiles) > 0 && opts.flags.keepLogs {
execUI.LogFilesKept(logFiles)
}

specs = append(specs, freshSpecs...)
specs = append(specs, importedSpecs...)

Expand Down Expand Up @@ -628,22 +637,22 @@ func checkExecutable(cmd string, args ...string) error {
return nil
}

func contextCancelOnInterrupt(parent context.Context) (context.Context, func()) {
ctx, ctxCancel := context.WithCancel(parent)
func contextCancelOnInterrupt(parent context.Context) (context.Context, func(error)) {
ctx, ctxCancel := context.WithCancelCause(parent)
c := make(chan os.Signal, 1)
signal.Notify(c, os.Interrupt)

go func() {
select {
case <-c:
ctxCancel()
ctxCancel(errors.New("Ctrl-C hit"))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is awesome. TIL I learned about the .WithCancelCause method.

case <-ctx.Done():
}
}()

return ctx, func() {
return ctx, func(err error) {
signal.Stop(c)
ctxCancel()
ctxCancel(err)
}
}

Expand Down
2 changes: 1 addition & 1 deletion cmd/src/batch_exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ Examples:
}

ctx, cancel := contextCancelOnInterrupt(context.Background())
defer cancel()
defer cancel(nil)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if we should be passing in nil directly here, an error could occur at any point in the execution of a batch spec.

Is there a reason why the handler function doesn't return a named return value and we can then pass that into defer cancel?

handler := func(args []string) (err error) {
	...
	ctx, cancel := contextCancelOnInterrupt(context.Background())
	defer cancel(err)
	...
}

This way we are guaranteed to always have access to whatever error occurs in the lifecycle of an execution.


err := executeBatchSpecInWorkspaces(ctx, flags)
if err != nil {
Expand Down
16 changes: 14 additions & 2 deletions cmd/src/batch_preview.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"flag"
"fmt"

"github.com/sourcegraph/src-cli/internal/batches/executor"
"github.com/sourcegraph/src-cli/internal/cmderrors"
)

Expand Down Expand Up @@ -40,9 +41,20 @@ Examples:
}

ctx, cancel := contextCancelOnInterrupt(context.Background())
defer cancel()

if err = executeBatchSpec(ctx, executeBatchSpecOpts{
defer cancel(nil)

failFastCancel := func(error) {}
if flags.failFast {
failFastCancel = cancel
}

cctx := executor.CancelableContext{
Context: ctx,
Cancel: failFastCancel,
}
Comment on lines +52 to +55
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a need for this struct? Is there a reason we can't pass in the context and failFastCancel as separate arguments?

Or better still have the failFastCancel as one of the fields in executeBatchSpecOpts, that way we are certain ctx is always going to be of type context.Context and wouldn't be confusing to anyone in the future;


if err = executeBatchSpec(cctx, executeBatchSpecOpts{
flags: flags,
client: cfg.apiClient(flags.api, flagSet.Output()),
file: file,
Expand Down
9 changes: 7 additions & 2 deletions internal/batches/executor/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,15 @@ import (
)

type taskExecutor interface {
Start(context.Context, []*Task, TaskExecutionUI)
Start(CancelableContext, []*Task, TaskExecutionUI)
Wait(context.Context) ([]taskResult, error)
}

type CancelableContext struct {
context.Context
Cancel context.CancelCauseFunc
}

// Coordinator coordinates the execution of Tasks. It makes use of an executor,
// checks the ExecutionCache whether execution is necessary, and builds
// batcheslib.ChangesetSpecs out of the executionResults.
Expand Down Expand Up @@ -177,7 +182,7 @@ func (c *Coordinator) buildSpecs(ctx context.Context, batchSpec *batcheslib.Batc

// ExecuteAndBuildSpecs executes the given tasks and builds changeset specs for the results.
// It calls the ui on updates.
func (c *Coordinator) ExecuteAndBuildSpecs(ctx context.Context, batchSpec *batcheslib.BatchSpec, tasks []*Task, ui TaskExecutionUI) ([]*batcheslib.ChangesetSpec, []string, error) {
func (c *Coordinator) ExecuteAndBuildSpecs(ctx CancelableContext, batchSpec *batcheslib.BatchSpec, tasks []*Task, ui TaskExecutionUI) ([]*batcheslib.ChangesetSpec, []string, error) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since the arguments to this method is getting bulky, we can move it into a struct and pass in the cancel function explicitly.

Suggested change
func (c *Coordinator) ExecuteAndBuildSpecs(ctx CancelableContext, batchSpec *batcheslib.BatchSpec, tasks []*Task, ui TaskExecutionUI) ([]*batcheslib.ChangesetSpec, []string, error) {
type ExecuteAndBuildSpecsArgs struct {
batchSpec *batcheslib.BatchSpec
tasks []*Task
ui TaskExecutionUI
cancelFunc context.CancelCauseFunc
}
func (c *Coordinator) ExecuteAndBuildSpecs(ctx context.Context, args ExecuteAndBuildSpecsArgs) ([]*batcheslib.ChangesetSpec, []string, error) {

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The cancelFunc isn't being used here, I wonder if we need to pass it in here as an argument?

ui.Start(tasks)

// Run executor.
Expand Down
13 changes: 10 additions & 3 deletions internal/batches/executor/coordinator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,9 @@ func TestCoordinator_Execute(t *testing.T) {
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
ctx := context.Background()
cctx := CancelableContext{
Context: ctx,
}

// Set attributes on Task which would be set by the TaskBuilder
for _, t := range tc.tasks {
Expand All @@ -294,7 +297,7 @@ func TestCoordinator_Execute(t *testing.T) {
// the batch spec. We'll run this multiple times to cover both the
// cache and non-cache code paths.
execute := func(t *testing.T) {
specs, _, err := coord.ExecuteAndBuildSpecs(ctx, tc.batchSpec, tc.tasks, newDummyTaskExecutionUI())
specs, _, err := coord.ExecuteAndBuildSpecs(cctx, tc.batchSpec, tc.tasks, newDummyTaskExecutionUI())
if tc.wantErrInclude == "" {
if err != nil {
t.Fatalf("execution failed: %s", err)
Expand Down Expand Up @@ -439,6 +442,10 @@ func TestCoordinator_Execute_StepCaching(t *testing.T) {
// in a new Coordinator, setting cb as the startCallback on the executor.
func execAndEnsure(t *testing.T, coord *Coordinator, exec *dummyExecutor, batchSpec *batcheslib.BatchSpec, task *Task, cb startCallback) {
t.Helper()
ctx := context.Background()
cctx := CancelableContext{
Context: ctx,
}

// Setup the callback
exec.startCb = cb
Expand All @@ -450,7 +457,7 @@ func execAndEnsure(t *testing.T, coord *Coordinator, exec *dummyExecutor, batchS
}

// Execute
freshSpecs, _, err := coord.ExecuteAndBuildSpecs(context.Background(), batchSpec, uncached, newDummyTaskExecutionUI())
freshSpecs, _, err := coord.ExecuteAndBuildSpecs(cctx, batchSpec, uncached, newDummyTaskExecutionUI())
if err != nil {
t.Fatalf("execution of task failed: %s", err)
}
Expand Down Expand Up @@ -554,7 +561,7 @@ type dummyExecutor struct {
waitErr error
}

func (d *dummyExecutor) Start(ctx context.Context, ts []*Task, ui TaskExecutionUI) {
func (d *dummyExecutor) Start(ctx CancelableContext, ts []*Task, ui TaskExecutionUI) {
if d.startCb != nil {
d.startCb(ctx, ts, ui)
d.startCbCalled = true
Expand Down
7 changes: 5 additions & 2 deletions internal/batches/executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,12 +92,14 @@ func NewExecutor(opts NewExecutorOpts) *executor {
}
}

var ErrFastFail = errors.New("Execution stopped due to fast-fail mode.")

// Start starts the execution of the given Tasks in goroutines, calling the
// given taskStatusHandler to update the progress of the tasks.
func (x *executor) Start(ctx context.Context, tasks []*Task, ui TaskExecutionUI) {
func (x *executor) Start(ctx CancelableContext, tasks []*Task, ui TaskExecutionUI) {
defer func() { close(x.doneEnqueuing) }()

for _, task := range tasks {
fmt.Println(task.Repository)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Left over debug statement

select {
case <-ctx.Done():
return
Expand All @@ -115,6 +117,7 @@ func (x *executor) Start(ctx context.Context, tasks []*Task, ui TaskExecutionUI)
default:
err := x.do(ctx, task, ui)
if err != nil {
ctx.Cancel(ErrFastFail)
x.par.Error(err)
}
}
Expand Down
17 changes: 13 additions & 4 deletions internal/batches/executor/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,6 @@ func TestExecutor_Integration(t *testing.T) {
},
{Run: `touch output-${{ outputs.myOutput }}`},
},

tasks: []*Task{
{Repository: testRepo1},
},
Expand Down Expand Up @@ -396,8 +395,15 @@ func TestExecutor_Integration(t *testing.T) {
dummyUI := newDummyTaskExecutionUI()
executor := NewExecutor(opts)

ctx := context.Background()
failFastCancel := func(error) {}
cctx := CancelableContext{
Context: ctx,
Cancel: failFastCancel,
}

// Run executor
executor.Start(context.Background(), tc.tasks, dummyUI)
executor.Start(cctx, tc.tasks, dummyUI)

results, err := executor.Wait(context.Background())
if tc.wantErrInclude == "" {
Expand Down Expand Up @@ -809,8 +815,11 @@ func testExecuteTasks(t *testing.T, tasks []*Task, archives ...mock.RepoArchive)
Parallelism: runtime.GOMAXPROCS(0),
Timeout: 30 * time.Second,
})

executor.Start(context.Background(), tasks, newDummyTaskExecutionUI())
ctx := context.Background()
cctx := CancelableContext{
Context: ctx,
}
executor.Start(cctx, tasks, newDummyTaskExecutionUI())
return executor.Wait(context.Background())
}

Expand Down
Loading