Skip to content

Commit

Permalink
Merge pull request #3349 from telepresenceio/thallgren/unify-daemon-d…
Browse files Browse the repository at this point in the history
…iscovery

Unify daemon discovery and make quit extendable.
  • Loading branch information
thallgren authored Sep 26, 2023
2 parents cec477d + d0e04ea commit 2ed58a3
Show file tree
Hide file tree
Showing 43 changed files with 746 additions and 537 deletions.
1 change: 1 addition & 0 deletions .github/actions/install-dependencies/action.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ runs:
shell: bash
env:
HOMEBREW_NO_INSTALL_FROM_API: ""
HOMEBREW_NO_INSTALLED_DEPENDENTS_CHECK: "1"
run: |
brew untap homebrew/core || true
brew untap homebrew/cask || true
Expand Down
3 changes: 1 addition & 2 deletions cmd/traffic/cmd/agent/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import (
"google.golang.org/grpc/credentials/insecure"
empty "google.golang.org/protobuf/types/known/emptypb"

"github.com/datawire/dlib/dcontext"
"github.com/datawire/dlib/dgroup"
"github.com/datawire/dlib/dlog"
rpc "github.com/telepresenceio/telepresence/rpc/v2/manager"
Expand Down Expand Up @@ -93,7 +92,7 @@ func TalkToManager(ctx context.Context, address string, info *rpc.AgentInfo, sta
defer func() {
// The ctx might well be cancelled at this point but is used as parent during
// the timed clean-up to keep logging intact.
ctx, cancel := context.WithTimeout(dcontext.WithoutCancel(ctx), time.Second)
ctx, cancel := context.WithTimeout(context.WithoutCancel(ctx), time.Second)
defer cancel()

// Reset state by processing an empty snapshot
Expand Down
34 changes: 27 additions & 7 deletions integration_test/cloud_config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"fmt"
"os"
"path/filepath"
"runtime"
"strings"
"time"

Expand Down Expand Up @@ -46,17 +47,34 @@ func (s *notConnectedSuite) Test_CloudNeverProxy() {
require.NoError(s.TelepresenceHelmInstall(ctx, true, "--set", fmt.Sprintf("client.routing.neverProxySubnets={%s/32}", ip)))
defer s.RollbackTM(ctx)

s.Eventually(func() bool {
defer itest.TelepresenceDisconnectOk(ctx)
_, _, err = itest.Telepresence(ctx, "connect", "--namespace", s.AppNamespace(), "--manager-namespace", s.ManagerNamespace())
timeout := 20 * time.Second
if runtime.GOOS == "windows" {
timeout *= 5
}
s.Eventuallyf(func() bool {
defer func() {
stdout, stderr, err := itest.Telepresence(ctx, "quit")
dlog.Infof(ctx, "stdout: %q", stdout)
dlog.Infof(ctx, "stderr: %q", stderr)
if err != nil {
dlog.Error(ctx, err)
}
}()
stdout, stderr, err := itest.Telepresence(ctx, "connect", "--namespace", s.AppNamespace(), "--manager-namespace", s.ManagerNamespace())
dlog.Infof(ctx, "stdout: %q", stdout)
dlog.Infof(ctx, "stderr: %q", stderr)
if err != nil {
dlog.Error(ctx, err)
return false
}

// The cluster's IP address will also be never proxied, so we gotta account for that.
neverProxiedCount := len(ips) + 1
stdout, _, err := itest.Telepresence(ctx, "status")
stdout, stderr, err = itest.Telepresence(ctx, "status")
dlog.Infof(ctx, "stdout: %q", stdout)
dlog.Infof(ctx, "stderr: %q", stderr)
if err != nil {
dlog.Error(ctx, err)
return false
}
if !strings.Contains(stdout, fmt.Sprintf("Never Proxy: (%d subnets)", neverProxiedCount)) {
Expand All @@ -66,6 +84,7 @@ func (s *notConnectedSuite) Test_CloudNeverProxy() {

jsonStdout, _, err := itest.Telepresence(ctx, "config", "view", "--output", "json")
if err != nil {
dlog.Error(ctx, err)
return false
}
var view client.SessionConfig
Expand All @@ -80,8 +99,9 @@ func (s *notConnectedSuite) Test_CloudNeverProxy() {
return false
}

dlog.Infof(ctx, "Success! Never-proxied IP %s is not reachable", ip)
return true
}, 20*time.Second, 5*time.Second, "never-proxy not updated in 20 seconds")
}, timeout, 5*time.Second, "never-proxy not updated in %s", timeout)
}

func (s *notConnectedSuite) Test_RootdCloudLogLevel() {
Expand Down Expand Up @@ -131,7 +151,7 @@ func (s *notConnectedSuite) Test_RootdCloudLogLevel() {
currentLine++
}
return levelSet
}, 20*time.Second, 5*time.Second, "Root log level not updated in 20 seconds")
}, 60*time.Second, 5*time.Second, "Root log level not updated in 20 seconds")

// Make sure the log level was set back after disconnect
rootLog, err = os.Open(rootLogName)
Expand Down Expand Up @@ -217,7 +237,7 @@ func (s *notConnectedSuite) Test_UserdCloudLogLevel() {
currentLine++
}
return levelSet
}, 20*time.Second, 5*time.Second, "Connector log level not updated in 20 seconds")
}, 60*time.Second, 5*time.Second, "Connector log level not updated in 20 seconds")

