Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[wip] Private Service doesn't need a cluster IP #15392

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions pkg/activator/net/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,13 +92,13 @@ func endpointsToDests(endpoints *corev1.Endpoints, portName string) (ready, notR
return ready, notReady
}

// getServicePort takes a service and a protocol and returns the port number of
// getTargetPort takes a service and a protocol and returns the port number of
// the port named for that protocol. If the port is not found then ok is false.
func getServicePort(protocol networking.ProtocolType, svc *corev1.Service) (int, bool) {
func getTargetPort(protocol networking.ProtocolType, svc *corev1.Service) (int, bool) {
wantName := networking.ServicePortName(protocol)
for _, p := range svc.Spec.Ports {
if p.Name == wantName {
return int(p.Port), true
return p.TargetPort.IntValue(), true
}
}

Expand Down
13 changes: 7 additions & 6 deletions pkg/activator/net/helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (

"github.com/google/go-cmp/cmp"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/apimachinery/pkg/util/sets"
"knative.dev/networking/pkg/apis/networking"
)
Expand Down Expand Up @@ -153,7 +154,7 @@ func TestEndpointsToDests(t *testing.T) {
}
}

func TestGetServicePort(t *testing.T) {
func TestGetTargetPort(t *testing.T) {
for _, tc := range []struct {
name string
protocol networking.ProtocolType
Expand All @@ -164,17 +165,17 @@ func TestGetServicePort(t *testing.T) {
name: "Single port",
protocol: networking.ProtocolHTTP1,
ports: []corev1.ServicePort{{
Name: "http",
Port: 100,
Name: "http",
TargetPort: intstr.FromInt(100),
}},
expect: 100,
expectOK: true,
}, {
name: "Missing port",
protocol: networking.ProtocolHTTP1,
ports: []corev1.ServicePort{{
Name: "invalid",
Port: 100,
Name: "invalid",
TargetPort: intstr.FromInt(100),
}},
expect: 0,
expectOK: false,
Expand All @@ -186,7 +187,7 @@ func TestGetServicePort(t *testing.T) {
},
}

port, ok := getServicePort(tc.protocol, &svc)
port, ok := getTargetPort(tc.protocol, &svc)
if ok != tc.expectOK {
t.Errorf("Wanted ok %v, got %v", tc.expectOK, ok)
}
Expand Down
63 changes: 32 additions & 31 deletions pkg/activator/net/revision_backends.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ import (
"knative.dev/pkg/controller"
"knative.dev/pkg/logging"
"knative.dev/pkg/logging/logkey"
"knative.dev/pkg/network"
"knative.dev/pkg/reconciler"
"knative.dev/serving/pkg/apis/serving"
revisioninformer "knative.dev/serving/pkg/client/injection/informers/serving/v1/revision"
Expand All @@ -62,9 +63,9 @@ import (
// ClusterIPDest will be set to non empty string and Dests will be nil. Otherwise Dests will be set
// to a slice of healthy l4 dests for reaching the revision.
type revisionDestsUpdate struct {
Rev types.NamespacedName
ClusterIPDest string
Dests sets.Set[string]
Rev types.NamespacedName
PrivateService string
Dests sets.Set[string]
}

type dests struct {
Expand All @@ -91,7 +92,7 @@ const (
defaultProbeFrequency time.Duration = 200 * time.Millisecond
)

// revisionWatcher watches the podIPs and ClusterIP of the service for a revision. It implements the logic
// revisionWatcher watches the podIPs/service of a revision. It implements the logic
// to supply revisionDestsUpdate events on updateCh
type revisionWatcher struct {
stopCh <-chan struct{}
Expand All @@ -103,8 +104,9 @@ type revisionWatcher struct {

// Stores the list of pods that have been successfully probed.
healthyPods sets.Set[string]
// Stores whether the service ClusterIP has been seen as healthy.
clusterIPHealthy bool

// Stores whether the private k8s service has been seen as healthy.
privateServiceHealthy bool

transport http.RoundTripper
destsCh chan dests
Expand Down Expand Up @@ -200,23 +202,22 @@ func (rw *revisionWatcher) probe(ctx context.Context, dest string) (pass bool, n
return match, notMesh, err
}

func (rw *revisionWatcher) getDest() (string, error) {
svc, err := rw.serviceLister.Services(rw.rev.Namespace).Get(names.PrivateService(rw.rev.Name))
func (rw *revisionWatcher) getPrivateServiceDest() (string, error) {
svcName := names.PrivateService(rw.rev.Name)
svc, err := rw.serviceLister.Services(rw.rev.Namespace).Get(svcName)
if err != nil {
return "", err
}
if svc.Spec.ClusterIP == "" {
return "", fmt.Errorf("private service %s/%s clusterIP is nil, this should never happen", svc.ObjectMeta.Namespace, svc.ObjectMeta.Name)
}

svcPort, ok := getServicePort(rw.protocol, svc)
svcHostname := network.GetServiceHostname(svcName, rw.rev.Namespace)
svcPort, ok := getTargetPort(rw.protocol, svc)
if !ok {
return "", fmt.Errorf("unable to find port in service %s/%s", svc.Namespace, svc.Name)
}
return net.JoinHostPort(svc.Spec.ClusterIP, strconv.Itoa(svcPort)), nil
return net.JoinHostPort(svcHostname, strconv.Itoa(svcPort)), nil
}

func (rw *revisionWatcher) probeClusterIP(dest string) (bool, error) {
func (rw *revisionWatcher) probePrivateService(dest string) (bool, error) {
ctx, cancel := context.WithTimeout(context.Background(), probeTimeout)
defer cancel()
match, _, err := rw.probe(ctx, dest)
Expand Down Expand Up @@ -296,12 +297,12 @@ func (rw *revisionWatcher) probePodIPs(ready, notReady sets.Set[string]) (succee
return healthy, unchanged, sawNotMesh.Load(), err
}

func (rw *revisionWatcher) sendUpdate(clusterIP string, dests sets.Set[string]) {
func (rw *revisionWatcher) sendUpdate(privateService string, dests sets.Set[string]) {
select {
case <-rw.stopCh:
return
default:
rw.updateCh <- revisionDestsUpdate{Rev: rw.rev, ClusterIPDest: clusterIP, Dests: dests}
rw.updateCh <- revisionDestsUpdate{Rev: rw.rev, PrivateService: privateService, Dests: dests}
}
}

Expand All @@ -310,9 +311,9 @@ func (rw *revisionWatcher) sendUpdate(clusterIP string, dests sets.Set[string])
func (rw *revisionWatcher) checkDests(curDests, prevDests dests) {
if len(curDests.ready) == 0 && len(curDests.notReady) == 0 {
// We must have scaled down.
rw.clusterIPHealthy = false
rw.privateServiceHealthy = false
rw.healthyPods = nil
rw.logger.Debug("ClusterIP is no longer healthy.")
rw.logger.Debug("Private service is no longer healthy.")
// Send update that we are now inactive (both params invalid).
rw.sendUpdate("", nil)
return
Expand Down Expand Up @@ -351,7 +352,7 @@ func (rw *revisionWatcher) checkDests(curDests, prevDests dests) {
// Note: it's important that this copies (via hs.Union) the healthy pods
// set before sending the update to avoid concurrent modifications
// affecting the throttler, which iterates over the set.
rw.sendUpdate("" /*clusterIP*/, hs.Union(nil))
rw.sendUpdate("", hs.Union(nil))
return
}
// no-op, and we have successfully probed at least one pod.
Expand Down Expand Up @@ -380,28 +381,28 @@ func (rw *revisionWatcher) checkDests(curDests, prevDests dests) {
// If we failed to probe even a single pod, check the clusterIP.
// NB: We can't cache the IP address, since user might go rogue
// and delete the K8s service. We'll fix it, but the cluster IP will be different.
dest, err := rw.getDest()
dest, err := rw.getPrivateServiceDest()
if err != nil {
rw.logger.Errorw("Failed to determine service destination", zap.Error(err))
return
}

// If cluster IP is healthy and we haven't scaled down, short circuit.
if rw.clusterIPHealthy {
rw.logger.Debugf("ClusterIP %s already probed (ready backends: %d)", dest, len(curDests.ready))
// If service hostname is healthy and we haven't scaled down, short circuit.
if rw.privateServiceHealthy {
rw.logger.Debugf("service hostname %s already probed (ready backends: %d)", dest, len(curDests.ready))
rw.sendUpdate(dest, curDests.ready)
return
}

// If clusterIP is healthy send this update and we are done.
if ok, err := rw.probeClusterIP(dest); err != nil {
rw.logger.Errorw("Failed to probe clusterIP "+dest, zap.Error(err))
// If service via hostname is healthy send this update and we are done.
if ok, err := rw.probePrivateService(dest); err != nil {
rw.logger.Errorw("Failed to probe private service: "+dest, zap.Error(err))
} else if ok {
// We can reach here only iff pods are not successfully individually probed
// but ClusterIP conversely has been successfully probed.
// but PrivateService conversely has been successfully probed.
rw.podsAddressable = false
rw.logger.Debugf("ClusterIP is successfully probed: %s (ready backends: %d)", dest, len(curDests.ready))
rw.clusterIPHealthy = true
rw.logger.Debugf("Private service is successfully probed: %s (ready backends: %d)", dest, len(curDests.ready))
rw.privateServiceHealthy = true
rw.healthyPods = nil
rw.sendUpdate(dest, curDests.ready)
}
Expand All @@ -421,8 +422,8 @@ func (rw *revisionWatcher) run(probeFrequency time.Duration) {
// then we want to probe on timer.
rw.logger.Debugw("Revision state", zap.Object("dests", curDests),
zap.Object("healthy", logging.StringSet(rw.healthyPods)),
zap.Bool("clusterIPHealthy", rw.clusterIPHealthy))
if len(curDests.ready)+len(curDests.notReady) > 0 && !(rw.clusterIPHealthy ||
zap.Bool("clusterHealthy", rw.privateServiceHealthy))
if len(curDests.ready)+len(curDests.notReady) > 0 && !(rw.privateServiceHealthy ||
curDests.ready.Union(curDests.notReady).Equal(rw.healthyPods)) {
rw.logger.Debug("Probing on timer")
tickCh = timer.C
Expand Down
Loading
Loading