Skip to content
This repository was archived by the owner on Oct 16, 2024. It is now read-only.

Commit 399c477

Browse files
committed
Merge branch 'mpiano/CLUSTERMAN-666_event' into master
2 parents 0423f85 + c1ff1f6 commit 399c477

File tree

18 files changed

+455
-80
lines changed

18 files changed

+455
-80
lines changed

clusterman/autoscaler/toggle.py

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
# Copyright 2019 Yelp Inc.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
import time
15+
from typing import Union
16+
17+
import staticconf
18+
19+
from clusterman.aws.client import dynamodb
20+
from clusterman.util import AUTOSCALER_PAUSED
21+
from clusterman.util import CLUSTERMAN_STATE_TABLE
22+
from clusterman.util import parse_time_string
23+
24+
25+
def disable_autoscaling(cluster: str, pool: str, scheduler: str, until: Union[str, int, float]):
26+
"""Disable autoscaling for a pool
27+
28+
:param str cluster: name of the cluster
29+
:param str pool: name of the pool
30+
:param str scheduler: cluster scheduler
31+
:param str until: how long should it remain disabled
32+
"""
33+
expiration = parse_time_string(until).timestamp if isinstance(until, str) else int(until)
34+
state = {
35+
"state": {"S": AUTOSCALER_PAUSED},
36+
"entity": {"S": f"{cluster}.{pool}.{scheduler}"},
37+
"timestamp": {"N": str(int(time.time()))},
38+
"expiration_timestamp": {"N": str(expiration)},
39+
}
40+
dynamodb.put_item(
41+
TableName=staticconf.read("aws.state_table", default=CLUSTERMAN_STATE_TABLE),
42+
Item=state,
43+
)
44+
45+
46+
def enable_autoscaling(cluster: str, pool: str, scheduler: str):
47+
"""Re-enable autoscaling for a pool
48+
49+
:param str cluster: name of the cluster
50+
:param str pool: name of the pool
51+
:param str scheduler: cluster scheduler
52+
"""
53+
dynamodb.delete_item(
54+
TableName=staticconf.read("aws.state_table", default=CLUSTERMAN_STATE_TABLE),
55+
Key={
56+
"state": {"S": AUTOSCALER_PAUSED},
57+
"entity": {"S": f"{cluster}.{pool}.{scheduler}"},
58+
},
59+
)

clusterman/cli/toggle.py

Lines changed: 5 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -22,15 +22,15 @@
2222
from clusterman.args import add_pool_arg
2323
from clusterman.args import add_scheduler_arg
2424
from clusterman.args import subparser
25-
from clusterman.aws.client import dynamodb
25+
from clusterman.autoscaler.toggle import disable_autoscaling
26+
from clusterman.autoscaler.toggle import enable_autoscaling
2627
from clusterman.aws.client import sts
2728
from clusterman.cli.util import timeout_wrapper
2829
from clusterman.exceptions import AccountNumberMistmatchError
29-
from clusterman.util import AUTOSCALER_PAUSED
3030
from clusterman.util import autoscaling_is_paused
31-
from clusterman.util import CLUSTERMAN_STATE_TABLE
3231
from clusterman.util import parse_time_string
3332

33+
3434
logger = colorlog.getLogger(__name__)
3535

3636

@@ -48,24 +48,11 @@ def ensure_account_id(cluster) -> None:
4848
@timeout_wrapper
4949
def disable(args: argparse.Namespace) -> None:
5050
ensure_account_id(args.cluster)
51-
52-
state = {
53-
"state": {"S": AUTOSCALER_PAUSED},
54-
"entity": {"S": f"{args.cluster}.{args.pool}.{args.scheduler}"},
55-
"timestamp": {"N": str(int(time.time()))},
56-
}
57-
5851
if not args.until:
5952
print("Default has changed; autoscaler will be re-enabled in 30m since no --until flag was used.")
6053
args.until = "30m"
6154

