Skip to content

Commit

Permalink
koord-manager: cleanup node.alloc when device cr have been deleted
Browse files Browse the repository at this point in the history
Signed-off-by: lucming <[email protected]>
Signed-off-by: liuming6 <[email protected]>
  • Loading branch information
liuming6 committed Sep 1, 2023
1 parent 77aa7e2 commit 7754a35
Show file tree
Hide file tree
Showing 2 changed files with 201 additions and 1 deletion.
45 changes: 44 additions & 1 deletion pkg/slo-controller/noderesource/device_resource_calculator.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func (r *NodeResourceReconciler) updateDeviceResources(node *corev1.Node) error
if !errors.IsNotFound(err) {
return fmt.Errorf("failed to get device resources: %w", err)
}
return nil
return r.cleanupGPUNodeResource(node)
}

// update device resources
Expand Down Expand Up @@ -136,6 +136,49 @@ func (r *NodeResourceReconciler) updateGPUNodeResource(node *corev1.Node, device
return err
}

func (r *NodeResourceReconciler) cleanupGPUNodeResource(node *corev1.Node) error {
deletedKeys := []corev1.ResourceName{
extension.ResourceGPU,
extension.ResourceGPUCore,
extension.ResourceGPUMemory,
extension.ResourceGPUMemoryRatio,
}
needUpdate := false
for _, key := range deletedKeys {
if _, ok := node.Status.Allocatable[key]; ok {
needUpdate = true
break
}
}
if !needUpdate {
return nil
}

err := util.RetryOnConflictOrTooManyRequests(func() error {
updateNode := &corev1.Node{}
if err := r.Client.Get(context.TODO(), types.NamespacedName{Name: node.Name}, updateNode); err != nil {
klog.Errorf("failed to get node %v, error: %v", node.Name, err)
if errors.IsNotFound(err) {
return nil
}
return err
}

updateNode = updateNode.DeepCopy() // avoid overwriting the cache
for _, resourceName := range deletedKeys {
delete(updateNode.Status.Allocatable, resourceName)
delete(updateNode.Status.Capacity, resourceName)
}

return r.Client.Status().Update(context.TODO(), updateNode)
})
if err != nil {
klog.ErrorS(err, "failed to cleanup device resource on node", "node", klog.KObj(node))
}

return err
}

