diff --git a/kwok/charts/crds/karpenter.sh_nodeclaims.yaml b/kwok/charts/crds/karpenter.sh_nodeclaims.yaml index e10563ab7b..80836302fd 100644 --- a/kwok/charts/crds/karpenter.sh_nodeclaims.yaml +++ b/kwok/charts/crds/karpenter.sh_nodeclaims.yaml @@ -81,6 +81,14 @@ spec: memory leak protection, and disruption testing. pattern: ^(([0-9]+(s|m|h))+)|(Never)$ type: string + minimumPriceImprovementPercent: + description: |- + MinimumPriceImprovementPercent is the minimum price improvement necessary to disrupt this node, as an integer percentage. + The default is 0%, which maintains the existing consolidation behavior prior to this feature. + format: int32 + maximum: 100 + minimum: 0 + type: integer nodeClassRef: description: NodeClassRef is a reference to an object that defines provider specific configuration properties: diff --git a/pkg/apis/crds/karpenter.sh_nodeclaims.yaml b/pkg/apis/crds/karpenter.sh_nodeclaims.yaml index 5f5bff9ae0..7ad8fa8547 100644 --- a/pkg/apis/crds/karpenter.sh_nodeclaims.yaml +++ b/pkg/apis/crds/karpenter.sh_nodeclaims.yaml @@ -81,6 +81,14 @@ spec: memory leak protection, and disruption testing. pattern: ^(([0-9]+(s|m|h))+)|(Never)$ type: string + minimumPriceImprovementPercent: + description: |- + MinimumPriceImprovementPercent is the minimum price improvement necessary to disrupt this node, as an integer percentage. + The default is 0%, which maintains the existing consolidation behavior prior to this feature. + format: int32 + maximum: 100 + minimum: 0 + type: integer nodeClassRef: description: NodeClassRef is a reference to an object that defines provider specific configuration properties: diff --git a/pkg/apis/v1/nodeclaim.go b/pkg/apis/v1/nodeclaim.go index 1c163858ba..8758a9a762 100644 --- a/pkg/apis/v1/nodeclaim.go +++ b/pkg/apis/v1/nodeclaim.go @@ -73,6 +73,12 @@ type NodeClaimSpec struct { // +kubebuilder:validation:Schemaless // +optional ExpireAfter NillableDuration `json:"expireAfter,omitempty"` + // MinimumPriceImprovementPercent is the minimum price improvement necessary to disrupt this node, as an integer percentage. + // The default is 0%, which maintains the existing consolidation behavior prior to this feature. + // +kubebuilder:validation:Minimum:=0 + // +kubebuilder:validation:Maximum:=100 + // +optional + MinimumPriceImprovementPercent *int32 `json:"minimumPriceImprovementPercent,omitempty"` } // A node selector requirement with min values is a selector that contains values, a key, an operator that relates the key and values diff --git a/pkg/apis/v1/zz_generated.deepcopy.go b/pkg/apis/v1/zz_generated.deepcopy.go index e1d62f319c..36cd164114 100644 --- a/pkg/apis/v1/zz_generated.deepcopy.go +++ b/pkg/apis/v1/zz_generated.deepcopy.go @@ -217,6 +217,11 @@ func (in *NodeClaimSpec) DeepCopyInto(out *NodeClaimSpec) { **out = **in } in.ExpireAfter.DeepCopyInto(&out.ExpireAfter) + if in.MinimumPriceImprovementPercent != nil { + in, out := &in.MinimumPriceImprovementPercent, &out.MinimumPriceImprovementPercent + *out = new(int32) + **out = **in + } } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new NodeClaimSpec. diff --git a/pkg/controllers/disruption/consolidation.go b/pkg/controllers/disruption/consolidation.go index e3539be371..30ab26cad2 100644 --- a/pkg/controllers/disruption/consolidation.go +++ b/pkg/controllers/disruption/consolidation.go @@ -27,6 +27,7 @@ import ( corev1 "k8s.io/api/core/v1" "k8s.io/utils/clock" "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/log" v1 "sigs.k8s.io/karpenter/pkg/apis/v1" "sigs.k8s.io/karpenter/pkg/cloudprovider" @@ -149,7 +150,7 @@ func (c *consolidation) computeConsolidation(ctx context.Context, candidates ... // get the current node price based on the offering // fallback if we can't find the specific zonal pricing data - candidatePrice, err := getCandidatePrices(candidates) + candidatePrice, err := getCandidatePrices(ctx, candidates) if err != nil { return Command{}, pscheduling.Results{}, fmt.Errorf("getting offering price from candidate node, %w", err) } @@ -285,14 +286,32 @@ func (c *consolidation) computeSpotToSpotConsolidation(ctx context.Context, cand } // getCandidatePrices returns the sum of the prices of the given candidates -func getCandidatePrices(candidates []*Candidate) (float64, error) { +func getCandidatePrices(ctx context.Context, candidates []*Candidate) (float64, error) { var price float64 for _, c := range candidates { compatibleOfferings := c.instanceType.Offerings.Compatible(scheduling.NewLabelRequirements(c.StateNode.Labels())) if len(compatibleOfferings) == 0 { return 0.0, fmt.Errorf("unable to determine offering for %s/%s/%s", c.instanceType.Name, c.capacityType, c.zone) } - price += compatibleOfferings.Cheapest().Price + + // limit maximum candidate replacement price for consideration + originalCheapestPrice := compatibleOfferings.Cheapest().Price + cheapestConsiderablePrice := originalCheapestPrice + if candidates[0].NodeClaim.Spec.MinimumPriceImprovementPercent != nil { + candidateMaxPriceFactor := 1 - (float64(*c.NodeClaim.Spec.MinimumPriceImprovementPercent) / 100) + cheapestConsiderablePrice *= candidateMaxPriceFactor + + log.FromContext(ctx).WithValues( + "originalPrice", originalCheapestPrice, + "requiredPrice", cheapestConsiderablePrice, + "minimumPriceImprovementPercent", *c.NodeClaim.Spec.MinimumPriceImprovementPercent, + ).Info("price threshold for consolidation") + + // con.recorder.Publish(disruptionevents.PriceThreshold(c.Node, c.NodeClaim, fmt.Sprintf("PriceThreshold for Consolidation, original price: %v, required price: %v, price factor: %v%", + // originalCheapestPrice, cheapestConsiderablePrice, *c.NodeClaim.Spec.MinimumPriceImprovementPercent))) + } + + price += cheapestConsiderablePrice } return price, nil } diff --git a/pkg/controllers/disruption/events/events.go b/pkg/controllers/disruption/events/events.go index dea770d5dd..10d017f149 100644 --- a/pkg/controllers/disruption/events/events.go +++ b/pkg/controllers/disruption/events/events.go @@ -90,6 +90,28 @@ func Unconsolidatable(node *corev1.Node, nodeClaim *v1.NodeClaim, reason string) } } +// PriceThreshold is an event that notes the maximum price threshold that a NodeClaim/Node requires to allow consolidation +func PriceThreshold(node *corev1.Node, nodeClaim *v1.NodeClaim, reason string) []events.Event { + return []events.Event{ + { + InvolvedObject: node, + Type: corev1.EventTypeNormal, + Reason: "PriceThreshold", + Message: reason, + DedupeValues: []string{string(node.UID)}, + DedupeTimeout: time.Minute * 15, + }, + { + InvolvedObject: nodeClaim, + Type: corev1.EventTypeNormal, + Reason: "PriceThreshold", + Message: reason, + DedupeValues: []string{string(nodeClaim.UID)}, + DedupeTimeout: time.Minute * 15, + }, + } +} + // Blocked is an event that informs the user that a NodeClaim/Node combination is blocked on deprovisioning // due to the state of the NodeClaim/Node or due to some state of the pods that are scheduled to the NodeClaim/Node func Blocked(node *corev1.Node, nodeClaim *v1.NodeClaim, reason string) (evs []events.Event) {