62-
if args.until:
63-
state["expiration_timestamp"] = {"N": str(parse_time_string(args.until).timestamp)}
64-
65-
dynamodb.put_item(
66-
TableName=staticconf.read("aws.state_table", default=CLUSTERMAN_STATE_TABLE),
67-
Item=state,
68-
)
55+
disable_autoscaling(args.cluster, args.pool, args.scheduler, args.until)
6956

7057
time.sleep(1) # Give DynamoDB some time to settle
7158

@@ -86,14 +73,7 @@ def disable(args: argparse.Namespace) -> None:
8673
@timeout_wrapper
8774
def enable(args: argparse.Namespace) -> None:
8875
ensure_account_id(args.cluster)
89-
90-
dynamodb.delete_item(
91-
TableName=staticconf.read("aws.state_table", default=CLUSTERMAN_STATE_TABLE),
92-
Key={
93-
"state": {"S": AUTOSCALER_PAUSED},
94-
"entity": {"S": f"{args.cluster}.{args.pool}.{args.scheduler}"},
95-
},
96-
)
76+
enable_autoscaling(args.cluster, args.pool, args.scheduler)
9777
time.sleep(1) # Give DynamoDB some time to settle
9878
now = parse_time_string("now").to("local")
9979
if autoscaling_is_paused(args.cluster, args.pool, args.scheduler, now):

clusterman/cli/util.py

Lines changed: 19 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,27 +1,38 @@
1+
# Copyright 2019 Yelp Inc.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
114
import argparse
2-
import signal
315
import socket
16+
from functools import partial
417

518
import colorlog
619

20+
from clusterman.util import limit_function_runtime
21+
22+
723
logger = colorlog.getLogger(__name__)
824
TIMEOUT_TIME_SECONDS = 10
925

1026

1127
def timeout_wrapper(main):
1228
def wrapper(args: argparse.Namespace):
13-
1429
# After 10s, prints a warning message if the command is running from the wrong place
15-
def timeout_handler(signum, frame):
30+
def timeout_handler():
1631
warning_string = "This command is taking a long time to run; are you running from the right account?"
1732
if "yelpcorp" in socket.getfqdn():
1833
warning_string += "\nHINT: try ssh'ing to adhoc-prod or another box in the right account."
1934
logger.warning(warning_string)
2035

21-
signal.signal(signal.SIGALRM, timeout_handler)
22-
signal.alarm(TIMEOUT_TIME_SECONDS)
23-
24-
main(args)
25-
signal.alarm(0) # Cancel the alarm if we've gotten here
36+
limit_function_runtime(partial(main, args), TIMEOUT_TIME_SECONDS, timeout_handler)
2637

2738
return wrapper

clusterman/kubernetes/kubernetes_cluster_connector.py

Lines changed: 30 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,7 @@ class KubernetesClusterConnector(ClusterConnector):
7474
_unschedulable_pods: List[KubernetesPod]
7575
_excluded_pods_by_ip: Mapping[str, List[KubernetesPod]]
7676
_pods_by_ip: Mapping[str, List[KubernetesPod]]
77+
_label_selectors: List[str]
7778

7879
def __init__(self, cluster: str, pool: Optional[str], init_crd: bool = False) -> None:
7980
super().__init__(cluster, pool)
@@ -84,6 +85,14 @@ def __init__(self, cluster: str, pool: Optional[str], init_crd: bool = False) ->
8485
)
8586
self._nodes_by_ip = {}
8687
self._init_crd_client = init_crd
88+
self._label_selectors = []
89+
if self.pool:
90+
# TODO(CLUSTERMAN-659): Switch to using just pool_label_key once the new node labels are applied everywhere
91+
node_label_selector = self.pool_config.read_string(
92+
"node_label_key",
93+
default=self.pool_config.read_string("pool_label_key", default="clusterman.com/pool"),
94+
)
95+
self._label_selectors.append(f"{node_label_selector}={self.pool}")
8796

8897
def reload_state(self) -> None:
8998
logger.info("Reloading nodes")
@@ -113,6 +122,16 @@ def reload_client(self) -> None:
113122
else None
114123
)
115124