func (r *NodeResourceReconciler) updateGPUDriverAndModel(node *corev1.Node, device *schedulingv1alpha1.Device) error {
// TODO: currently update the device resources barely. move to device plugins or implement a standard plugin later
if device == nil || device.Labels == nil {
Expand Down
157 changes: 157 additions & 0 deletions pkg/slo-controller/noderesource/device_resource_calculator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package noderesource

import (
"context"
"fmt"
"testing"
"time"

Expand All @@ -28,13 +29,16 @@ import (
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/clock"
"k8s.io/client-go/tools/record"
"k8s.io/utils/pointer"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/client/fake"

"github.com/koordinator-sh/koordinator/apis/configuration"
"github.com/koordinator-sh/koordinator/apis/extension"
schedulingv1alpha1 "github.com/koordinator-sh/koordinator/apis/scheduling/v1alpha1"
schedulingfake "github.com/koordinator-sh/koordinator/pkg/client/clientset/versioned/fake"
"github.com/koordinator-sh/koordinator/pkg/slo-controller/config"
"github.com/koordinator-sh/koordinator/pkg/slo-controller/noderesource/framework"
)

Expand Down Expand Up @@ -326,3 +330,156 @@ func Test_isGPULabelNeedSync(t *testing.T) {
assert.Equal(t, tt.expected, actual)
}
}

func TestNodeResourceReconciler_cleanupGPUNodeResource(t *testing.T) {
testNodeWithoutDevice := corev1.Node{
ObjectMeta: metav1.ObjectMeta{
Name: "test-node0",
},
Status: corev1.NodeStatus{
Allocatable: corev1.ResourceList{
corev1.ResourceCPU: resource.MustParse("20"),
corev1.ResourceMemory: resource.MustParse("40G"),
},
Capacity: corev1.ResourceList{
corev1.ResourceCPU: resource.MustParse("20"),
corev1.ResourceMemory: resource.MustParse("40G"),
},
},
}
testNodeWithGPU := corev1.Node{
ObjectMeta: metav1.ObjectMeta{
Name: "test-node0",
},
Status: corev1.NodeStatus{
Allocatable: corev1.ResourceList{
corev1.ResourceCPU: resource.MustParse("20"),
corev1.ResourceMemory: resource.MustParse("40G"),
extension.ResourceGPUCore: resource.MustParse("20"),
extension.ResourceGPUMemory: resource.MustParse("40G"),
extension.ResourceGPUMemoryRatio: resource.MustParse("20"),
},
Capacity: corev1.ResourceList{
corev1.ResourceCPU: resource.MustParse("20"),
corev1.ResourceMemory: resource.MustParse("40G"),
extension.ResourceGPUCore: resource.MustParse("20"),
extension.ResourceGPUMemory: resource.MustParse("40G"),
extension.ResourceGPUMemoryRatio: resource.MustParse("20"),
},
},
}
expected := corev1.Node{
ObjectMeta: metav1.ObjectMeta{
Name: "test-node0",
},
Status: corev1.NodeStatus{
Allocatable: corev1.ResourceList{
corev1.ResourceCPU: resource.MustParse("20"),
corev1.ResourceMemory: resource.MustParse("40G"),
},
Capacity: corev1.ResourceList{
corev1.ResourceCPU: resource.MustParse("20"),
corev1.ResourceMemory: resource.MustParse("40G"),
},
},
}

type fields struct {
Client client.Client
Recorder record.EventRecorder
Scheme *runtime.Scheme
Clock clock.Clock
NodeSyncContext *framework.SyncContext
GPUSyncContext *framework.SyncContext
cfgCache config.ColocationCfgCache
}
type args struct {
oldNode *corev1.Node
}

tests := []struct {
name string
fields fields
args args
want *corev1.Node
wantErr assert.ErrorAssertionFunc
}{
{
name: "no need to cleanup, do nothing",
fields: fields{
Client: fake.NewClientBuilder().WithRuntimeObjects(&testNodeWithoutDevice).Build(),
},
args: args{
oldNode: &testNodeWithoutDevice,
},
want: &expected,
wantErr: assert.NoError,
},
{
name: "cleanup gpu resource successfully",
fields: fields{
Client: fake.NewClientBuilder().WithRuntimeObjects(&testNodeWithGPU).Build(),
},
args: args{
oldNode: &testNodeWithGPU,
},
want: &expected,
wantErr: assert.NoError,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
r := &NodeResourceReconciler{
Client: tt.fields.Client,
Recorder: tt.fields.Recorder,
Scheme: tt.fields.Scheme,
Clock: tt.fields.Clock,
NodeSyncContext: tt.fields.NodeSyncContext,
GPUSyncContext: tt.fields.GPUSyncContext,
cfgCache: tt.fields.cfgCache,
}
tt.wantErr(t, r.cleanupGPUNodeResource(tt.args.oldNode), fmt.Sprintf("cleanupDeviceNodeResource(%v)", tt.args.oldNode))

gotNode := &corev1.Node{}
err := r.Client.Get(context.TODO(), types.NamespacedName{Name: tt.args.oldNode.Name}, gotNode)
assert.NoError(t, err)
assert.Equal(t, tt.want.Status, gotNode.Status)
})
}

failedCases := []struct {
name string
fields fields
args args
want *corev1.Node
wantErr assert.ErrorAssertionFunc
}{
{
name: "failed to cleanup gpu resource for node not found ",
fields: fields{
Client: fake.NewClientBuilder().WithScheme(runtime.NewScheme()).Build(),
},
args: args{
oldNode: &testNodeWithGPU,
},
want: &testNodeWithGPU,
wantErr: assert.Error,
},
}

for _, tt := range failedCases {
t.Run(tt.name, func(t *testing.T) {
r := &NodeResourceReconciler{
Client: tt.fields.Client,
Recorder: tt.fields.Recorder,
Scheme: tt.fields.Scheme,
Clock: tt.fields.Clock,
NodeSyncContext: tt.fields.NodeSyncContext,
GPUSyncContext: tt.fields.GPUSyncContext,
cfgCache: tt.fields.cfgCache,
}
tt.wantErr(t, r.cleanupGPUNodeResource(tt.args.oldNode), fmt.Sprintf("cleanupDeviceNodeResource(%v)", tt.args.oldNode))
})
}
}

0 comments on commit 7754a35

Please sign in to comment.