Skip to content

Commit

Permalink
reapply: "fix: refactor coder logger to allow flush without deadlock (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
mafredri authored Sep 30, 2024
1 parent a1e8f3c commit b3d1ec8
Show file tree
Hide file tree
Showing 6 changed files with 328 additions and 86 deletions.
15 changes: 13 additions & 2 deletions cmd/envbuilder/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,15 @@ func envbuilderCmd() serpent.Command {
Options: o.CLI(),
Handler: func(inv *serpent.Invocation) error {
o.SetDefaults()
var preExecs []func()
preExec := func() {
for _, fn := range preExecs {
fn()
}
preExecs = nil
}
defer preExec() // Ensure cleanup in case of error.

o.Logger = log.New(os.Stderr, o.Verbose)
if o.CoderAgentURL != "" {
if o.CoderAgentToken == "" {
Expand All @@ -49,7 +58,9 @@ func envbuilderCmd() serpent.Command {
coderLog, closeLogs, err := log.Coder(inv.Context(), u, o.CoderAgentToken)
if err == nil {
o.Logger = log.Wrap(o.Logger, coderLog)
defer closeLogs()
preExecs = append(preExecs, func() {
closeLogs()
})
// This adds the envbuilder subsystem.
// If telemetry is enabled in a Coder deployment,
// this will be reported and help us understand
Expand Down Expand Up @@ -78,7 +89,7 @@ func envbuilderCmd() serpent.Command {
return nil
}

err := envbuilder.Run(inv.Context(), o)
err := envbuilder.Run(inv.Context(), o, preExec)
if err != nil {
o.Logger(log.LevelError, "error: %s", err)
}
Expand Down
7 changes: 6 additions & 1 deletion envbuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,9 @@ type execArgsInfo struct {
// Logger is the logf to use for all operations.
// Filesystem is the filesystem to use for all operations.
// Defaults to the host filesystem.
func Run(ctx context.Context, opts options.Options) error {
// preExec are any functions that should be called before exec'ing the init
// command. This is useful for ensuring that defers get run.
func Run(ctx context.Context, opts options.Options, preExec ...func()) error {
var args execArgsInfo
// Run in a separate function to ensure all defers run before we
// setuid or exec.
Expand All @@ -103,6 +105,9 @@ func Run(ctx context.Context, opts options.Options) error {
}

opts.Logger(log.LevelInfo, "=== Running the init command %s %+v as the %q user...", opts.InitCommand, args.InitArgs, args.UserInfo.user.Username)
for _, fn := range preExec {
fn()
}

err = syscall.Exec(args.InitCommand, append([]string{args.InitCommand}, args.InitArgs...), args.Environ)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ require (
github.com/gliderlabs/ssh v0.3.7
github.com/go-git/go-billy/v5 v5.5.0
github.com/go-git/go-git/v5 v5.12.0
github.com/google/go-cmp v0.6.0
github.com/google/go-containerregistry v0.20.1
github.com/google/uuid v1.6.0
github.com/hashicorp/go-multierror v1.1.1
Expand Down Expand Up @@ -149,7 +150,6 @@ require (
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect
github.com/golang/protobuf v1.5.4 // indirect
github.com/google/btree v1.1.2 // indirect
github.com/google/go-cmp v0.6.0 // indirect
github.com/google/nftables v0.2.0 // indirect
github.com/google/pprof v0.0.0-20230817174616-7a8ec2ada47b // indirect
github.com/gorilla/handlers v1.5.1 // indirect
Expand Down
67 changes: 67 additions & 0 deletions integration/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ import (
"testing"
"time"

"github.com/coder/coder/v2/codersdk"
"github.com/coder/coder/v2/codersdk/agentsdk"
"github.com/coder/envbuilder"
"github.com/coder/envbuilder/devcontainer/features"
"github.com/coder/envbuilder/internal/magicdir"
Expand Down Expand Up @@ -58,6 +60,71 @@ const (
testImageUbuntu = "localhost:5000/envbuilder-test-ubuntu:latest"
)

func TestLogs(t *testing.T) {
t.Parallel()

token := uuid.NewString()
logsDone := make(chan struct{})

logHandler := func(w http.ResponseWriter, r *http.Request) {
switch r.URL.Path {
case "/api/v2/buildinfo":
w.Header().Set("Content-Type", "application/json")
_, _ = w.Write([]byte(`{"version": "v2.8.9"}`))
return
case "/api/v2/workspaceagents/me/logs":
w.WriteHeader(http.StatusOK)
tokHdr := r.Header.Get(codersdk.SessionTokenHeader)
assert.Equal(t, token, tokHdr)
var req agentsdk.PatchLogs
err := json.NewDecoder(r.Body).Decode(&req)
if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
for _, log := range req.Logs {
t.Logf("got log: %+v", log)
if strings.Contains(log.Output, "Running the init command") {
close(logsDone)
return
}
}
return
default:
t.Errorf("unexpected request to %s", r.URL.Path)
w.WriteHeader(http.StatusNotFound)
return
}
}
logSrv := httptest.NewServer(http.HandlerFunc(logHandler))
defer logSrv.Close()

// Ensures that a Git repository with a devcontainer.json is cloned and built.
srv := gittest.CreateGitServer(t, gittest.Options{
Files: map[string]string{
"devcontainer.json": `{
"build": {
"dockerfile": "Dockerfile"
},
}`,
"Dockerfile": fmt.Sprintf(`FROM %s`, testImageUbuntu),
},
})
_, err := runEnvbuilder(t, runOpts{env: []string{
envbuilderEnv("GIT_URL", srv.URL),
"CODER_AGENT_URL=" + logSrv.URL,
"CODER_AGENT_TOKEN=" + token,
}})
require.NoError(t, err)
ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
defer cancel()
select {
case <-ctx.Done():
t.Fatal("timed out waiting for logs")
case <-logsDone:
}
}

func TestInitScriptInitCommand(t *testing.T) {
t.Parallel()

Expand Down
115 changes: 70 additions & 45 deletions log/coder.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"fmt"
"net/url"
"os"
"sync"
"time"

"cdr.dev/slog"
Expand All @@ -27,13 +28,14 @@ var (
minAgentAPIV2 = "v2.9"
)

// Coder establishes a connection to the Coder instance located at
// coderURL and authenticates using token. It then establishes a
// dRPC connection to the Agent API and begins sending logs.
// If the version of Coder does not support the Agent API, it will
// fall back to using the PatchLogs endpoint.
// The returned function is used to block until all logs are sent.
func Coder(ctx context.Context, coderURL *url.URL, token string) (Func, func(), error) {
// Coder establishes a connection to the Coder instance located at coderURL and
// authenticates using token. It then establishes a dRPC connection to the Agent
// API and begins sending logs. If the version of Coder does not support the
// Agent API, it will fall back to using the PatchLogs endpoint. The closer is
// used to close the logger and to wait at most logSendGracePeriod for logs to
// be sent. Cancelling the context will close the logs immediately without
// waiting for logs to be sent.
func Coder(ctx context.Context, coderURL *url.URL, token string) (logger Func, closer func(), err error) {
// To troubleshoot issues, we need some way of logging.
metaLogger := slog.Make(sloghuman.Sink(os.Stderr))
defer metaLogger.Sync()
Expand All @@ -44,18 +46,39 @@ func Coder(ctx context.Context, coderURL *url.URL, token string) (Func, func(),
}
if semver.Compare(semver.MajorMinor(bi.Version), minAgentAPIV2) < 0 {
metaLogger.Warn(ctx, "Detected Coder version incompatible with AgentAPI v2, falling back to deprecated API", slog.F("coder_version", bi.Version))
sendLogs, flushLogs := sendLogsV1(ctx, client, metaLogger.Named("send_logs_v1"))
return sendLogs, flushLogs, nil
logger, closer = sendLogsV1(ctx, client, metaLogger.Named("send_logs_v1"))
return logger, closer, nil
}

// Create a new context so we can ensure the connection is torn down.
ctx, cancel := context.WithCancel(ctx)
defer func() {
if err != nil {
cancel()
}
}()
// Note that ctx passed to initRPC will be inherited by the
// underlying connection, nothing we can do about that here.
dac, err := initRPC(ctx, client, metaLogger.Named("init_rpc"))
if err != nil {
// Logged externally
return nil, nil, fmt.Errorf("init coder rpc client: %w", err)
}
ls := agentsdk.NewLogSender(metaLogger.Named("coder_log_sender"))
metaLogger.Warn(ctx, "Sending logs via AgentAPI v2", slog.F("coder_version", bi.Version))
sendLogs, doneFunc := sendLogsV2(ctx, dac, ls, metaLogger.Named("send_logs_v2"))
return sendLogs, doneFunc, nil
logger, loggerCloser := sendLogsV2(ctx, dac, ls, metaLogger.Named("send_logs_v2"))
var closeOnce sync.Once
closer = func() {
loggerCloser()

closeOnce.Do(func() {
// Typically cancel would be after Close, but we want to be
// sure there's nothing that might block on Close.
cancel()
_ = dac.DRPCConn().Close()
})
}
return logger, closer, nil
}

type coderLogSender interface {
Expand All @@ -74,7 +97,7 @@ func initClient(coderURL *url.URL, token string) *agentsdk.Client {
func initRPC(ctx context.Context, client *agentsdk.Client, l slog.Logger) (proto.DRPCAgentClient20, error) {
var c proto.DRPCAgentClient20
var err error
retryCtx, retryCancel := context.WithTimeout(context.Background(), rpcConnectTimeout)
retryCtx, retryCancel := context.WithTimeout(ctx, rpcConnectTimeout)
defer retryCancel()
attempts := 0
for r := retry.New(100*time.Millisecond, time.Second); r.Wait(retryCtx); {
Expand All @@ -95,65 +118,67 @@ func initRPC(ctx context.Context, client *agentsdk.Client, l slog.Logger) (proto

// sendLogsV1 uses the PatchLogs endpoint to send logs.
// This is deprecated, but required for backward compatibility with older versions of Coder.
func sendLogsV1(ctx context.Context, client *agentsdk.Client, l slog.Logger) (Func, func()) {
func sendLogsV1(ctx context.Context, client *agentsdk.Client, l slog.Logger) (logger Func, closer func()) {
// nolint: staticcheck // required for backwards compatibility
sendLogs, flushLogs := agentsdk.LogsSender(agentsdk.ExternalLogSourceID, client.PatchLogs, slog.Logger{})
sendLog, flushAndClose := agentsdk.LogsSender(agentsdk.ExternalLogSourceID, client.PatchLogs, slog.Logger{})
var mu sync.Mutex
return func(lvl Level, msg string, args ...any) {
log := agentsdk.Log{
CreatedAt: time.Now(),
Output: fmt.Sprintf(msg, args...),
Level: codersdk.LogLevel(lvl),
}
if err := sendLogs(ctx, log); err != nil {
mu.Lock()
defer mu.Unlock()
if err := sendLog(ctx, log); err != nil {
l.Warn(ctx, "failed to send logs to Coder", slog.Error(err))
}
}, func() {
if err := flushLogs(ctx); err != nil {
ctx, cancel := context.WithTimeout(ctx, logSendGracePeriod)
defer cancel()
if err := flushAndClose(ctx); err != nil {
l.Warn(ctx, "failed to flush logs", slog.Error(err))
}
}
}

// sendLogsV2 uses the v2 agent API to send logs. Only compatibile with coder versions >= 2.9.
func sendLogsV2(ctx context.Context, dest agentsdk.LogDest, ls coderLogSender, l slog.Logger) (Func, func()) {
func sendLogsV2(ctx context.Context, dest agentsdk.LogDest, ls coderLogSender, l slog.Logger) (logger Func, closer func()) {
sendCtx, sendCancel := context.WithCancel(ctx)
done := make(chan struct{})
uid := uuid.New()
go func() {
defer close(done)
if err := ls.SendLoop(ctx, dest); err != nil {
if err := ls.SendLoop(sendCtx, dest); err != nil {
if !errors.Is(err, context.Canceled) {
l.Warn(ctx, "failed to send logs to Coder", slog.Error(err))
}
}

// Wait for up to 10 seconds for logs to finish sending.
sendCtx, sendCancel := context.WithTimeout(context.Background(), logSendGracePeriod)
defer sendCancel()
// Try once more to send any pending logs
if err := ls.SendLoop(sendCtx, dest); err != nil {
if !errors.Is(err, context.DeadlineExceeded) {
l.Warn(ctx, "failed to send remaining logs to Coder", slog.Error(err))
}
}
ls.Flush(uid)
if err := ls.WaitUntilEmpty(sendCtx); err != nil {
if !errors.Is(err, context.DeadlineExceeded) {
l.Warn(ctx, "log sender did not empty", slog.Error(err))
}
}
}()

logFunc := func(l Level, msg string, args ...any) {
ls.Enqueue(uid, agentsdk.Log{
CreatedAt: time.Now(),
Output: fmt.Sprintf(msg, args...),
Level: codersdk.LogLevel(l),
})
}
var closeOnce sync.Once
return func(l Level, msg string, args ...any) {
ls.Enqueue(uid, agentsdk.Log{
CreatedAt: time.Now(),
Output: fmt.Sprintf(msg, args...),
Level: codersdk.LogLevel(l),
})
}, func() {
closeOnce.Do(func() {
// Trigger a flush and wait for logs to be sent.
ls.Flush(uid)
ctx, cancel := context.WithTimeout(ctx, logSendGracePeriod)
defer cancel()
err := ls.WaitUntilEmpty(ctx)
if err != nil {
l.Warn(ctx, "log sender did not empty", slog.Error(err))
}

doneFunc := func() {
<-done
}
// Stop the send loop.
sendCancel()
})

return logFunc, doneFunc
// Wait for the send loop to finish.
<-done
}
}
Loading

0 comments on commit b3d1ec8

Please sign in to comment.