125+
def set_label_selectors(self, label_selectors: List[str], add_to_existing: bool = False) -> None:
126+
"""Set label selectors for node listing purposes
127+
128+
:param List[str] label_selectors: list of selectors (joined with logic and)
129+
:param bool add_to_existing: if set add to existing selectors rather than replacing
130+
"""
131+
self._label_selectors = sorted(
132+
(set(self._label_selectors) | set(label_selectors)) if add_to_existing else set(label_selectors)
133+
)
134+
116135
def get_num_removed_nodes_before_last_reload(self) -> int:
117136
previous_nodes = self._prev_nodes_by_ip
118137
current_nodes = self._nodes_by_ip
@@ -270,6 +289,15 @@ def create_node_migration_resource(
270289
except Exception as e:
271290
logger.error(f"Failed creating migration event resource: {e}")
272291

292+
def has_enough_capacity_for_pods(self) -> bool:
293+
"""Checks whether there are unschedulable pods due to insufficient resources
294+
295+
:return: True if no unschedulable pods are due to resource constraints
296+
"""
297+
return not any(
298+
reason == PodUnschedulableReason.InsufficientResources for _, reason in self.get_unschedulable_pods()
299+
)
300+
273301
def _evict_or_delete_pods(self, node_name: str, pods: List[KubernetesPod], disable_eviction: bool) -> bool:
274302
all_done = True
275303
action_name = "deleted" if disable_eviction else "evicted"
@@ -403,12 +431,8 @@ def _is_node_safe_to_kill(self, node_ip: str) -> bool:
403431
return True
404432

405433
def _get_nodes_by_ip(self) -> Mapping[str, KubernetesNode]:
406-
# TODO(CLUSTERMAN-659): Switch to using just pool_label_key once the new node labels are applied everywhere
407-
node_label_selector = self.pool_config.read_string(
408-
"node_label_key", default=self.pool_config.read_string("pool_label_key", default="clusterman.com/pool")
409-
)
410-
label_selector = f"{node_label_selector}={self.pool}"
411-
pool_nodes = self._core_api.list_node(label_selector=label_selector).items
434+
kwargs = {"label_selector": ",".join(self._label_selectors)} if self._label_selectors else {}
435+
pool_nodes = self._core_api.list_node(**kwargs).items
412436
return {get_node_ip(node): node for node in pool_nodes}
413437

414438
def _get_pods_info(

clusterman/migration/event.py

Lines changed: 17 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -22,30 +22,24 @@
2222
import semver
2323

2424
from clusterman.aws.markets import EC2_INSTANCE_TYPES
25+
from clusterman.interfaces.types import ClusterNodeMetadata
26+
from clusterman.migration.event_enums import ComparableConditionTarget
27+
from clusterman.migration.event_enums import ComparableVersion
2528
from clusterman.migration.event_enums import CONDITION_OPERATOR_SUPPORT_MATRIX
2629
from clusterman.migration.event_enums import ConditionOperator
2730
from clusterman.migration.event_enums import ConditionTrait
2831
from clusterman.util import parse_time_interval_seconds
2932

3033

31-
ComparableVersion = Union[semver.VersionInfo, packaging.version.Version]
32-
ComparableConditionTarget = Union[str, int, ComparableVersion]
33-
34-
35-
def _load_version_target(target: str) -> ComparableVersion:
34+
def _load_lsbrelease_version_target(target: str) -> ComparableVersion:
3635
"""Validate condition target as a version
3736
3837
:param str target: version from user input
3938
:return: same value if validates as version
4039
"""
41-
try:
42-
parsed = semver.parse_version_info(target)
43-
except ValueError:
44-
# This allows supporting non-semver version, e.g. with less
45-
# then 3 numeric components, as long as they comply with PEP440.
46-
parsed = packaging.version.parse(target)
47-
if not isinstance(parsed, packaging.version.Version):
48-
raise ValueError(f"Invalid version string: {target}")
40+
parsed = packaging.version.parse(target)
41+
if not isinstance(parsed, packaging.version.Version):
42+
raise ValueError(f"Invalid version string: {target}")
4943
return parsed
5044

5145

@@ -71,8 +65,8 @@ def _load_instance_type_target(target: str) -> str:
7165

7266

7367
CONDITION_TARGET_LOADERS: Dict[ConditionTrait, Callable[[str], Union[str, int, List[str]]]] = {
74-
ConditionTrait.KERNEL: _load_version_target,
75-
ConditionTrait.LSBRELEASE: _load_version_target,
68+
ConditionTrait.KERNEL: semver.VersionInfo.parse,
69+
ConditionTrait.LSBRELEASE: _load_lsbrelease_version_target,
7670
ConditionTrait.UPTIME: load_timespan_target,
7771
ConditionTrait.INSTANCE_TYPE: _load_instance_type_target,
7872
}
@@ -116,6 +110,14 @@ def to_dict(self) -> dict:
116110
"target": self.stringify_target(),
117111
}
118112

113+
def matches(self, node: ClusterNodeMetadata) -> bool:
114+
"""Check if condition is met for a node
115+
116+
:param ClusterNodeMetadata node: node metadata
117+
:return: true if it meets the condition
118+
"""
119+
return self.operator.apply(self.trait.get_from(node), self.target)
120+
119121
def __str__(self) -> str:
120122
return f"{self.trait.name} {self.operator.value} {self.stringify_target()}"
121123

clusterman/migration/event_enums.py

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,16 @@
1515
import operator
1616
from typing import Any
1717
from typing import Collection
18+
from typing import Union
19+
20+
import packaging.version
21+
import semver
22+
23+
from clusterman.interfaces.types import ClusterNodeMetadata
24+
25+
26+
ComparableVersion = Union[semver.VersionInfo, packaging.version.Version]
27+
ComparableConditionTarget = Union[str, int, ComparableVersion]
1828

1929

2030
class MigrationStatus(enum.Enum):
@@ -30,6 +40,14 @@ class ConditionTrait(enum.Enum):
3040
INSTANCE_TYPE = "instance_type"
3141
UPTIME = "uptime"
3242

43+
def get_from(self, node: ClusterNodeMetadata) -> ComparableConditionTarget:
44+
"""Get trait value from node metadata
45+
46+
:param ClusterNodeMetadata node: node metadata
47+
:return: value
48+
"""
49+
return CONDITION_TRAIT_GETTERS[self](node)
50+
3351

3452
class ConditionOperator(enum.Enum):
3553
GT = "gt"
@@ -71,3 +89,10 @@ def apply(self, left: Any, right: Any) -> bool:
7189
},
7290
ConditionTrait.UPTIME: {ConditionOperator.GT, ConditionOperator.GE, ConditionOperator.LT, ConditionOperator.LE},
7391
}
92+
93+
CONDITION_TRAIT_GETTERS = {
94+
ConditionTrait.KERNEL: lambda node: semver.VersionInfo.parse(node.agent.kernel),
95+
ConditionTrait.LSBRELEASE: lambda node: packaging.version.parse(node.agent.lsbrelease),
96+
ConditionTrait.INSTANCE_TYPE: lambda node: node.instance.market.instance,
97+
ConditionTrait.UPTIME: lambda node: node.instance.uptime.total_seconds(),
98+
}

clusterman/migration/settings.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@
2323
DEFAULT_POOL_PRESCALING = 0
2424
DEFAULT_NODE_BOOT_WAIT = "3m"
2525
DEFAULT_NODE_BOOT_TIMEOUT = "10m"
26-
DEFAULT_WORKER_TIMEOUT = "1d"
26+
DEFAULT_WORKER_TIMEOUT = "2h"
2727

2828

2929
class MigrationPrecendence(enum.Enum):
@@ -66,6 +66,9 @@ def __eq__(self, other: object) -> bool:
6666
raise NotImplementedError()
6767
return self.init_value == other.init_value
6868

69+
def __bool__(self) -> bool:
70+
return self.value > 0
71+
6972

7073
class WorkerSetup(NamedTuple):
7174
rate: PoolPortion

0 commit comments

Comments
 (0)