// Make sure the log level was set back after disconnect
logF, err = os.Open(logName)
Expand Down
15 changes: 12 additions & 3 deletions integration_test/docker_run_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import (
"github.com/telepresenceio/telepresence/v2/pkg/client/cli/daemon"
)

func (s *singleServiceSuite) Test_DockerRun() {
func (s *singleServiceSuite) Test_DockerRun_HostDaemon() {
if s.IsCI() && goRuntime.GOOS != "linux" {
s.T().Skip("CI can't run linux docker containers inside non-linux runners")
}
Expand All @@ -33,8 +33,11 @@ func (s *singleServiceSuite) Test_DockerRun() {

runDockerRun := func(ctx context.Context, wch chan<- struct{}) {
defer close(wch)
_, _, _ = itest.Telepresence(ctx, "intercept", "--mount", "false", svc,
_, stderr, _ := itest.Telepresence(ctx, "intercept", "--mount", "false", svc,
"--docker-run", "--port", "9070:8080", "--", "--rm", "-v", abs+":/usr/src/app", tag)
if len(stderr) > 0 {
dlog.Debugf(ctx, "stderr = %q", stderr)
}
}

assertInterceptResponse := func(ctx context.Context) {
Expand Down Expand Up @@ -77,6 +80,12 @@ func (s *singleServiceSuite) Test_DockerRun() {
go runDockerRun(soft, wch)
assertInterceptResponse(ctx)
softCancel()
select {
case <-wch:
case <-time.After(30 * time.Second):
itest.TelepresenceOk(ctx, "leave", svc)
s.Fail("interceptor did not terminate")
}
assertNotIntercepted(ctx)
})

Expand Down Expand Up @@ -144,7 +153,7 @@ func (s *dockerDaemonSuite) Test_DockerRun_DockerDaemon() {
match := regexp.MustCompile(`Connected to context ?(.+),\s*namespace (\S+)\s+\(`).FindStringSubmatch(stdout)
require.Len(match, 3)

daemonID, err := daemon.NewIdentifier("", match[1], match[2])
daemonID, err := daemon.NewIdentifier("", match[1], match[2], true)
require.NoError(err)
daemonName := daemonID.ContainerName()
tag := "telepresence/echo-test"
Expand Down
3 changes: 1 addition & 2 deletions integration_test/itest/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ import (
"k8s.io/client-go/tools/clientcmd/api"
sigsYaml "sigs.k8s.io/yaml"

"github.com/datawire/dlib/dcontext"
"github.com/datawire/dlib/dexec"
"github.com/datawire/dlib/dhttp"
"github.com/datawire/dlib/dlog"
Expand Down Expand Up @@ -454,7 +453,7 @@ func (s *cluster) CapturePodLogs(ctx context.Context, app, container, ns string)
}

// Let command die when the pod that it logs die
ctx = dcontext.WithoutCancel(ctx)
ctx = context.WithoutCancel(ctx)

present := struct{}{}

Expand Down
18 changes: 14 additions & 4 deletions integration_test/kubeconfig_extension_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (

"k8s.io/client-go/tools/clientcmd/api"

"github.com/datawire/dlib/dlog"
"github.com/datawire/dlib/dtime"
"github.com/telepresenceio/telepresence/v2/integration_test/itest"
"github.com/telepresenceio/telepresence/v2/pkg/filelocation"
Expand Down Expand Up @@ -62,10 +63,19 @@ func (s *notConnectedSuite) Test_APIServerIsProxied() {
s.TelepresenceConnect(ctx, "--context", "extra")

expectedLen := len(ips)
expect := fmt.Sprintf("Also Proxy : (%d subnets)", expectedLen)
s.Eventually(func() bool {
stdout, _, err := itest.Telepresence(ctx, "status")
return err == nil && strings.Contains(stdout, fmt.Sprintf("Also Proxy : (%d subnets)", expectedLen))
}, 10*time.Second, 1*time.Second, fmt.Sprintf("did not find %d also-proxied subnets", expectedLen))
stdout, stderr, err := itest.Telepresence(ctx, "status")
if err == nil && strings.Contains(stdout, expect) {
return true
}
if err != nil {
dlog.Errorf(ctx, "%s: %v", stderr, err)
} else {
dlog.Infof(ctx, "%q does not contain %q", stdout, expect)
}
return false
}, 30*time.Second, 3*time.Second, fmt.Sprintf("did not find %d also-proxied subnets", expectedLen))

jsonStdout := itest.TelepresenceOk(ctx, "status", "--json")
var status statusResponse
Expand Down Expand Up @@ -177,7 +187,7 @@ func (s *notConnectedSuite) Test_ConflictingProxies() {
return true
}
return newRoute.Interface.Name != originalRoute.Interface.Name
}, 5*time.Second, 200*time.Millisecond)
}, 30*time.Second, 200*time.Millisecond)
})
}
}
Expand Down
2 changes: 1 addition & 1 deletion integration_test/manager_grpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func (m *managerGRPCSuite) SetupSuite() {
_, err = m.client.Version(ctx, &empty.Empty{})
m.Require().NoError(err)

daemonID, err := daemon.NewIdentifier("", k8sCluster.Context, m.AppNamespace())
daemonID, err := daemon.NewIdentifier("", k8sCluster.Context, m.AppNamespace(), false)
m.Require().NoError(err)
m.si, err = trafficmgr.LoadSessionInfoFromUserCache(ctx, daemonID)
m.Require().NoError(err)
Expand Down
2 changes: 1 addition & 1 deletion integration_test/multi_connect_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ func (s *multiConnectSuite) Test_MultipleConnect() {
cfg, err := clientcmd.LoadFromFile(kc)
require.NoError(err)
ctxName := daemon.SafeContainerName(cfg.CurrentContext)
s.doubleConnectCheck(ctx, ctx2, ctxName+"-"+s.AppNamespace(), ctxName+"-"+s.appSpace2, s.AppNamespace(), s.appSpace2, "")
s.doubleConnectCheck(ctx, ctx2, ctxName+"-"+s.AppNamespace()+"-cn", ctxName+"-"+s.appSpace2+"-cn", s.AppNamespace(), s.appSpace2, "")
}

func (s *multiConnectSuite) Test_MultipleConnect_named() {
Expand Down
25 changes: 9 additions & 16 deletions pkg/client/cli/cmd/quit.go
Original file line number Diff line number Diff line change
@@ -1,17 +1,13 @@
package cmd

import (
"fmt"
"os"

"github.com/spf13/cobra"
"google.golang.org/protobuf/types/known/emptypb"

daemon2 "github.com/telepresenceio/telepresence/rpc/v2/daemon"
"github.com/telepresenceio/telepresence/v2/pkg/client/cli/ann"
"github.com/telepresenceio/telepresence/v2/pkg/client/cli/connect"
"github.com/telepresenceio/telepresence/v2/pkg/client/cli/daemon"
"github.com/telepresenceio/telepresence/v2/pkg/client/socket"
"github.com/telepresenceio/telepresence/v2/pkg/ioutil"
)

func quit() *cobra.Command {
Expand All @@ -29,26 +25,23 @@ func quit() *cobra.Command {
return err
}
if quitUserDaemon {
fmt.Fprintln(os.Stderr, "--user-daemon (-u) is deprecated, please use --stop-daemons (-s)")
ioutil.Println(os.Stderr, "--user-daemon (-u) is deprecated, please use --stop-daemons (-s)")
quitDaemons = true
}
if quitRootDaemon {
fmt.Fprintln(os.Stderr, "--root-daemon (-r) is deprecated, please use --stop-daemons (-s)")
ioutil.Println(os.Stderr, "--root-daemon (-r) is deprecated, please use --stop-daemons (-s)")
quitDaemons = true
}
ctx := cmd.Context()
if quitDaemons && daemon.GetUserClient(ctx) == nil {
// User daemon isn't running. If the root daemon is running, we must
// kill it from here.
if conn, err := socket.Dial(ctx, socket.RootDaemonPath(ctx)); err == nil {
_, _ = daemon2.NewDaemonClient(conn).Quit(ctx, &emptypb.Empty{})
}
if quitDaemons {
connect.Quit(cmd.Context())
} else {
connect.Disconnect(cmd.Context())
}
return connect.Disconnect(cmd.Context(), quitDaemons)
return nil
},
}
flags := cmd.Flags()
flags.BoolVarP(&quitDaemons, "stop-daemons", "s", false, "stop the traffic-manager and network daemons")
flags.BoolVarP(&quitDaemons, "stop-daemons", "s", false, "stop all local telepresence daemons")
flags.BoolVarP(&quitRootDaemon, "root-daemon", "r", false, "stop daemons")
flags.BoolVarP(&quitUserDaemon, "user-daemon", "u", false, "stop daemons")

Expand Down
4 changes: 2 additions & 2 deletions pkg/client/cli/cmd/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ func BasicGetStatusInfo(ctx context.Context) (ioutil.WriterTos, error) {
}, nil
}
var wt ioutil.WriterTos
if userD.Remote() {
if userD.Containerized() {
sie := StatusInfoEmbedded{
RootDaemon: &rs,
UserDaemon: &us,
Expand All @@ -153,7 +153,7 @@ func BasicGetStatusInfo(ctx context.Context) (ioutil.WriterTos, error) {
}
us.Name = version.Name
if us.Name == "" {
if userD.Remote() {
if userD.Containerized() {
us.Name = "Daemon"
} else {
us.Name = "User Daemon"
Expand Down
2 changes: 1 addition & 1 deletion pkg/client/cli/cmd/version.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func printVersion(cmd *cobra.Command, _ []string) error {
remote := false
userD := daemon.GetUserClient(ctx)
if userD != nil {
remote = userD.Remote()
remote = userD.Containerized()
}

if !remote {
Expand Down
Loading

0 comments on commit 2ed58a3

Please sign in to comment.