diff --git a/go.mod b/go.mod index 4e7cbfa0be..202e4dd3ac 100644 --- a/go.mod +++ b/go.mod @@ -1,11 +1,11 @@ module sigs.k8s.io/karpenter -go 1.22.3 +go 1.22.4 require ( github.com/Pallinder/go-randomdata v1.2.0 github.com/avast/retry-go v3.0.0+incompatible - github.com/awslabs/operatorpkg v0.0.0-20240514175841-edb8fe5824b4 + github.com/awslabs/operatorpkg v0.0.0-20240605172541-88cf99023fa4 github.com/docker/docker v26.1.3+incompatible github.com/go-logr/logr v1.4.2 github.com/imdario/mergo v0.3.16 @@ -29,6 +29,7 @@ require ( k8s.io/component-base v0.30.1 k8s.io/csi-translation-lib v0.30.1 k8s.io/klog/v2 v2.120.1 + k8s.io/kubernetes v1.30.1 k8s.io/utils v0.0.0-20240102154912-e7106e64919e knative.dev/pkg v0.0.0-20230712131115-7051d301e7f4 sigs.k8s.io/controller-runtime v0.18.3 @@ -90,7 +91,7 @@ require ( golang.org/x/term v0.20.0 // indirect golang.org/x/tools v0.21.0 // indirect gomodules.xyz/jsonpatch/v2 v2.4.0 // indirect - google.golang.org/api v0.124.0 // indirect + google.golang.org/api v0.126.0 // indirect google.golang.org/appengine v1.6.7 // indirect google.golang.org/genproto/googleapis/api v0.0.0-20230726155614-23370e0ffb3e // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20230822172742-b8732ec3820d // indirect diff --git a/go.sum b/go.sum index 7c6a12ec9c..e1752daea6 100644 --- a/go.sum +++ b/go.sum @@ -47,8 +47,8 @@ github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d/go.mod h1:rBZYJk5 github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kdvxnR2qWY= github.com/avast/retry-go v3.0.0+incompatible h1:4SOWQ7Qs+oroOTQOYnAHqelpCO0biHSxpiH9JdtuBj0= github.com/avast/retry-go v3.0.0+incompatible/go.mod h1:XtSnn+n/sHqQIpZ10K1qAevBhOOCWBLXXy3hyiqqBrY= -github.com/awslabs/operatorpkg v0.0.0-20240514175841-edb8fe5824b4 h1:Du6S9Xa+/VuCpoSwOW7QpPM30IEIvB91WJEIltWQwRk= -github.com/awslabs/operatorpkg v0.0.0-20240514175841-edb8fe5824b4/go.mod h1:YcidmUg8Pjk349+jd+sRCdo6h3jzxqAY1VDNgVJKbSA= +github.com/awslabs/operatorpkg v0.0.0-20240605172541-88cf99023fa4 h1:EVFVrteX0PQuofO9Ah4rf4aGyUkBM3lLuKgzwilAEAg= +github.com/awslabs/operatorpkg v0.0.0-20240605172541-88cf99023fa4/go.mod h1:OR0NDOTl6XUXKgcksUab5d7mCnpaZf7Ko4eWEbheJTY= github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q= github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8= github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= @@ -550,8 +550,8 @@ google.golang.org/api v0.25.0/go.mod h1:lIXQywCXRcnZPGlsd8NbLnOjtAoL6em04bJ9+z0M google.golang.org/api v0.28.0/go.mod h1:lIXQywCXRcnZPGlsd8NbLnOjtAoL6em04bJ9+z0MncE= google.golang.org/api v0.29.0/go.mod h1:Lcubydp8VUV7KeIHD9z2Bys/sm/vGKnG1UHuDBSrHWM= google.golang.org/api v0.30.0/go.mod h1:QGmEvQ87FHZNiUVJkT14jQNYJ4ZJjdRF23ZXz5138Fc= -google.golang.org/api v0.124.0 h1:dP6Ef1VgOGqQ8eiv4GiY8RhmeyqzovcXBYPDUYG8Syo= -google.golang.org/api v0.124.0/go.mod h1:xu2HQurE5gi/3t1aFCvhPD781p0a3p11sdunTJ2BlP4= +google.golang.org/api v0.126.0 h1:q4GJq+cAdMAC7XP7njvQ4tvohGLiSlytuL4BQxbIZ+o= +google.golang.org/api v0.126.0/go.mod h1:mBwVAtz+87bEN6CbA1GtZPDOqY2R5ONPqJeIlvyo4Aw= google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM= google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4= google.golang.org/appengine v1.5.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4= @@ -671,6 +671,8 @@ k8s.io/klog/v2 v2.120.1 h1:QXU6cPEOIslTGvZaXvFWiP9VKyeet3sawzTOvdXb4Vw= k8s.io/klog/v2 v2.120.1/go.mod h1:3Jpz1GvMt720eyJH1ckRHK1EDfpxISzJ7I9OYgaDtPE= k8s.io/kube-openapi v0.0.0-20240228011516-70dd3763d340 h1:BZqlfIlq5YbRMFko6/PM7FjZpUb45WallggurYhKGag= k8s.io/kube-openapi v0.0.0-20240228011516-70dd3763d340/go.mod h1:yD4MZYeKMBwQKVht279WycxKyM84kkAx2DPrTXaeb98= +k8s.io/kubernetes v1.30.1 h1:XlqS6KslLEA5mQzLK2AJrhr4Z1m8oJfkhHiWJ5lue+I= +k8s.io/kubernetes v1.30.1/go.mod h1:yPbIk3MhmhGigX62FLJm+CphNtjxqCvAIFQXup6RKS0= k8s.io/utils v0.0.0-20240102154912-e7106e64919e h1:eQ/4ljkx21sObifjzXwlPKpdGLrCfRziVtos3ofG/sQ= k8s.io/utils v0.0.0-20240102154912-e7106e64919e/go.mod h1:OLgZIPagt7ERELqWJFomSt595RzquPNLL48iOWgYOg0= knative.dev/pkg v0.0.0-20230712131115-7051d301e7f4 h1:oO/BQJpVCFTSTMHF/S6u+nPtIvbHDTsvbPZvdCZAFjs= diff --git a/pkg/apis/crds/karpenter.sh_nodeoverlays.yaml b/pkg/apis/crds/karpenter.sh_nodeoverlays.yaml new file mode 100644 index 0000000000..016a227757 --- /dev/null +++ b/pkg/apis/crds/karpenter.sh_nodeoverlays.yaml @@ -0,0 +1,83 @@ +--- +apiVersion: apiextensions.k8s.io/v1 +kind: CustomResourceDefinition +metadata: + annotations: + controller-gen.kubebuilder.io/version: v0.15.0 + name: nodeoverlays.karpenter.sh +spec: + group: karpenter.sh + names: + categories: + - karpenter + kind: NodeOverlay + listKind: NodeOverlayList + plural: nodeoverlays + singular: nodeoverlay + scope: Cluster + versions: + - name: v1beta1 + schema: + openAPIV3Schema: + properties: + apiVersion: + description: |- + APIVersion defines the versioned schema of this representation of an object. + Servers should convert recognized schemas to the latest internal value, and + may reject unrecognized values. + More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#resources + type: string + kind: + description: |- + Kind is a string value representing the REST resource this object represents. + Servers may infer this from the endpoint the client submits requests to. + Cannot be updated. + In CamelCase. + More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#types-kinds + type: string + metadata: + type: object + spec: + properties: + pricePercent: + description: PricePercent modifies the price of the simulated node + (PriceAdjustment + (Price * PricePercent / 100)). + type: integer + selector: + description: Selector matches against simulated nodes and modifies + their scheduling properties. Matches all if empty. + items: + description: |- + A node selector requirement is a selector that contains values, a key, and an operator + that relates the key and values. + properties: + key: + description: The label key that the selector applies to. + type: string + operator: + description: |- + Represents a key's relationship to a set of values. + Valid operators are In, NotIn, Exists, DoesNotExist. Gt, and Lt. + type: string + values: + description: |- + An array of string values. If the operator is In or NotIn, + the values array must be non-empty. If the operator is Exists or DoesNotExist, + the values array must be empty. If the operator is Gt or Lt, the values + array must have a single element, which will be interpreted as an integer. + This array is replaced during a strategic merge patch. + items: + type: string + type: array + x-kubernetes-list-type: atomic + required: + - key + - operator + type: object + type: array + type: object + required: + - spec + type: object + served: true + storage: true diff --git a/pkg/apis/v1beta1/nodeoverlay.go b/pkg/apis/v1beta1/nodeoverlay.go new file mode 100644 index 0000000000..0d5338cf19 --- /dev/null +++ b/pkg/apis/v1beta1/nodeoverlay.go @@ -0,0 +1,71 @@ +/* +Copyright The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package v1beta1 + +import ( + v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +// +kubebuilder:object:root=true +type NodeOverlayList struct { + metav1.TypeMeta `json:",inline"` + metav1.ListMeta `json:"metadata,omitempty"` + Items []NodeOverlay `json:"items"` +} + +// +kubebuilder:object:root=true +// +kubebuilder:resource:path=nodeoverlays,scope=Cluster,categories=karpenter +type NodeOverlay struct { + metav1.TypeMeta `json:",inline"` + metav1.ObjectMeta `json:"metadata,omitempty"` + + // +required + Spec NodeOverlaySpec `json:"spec"` +} + +type NodeOverlaySpec struct { + // Selector matches against simulated nodes and modifies their scheduling properties. Matches all if empty. + // +optional + Selector []v1.NodeSelectorRequirement `json:"selector,omitempty"` + // PricePercent modifies the price of the simulated node (PriceAdjustment + (Price * PricePercent / 100)). + // +optional + PricePercent *int `json:"pricePercent,omitempty"` + + // + // The following fields are not yet implemented + // + + // PriceAdjustment modifies the price of the simulated node (PriceAdjustment + (Price * PricePercent / 100)). + // +optional + // PriceAdjustment *int `json:"priceAdjustment,omitempty"` + + // Requirements add additional scheduling requirements to the Node, which may be targeted by pod scheduling constraints + // This feature is not currently implemented + // +optional + // Requirements []v1.NodeSelectorRequirement `json:"requirements,omitempty"` + + // Capacity adds resource capacities to the simulated Node, but will not replace existing values. + // This feature is not currently implemented + // +optional + // Capacity v1.ResourceList + + // Overhead subtracts resources from the simulated Node + // This feature is not currently implemented + // +optional + // Overhead v1.ResourceList `json:"overhead,omitempty"` +} diff --git a/pkg/apis/v1beta1/nodeoverlay.yaml b/pkg/apis/v1beta1/nodeoverlay.yaml new file mode 100644 index 0000000000..6c8953fb7e --- /dev/null +++ b/pkg/apis/v1beta1/nodeoverlay.yaml @@ -0,0 +1,47 @@ +--- +# Reduce on-demand prices to 90% +# https://github.com/aws/karpenter-provider-aws/issues/3860 +# https://github.com/aws/karpenter-provider-aws/pull/4697 +kind: NodeOverlay +metadata: + name: discount +spec: + selector: + matchLabels: + karpenter.sh/capacity-type: on-demand + pricePercent: 90 +--- +# Support for extended resource types (e.g. smarter-devices/fuse) +# https://github.com/kubernetes-sigs/karpenter/issues/751 +https://github.com/kubernetes-sigs/karpenter/issues/729 +kind: NodeOverlay +metadata: + name: discount +spec: + selector: + matchLabels: + karpenter.sh/capacity-type: on-demand + matchExpressions: + - key: node.kubernetes.io/instance-type + operator: In + values: + - m5.large + - m5.2xlarge + - m5.4xlarge + - m5.8xlarge + - m5.12xlarge + capacity: + smarter-devices/fuse: 1 +--- +# Add memory overhead of 10Mi to all instances with 2Gi memory +# https://github.com/aws/karpenter-provider-aws/issues/5161 +kind: NodeOverlay +metadata: + name: discount +spec: + selector: + matchLabels: + karpenter.k8s.aws/instance-memory: 2048 + overhead: + memory: 10Mi +--- \ No newline at end of file diff --git a/pkg/apis/v1beta1/zz_generated.deepcopy.go b/pkg/apis/v1beta1/zz_generated.deepcopy.go index 057186955b..f2217fb12d 100644 --- a/pkg/apis/v1beta1/zz_generated.deepcopy.go +++ b/pkg/apis/v1beta1/zz_generated.deepcopy.go @@ -381,6 +381,91 @@ func (in *NodeClassReference) DeepCopy() *NodeClassReference { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *NodeOverlay) DeepCopyInto(out *NodeOverlay) { + *out = *in + out.TypeMeta = in.TypeMeta + in.ObjectMeta.DeepCopyInto(&out.ObjectMeta) + in.Spec.DeepCopyInto(&out.Spec) +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new NodeOverlay. +func (in *NodeOverlay) DeepCopy() *NodeOverlay { + if in == nil { + return nil + } + out := new(NodeOverlay) + in.DeepCopyInto(out) + return out +} + +// DeepCopyObject is an autogenerated deepcopy function, copying the receiver, creating a new runtime.Object. +func (in *NodeOverlay) DeepCopyObject() runtime.Object { + if c := in.DeepCopy(); c != nil { + return c + } + return nil +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *NodeOverlayList) DeepCopyInto(out *NodeOverlayList) { + *out = *in + out.TypeMeta = in.TypeMeta + in.ListMeta.DeepCopyInto(&out.ListMeta) + if in.Items != nil { + in, out := &in.Items, &out.Items + *out = make([]NodeOverlay, len(*in)) + for i := range *in { + (*in)[i].DeepCopyInto(&(*out)[i]) + } + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new NodeOverlayList. +func (in *NodeOverlayList) DeepCopy() *NodeOverlayList { + if in == nil { + return nil + } + out := new(NodeOverlayList) + in.DeepCopyInto(out) + return out +} + +// DeepCopyObject is an autogenerated deepcopy function, copying the receiver, creating a new runtime.Object. +func (in *NodeOverlayList) DeepCopyObject() runtime.Object { + if c := in.DeepCopy(); c != nil { + return c + } + return nil +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *NodeOverlaySpec) DeepCopyInto(out *NodeOverlaySpec) { + *out = *in + if in.Selector != nil { + in, out := &in.Selector, &out.Selector + *out = make([]v1.NodeSelectorRequirement, len(*in)) + for i := range *in { + (*in)[i].DeepCopyInto(&(*out)[i]) + } + } + if in.PricePercent != nil { + in, out := &in.PricePercent, &out.PricePercent + *out = new(int) + **out = **in + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new NodeOverlaySpec. +func (in *NodeOverlaySpec) DeepCopy() *NodeOverlaySpec { + if in == nil { + return nil + } + out := new(NodeOverlaySpec) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *NodePool) DeepCopyInto(out *NodePool) { *out = *in diff --git a/pkg/controllers/controllers.go b/pkg/controllers/controllers.go index 4451582c17..77fc9e55e6 100644 --- a/pkg/controllers/controllers.go +++ b/pkg/controllers/controllers.go @@ -25,6 +25,7 @@ import ( "sigs.k8s.io/karpenter/pkg/cloudprovider" "sigs.k8s.io/karpenter/pkg/controllers/disruption" "sigs.k8s.io/karpenter/pkg/controllers/disruption/orchestration" + "sigs.k8s.io/karpenter/pkg/controllers/instancetypes" "sigs.k8s.io/karpenter/pkg/controllers/leasegarbagecollection" metricsnode "sigs.k8s.io/karpenter/pkg/controllers/metrics/node" metricsnodepool "sigs.k8s.io/karpenter/pkg/controllers/metrics/nodepool" @@ -52,15 +53,18 @@ func NewControllers( cloudProvider cloudprovider.CloudProvider, ) []controller.Controller { - p := provisioning.NewProvisioner(kubeClient, recorder, cloudProvider, cluster) + provisioner := provisioning.NewProvisioner(kubeClient, recorder, cloudProvider, cluster) evictionQueue := terminator.NewQueue(kubeClient, recorder) - disruptionQueue := orchestration.NewQueue(kubeClient, recorder, cluster, clock, p) - + disruptionQueue := orchestration.NewQueue(kubeClient, recorder, cluster, clock, provisioner) + instanceTypesProvider := instancetypes.NewController(kubeClient, cloudProvider) return []controller.Controller{ - p, evictionQueue, disruptionQueue, - disruption.NewController(clock, kubeClient, p, cloudProvider, recorder, cluster, disruptionQueue), - provisioning.NewPodController(kubeClient, p, recorder), - provisioning.NewNodeController(kubeClient, p, recorder), + provisioner, + evictionQueue, + disruptionQueue, + instanceTypesProvider, + disruption.NewController(clock, kubeClient, provisioner, instanceTypesProvider, recorder, cluster, disruptionQueue), + provisioning.NewPodController(kubeClient, provisioner, recorder), + provisioning.NewNodeController(kubeClient, provisioner, recorder), nodepoolhash.NewController(kubeClient), informer.NewDaemonSetController(kubeClient, cluster), informer.NewNodeController(kubeClient, cluster), diff --git a/pkg/controllers/disruption/consolidation.go b/pkg/controllers/disruption/consolidation.go index 3d073d941b..1e500451f2 100644 --- a/pkg/controllers/disruption/consolidation.go +++ b/pkg/controllers/disruption/consolidation.go @@ -30,9 +30,9 @@ import ( "sigs.k8s.io/karpenter/pkg/apis/v1alpha5" "sigs.k8s.io/karpenter/pkg/apis/v1beta1" - "sigs.k8s.io/karpenter/pkg/cloudprovider" disruptionevents "sigs.k8s.io/karpenter/pkg/controllers/disruption/events" "sigs.k8s.io/karpenter/pkg/controllers/disruption/orchestration" + "sigs.k8s.io/karpenter/pkg/controllers/instancetypes" "sigs.k8s.io/karpenter/pkg/controllers/provisioning" pscheduling "sigs.k8s.io/karpenter/pkg/controllers/provisioning/scheduling" "sigs.k8s.io/karpenter/pkg/controllers/state" @@ -56,21 +56,21 @@ type consolidation struct { cluster *state.Cluster kubeClient client.Client provisioner *provisioning.Provisioner - cloudProvider cloudprovider.CloudProvider + instanceTypesProvider instancetypes.Provider recorder events.Recorder lastConsolidationState time.Time } func MakeConsolidation(clock clock.Clock, cluster *state.Cluster, kubeClient client.Client, provisioner *provisioning.Provisioner, - cloudProvider cloudprovider.CloudProvider, recorder events.Recorder, queue *orchestration.Queue) consolidation { + instanceTypesProvider instancetypes.Provider, recorder events.Recorder, queue *orchestration.Queue) consolidation { return consolidation{ - queue: queue, - clock: clock, - cluster: cluster, - kubeClient: kubeClient, - provisioner: provisioner, - cloudProvider: cloudProvider, - recorder: recorder, + queue: queue, + clock: clock, + cluster: cluster, + kubeClient: kubeClient, + provisioner: provisioner, + instanceTypesProvider: instanceTypesProvider, + recorder: recorder, } } diff --git a/pkg/controllers/disruption/consolidation_test.go b/pkg/controllers/disruption/consolidation_test.go index e3143807c9..59f0b01082 100644 --- a/pkg/controllers/disruption/consolidation_test.go +++ b/pkg/controllers/disruption/consolidation_test.go @@ -582,11 +582,11 @@ var _ = Describe("Consolidation", func() { // inform cluster state about nodes and nodeclaims ExpectMakeNodesAndNodeClaimsInitializedAndStateUpdated(ctx, env.Client, nodeStateController, nodeClaimStateController, nodes, nodeClaims) - emptyConsolidation := disruption.NewEmptyNodeConsolidation(disruption.MakeConsolidation(fakeClock, cluster, env.Client, prov, cloudProvider, recorder, queue)) + emptyConsolidation := disruption.NewEmptyNodeConsolidation(disruption.MakeConsolidation(fakeClock, cluster, env.Client, prov, instanceTypesProvider, recorder, queue)) budgets, err := disruption.BuildDisruptionBudgets(ctx, cluster, fakeClock, env.Client, recorder) Expect(err).To(Succeed()) - candidates, err := disruption.GetCandidates(ctx, cluster, env.Client, recorder, fakeClock, cloudProvider, emptyConsolidation.ShouldDisrupt, queue) + candidates, err := disruption.GetCandidates(ctx, cluster, env.Client, recorder, fakeClock, instanceTypesProvider, emptyConsolidation.ShouldDisrupt, queue) Expect(err).To(Succeed()) var wg sync.WaitGroup @@ -646,11 +646,11 @@ var _ = Describe("Consolidation", func() { // inform cluster state about nodes and nodeclaims ExpectMakeNodesAndNodeClaimsInitializedAndStateUpdated(ctx, env.Client, nodeStateController, nodeClaimStateController, nodes, nodeClaims) - emptyConsolidation := disruption.NewEmptyNodeConsolidation(disruption.MakeConsolidation(fakeClock, cluster, env.Client, prov, cloudProvider, recorder, queue)) + emptyConsolidation := disruption.NewEmptyNodeConsolidation(disruption.MakeConsolidation(fakeClock, cluster, env.Client, prov, instanceTypesProvider, recorder, queue)) budgets, err := disruption.BuildDisruptionBudgets(ctx, cluster, fakeClock, env.Client, recorder) Expect(err).To(Succeed()) - candidates, err := disruption.GetCandidates(ctx, cluster, env.Client, recorder, fakeClock, cloudProvider, emptyConsolidation.ShouldDisrupt, queue) + candidates, err := disruption.GetCandidates(ctx, cluster, env.Client, recorder, fakeClock, instanceTypesProvider, emptyConsolidation.ShouldDisrupt, queue) Expect(err).To(Succeed()) var wg sync.WaitGroup @@ -673,11 +673,11 @@ var _ = Describe("Consolidation", func() { // inform cluster state about nodes and nodeclaims ExpectMakeNodesAndNodeClaimsInitializedAndStateUpdated(ctx, env.Client, nodeStateController, nodeClaimStateController, nodes, nodeClaims) - multiConsolidation := disruption.NewMultiNodeConsolidation(disruption.MakeConsolidation(fakeClock, cluster, env.Client, prov, cloudProvider, recorder, queue)) + multiConsolidation := disruption.NewMultiNodeConsolidation(disruption.MakeConsolidation(fakeClock, cluster, env.Client, prov, instanceTypesProvider, recorder, queue)) budgets, err := disruption.BuildDisruptionBudgets(ctx, cluster, fakeClock, env.Client, recorder) Expect(err).To(Succeed()) - candidates, err := disruption.GetCandidates(ctx, cluster, env.Client, recorder, fakeClock, cloudProvider, multiConsolidation.ShouldDisrupt, queue) + candidates, err := disruption.GetCandidates(ctx, cluster, env.Client, recorder, fakeClock, instanceTypesProvider, multiConsolidation.ShouldDisrupt, queue) Expect(err).To(Succeed()) var wg sync.WaitGroup @@ -737,11 +737,11 @@ var _ = Describe("Consolidation", func() { // inform cluster state about nodes and nodeclaims ExpectMakeNodesAndNodeClaimsInitializedAndStateUpdated(ctx, env.Client, nodeStateController, nodeClaimStateController, nodes, nodeClaims) - multiConsolidation := disruption.NewMultiNodeConsolidation(disruption.MakeConsolidation(fakeClock, cluster, env.Client, prov, cloudProvider, recorder, queue)) + multiConsolidation := disruption.NewMultiNodeConsolidation(disruption.MakeConsolidation(fakeClock, cluster, env.Client, prov, instanceTypesProvider, recorder, queue)) budgets, err := disruption.BuildDisruptionBudgets(ctx, cluster, fakeClock, env.Client, recorder) Expect(err).To(Succeed()) - candidates, err := disruption.GetCandidates(ctx, cluster, env.Client, recorder, fakeClock, cloudProvider, multiConsolidation.ShouldDisrupt, queue) + candidates, err := disruption.GetCandidates(ctx, cluster, env.Client, recorder, fakeClock, instanceTypesProvider, multiConsolidation.ShouldDisrupt, queue) Expect(err).To(Succeed()) var wg sync.WaitGroup @@ -764,11 +764,11 @@ var _ = Describe("Consolidation", func() { // inform cluster state about nodes and nodeclaims ExpectMakeNodesAndNodeClaimsInitializedAndStateUpdated(ctx, env.Client, nodeStateController, nodeClaimStateController, nodes, nodeClaims) - singleConsolidation := disruption.NewSingleNodeConsolidation(disruption.MakeConsolidation(fakeClock, cluster, env.Client, prov, cloudProvider, recorder, queue)) + singleConsolidation := disruption.NewSingleNodeConsolidation(disruption.MakeConsolidation(fakeClock, cluster, env.Client, prov, instanceTypesProvider, recorder, queue)) budgets, err := disruption.BuildDisruptionBudgets(ctx, cluster, fakeClock, env.Client, recorder) Expect(err).To(Succeed()) - candidates, err := disruption.GetCandidates(ctx, cluster, env.Client, recorder, fakeClock, cloudProvider, singleConsolidation.ShouldDisrupt, queue) + candidates, err := disruption.GetCandidates(ctx, cluster, env.Client, recorder, fakeClock, instanceTypesProvider, singleConsolidation.ShouldDisrupt, queue) Expect(err).To(Succeed()) var wg sync.WaitGroup @@ -828,11 +828,11 @@ var _ = Describe("Consolidation", func() { // inform cluster state about nodes and nodeclaims ExpectMakeNodesAndNodeClaimsInitializedAndStateUpdated(ctx, env.Client, nodeStateController, nodeClaimStateController, nodes, nodeClaims) - singleConsolidation := disruption.NewSingleNodeConsolidation(disruption.MakeConsolidation(fakeClock, cluster, env.Client, prov, cloudProvider, recorder, queue)) + singleConsolidation := disruption.NewSingleNodeConsolidation(disruption.MakeConsolidation(fakeClock, cluster, env.Client, prov, instanceTypesProvider, recorder, queue)) budgets, err := disruption.BuildDisruptionBudgets(ctx, cluster, fakeClock, env.Client, recorder) Expect(err).To(Succeed()) - candidates, err := disruption.GetCandidates(ctx, cluster, env.Client, recorder, fakeClock, cloudProvider, singleConsolidation.ShouldDisrupt, queue) + candidates, err := disruption.GetCandidates(ctx, cluster, env.Client, recorder, fakeClock, instanceTypesProvider, singleConsolidation.ShouldDisrupt, queue) Expect(err).To(Succeed()) var wg sync.WaitGroup diff --git a/pkg/controllers/disruption/controller.go b/pkg/controllers/disruption/controller.go index 44f2fb6dd5..093c4f4140 100644 --- a/pkg/controllers/disruption/controller.go +++ b/pkg/controllers/disruption/controller.go @@ -34,8 +34,8 @@ import ( "sigs.k8s.io/controller-runtime/pkg/reconcile" "sigs.k8s.io/karpenter/pkg/apis/v1beta1" - "sigs.k8s.io/karpenter/pkg/cloudprovider" "sigs.k8s.io/karpenter/pkg/controllers/disruption/orchestration" + "sigs.k8s.io/karpenter/pkg/controllers/instancetypes" "sigs.k8s.io/karpenter/pkg/controllers/provisioning" "sigs.k8s.io/karpenter/pkg/controllers/provisioning/scheduling" "sigs.k8s.io/karpenter/pkg/controllers/state" @@ -46,34 +46,34 @@ import ( ) type Controller struct { - queue *orchestration.Queue - kubeClient client.Client - cluster *state.Cluster - provisioner *provisioning.Provisioner - recorder events.Recorder - clock clock.Clock - cloudProvider cloudprovider.CloudProvider - methods []Method - mu sync.Mutex - lastRun map[string]time.Time + queue *orchestration.Queue + kubeClient client.Client + cluster *state.Cluster + provisioner *provisioning.Provisioner + recorder events.Recorder + clock clock.Clock + instanceTypesProvider instancetypes.Provider + methods []Method + mu sync.Mutex + lastRun map[string]time.Time } // pollingPeriod that we inspect cluster to look for opportunities to disrupt const pollingPeriod = 10 * time.Second func NewController(clk clock.Clock, kubeClient client.Client, provisioner *provisioning.Provisioner, - cp cloudprovider.CloudProvider, recorder events.Recorder, cluster *state.Cluster, queue *orchestration.Queue, + instanceTypesProvider instancetypes.Provider, recorder events.Recorder, cluster *state.Cluster, queue *orchestration.Queue, ) *Controller { - c := MakeConsolidation(clk, cluster, kubeClient, provisioner, cp, recorder, queue) + c := MakeConsolidation(clk, cluster, kubeClient, provisioner, instanceTypesProvider, recorder, queue) return &Controller{ - queue: queue, - clock: clk, - kubeClient: kubeClient, - cluster: cluster, - provisioner: provisioner, - recorder: recorder, - cloudProvider: cp, - lastRun: map[string]time.Time{}, + queue: queue, + clock: clk, + kubeClient: kubeClient, + cluster: cluster, + provisioner: provisioner, + recorder: recorder, + instanceTypesProvider: instanceTypesProvider, + lastRun: map[string]time.Time{}, methods: []Method{ // Expire any NodeClaims that must be deleted, allowing their pods to potentially land on currently NewExpiration(clk, kubeClient, cluster, provisioner, recorder), @@ -144,7 +144,7 @@ func (c *Controller) disrupt(ctx context.Context, disruption Method) (bool, erro methodLabel: disruption.Type(), consolidationTypeLabel: disruption.ConsolidationType(), }))() - candidates, err := GetCandidates(ctx, c.cluster, c.kubeClient, c.recorder, c.clock, c.cloudProvider, disruption.ShouldDisrupt, c.queue) + candidates, err := GetCandidates(ctx, c.cluster, c.kubeClient, c.recorder, c.clock, c.instanceTypesProvider, disruption.ShouldDisrupt, c.queue) if err != nil { return false, fmt.Errorf("determining candidates, %w", err) } diff --git a/pkg/controllers/disruption/emptynodeconsolidation.go b/pkg/controllers/disruption/emptynodeconsolidation.go index e5e0f12184..f1008ba9cc 100644 --- a/pkg/controllers/disruption/emptynodeconsolidation.go +++ b/pkg/controllers/disruption/emptynodeconsolidation.go @@ -86,7 +86,7 @@ func (c *EmptyNodeConsolidation) ComputeCommand(ctx context.Context, disruptionB case <-c.clock.After(consolidationTTL): } - v := NewValidation(c.clock, c.cluster, c.kubeClient, c.provisioner, c.cloudProvider, c.recorder, c.queue) + v := NewValidation(c.clock, c.cluster, c.kubeClient, c.provisioner, c.instanceTypesProvider, c.recorder, c.queue) validatedCandidates, err := v.ValidateCandidates(ctx, cmd.candidates...) if err != nil { if IsValidationError(err) { diff --git a/pkg/controllers/disruption/helpers.go b/pkg/controllers/disruption/helpers.go index a734e002a3..4e409506a0 100644 --- a/pkg/controllers/disruption/helpers.go +++ b/pkg/controllers/disruption/helpers.go @@ -26,6 +26,7 @@ import ( "github.com/samber/lo" disruptionevents "sigs.k8s.io/karpenter/pkg/controllers/disruption/events" + "sigs.k8s.io/karpenter/pkg/controllers/instancetypes" nodeutils "sigs.k8s.io/karpenter/pkg/utils/node" v1 "k8s.io/api/core/v1" @@ -197,9 +198,9 @@ func disruptionCost(ctx context.Context, pods []*v1.Pod) float64 { // GetCandidates returns nodes that appear to be currently deprovisionable based off of their nodePool func GetCandidates(ctx context.Context, cluster *state.Cluster, kubeClient client.Client, recorder events.Recorder, clk clock.Clock, - cloudProvider cloudprovider.CloudProvider, shouldDeprovision CandidateFilter, queue *orchestration.Queue, + instanceTypesProvider instancetypes.Provider, shouldDeprovision CandidateFilter, queue *orchestration.Queue, ) ([]*Candidate, error) { - nodePoolMap, nodePoolToInstanceTypesMap, err := BuildNodePoolMap(ctx, kubeClient, cloudProvider) + nodePoolMap, nodePoolToInstanceTypesMap, err := BuildNodePoolMap(ctx, kubeClient, instanceTypesProvider) if err != nil { return nil, err } @@ -272,7 +273,7 @@ func BuildDisruptionBudgets(ctx context.Context, cluster *state.Cluster, clk clo } // BuildNodePoolMap builds a provName -> nodePool map and a provName -> instanceName -> instance type map -func BuildNodePoolMap(ctx context.Context, kubeClient client.Client, cloudProvider cloudprovider.CloudProvider) (map[string]*v1beta1.NodePool, map[string]map[string]*cloudprovider.InstanceType, error) { +func BuildNodePoolMap(ctx context.Context, kubeClient client.Client, instanceTypesProvider instancetypes.Provider) (map[string]*v1beta1.NodePool, map[string]map[string]*cloudprovider.InstanceType, error) { nodePoolMap := map[string]*v1beta1.NodePool{} nodePoolList := &v1beta1.NodePoolList{} if err := kubeClient.List(ctx, nodePoolList); err != nil { @@ -283,7 +284,7 @@ func BuildNodePoolMap(ctx context.Context, kubeClient client.Client, cloudProvid np := &nodePoolList.Items[i] nodePoolMap[np.Name] = np - nodePoolInstanceTypes, err := cloudProvider.GetInstanceTypes(ctx, np) + nodePoolInstanceTypes, err := instanceTypesProvider.Get(ctx, np) if err != nil { // don't error out on building the node pool, we just won't be able to handle any nodes that // were created by it diff --git a/pkg/controllers/disruption/multinodeconsolidation.go b/pkg/controllers/disruption/multinodeconsolidation.go index 8573d64201..7df442d804 100644 --- a/pkg/controllers/disruption/multinodeconsolidation.go +++ b/pkg/controllers/disruption/multinodeconsolidation.go @@ -89,7 +89,7 @@ func (m *MultiNodeConsolidation) ComputeCommand(ctx context.Context, disruptionB return cmd, scheduling.Results{}, nil } - if err := NewValidation(m.clock, m.cluster, m.kubeClient, m.provisioner, m.cloudProvider, m.recorder, m.queue).IsValid(ctx, cmd, consolidationTTL); err != nil { + if err := NewValidation(m.clock, m.cluster, m.kubeClient, m.provisioner, m.instanceTypesProvider, m.recorder, m.queue).IsValid(ctx, cmd, consolidationTTL); err != nil { if IsValidationError(err) { log.FromContext(ctx).V(1).Info(fmt.Sprintf("abandoning multi-node consolidation attempt due to pod churn, command is no longer valid, %s", cmd)) return Command{}, scheduling.Results{}, nil diff --git a/pkg/controllers/disruption/singlenodeconsolidation.go b/pkg/controllers/disruption/singlenodeconsolidation.go index c6c6b5fb0f..372d35f3eb 100644 --- a/pkg/controllers/disruption/singlenodeconsolidation.go +++ b/pkg/controllers/disruption/singlenodeconsolidation.go @@ -46,7 +46,7 @@ func (s *SingleNodeConsolidation) ComputeCommand(ctx context.Context, disruption } candidates = s.sortCandidates(candidates) - v := NewValidation(s.clock, s.cluster, s.kubeClient, s.provisioner, s.cloudProvider, s.recorder, s.queue) + v := NewValidation(s.clock, s.cluster, s.kubeClient, s.provisioner, s.instanceTypesProvider, s.recorder, s.queue) // Set a timeout timeout := s.clock.Now().Add(SingleNodeConsolidationTimeoutDuration) diff --git a/pkg/controllers/disruption/suite_test.go b/pkg/controllers/disruption/suite_test.go index be4b18e1d5..5007cac3a0 100644 --- a/pkg/controllers/disruption/suite_test.go +++ b/pkg/controllers/disruption/suite_test.go @@ -44,6 +44,7 @@ import ( "sigs.k8s.io/karpenter/pkg/cloudprovider/fake" "sigs.k8s.io/karpenter/pkg/controllers/disruption" "sigs.k8s.io/karpenter/pkg/controllers/disruption/orchestration" + "sigs.k8s.io/karpenter/pkg/controllers/instancetypes" "sigs.k8s.io/karpenter/pkg/controllers/provisioning" "sigs.k8s.io/karpenter/pkg/controllers/state" "sigs.k8s.io/karpenter/pkg/controllers/state/informer" @@ -60,6 +61,7 @@ var cluster *state.Cluster var disruptionController *disruption.Controller var prov *provisioning.Provisioner var cloudProvider *fake.CloudProvider +var instanceTypesProvider instancetypes.Provider var nodeStateController *informer.NodeController var nodeClaimStateController *informer.NodeClaimController var fakeClock *clock.FakeClock @@ -90,7 +92,7 @@ var _ = BeforeSuite(func() { recorder = test.NewEventRecorder() prov = provisioning.NewProvisioner(env.Client, recorder, cloudProvider, cluster) queue = orchestration.NewTestingQueue(env.Client, recorder, cluster, fakeClock, prov) - disruptionController = disruption.NewController(fakeClock, env.Client, prov, cloudProvider, recorder, cluster, queue) + disruptionController = disruption.NewController(fakeClock, env.Client, prov, instanceTypesProvider, recorder, cluster, queue) }) var _ = AfterSuite(func() { @@ -203,7 +205,7 @@ var _ = Describe("Simulate Scheduling", func() { // nodePool.Spec.Disruption.Budgets = []v1beta1.Budget{{Nodes: "100%"}} // ExpectApplied(ctx, env.Client, nodePool) - nodePoolMap, nodePoolToInstanceTypesMap, err := disruption.BuildNodePoolMap(ctx, env.Client, cloudProvider) + nodePoolMap, nodePoolToInstanceTypesMap, err := disruption.BuildNodePoolMap(ctx, env.Client, instanceTypesProvider) Expect(err).To(Succeed()) // Mark all nodeclaims as marked for deletion diff --git a/pkg/controllers/disruption/validation.go b/pkg/controllers/disruption/validation.go index 5316d10845..6776b2bd14 100644 --- a/pkg/controllers/disruption/validation.go +++ b/pkg/controllers/disruption/validation.go @@ -28,8 +28,8 @@ import ( "sigs.k8s.io/karpenter/pkg/apis/v1alpha5" "sigs.k8s.io/karpenter/pkg/apis/v1beta1" - "sigs.k8s.io/karpenter/pkg/cloudprovider" "sigs.k8s.io/karpenter/pkg/controllers/disruption/orchestration" + "sigs.k8s.io/karpenter/pkg/controllers/instancetypes" "sigs.k8s.io/karpenter/pkg/controllers/provisioning" "sigs.k8s.io/karpenter/pkg/controllers/state" "sigs.k8s.io/karpenter/pkg/events" @@ -55,27 +55,27 @@ func IsValidationError(err error) bool { // of the commands passed to IsValid were constructed based off of the same consolidation state. This allows it to // skip the validation TTL for all but the first command. type Validation struct { - start time.Time - clock clock.Clock - cluster *state.Cluster - kubeClient client.Client - cloudProvider cloudprovider.CloudProvider - provisioner *provisioning.Provisioner - once sync.Once - recorder events.Recorder - queue *orchestration.Queue + start time.Time + clock clock.Clock + cluster *state.Cluster + kubeClient client.Client + instanceTypesProvider instancetypes.Provider + provisioner *provisioning.Provisioner + once sync.Once + recorder events.Recorder + queue *orchestration.Queue } func NewValidation(clk clock.Clock, cluster *state.Cluster, kubeClient client.Client, provisioner *provisioning.Provisioner, - cp cloudprovider.CloudProvider, recorder events.Recorder, queue *orchestration.Queue) *Validation { + instanceTypesProvider instancetypes.Provider, recorder events.Recorder, queue *orchestration.Queue) *Validation { return &Validation{ - clock: clk, - cluster: cluster, - kubeClient: kubeClient, - provisioner: provisioner, - cloudProvider: cp, - recorder: recorder, - queue: queue, + clock: clk, + cluster: cluster, + kubeClient: kubeClient, + provisioner: provisioner, + instanceTypesProvider: instanceTypesProvider, + recorder: recorder, + queue: queue, } } @@ -117,7 +117,7 @@ func (v *Validation) IsValid(ctx context.Context, cmd Command, validationPeriod // // If these conditions are met for all candidates, ValidateCandidates returns a slice with the updated representations. func (v *Validation) ValidateCandidates(ctx context.Context, candidates ...*Candidate) ([]*Candidate, error) { - validatedCandidates, err := GetCandidates(ctx, v.cluster, v.kubeClient, v.recorder, v.clock, v.cloudProvider, v.ShouldDisrupt, v.queue) + validatedCandidates, err := GetCandidates(ctx, v.cluster, v.kubeClient, v.recorder, v.clock, v.instanceTypesProvider, v.ShouldDisrupt, v.queue) if err != nil { return nil, fmt.Errorf("constructing validation candidates, %w", err) } diff --git a/pkg/controllers/instancetypes/controller.go b/pkg/controllers/instancetypes/controller.go new file mode 100644 index 0000000000..fcf79be819 --- /dev/null +++ b/pkg/controllers/instancetypes/controller.go @@ -0,0 +1,137 @@ +/* +Copyright The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package instancetypes + +import ( + "context" + "fmt" + "time" + + "github.com/awslabs/operatorpkg/reasonable" + "github.com/patrickmn/go-cache" + "github.com/samber/lo" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + controllerruntime "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/controller" + "sigs.k8s.io/controller-runtime/pkg/handler" + "sigs.k8s.io/controller-runtime/pkg/log" + "sigs.k8s.io/controller-runtime/pkg/manager" + "sigs.k8s.io/controller-runtime/pkg/reconcile" + + "sigs.k8s.io/karpenter/pkg/apis/v1beta1" + "sigs.k8s.io/karpenter/pkg/cloudprovider" + "sigs.k8s.io/karpenter/pkg/scheduling" +) + +// Controller maintains an in memory cache of instance types per node pool. +// Further, this controller injects configuration from NodeOverlays into InstanceTypes +type Controller struct { + kubeClient client.Client + cloudProvider cloudprovider.CloudProvider + cache *cache.Cache +} + +type Provider interface { + Get(context.Context, *v1beta1.NodePool) ([]*cloudprovider.InstanceType, error) +} + +var _ Provider = &Controller{} + +func NewController(kubeClient client.Client, cloudProvider cloudprovider.CloudProvider) *Controller { + return &Controller{ + kubeClient: kubeClient, + cloudProvider: cloudProvider, + cache: cache.New(10*time.Minute, 10*time.Minute), + } +} + +func (c *Controller) Register(_ context.Context, m manager.Manager) error { + builder := controllerruntime.NewControllerManagedBy(m). + Named("nodeoverlay"). + For(&v1beta1.NodePool{}). + WithOptions(controller.Options{RateLimiter: reasonable.RateLimiter()}) + + for _, ncGVK := range c.cloudProvider.GetSupportedNodeClasses() { + nodeclass := &unstructured.Unstructured{} + nodeclass.SetGroupVersionKind(ncGVK) + builder = builder.Watches(nodeclass, handler.EnqueueRequestsFromMapFunc(func(ctx context.Context, o client.Object) (requests []reconcile.Request) { + nodeClaimList := &v1beta1.NodeClaimList{} + if err := m.GetClient().List(ctx, nodeClaimList, client.MatchingFields{ + "spec.template.spec.nodeClassRef.apiVersion": o.GetObjectKind().GroupVersionKind().GroupVersion().String(), + "spec.template.spec.nodeClassRef.kind": o.GetObjectKind().GroupVersionKind().Kind, + "spec.template.spec.nodeClassRef.name": o.GetName(), + }); err != nil { + return requests + } + return lo.Map(nodeClaimList.Items, func(n v1beta1.NodeClaim, _ int) reconcile.Request { + return reconcile.Request{ + NamespacedName: client.ObjectKeyFromObject(&n), + } + }) + })) + } + return builder.Complete(reconcile.AsReconciler(m.GetClient(), c)) +} + +// Get() is a cached and thread safe way to retrieve instance types for a node pool +func (c *Controller) Get(ctx context.Context, nodePool *v1beta1.NodePool) ([]*cloudprovider.InstanceType, error) { + instanceTypes, ok := c.cache.Get(string(nodePool.UID)) + if !ok { + return nil, fmt.Errorf("nodepool %q missing instance types", nodePool.Name) + } + return instanceTypes.([]*cloudprovider.InstanceType), nil +} + +func (c *Controller) Reconcile(ctx context.Context, nodePool *v1beta1.NodePool) (reconcile.Result, error) { + instanceTypes, err := c.cloudProvider.GetInstanceTypes(ctx, nodePool) + if err != nil { + return reconcile.Result{}, fmt.Errorf("getting instance types, %w", err) + } + + nodeOverlayList := v1beta1.NodeOverlayList{} + if err := c.kubeClient.List(ctx, &nodeOverlayList); err != nil { + return reconcile.Result{}, fmt.Errorf("listing node overlays, %w", err) + } + + for _, instanceType := range instanceTypes { + nodeOverlays := lo.Filter(nodeOverlayList.Items, func(nodeOverlay v1beta1.NodeOverlay, _ int) bool { + return scheduling.NewNodeSelectorRequirements(nodeOverlay.Spec.Selector...).Compatible(instanceType.Requirements) == nil + }) + if len(nodeOverlays) > 1 { + log.FromContext(ctx).Info("more than one overlay matched, continuing without changes", + "instance-type", instanceType.Name, + "node-overlays", lo.Map(nodeOverlayList.Items, func(nodeOverlay v1beta1.NodeOverlay, _ int) string { return nodeOverlay.Name }), + ) + continue + } + c.overlayPrice(instanceType, nodeOverlays[0]) + } + + c.cache.SetDefault(string(nodePool.UID), instanceTypes) + return reconcile.Result{RequeueAfter: 5 * time.Minute}, nil +} + +func (c *Controller) overlayPrice(instanceType *cloudprovider.InstanceType, nodeOverlay v1beta1.NodeOverlay) { + for _, offering := range instanceType.Offerings { + if scheduling.NewNodeSelectorRequirements(nodeOverlay.Spec.Selector...).Compatible(offering.Requirements) == nil { + if nodeOverlay.Spec.PricePercent != nil { + offering.Price = offering.Price * (.01 * float64(lo.FromPtr(nodeOverlay.Spec.PricePercent))) + } + } + } +} diff --git a/pkg/controllers/provisioning/provisioner.go b/pkg/controllers/provisioning/provisioner.go index 9f7a39c0d9..18a511412e 100644 --- a/pkg/controllers/provisioning/provisioner.go +++ b/pkg/controllers/provisioning/provisioner.go @@ -47,6 +47,7 @@ import ( "sigs.k8s.io/karpenter/pkg/utils/pretty" "sigs.k8s.io/karpenter/pkg/cloudprovider" + "sigs.k8s.io/karpenter/pkg/controllers/instancetypes" scheduler "sigs.k8s.io/karpenter/pkg/controllers/provisioning/scheduling" "sigs.k8s.io/karpenter/pkg/controllers/state" "sigs.k8s.io/karpenter/pkg/events" @@ -75,13 +76,14 @@ func WithReason(reason string) func(LaunchOptions) LaunchOptions { // Provisioner waits for enqueued pods, batches them, creates capacity and binds the pods to the capacity. type Provisioner struct { - cloudProvider cloudprovider.CloudProvider - kubeClient client.Client - batcher *Batcher - volumeTopology *scheduler.VolumeTopology - cluster *state.Cluster - recorder events.Recorder - cm *pretty.ChangeMonitor + cloudProvider cloudprovider.CloudProvider + instanceTypesProvider instancetypes.Provider + kubeClient client.Client + batcher *Batcher + volumeTopology *scheduler.VolumeTopology + cluster *state.Cluster + recorder events.Recorder + cm *pretty.ChangeMonitor } func NewProvisioner(kubeClient client.Client, recorder events.Recorder, @@ -218,7 +220,8 @@ func (p *Provisioner) NewScheduler(ctx context.Context, pods []*v1.Pod, stateNod domains := map[string]sets.Set[string]{} for _, nodePool := range nodePoolList.Items { // Get instance type options - instanceTypeOptions, err := p.cloudProvider.GetInstanceTypes(ctx, lo.ToPtr(nodePool)) + + instanceTypeOptions, err := p.instanceTypesProvider.Get(ctx, &nodePool) if err != nil { // we just log an error and skip the provisioner to prevent a single mis-configured provisioner from stopping // all scheduling diff --git a/pkg/controllers/provisioning/scheduling/topology.go b/pkg/controllers/provisioning/scheduling/topology.go index 2ea541e1ee..6931ac3548 100644 --- a/pkg/controllers/provisioning/scheduling/topology.go +++ b/pkg/controllers/provisioning/scheduling/topology.go @@ -37,6 +37,8 @@ import ( "k8s.io/apimachinery/pkg/util/sets" "sigs.k8s.io/controller-runtime/pkg/client" + podutilk8s "k8s.io/kubernetes/pkg/api/v1/pod" + "sigs.k8s.io/karpenter/pkg/utils/pod" ) @@ -435,5 +437,5 @@ func mapOperator(operator metav1.LabelSelectorOperator) selection.Operator { } func IgnoredForTopology(p *v1.Pod) bool { - return !pod.IsScheduled(p) || pod.IsTerminal(p) || pod.IsTerminating(p) + return !pod.IsScheduled(p) || podutilk8s.IsPodTerminal(p) || pod.IsTerminating(p) } diff --git a/pkg/controllers/state/cluster.go b/pkg/controllers/state/cluster.go index f03f4a8f40..b5d7eb3074 100644 --- a/pkg/controllers/state/cluster.go +++ b/pkg/controllers/state/cluster.go @@ -41,6 +41,9 @@ import ( "sigs.k8s.io/karpenter/pkg/apis/v1beta1" "sigs.k8s.io/karpenter/pkg/cloudprovider" "sigs.k8s.io/karpenter/pkg/scheduling" + + podutilk8s "k8s.io/kubernetes/pkg/api/v1/pod" + podutils "sigs.k8s.io/karpenter/pkg/utils/pod" ) @@ -290,7 +293,7 @@ func (c *Cluster) UpdatePod(ctx context.Context, pod *v1.Pod) error { defer c.mu.Unlock() var err error - if podutils.IsTerminal(pod) { + if podutilk8s.IsPodTerminal(pod) { c.updateNodeUsageFromPodCompletion(client.ObjectKeyFromObject(pod)) } else { err = c.updateNodeUsageFromPod(ctx, pod) @@ -495,7 +498,7 @@ func (c *Cluster) populateResourceRequests(ctx context.Context, n *StateNode) er } for i := range pods.Items { pod := &pods.Items[i] - if podutils.IsTerminal(pod) { + if podutilk8s.IsPodTerminal(pod) { continue } if err := n.updateForPod(ctx, c.kubeClient, pod); err != nil { diff --git a/pkg/utils/pod/scheduling.go b/pkg/utils/pod/scheduling.go index 930a1745ed..266d2199fb 100644 --- a/pkg/utils/pod/scheduling.go +++ b/pkg/utils/pod/scheduling.go @@ -23,6 +23,8 @@ import ( "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/utils/clock" + podutilk8s "k8s.io/kubernetes/pkg/api/v1/pod" + "sigs.k8s.io/karpenter/pkg/apis/v1alpha5" "sigs.k8s.io/karpenter/pkg/apis/v1beta1" "sigs.k8s.io/karpenter/pkg/scheduling" @@ -32,7 +34,7 @@ import ( // - Isn't a terminal pod (Failed or Succeeded) // - Isn't actively terminating func IsActive(pod *v1.Pod) bool { - return !IsTerminal(pod) && + return !podutilk8s.IsPodTerminal(pod) && !IsTerminating(pod) } @@ -65,7 +67,7 @@ func IsEvictable(pod *v1.Pod) bool { // - Doesn't tolerate the "karpenter.sh/disruption=disrupting" taint // - Isn't a mirror pod (https://kubernetes.io/docs/tasks/configure-pod-container/static-pod/) func IsWaitingEviction(pod *v1.Pod, clk clock.Clock) bool { - return !IsTerminal(pod) && + return !podutilk8s.IsPodTerminal(pod) && !IsStuckTerminating(pod, clk) && !ToleratesDisruptionNoScheduleTaint(pod) && // Mirror pods cannot be deleted through the API server since they are created and managed by kubelet @@ -119,10 +121,6 @@ func IsPreempting(pod *v1.Pod) bool { return pod.Status.NominatedNodeName != "" } -func IsTerminal(pod *v1.Pod) bool { - return pod.Status.Phase == v1.PodFailed || pod.Status.Phase == v1.PodSucceeded -} - func IsTerminating(pod *v1.Pod) bool { return pod.DeletionTimestamp != nil }