66
77from kubernetes import client , config , watch
88
9- config .load_kube_config ()
9+ config .load_incluster_config ()
1010v1 = client .CoreV1Api ()
1111
12- scheduler_name = "foobar"
12+ scheduler_name = "simple-python-scheduler"
13+ NAMESPACE = "test-ns"
1314
1415
1516def nodes_available ():
@@ -21,33 +22,56 @@ def nodes_available():
2122 return ready_nodes
2223
2324
24- def scheduler (name , node , namespace = "default" ):
25- body = client .V1Binding ()
25+ def get_running_pods ():
26+ pods = []
27+ for pod in v1 .list_namespaced_pod (namespace = NAMESPACE ).items :
28+ if pod .status .phase == "Running" :
29+ pods .append (pod )
30+ return pods
2631
27- target = client .V1ObjectReference ()
28- target .kind = "Node"
29- target .apiVersion = "v1"
30- target .name = node
3132
32- meta = client .V1ObjectMeta ()
33- meta .name = name
33+ def preemption (priority ):
34+ for pod in get_running_pods ():
35+ try :
36+ pod_priority = pod .metadata .annotations ["priority" ]
37+ except :
38+ print (f"Preempting pod { pod .metadata .name } " )
39+ return v1 .delete_namespaced_pod (name = pod .metadata .name , namespace = NAMESPACE )
3440
35- body .target = target
36- body .metadata = meta
41+ if int (pod_priority ) > int (priority ):
42+ print (f"Preempting pod { pod .metadata .name } " )
43+ return v1 .delete_namespaced_pod (name = pod .metadata .name , namespace = NAMESPACE )
44+ return None
3745
38- return v1 .create_namespaced_binding_binding (name , namespace , body )
46+
47+ def scheduler (name , priority , node , namespace = NAMESPACE ):
48+ while len (v1 .list_node ().items ) <= len (get_running_pods ()):
49+ preemption (priority )
50+
51+ target = client .V1ObjectReference (kind = "Node" , api_version = "v1" , name = node )
52+ meta = client .V1ObjectMeta (name = name )
53+ body = client .V1Binding (target = target , metadata = meta )
54+ return v1 .create_namespaced_binding (
55+ namespace = namespace , body = body , _preload_content = False
56+ )
3957
4058
4159def main ():
4260 w = watch .Watch ()
43- for event in w .stream (v1 .list_namespaced_pod , "default" ):
61+ for event in w .stream (v1 .list_namespaced_pod , NAMESPACE ):
4462 if (
4563 event ["object" ].status .phase == "Pending"
4664 and event ["object" ].spec .scheduler_name == scheduler_name
4765 ):
4866 try :
67+ try :
68+ priority = event ["object" ].metadata .annotations ["priority" ]
69+ except TypeError :
70+ priority = 1000
4971 res = scheduler (
50- event ["object" ].metadata .name , random .choice (nodes_available ())
72+ name = event ["object" ].metadata .name ,
73+ priority = priority ,
74+ node = random .choice (nodes_available ()),
5175 )
5276 except client .rest .ApiException as e :
5377 print (json .loads (e .body )["message" ])
0 commit comments