Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(delta): force push EDS once CDS sent for Ads #981

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions pkg/cache/v3/delta.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func createDeltaResponse(ctx context.Context, req *DeltaRequest, state stream.St
version := resources.versionMap[name]
nextVersionMap[name] = version
prevVersion, found := state.GetResourceVersions()[name]
if !found || (prevVersion != version) {
if !found || (prevVersion != version) || state.ShouldForcePushResource(name) {
filtered = append(filtered, r)
}
}
Expand All @@ -67,7 +67,7 @@ func createDeltaResponse(ctx context.Context, req *DeltaRequest, state stream.St
prevVersion, found := state.GetResourceVersions()[name]
if r, ok := resources.resourceMap[name]; ok {
nextVersion := resources.versionMap[name]
if prevVersion != nextVersion {
if prevVersion != nextVersion || state.ShouldForcePushResource(name) {
filtered = append(filtered, r)
}
nextVersionMap[name] = nextVersion
Expand Down
89 changes: 89 additions & 0 deletions pkg/cache/v3/delta_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"google.golang.org/protobuf/testing/protocmp"
"google.golang.org/protobuf/types/known/durationpb"

core "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
discovery "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3"
Expand Down Expand Up @@ -296,3 +297,91 @@ func TestSnapshotCacheDeltaWatchCancel(t *testing.T) {
t.Errorf("should not return a status for unknown key: got %#v", s)
}
}

func TestSnapshotCacheDeltaWatchWithForceEDSOfRelevantEndpoints(t *testing.T) {
c := cache.NewSnapshotCache(true, group{}, logger{t: t})
watches := make(map[string]chan cache.DeltaResponse)

// Make our initial request as a wildcard to get all resources and make sure the wildcard requesting works as intended
for _, typ := range testTypes {
watches[typ] = make(chan cache.DeltaResponse, 1)
c.CreateDeltaWatch(&discovery.DeltaDiscoveryRequest{
Node: &core.Node{
Id: "node",
},
TypeUrl: typ,
ResourceNamesSubscribe: names[typ],
}, stream.NewStreamState(true, nil), watches[typ])
}

if err := c.SetSnapshot(context.Background(), key, fixture.snapshotTwoClusters()); err != nil {
t.Fatal(err)
}

versionMap := make(map[string]map[string]string)
for _, typ := range testTypes {
t.Run(typ, func(t *testing.T) {
select {
case out := <-watches[typ]:
snapshot := fixture.snapshotTwoClusters()
assertResourceMapEqual(t, cache.IndexRawResourcesByName(out.(*cache.RawDeltaResponse).Resources), snapshot.GetResources(typ))
vMap := out.GetNextVersionMap()
versionMap[typ] = vMap
case <-time.After(time.Second):
t.Fatal("failed to receive snapshot response")
}
})
}

// On re-request we want to use non-wildcard so we can verify the logic path of not requesting
// all resources as well as individual resource removals
for _, typ := range testTypes {
watches[typ] = make(chan cache.DeltaResponse, 1)
state := stream.NewStreamState(false, versionMap[typ])
for resource := range versionMap[typ] {
state.GetSubscribedResourceNames()[resource] = struct{}{}
}
c.CreateDeltaWatch(&discovery.DeltaDiscoveryRequest{
Node: &core.Node{
Id: "node",
},
TypeUrl: typ,
ResourceNamesSubscribe: names[typ],
}, state, watches[typ])
}

if count := c.GetStatusInfo(key).GetNumDeltaWatches(); count != len(testTypes) {
t.Errorf("watches should be created for the latest version, saw %d watches expected %d", count, len(testTypes))
}

// set partially-versioned snapshot
snapshot2 := fixture.snapshotTwoClusters()
cluster := resource.MakeCluster(resource.Ads, clusterName)
cluster.ConnectTimeout = durationpb.New(99 * time.Second)
snapshot2.Resources[types.Cluster] = cache.NewResources(fixture.version2, []types.Resource{cluster, anotherTestCluster})
if err := c.SetSnapshot(context.Background(), key, snapshot2); err != nil {
t.Fatal(err)
}
if count := c.GetStatusInfo(key).GetNumDeltaWatches(); count != len(testTypes)-2 {
t.Errorf("watches should be preserved for all but two, got: %d open watches instead of the expected %d open watches", count, len(testTypes)-1)
}

// validate response for endpoints
select {
case out := <-watches[testTypes[1]]:
snapshot2 := fixture.snapshotTwoClusters()
snapshot2.Resources[types.Cluster] = cache.NewResources(fixture.version2, []types.Resource{cluster})
assertResourceMapEqual(t, cache.IndexRawResourcesByName(out.(*cache.RawDeltaResponse).Resources), snapshot2.GetResources(rsrc.ClusterType))
vMap := out.GetNextVersionMap()
versionMap[testTypes[1]] = vMap
case out := <-watches[testTypes[0]]:
for _, resource := range out.(*cache.RawDeltaResponse).Resources {
// should send only endpoint of the changed cluster
assert.Equal(t, resource, testEndpoint)
}
vMap := out.GetNextVersionMap()
versionMap[testTypes[0]] = vMap
case <-time.After(time.Second):
t.Fatal("failed to receive snapshot response")
}
}
22 changes: 22 additions & 0 deletions pkg/cache/v3/fixtures_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,3 +37,25 @@ func (f *fixtureGenerator) snapshot() *cache.Snapshot {

return snapshot
}

func (f *fixtureGenerator) snapshotTwoClusters() *cache.Snapshot {
snapshot, err := cache.NewSnapshot(
f.version,
map[rsrc.Type][]types.Resource{
rsrc.EndpointType: {testEndpoint, anotherTestEndpoint},
rsrc.ClusterType: {testCluster, anotherTestCluster},
rsrc.RouteType: {testRoute, testEmbeddedRoute},
rsrc.ScopedRouteType: {testScopedRoute},
rsrc.VirtualHostType: {testVirtualHost},
rsrc.ListenerType: {testScopedListener, testListener},
rsrc.RuntimeType: {testRuntime},
rsrc.SecretType: {testSecret[0]},
rsrc.ExtensionConfigType: {testExtensionConfig},
},
)
if err != nil {
panic(err.Error())
}

return snapshot
}
3 changes: 3 additions & 0 deletions pkg/cache/v3/resource_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (

const (
clusterName = "cluster0"
anotherClusterName = "cluster1"
routeName = "route0"
embeddedRouteName = "embeddedRoute0"
scopedRouteName = "scopedRoute0"
Expand All @@ -45,7 +46,9 @@ const (

var (
testEndpoint = resource.MakeEndpoint(clusterName, 8080)
anotherTestEndpoint = resource.MakeEndpoint(anotherClusterName, 9090)
testCluster = resource.MakeCluster(resource.Ads, clusterName)
anotherTestCluster = resource.MakeCluster(resource.Ads, anotherClusterName)
testRoute = resource.MakeRouteConfig(routeName, clusterName)
testEmbeddedRoute = resource.MakeRouteConfig(embeddedRouteName, clusterName)
testScopedRoute = resource.MakeScopedRouteConfig(scopedRouteName, routeName, []string{"1.2.3.4"})
Expand Down
18 changes: 18 additions & 0 deletions pkg/cache/v3/simple.go
Original file line number Diff line number Diff line change
Expand Up @@ -303,8 +303,14 @@ func (cache *snapshotCache) respondDeltaWatches(ctx context.Context, info *statu
// of maps are randomized order when ranged over.
if cache.ads {
info.orderResponseDeltaWatches()
forcePushResources := map[types.ResponseType][]string{}
for _, key := range info.orderedDeltaWatches {
watch := info.deltaWatches[key.ID]
responseType := GetResponseType(watch.Request.TypeUrl)
if resources, found := forcePushResources[responseType]; found {
watch.StreamState.SetForcePushResource(resources)
}

res, err := cache.respondDelta(
ctx,
snapshot,
Expand All @@ -319,6 +325,9 @@ func (cache *snapshotCache) respondDeltaWatches(ctx context.Context, info *statu
// so we don't want to respond or remove any existing resource watches
if res != nil {
delete(info.deltaWatches, key.ID)
if indexed, typ, ok := getForcePushEdsResourcesNames(responseType, res); ok {
forcePushResources[typ] = indexed
}
}
}
} else {
Expand Down Expand Up @@ -496,6 +505,15 @@ func (cache *snapshotCache) respond(ctx context.Context, request *Request, value
}
}

func getForcePushEdsResourcesNames(typ types.ResponseType, response *RawDeltaResponse) ([]string, types.ResponseType, bool) {
switch typ {
case types.Cluster:
return GetResourceNames(response.Resources), types.Endpoint, true
default:
return []string{}, types.UnknownType, false
}
}

func createResponse(ctx context.Context, request *Request, resources map[string]types.ResourceWithTTL, version string, heartbeat bool) Response {
filtered := make([]types.ResourceWithTTL, 0, len(resources))

Expand Down
19 changes: 19 additions & 0 deletions pkg/server/stream/v3/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,14 @@ type StreamState struct { // nolint:golint,revive

// Ordered indicates whether we want an ordered ADS stream or not
ordered bool

// ForcePush indicates if should push the response even when the version is the same
// https://www.envoyproxy.io/docs/envoy/latest/api-docs/xds_protocol#resource-warming
// This is required in the situation:
// 1) There is a Cluster change and control plane responds with CDS
// 2) Envoy has a cluster in the warming phase until there is a EDS response, if endpoints haven't changed
// there is no EDS send and changes to the clusters are blocked
forcePushResource map[string]bool
}

// NewStreamState initializes a stream state.
Expand All @@ -54,6 +62,7 @@ func NewStreamState(wildcard bool, initialResourceVersions map[string]string) St
first: true,
knownResourceNames: map[string]map[string]struct{}{},
ordered: false, // Ordered comes from the first request since that's when we discover if they want ADS
forcePushResource: map[string]bool{},
}

if initialResourceVersions == nil {
Expand Down Expand Up @@ -97,6 +106,12 @@ func (s *StreamState) SetWildcard(wildcard bool) {
s.wildcard = wildcard
}

func (s *StreamState) SetForcePushResource(forcePushResources []string) {
for _, resName := range forcePushResources {
s.forcePushResource[resName] = true
}
}

// GetResourceVersions returns a map of current resources grouped by type URL.
func (s *StreamState) GetResourceVersions() map[string]string {
return s.resourceVersions
Expand All @@ -119,6 +134,10 @@ func (s *StreamState) IsWildcard() bool {
return s.wildcard
}

func (s *StreamState) ShouldForcePushResource(resourceName string) bool {
return s.forcePushResource[resourceName]
}

// GetKnownResourceNames returns the current known list of resources on a SOTW stream.
func (s *StreamState) GetKnownResourceNames(url string) map[string]struct{} {
return s.knownResourceNames[url]
Expand Down