Skip to content

Commit

Permalink
koord-scheduler: move nodes to GroupQuotaManager (#1595)
Browse files Browse the repository at this point in the history
Signed-off-by: chuanyun.lcy <[email protected]>
Co-authored-by: chuanyun.lcy <[email protected]>
  • Loading branch information
shaloulcy and chuanyun.lcy committed Aug 30, 2023
1 parent 89a0fb0 commit b40396d
Show file tree
Hide file tree
Showing 5 changed files with 134 additions and 64 deletions.
83 changes: 82 additions & 1 deletion pkg/scheduler/plugins/elasticquota/core/group_quota_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@ import (
"sync"

v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/labels"
quotav1 "k8s.io/apiserver/pkg/quota/v1"
listerv1 "k8s.io/client-go/listers/core/v1"
"k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/api/v1/resource"
"sigs.k8s.io/scheduler-plugins/pkg/apis/scheduling/v1alpha1"
Expand Down Expand Up @@ -50,9 +52,16 @@ type GroupQuotaManager struct {
// scaleMinQuotaManager is used when overRootResource
scaleMinQuotaManager *ScaleMinQuotaManager
once sync.Once

// nodeResourceMapLock used to lock the nodeResourceMapLock.
nodeResourceMapLock sync.Mutex
// nodeResourceMap store the nodes belong to the manager.
nodeResourceMap map[string]struct{}
// nodeLister used to list nodes.
nodeLister listerv1.NodeLister
}

func NewGroupQuotaManager(systemGroupMax, defaultGroupMax v1.ResourceList) *GroupQuotaManager {
func NewGroupQuotaManager(systemGroupMax, defaultGroupMax v1.ResourceList, nodeLister listerv1.NodeLister) *GroupQuotaManager {
quotaManager := &GroupQuotaManager{
totalResourceExceptSystemAndDefaultUsed: v1.ResourceList{},
totalResource: v1.ResourceList{},
Expand All @@ -61,6 +70,8 @@ func NewGroupQuotaManager(systemGroupMax, defaultGroupMax v1.ResourceList) *Grou
runtimeQuotaCalculatorMap: make(map[string]*RuntimeQuotaCalculator),
quotaTopoNodeMap: make(map[string]*QuotaTopoNode),
scaleMinQuotaManager: NewScaleMinQuotaManager(),
nodeResourceMap: make(map[string]struct{}),
nodeLister: nodeLister,
}
quotaManager.quotaInfoMap[extension.SystemQuotaName] = NewQuotaInfo(false, true, extension.SystemQuotaName, extension.RootQuotaName)
quotaManager.quotaInfoMap[extension.SystemQuotaName].setMaxQuotaNoLock(systemGroupMax)
Expand Down Expand Up @@ -710,3 +721,73 @@ func getPodName(oldPod, newPod *v1.Pod) string {
}
return ""
}

func (gqm *GroupQuotaManager) OnNodeAdd(node *v1.Node) {
gqm.nodeResourceMapLock.Lock()
defer gqm.nodeResourceMapLock.Unlock()

if _, ok := gqm.nodeResourceMap[node.Name]; ok {
return
}
gqm.nodeResourceMap[node.Name] = struct{}{}
gqm.UpdateClusterTotalResource(node.Status.Allocatable)
klog.V(5).Infof("OnNodeAddFunc success %v", node.Name)
}

func (gqm *GroupQuotaManager) OnNodeUpdate(oldNode, newNode *v1.Node) {
gqm.nodeResourceMapLock.Lock()
defer gqm.nodeResourceMapLock.Unlock()

if _, exist := gqm.nodeResourceMap[newNode.Name]; !exist {
gqm.nodeResourceMap[newNode.Name] = struct{}{}
gqm.UpdateClusterTotalResource(newNode.Status.Allocatable)
return
}

oldNodeAllocatable := oldNode.Status.Allocatable
newNodeAllocatable := newNode.Status.Allocatable
if quotav1.Equals(oldNodeAllocatable, newNodeAllocatable) {
return
}

deltaNodeAllocatable := quotav1.Subtract(newNodeAllocatable, oldNodeAllocatable)
gqm.UpdateClusterTotalResource(deltaNodeAllocatable)
klog.V(5).Infof("OnNodeUpdateFunc success:%v [%v]", newNode.Name, newNodeAllocatable)
}

func (gqm *GroupQuotaManager) OnNodeDelete(node *v1.Node) {
gqm.nodeResourceMapLock.Lock()
defer gqm.nodeResourceMapLock.Unlock()

if _, exist := gqm.nodeResourceMap[node.Name]; !exist {
return
}

delta := quotav1.Subtract(v1.ResourceList{}, node.Status.Allocatable)
gqm.UpdateClusterTotalResource(delta)
delete(gqm.nodeResourceMap, node.Name)
klog.V(5).Infof("OnNodeDeleteFunc success:%v [%v]", node.Name, delta)
}

func (gqm *GroupQuotaManager) ResyncNodes() {
nodes, err := gqm.nodeLister.List(labels.Everything())
if err != nil {
klog.Errorf("failed to list nodes for ResyncNodes")
return
}

for _, node := range nodes {
func() {
gqm.nodeResourceMapLock.Lock()
defer gqm.nodeResourceMapLock.Unlock()
if _, exist := gqm.nodeResourceMap[node.Name]; exist {
return
}

gqm.nodeResourceMap[node.Name] = struct{}{}
gqm.UpdateClusterTotalResource(node.Status.Allocatable)
}()
}

// TODO: remove none-exist node
}
Original file line number Diff line number Diff line change
Expand Up @@ -1255,7 +1255,7 @@ func TestGroupQuotaManager_OnPodUpdateAfterReserve(t *testing.T) {
}

func TestNewGroupQuotaManager(t *testing.T) {
gqm := NewGroupQuotaManager(createResourceList(100, 100), createResourceList(300, 300))
gqm := NewGroupQuotaManager(createResourceList(100, 100), createResourceList(300, 300), nil)
assert.Equal(t, createResourceList(100, 100), gqm.GetQuotaInfoByName(extension.SystemQuotaName).getMax())
assert.Equal(t, createResourceList(300, 300), gqm.GetQuotaInfoByName(extension.DefaultQuotaName).getMax())
assert.True(t, gqm.scaleMinQuotaEnabled)
Expand Down
46 changes: 7 additions & 39 deletions pkg/scheduler/plugins/elasticquota/node_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ package elasticquota

import (
corev1 "k8s.io/api/core/v1"
quotav1 "k8s.io/apiserver/pkg/quota/v1"
"k8s.io/klog/v2"
)

Expand All @@ -27,53 +26,28 @@ func (g *Plugin) OnNodeAdd(obj interface{}) {
if !ok {
return
}

if node.DeletionTimestamp != nil {
klog.Errorf("node is deleting:%v", node.Name)
klog.V(5).Infof("OnNodeAddFunc add:%v delete:%v", node.Name, node.DeletionTimestamp)
return
}

g.nodeResourceMapLock.Lock()
defer g.nodeResourceMapLock.Unlock()

if _, ok := g.nodeResourceMap[node.Name]; ok {
return
}
g.nodeResourceMap[node.Name] = struct{}{}
g.groupQuotaManager.UpdateClusterTotalResource(node.Status.Allocatable)
klog.V(5).Infof("OnNodeAddFunc success %v", node.Name)
g.groupQuotaManager.OnNodeAdd(node)
}

func (g *Plugin) OnNodeUpdate(oldObj, newObj interface{}) {
newNode := newObj.(*corev1.Node)
oldNode := oldObj.(*corev1.Node)

g.nodeResourceMapLock.Lock()
defer g.nodeResourceMapLock.Unlock()

if _, exist := g.nodeResourceMap[newNode.Name]; !exist {
return
}

if newNode.ResourceVersion == oldNode.ResourceVersion {
klog.Warningf("update node warning, update version for the same, nodeName:%v", newNode.Name)
return
}

if newNode.DeletionTimestamp != nil {
klog.V(5).Infof("OnNodeUpdateFunc update:%v delete:%v", newNode.Name, newNode.DeletionTimestamp)
return
}

oldNodeAllocatable := oldNode.Status.Allocatable
newNodeAllocatable := newNode.Status.Allocatable
if quotav1.Equals(oldNodeAllocatable, newNodeAllocatable) {
return
}

deltaNodeAllocatable := quotav1.Subtract(newNodeAllocatable, oldNodeAllocatable)
g.groupQuotaManager.UpdateClusterTotalResource(deltaNodeAllocatable)
klog.V(5).Infof("OnNodeUpdateFunc success:%v [%v]", newNode.Name, newNodeAllocatable)
g.groupQuotaManager.OnNodeUpdate(oldNode, newNode)
}

func (g *Plugin) OnNodeDelete(obj interface{}) {
Expand All @@ -83,15 +57,9 @@ func (g *Plugin) OnNodeDelete(obj interface{}) {
return
}

g.nodeResourceMapLock.Lock()
defer g.nodeResourceMapLock.Unlock()

if _, exist := g.nodeResourceMap[node.Name]; !exist {
return
}
g.groupQuotaManager.OnNodeDelete(node)
}

delta := quotav1.Subtract(corev1.ResourceList{}, node.Status.Allocatable)
g.groupQuotaManager.UpdateClusterTotalResource(delta)
delete(g.nodeResourceMap, node.Name)
klog.V(5).Infof("OnNodeDeleteFunc success:%v [%v]", node.Name, delta)
func (g *Plugin) ResyncNodes() {
g.groupQuotaManager.ResyncNodes()
}
37 changes: 16 additions & 21 deletions pkg/scheduler/plugins/elasticquota/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package elasticquota
import (
"context"
"fmt"
"sync"
"time"

corev1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -68,17 +67,14 @@ func (p *PostFilterState) Clone() framework.StateData {
}

type Plugin struct {
handle framework.Handle
client versioned.Interface
pluginArgs *config.ElasticQuotaArgs
quotaLister v1alpha1.ElasticQuotaLister
podLister v1.PodLister
pdbLister policylisters.PodDisruptionBudgetLister
nodeLister v1.NodeLister
// only used in OnNodeAdd,in case Recover and normal Watch double call OnNodeAdd
nodeResourceMapLock sync.Mutex
nodeResourceMap map[string]struct{}
groupQuotaManager *core.GroupQuotaManager
handle framework.Handle
client versioned.Interface
pluginArgs *config.ElasticQuotaArgs
quotaLister v1alpha1.ElasticQuotaLister
podLister v1.PodLister
pdbLister policylisters.PodDisruptionBudgetLister
nodeLister v1.NodeLister
groupQuotaManager *core.GroupQuotaManager
}

var (
Expand Down Expand Up @@ -108,16 +104,15 @@ func New(args runtime.Object, handle framework.Handle) (framework.Plugin, error)
elasticQuotaInformer := scheSharedInformerFactory.Scheduling().V1alpha1().ElasticQuotas()

elasticQuota := &Plugin{
handle: handle,
client: client,
pluginArgs: pluginArgs,
podLister: handle.SharedInformerFactory().Core().V1().Pods().Lister(),
quotaLister: elasticQuotaInformer.Lister(),
pdbLister: getPDBLister(handle),
nodeLister: handle.SharedInformerFactory().Core().V1().Nodes().Lister(),
groupQuotaManager: core.NewGroupQuotaManager(pluginArgs.SystemQuotaGroupMax, pluginArgs.DefaultQuotaGroupMax),
nodeResourceMap: make(map[string]struct{}),
handle: handle,
client: client,
pluginArgs: pluginArgs,
podLister: handle.SharedInformerFactory().Core().V1().Pods().Lister(),
quotaLister: elasticQuotaInformer.Lister(),
pdbLister: getPDBLister(handle),
nodeLister: handle.SharedInformerFactory().Core().V1().Nodes().Lister(),
}
elasticQuota.groupQuotaManager = core.NewGroupQuotaManager(pluginArgs.SystemQuotaGroupMax, pluginArgs.DefaultQuotaGroupMax, elasticQuota.nodeLister)

ctx := context.TODO()

Expand Down
30 changes: 28 additions & 2 deletions pkg/scheduler/plugins/elasticquota/plugin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -510,7 +510,7 @@ func TestPlugin_OnNodeUpdate(t *testing.T) {
totalRes: createResourceList(200, 2000),
},
{
name: "node not exist",
name: "node not exist. we should add node",
nodes: []*corev1.Node{
{
ObjectMeta: metav1.ObjectMeta{
Expand All @@ -529,7 +529,7 @@ func TestPlugin_OnNodeUpdate(t *testing.T) {
},
},
},
totalRes: createResourceList(300, 3000),
totalRes: createResourceList(400, 4000),
},
}

Expand All @@ -549,6 +549,32 @@ func TestPlugin_OnNodeUpdate(t *testing.T) {
}
}

func TestPlugin_ResyncNodes(t *testing.T) {

suit := newPluginTestSuit(t, nil)
p, _ := suit.proxyNew(suit.elasticQuotaArgs, suit.Handle)
plugin := p.(*Plugin)

// add node
nodes := []*corev1.Node{defaultCreateNodeWithResourceVersion("1"), defaultCreateNodeWithResourceVersion("2"),
defaultCreateNodeWithResourceVersion("3")}
for _, node := range nodes {
plugin.handle.ClientSet().CoreV1().Nodes().Create(context.TODO(), node, metav1.CreateOptions{})
}
time.Sleep(100 * time.Millisecond)
assert.Equal(t, plugin.groupQuotaManager.GetClusterTotalResource(), createResourceList(300, 3000))

// delete node in cache
plugin.OnNodeDelete(nodes[1])
time.Sleep(100 * time.Millisecond)
assert.Equal(t, plugin.groupQuotaManager.GetClusterTotalResource(), createResourceList(200, 2000))

// resync nodes
plugin.ResyncNodes()
time.Sleep(100 * time.Millisecond)
assert.Equal(t, plugin.groupQuotaManager.GetClusterTotalResource(), createResourceList(300, 3000))
}

func defaultCreateNodeWithResourceVersion(nodeName string) *corev1.Node {
node := defaultCreateNode(nodeName)
node.ResourceVersion = "3"
Expand Down

0 comments on commit b40396d

Please sign in to comment.