Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(log): sendLogsV2: cancel LogSender.SendLoop when doneFunc is called #369

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cmd/envbuilder/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ func envbuilderCmd() serpent.Command {
preExec = append(preExec, func() {
o.Logger(log.LevelInfo, "Closing logs")
closeLogs()
o.Logger(log.LevelInfo, "Closed logs")
})
// This adds the envbuilder subsystem.
// If telemetry is enabled in a Coder deployment,
Expand Down
5 changes: 4 additions & 1 deletion integration/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,8 +110,9 @@ func TestLogs(t *testing.T) {
"Dockerfile": fmt.Sprintf(`FROM %s`, testImageUbuntu),
},
})
_, err := runEnvbuilder(t, runOpts{env: []string{
ctr, err := runEnvbuilder(t, runOpts{env: []string{
envbuilderEnv("GIT_URL", srv.URL),
envbuilderEnv("INIT_SCRIPT", "date > /.date.txt"),
"CODER_AGENT_URL=" + logSrv.URL,
"CODER_AGENT_TOKEN=" + token,
}})
Expand All @@ -123,6 +124,8 @@ func TestLogs(t *testing.T) {
t.Fatal("timed out waiting for logs")
case <-logsDone:
}
output := execContainer(t, ctr, "cat /date.txt")
require.NotEmpty(t, strings.TrimSpace(output))
}

func TestInitScriptInitCommand(t *testing.T) {
Expand Down
29 changes: 21 additions & 8 deletions log/coder.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,10 +105,15 @@ func sendLogsV1(ctx context.Context, client *agentsdk.Client, l slog.Logger) (Fu
Level: codersdk.LogLevel(lvl),
}
if err := sendLogs(ctx, log); err != nil {
l.Warn(ctx, "failed to send logs to Coder", slog.Error(err))
if !errors.Is(err, context.Canceled) {
l.Warn(ctx, "failed to send logs to Coder", slog.Error(err))
}
}
}, func() {
if err := flushLogs(ctx); err != nil {
// Wait for up to 10 seconds for logs to finish sending.
sendCtx, sendCancel := context.WithTimeout(context.Background(), logSendGracePeriod)
defer sendCancel()
if err := flushLogs(sendCtx); err != nil {
l.Warn(ctx, "failed to flush logs", slog.Error(err))
}
}
Expand All @@ -118,9 +123,11 @@ func sendLogsV1(ctx context.Context, client *agentsdk.Client, l slog.Logger) (Fu
func sendLogsV2(ctx context.Context, dest agentsdk.LogDest, ls coderLogSender, l slog.Logger) (Func, func()) {
done := make(chan struct{})
uid := uuid.New()
sendLoopCtx, cancelSendLoop := context.WithCancel(ctx)
defer cancelSendLoop()
go func() {
defer close(done)
if err := ls.SendLoop(ctx, dest); err != nil {
if err := ls.SendLoop(sendLoopCtx, dest); err != nil {
if !errors.Is(err, context.Canceled) {
l.Warn(ctx, "failed to send logs to Coder", slog.Error(err))
}
Expand All @@ -144,14 +151,20 @@ func sendLogsV2(ctx context.Context, dest agentsdk.LogDest, ls coderLogSender, l
}()

logFunc := func(l Level, msg string, args ...any) {
ls.Enqueue(uid, agentsdk.Log{
CreatedAt: time.Now(),
Output: fmt.Sprintf(msg, args...),
Level: codersdk.LogLevel(l),
})
select {
case <-sendLoopCtx.Done():
return
default:
ls.Enqueue(uid, agentsdk.Log{
CreatedAt: time.Now(),
Output: fmt.Sprintf(msg, args...),
Level: codersdk.LogLevel(l),
})
}
}

doneFunc := func() {
cancelSendLoop()
<-done
}

Expand Down
Loading