Skip to content

Commit

Permalink
Merge pull request #3643 from telepresenceio/thallgren/inject-annotat…
Browse files Browse the repository at this point in the history
…ion-race

Race in traffic-agent injector when using inject annotation
  • Loading branch information
thallgren authored Jul 9, 2024
2 parents cd4e4e5 + 0629f25 commit 6b9cb05
Show file tree
Hide file tree
Showing 8 changed files with 105 additions and 64 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG.yml
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,12 @@ items:
A new Helm chart value <code>schedulerName</code> has been added. With this feature, we are
able to define some particular schedulers from Kubernetes to apply some different strategies to allocate telepresence resources,
including the Traffic Manager and hooks pods.
- type: bugfix
title: Race in traffic-agent injector when using inject annotation
body: >-
Applying multiple deployments that used the <code>telepresence.getambassador.io/inject-traffic-agent: enabled</code> would cause
a race condition, resulting in a large number of created pods that eventually had to be deleted, or sometimes in
pods that didn't contain a traffic agent.
- type: bugfix
title: Fix configuring custom agent security context
body: ->
Expand Down
8 changes: 8 additions & 0 deletions cmd/traffic/cmd/manager/mutator/agent_injector.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,14 @@ func (a *agentInjector) Inject(ctx context.Context, req *admission.AdmissionRequ
switch {
case err != nil:
return nil, err
case scx == nil && ia == "enabled":
// A race condition may occur when a workload with "enabled" is applied.
// The workload event handler will create the agent config, but the webhook injection call may arrive before
// that agent config has been stored.
// Returning an error here will make the webhook call again, and hopefully we're the agent config is ready
// by then.
dlog.Debugf(ctx, "No agent config has been generated for annotation enabled %s.%s", pod.Name, pod.Namespace)
return nil, errors.New("agent-config is not yet generated")
case scx == nil:
return nil, nil
case scx.AgentConfig().Manual:
Expand Down
17 changes: 16 additions & 1 deletion cmd/traffic/cmd/manager/mutator/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,20 @@ func (e *entry) workload(ctx context.Context) (agentconfig.SidecarExt, k8sapi.Wo
// isRolloutNeeded checks if the agent's entry in telepresence-agents matches the actual state of the
// pods. If it does, then there's no reason to trigger a rollout.
func (c *configWatcher) isRolloutNeeded(ctx context.Context, wl k8sapi.Workload, ac *agentconfig.Sidecar) bool {
podLabels := wl.GetPodTemplate().GetObjectMeta().GetLabels()
podMeta := wl.GetPodTemplate().GetObjectMeta()
if wl.GetDeletionTimestamp() != nil {
return false
}
if ia, ok := podMeta.GetAnnotations()[agentconfig.InjectAnnotation]; ok {
// Annotation controls injection, so no explicit rollout is needed unless the deployment was added after the traffic-manager.
// If the annotation changes, there will be an implicit rollout anyway.
if podMeta.GetCreationTimestamp().After(c.startedAt) {
dlog.Debugf(ctx, "Rollout of %s.%s is not necessary. Pod template has inject annotation %s",
wl.GetName(), wl.GetNamespace(), ia)
return false
}
}
podLabels := podMeta.GetLabels()
if len(podLabels) == 0 {
// Have never seen this, but if it happens, then rollout only if an agent is desired
dlog.Debugf(ctx, "Rollout of %s.%s is necessary. Pod template has no pod labels",
Expand Down Expand Up @@ -390,6 +403,7 @@ type configWatcher struct {
rolloutLocks *xsync.MapOf[workloadKey, *sync.Mutex]
nsLocks *xsync.MapOf[string, *sync.RWMutex]
blacklistedPods *xsync.MapOf[string, time.Time]
startedAt time.Time

cms []cache.SharedIndexInformer
svs []cache.SharedIndexInformer
Expand Down Expand Up @@ -498,6 +512,7 @@ func (c *configWatcher) SetSelf(self Map) {
}

func (c *configWatcher) StartWatchers(ctx context.Context) error {
c.startedAt = time.Now()
ctx, c.cancel = context.WithCancel(ctx)
for _, si := range c.svs {
if err := c.watchServices(ctx, si); err != nil {
Expand Down
18 changes: 7 additions & 11 deletions cmd/traffic/cmd/manager/mutator/workload_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,18 +144,14 @@ func (c *configWatcher) updateWorkload(ctx context.Context, wl, oldWl k8sapi.Wor
if !ok {
return
}
if oldWl != nil {
diff := cmp.Diff(oldWl.GetPodTemplate(), tpl,
cmpopts.IgnoreFields(meta.ObjectMeta{}, "Namespace", "UID", "ResourceVersion", "CreationTimestamp", "DeletionTimestamp"),
cmpopts.IgnoreMapEntries(func(k, _ string) bool {
return k == AnnRestartedAt
}),
)
if diff == "" {
return
}
dlog.Debugf(ctx, "Diff: %s", diff)
if oldWl != nil && cmp.Equal(oldWl.GetPodTemplate(), tpl,
cmpopts.IgnoreFields(meta.ObjectMeta{}, "Namespace", "UID", "ResourceVersion", "CreationTimestamp", "DeletionTimestamp"),
cmpopts.IgnoreMapEntries(func(k, _ string) bool {
return k == AnnRestartedAt
})) {
return
}

switch ia {
case "enabled":
img := managerutil.GetAgentImage(ctx)
Expand Down
54 changes: 26 additions & 28 deletions integration_test/multiple_services_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package integration_test

import (
"bytes"
"errors"
"fmt"
"io"
"math/rand"
Expand Down Expand Up @@ -31,7 +32,6 @@ func init() {
}

func (s *multipleServicesSuite) Test_LargeRequest() {
require := s.Require()
client := &http.Client{Timeout: 15 * time.Minute}
const sendSize = 1024 * 1024 * 20
const varyMax = 1 << 15 // vary last 64Ki
Expand All @@ -56,39 +56,37 @@ func (s *multipleServicesSuite) Test_LargeRequest() {
// Distribute the requests over all services
url := fmt.Sprintf("http://%s-%d.%s/put", s.Name(), x%s.ServiceCount(), s.AppNamespace())
req, err := http.NewRequest(http.MethodPut, url, bytes.NewBuffer(b))
require.NoError(err)
if !s.NoError(err) {
return
}

resp, err := client.Do(req)
require.NoError(err)
defer resp.Body.Close()
require.Equal(resp.StatusCode, 200)

// Read start
buf := make([]byte, sendSize)
var sb []byte
b1 := buf[:1]
for {
if _, err = resp.Body.Read(b1); err != nil || b1[0] == '!' {
break
}
sb = append(sb, b1[0])
if !s.NoError(err) {
return
}
bdy := resp.Body
defer bdy.Close()
if !s.Equal(resp.StatusCode, 200) {
return
}
require.NoError(err)
b1 = buf[1:2]
_, err = resp.Body.Read(b1)
require.Equal(b1[0], byte('\n'))
require.NoError(err)

i := 2
for err == nil {

cl := sendSize + 1024
buf := make([]byte, cl)
i := 0
for i < cl && err == nil {
var j int
j, err = resp.Body.Read(buf[i:])
j, err = bdy.Read(buf[i:])
i += j
}
// Do this instead of require.Equal(b, buf) so that on failure we don't print two very large buffers to the terminal
require.Equalf(sendSize, i, "Size of response body not equal sent body. %s", string(sb))
require.Equal(true, bytes.Equal(b, buf))
require.Equal(io.EOF, err)
if errors.Is(err, io.EOF) {
err = nil
}
if s.NoError(err) {
ei := bytes.Index(buf, []byte{'!', '\n'})
s.GreaterOrEqual(ei, 0)
// Do this instead of require.Equal(b, buf[ei:i]) so that on failure we don't print two very large buffers to the terminal
s.Equal(true, bytes.Equal(b, buf[ei:i]))
}
}(i)
}
wg.Wait()
Expand Down
58 changes: 38 additions & 20 deletions pkg/agentmap/discorvery.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,12 @@ import (
"fmt"
"regexp"

appsv1 "k8s.io/api/apps/v1"
core "k8s.io/api/core/v1"
k8sErrors "k8s.io/apimachinery/pkg/api/errors"
meta "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/util/intstr"
apps "k8s.io/client-go/informers/apps/v1"

"github.com/datawire/dlib/dlog"
"github.com/datawire/k8sapi/pkg/k8sapi"
Expand All @@ -28,19 +28,20 @@ func FindOwnerWorkload(ctx context.Context, obj k8sapi.Object) (k8sapi.Workload,
return GetWorkload(ctx, wlName, obj.GetNamespace(), lbs[agentconfig.WorkloadKindLabel])
}
refs := obj.GetOwnerReferences()
ns := obj.GetNamespace()
for i := range refs {
if or := &refs[i]; or.Controller != nil && *or.Controller {
if or.Kind == "ReplicaSet" {
// Try the common case first. Strip replicaset's generated hash and try to
// get the deployment. If this succeeds, we have saved us a replicaset
// lookup.
if m := ReplicaSetNameRx.FindStringSubmatch(or.Name); m != nil {
if wl, err := GetWorkload(ctx, m[1], obj.GetNamespace(), "Deployment"); err == nil {
if wl, err := GetWorkload(ctx, m[1], ns, "Deployment"); err == nil {
return wl, nil
}
}
}
wl, err := GetWorkload(ctx, or.Name, obj.GetNamespace(), or.Kind)
wl, err := GetWorkload(ctx, or.Name, ns, or.Kind)
if err != nil {
return nil, err
}
Expand All @@ -55,42 +56,59 @@ func FindOwnerWorkload(ctx context.Context, obj k8sapi.Object) (k8sapi.Workload,

func GetWorkload(ctx context.Context, name, namespace, workloadKind string) (obj k8sapi.Workload, err error) {
dlog.Debugf(ctx, "GetWorkload(%s,%s,%s)", name, namespace, workloadKind)
f := informer.GetFactory(ctx, namespace)
if f == nil {
dlog.Debugf(ctx, "fetching %s %s.%s using direct API call", workloadKind, name, namespace)
return k8sapi.GetWorkload(ctx, name, namespace, workloadKind)
}
return getWorkload(f.Apps().V1(), name, namespace, workloadKind)
}

func getWorkload(ai apps.Interface, name, namespace, workloadKind string) (obj k8sapi.Workload, err error) {
switch workloadKind {
case "Deployment":
obj, err = getDeployment(ctx, name, namespace)
return getDeployment(ai, name, namespace)
case "ReplicaSet":
obj, err = k8sapi.GetReplicaSet(ctx, name, namespace)
return getReplicaSet(ai, name, namespace)
case "StatefulSet":
obj, err = k8sapi.GetStatefulSet(ctx, name, namespace)
return getStatefulSet(ai, name, namespace)
case "":
for _, wk := range []string{"Deployment", "ReplicaSet", "StatefulSet"} {
if obj, err = GetWorkload(ctx, name, namespace, wk); err == nil {
if obj, err = getWorkload(ai, name, namespace, wk); err == nil {
return obj, nil
}
if !k8sErrors.IsNotFound(err) {
return nil, err
}
}
err = k8sErrors.NewNotFound(core.Resource("workload"), name+"."+namespace)
return nil, k8sErrors.NewNotFound(core.Resource("workload"), name+"."+namespace)
default:
return nil, k8sapi.UnsupportedWorkloadKindError(workloadKind)
}
return obj, err
}

func getDeployment(ctx context.Context, name, namespace string) (obj k8sapi.Workload, err error) {
if f := informer.GetFactory(ctx, namespace); f != nil {
var dep *appsv1.Deployment
dep, err = f.Apps().V1().Deployments().Lister().Deployments(namespace).Get(name)
if err == nil {
obj = k8sapi.Deployment(dep)
}
return obj, err
func getDeployment(ai apps.Interface, name, namespace string) (wl k8sapi.Workload, err error) {
dep, err := ai.Deployments().Lister().Deployments(namespace).Get(name)
if err != nil {
return nil, err
}
return k8sapi.Deployment(dep), nil
}

// This shouldn't happen really.
dlog.Debugf(ctx, "fetching deployment %s.%s using direct API call", name, namespace)
return k8sapi.GetDeployment(ctx, name, namespace)
func getReplicaSet(ai apps.Interface, name, namespace string) (k8sapi.Workload, error) {
rs, err := ai.ReplicaSets().Lister().ReplicaSets(namespace).Get(name)
if err != nil {
return nil, err
}
return k8sapi.ReplicaSet(rs), nil
}

func getStatefulSet(ai apps.Interface, name, namespace string) (k8sapi.Workload, error) {
ss, err := ai.StatefulSets().Lister().StatefulSets(namespace).Get(name)
if err != nil {
return nil, err
}
return k8sapi.StatefulSet(ss), nil
}

func findServicesForPod(ctx context.Context, pod *core.PodTemplateSpec, svcName string) ([]k8sapi.Object, error) {
Expand Down
4 changes: 2 additions & 2 deletions pkg/client/userd/trafficmgr/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -470,8 +470,8 @@ func (s *session) Remain(ctx context.Context) error {
defer cancel()
_, err := self.ManagerClient().Remain(ctx, self.NewRemainRequest())
if err != nil {
if status.Code(err) == codes.NotFound {
// Session has expired. We need to cancel the owner session and reconnect
if status.Code(err) == codes.NotFound || status.Code(err) == codes.Unavailable {
// The session has expired. We need to cancel the owner session and reconnect.
return ErrSessionExpired
}
dlog.Errorf(ctx, "error calling Remain: %v", client.CheckTimeout(ctx, err))
Expand Down
4 changes: 2 additions & 2 deletions pkg/dnet/kpfconn.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,14 +88,14 @@ func (pf *k8sPortForwardDialer) Dial(ctx context.Context, addr string) (conn net
return conn, nil
}
}
dlog.Errorf(pf.logCtx, "Error with k8sPortForwardDialer dial: %s", err)
dlog.Errorf(pf.logCtx, "Error with k8sPortForwardDialer dial %s: %s", addr, err)
return nil, err
}

func (pf *k8sPortForwardDialer) DialPod(ctx context.Context, name, namespace string, podPortNumber uint16) (net.Conn, error) {
conn, err := pf.dial(ctx, &podAddress{name: name, namespace: namespace, port: podPortNumber})
if err != nil {
dlog.Errorf(pf.logCtx, "Error with k8sPortForwardDialer dial: %s", err)
dlog.Errorf(pf.logCtx, "Error with k8sPortForwardDialer dial %s.%s:%d: %s", name, namespace, podPortNumber, err)
}
return conn, err
}
Expand Down

0 comments on commit 6b9cb05

Please sign in to comment.