From 44256348251342d1fddc2fc13a8f6a21cdd7c678 Mon Sep 17 00:00:00 2001 From: Cian Johnston Date: Fri, 27 Sep 2024 19:38:52 +0100 Subject: [PATCH 1/3] fix(log): sendLogsV2: cancel LogSender.SendLoop when doneFunc is called --- cmd/envbuilder/main.go | 2 +- integration/integration_test.go | 5 ++++- log/coder.go | 5 ++++- 3 files changed, 9 insertions(+), 3 deletions(-) diff --git a/cmd/envbuilder/main.go b/cmd/envbuilder/main.go index 720c0c85..d1e4bbf2 100644 --- a/cmd/envbuilder/main.go +++ b/cmd/envbuilder/main.go @@ -57,8 +57,8 @@ func envbuilderCmd() serpent.Command { o.Logger = log.Wrap(o.Logger, coderLog) defer closeLogs() preExec = append(preExec, func() { - o.Logger(log.LevelInfo, "Closing logs") closeLogs() + o.Logger(log.LevelInfo, "Closed logs") }) // This adds the envbuilder subsystem. // If telemetry is enabled in a Coder deployment, diff --git a/integration/integration_test.go b/integration/integration_test.go index 79b678d5..932f6581 100644 --- a/integration/integration_test.go +++ b/integration/integration_test.go @@ -110,8 +110,9 @@ func TestLogs(t *testing.T) { "Dockerfile": fmt.Sprintf(`FROM %s`, testImageUbuntu), }, }) - _, err := runEnvbuilder(t, runOpts{env: []string{ + ctr, err := runEnvbuilder(t, runOpts{env: []string{ envbuilderEnv("GIT_URL", srv.URL), + envbuilderEnv("INIT_SCRIPT", "date > /.date.txt"), "CODER_AGENT_URL=" + logSrv.URL, "CODER_AGENT_TOKEN=" + token, }}) @@ -123,6 +124,8 @@ func TestLogs(t *testing.T) { t.Fatal("timed out waiting for logs") case <-logsDone: } + output := execContainer(t, ctr, "cat /date.txt") + require.NotEmpty(t, strings.TrimSpace(output)) } func TestInitScriptInitCommand(t *testing.T) { diff --git a/log/coder.go b/log/coder.go index d8b4fe0d..15b4cb15 100644 --- a/log/coder.go +++ b/log/coder.go @@ -118,9 +118,11 @@ func sendLogsV1(ctx context.Context, client *agentsdk.Client, l slog.Logger) (Fu func sendLogsV2(ctx context.Context, dest agentsdk.LogDest, ls coderLogSender, l slog.Logger) (Func, func()) { done := make(chan struct{}) uid := uuid.New() + sendLoopCtx, cancelSendLoop := context.WithCancel(ctx) + defer cancelSendLoop() go func() { defer close(done) - if err := ls.SendLoop(ctx, dest); err != nil { + if err := ls.SendLoop(sendLoopCtx, dest); err != nil { if !errors.Is(err, context.Canceled) { l.Warn(ctx, "failed to send logs to Coder", slog.Error(err)) } @@ -152,6 +154,7 @@ func sendLogsV2(ctx context.Context, dest agentsdk.LogDest, ls coderLogSender, l } doneFunc := func() { + cancelSendLoop() <-done } From 51e89d26dd11a2b2dfe5ccbe8492a6a017858d9f Mon Sep 17 00:00:00 2001 From: Cian Johnston Date: Fri, 27 Sep 2024 19:59:53 +0100 Subject: [PATCH 2/3] fix test --- cmd/envbuilder/main.go | 1 + log/coder.go | 25 ++++++++++++++++++------- 2 files changed, 19 insertions(+), 7 deletions(-) diff --git a/cmd/envbuilder/main.go b/cmd/envbuilder/main.go index d1e4bbf2..cf4e8236 100644 --- a/cmd/envbuilder/main.go +++ b/cmd/envbuilder/main.go @@ -57,6 +57,7 @@ func envbuilderCmd() serpent.Command { o.Logger = log.Wrap(o.Logger, coderLog) defer closeLogs() preExec = append(preExec, func() { + o.Logger(log.LevelInfo, "Closing logs") closeLogs() o.Logger(log.LevelInfo, "Closed logs") }) diff --git a/log/coder.go b/log/coder.go index 15b4cb15..1808fd20 100644 --- a/log/coder.go +++ b/log/coder.go @@ -105,12 +105,18 @@ func sendLogsV1(ctx context.Context, client *agentsdk.Client, l slog.Logger) (Fu Level: codersdk.LogLevel(lvl), } if err := sendLogs(ctx, log); err != nil { - l.Warn(ctx, "failed to send logs to Coder", slog.Error(err)) + if !errors.Is(err, context.Canceled) { + l.Warn(ctx, "failed to send logs to Coder", slog.Error(err)) + } } }, func() { - if err := flushLogs(ctx); err != nil { + // Wait for up to 10 seconds for logs to finish sending. + sendCtx, sendCancel := context.WithTimeout(context.Background(), logSendGracePeriod) + defer sendCancel() + if err := flushLogs(sendCtx); err != nil { l.Warn(ctx, "failed to flush logs", slog.Error(err)) } + return } } @@ -146,11 +152,16 @@ func sendLogsV2(ctx context.Context, dest agentsdk.LogDest, ls coderLogSender, l }() logFunc := func(l Level, msg string, args ...any) { - ls.Enqueue(uid, agentsdk.Log{ - CreatedAt: time.Now(), - Output: fmt.Sprintf(msg, args...), - Level: codersdk.LogLevel(l), - }) + select { + case <-sendLoopCtx.Done(): + return + default: + ls.Enqueue(uid, agentsdk.Log{ + CreatedAt: time.Now(), + Output: fmt.Sprintf(msg, args...), + Level: codersdk.LogLevel(l), + }) + } } doneFunc := func() { From 06880901c788658b87e0a55eba218df5557bc2bc Mon Sep 17 00:00:00 2001 From: Cian Johnston Date: Fri, 27 Sep 2024 20:03:47 +0100 Subject: [PATCH 3/3] appease gosimple --- log/coder.go | 1 - 1 file changed, 1 deletion(-) diff --git a/log/coder.go b/log/coder.go index 1808fd20..4e6806c3 100644 --- a/log/coder.go +++ b/log/coder.go @@ -116,7 +116,6 @@ func sendLogsV1(ctx context.Context, client *agentsdk.Client, l slog.Logger) (Fu if err := flushLogs(sendCtx); err != nil { l.Warn(ctx, "failed to flush logs", slog.Error(err)) } - return } }