-
Notifications
You must be signed in to change notification settings - Fork 44
Expand file tree
/
Copy pathexperiment.py
More file actions
192 lines (158 loc) · 6.39 KB
/
experiment.py
File metadata and controls
192 lines (158 loc) · 6.39 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
from concurrent.futures import ThreadPoolExecutor
from typing import Any, Dict, List
from logzero import logger
from chaoslib.activity import ensure_activity_is_valid
from chaoslib.caching import lookup_activity, with_cache
from chaoslib.configuration import load_configuration
from chaoslib.control import validate_controls
from chaoslib.deprecation import (
warn_about_deprecated_features,
warn_about_moved_function,
)
from chaoslib.exceptions import InvalidActivity, InvalidExperiment
from chaoslib.extension import validate_extensions
from chaoslib.hypothesis import ensure_hypothesis_is_valid
from chaoslib.loader import load_experiment
from chaoslib.run import RunEventHandler, Runner
from chaoslib.run import apply_activities as apply_act
from chaoslib.run import apply_rollbacks as apply_roll
from chaoslib.run import initialize_run_journal as init_journal
from chaoslib.secret import load_secrets
from chaoslib.types import (
Configuration,
Dry,
Experiment,
Journal,
Run,
Schedule,
Secrets,
Settings,
Strategy,
)
__all__ = ["ensure_experiment_is_valid", "load_experiment"]
@with_cache
def ensure_experiment_is_valid(experiment: Experiment):
"""
A chaos experiment consists of a method made of activities to carry
sequentially.
There are two kinds of activities:
* probe: detecting the state of a resource in your system or external to it
There are two kinds of probes: `steady` and `close`
* action: an operation to apply against your system
Usually, an experiment is made of a set of `steady` probes that ensure the
system is sound to carry further the experiment. Then, an action before
another set of of ̀close` probes to sense the state of the system
post-action.
This function raises :exc:`InvalidExperiment`, :exc:`InvalidProbe` or
:exc:`InvalidAction` depending on where it fails.
"""
logger.info("Validating the experiment's syntax")
if not experiment:
raise InvalidExperiment("an empty experiment is not an experiment")
if not experiment.get("title"):
raise InvalidExperiment("experiment requires a title")
if not experiment.get("description"):
raise InvalidExperiment("experiment requires a description")
tags = experiment.get("tags")
if tags:
if list(filter(lambda t: t == "" or not isinstance(t, str), tags)):
raise InvalidExperiment("experiment tags must be a non-empty string")
logger.info("Pass 1")
validate_extensions(experiment)
logger.info("Pass 2")
config = load_configuration(experiment.get("configuration", {}))
logger.info("Pass 3")
load_secrets(experiment.get("secrets", {}), config)
logger.info("Pass 4")
ensure_hypothesis_is_valid(experiment)
logger.info("Pass 5")
method = experiment.get("method")
if method is None:
# we force the method key to be indicated, to make it clear
# that the SSH will still be executed before & after the method block
raise InvalidExperiment(
"an experiment requires a method, "
"which can be empty for only checking steady state hypothesis "
)
logger.info("Pass 6")
for activity in method:
ensure_activity_is_valid(activity)
logger.info("Pass 7")
# let's see if a ref is indeed found in the experiment
ref = activity.get("ref")
if ref and not lookup_activity(ref):
raise InvalidActivity(
"referenced activity '{r}' could not be "
"found in the experiment".format(r=ref)
)
logger.info("Pass 8")
rollbacks = experiment.get("rollbacks", [])
logger.info("Pass 9")
for activity in rollbacks:
ensure_activity_is_valid(activity)
logger.info("Pass 10")
warn_about_deprecated_features(experiment)
logger.info("Pass 11")
validate_controls(experiment)
logger.info("Pass 12")
logger.info("Experiment looks valid")
@with_cache
def run_experiment(
experiment: Experiment,
settings: Settings = None,
experiment_vars: Dict[str, Any] = None,
strategy: Strategy = Strategy.DEFAULT,
schedule: Schedule = None,
event_handlers: List[RunEventHandler] = None,
) -> Journal:
"""
Run the given `experiment` method step by step, in the following sequence:
steady probe, action, close probe.
Activities can be executed in background when they have the
`"background"` property set to `true`. In that case, the activity is run in
a thread. By the end of runs, those threads block until they are all
complete.
If the experiment has the `"dry"` property set to `activities`,the experiment
runs without actually executing the activities.
NOTE: Tricky to make a decision whether we should rollback when exiting
abnormally (Ctrl-C, SIGTERM...). Afterall, there is a chance we actually
cannot afford to rollback properly. Better bailing to a conservative
approach. This means we swallow :exc:`KeyboardInterrupt` and
:exc:`SystemExit` and do not bubble it back up to the caller. We when were
interrupted, we set the `interrupted` flag of the result accordingly to
notify the caller this was indeed not terminated properly.
"""
with Runner(strategy, schedule) as runner:
if event_handlers:
for h in event_handlers:
runner.register_event_handler(h)
return runner.run(experiment, settings, experiment_vars=experiment_vars)
def initialize_run_journal(experiment: Experiment) -> Journal:
warn_about_moved_function(
"The 'initialize_run_journal' function has now moved to the "
"'chaoslib.run' package"
)
return init_journal(experiment)
def apply_activities(
experiment: Experiment,
configuration: Configuration,
secrets: Secrets,
pool: ThreadPoolExecutor,
journal: Journal,
dry: Dry,
) -> List[Run]:
warn_about_moved_function(
"The 'apply_activities' function has now moved to the " "'chaoslib.run' package"
)
return apply_act(experiment, configuration, secrets, pool, journal, dry)
def apply_rollbacks(
experiment: Experiment,
configuration: Configuration,
secrets: Secrets,
pool: ThreadPoolExecutor,
dry: Dry,
) -> List[Run]:
warn_about_moved_function(
"The 'apply_rollbacks' function has now moved to the " "'chaoslib.run' package"
)
return apply_roll(experiment, configuration, secrets, pool, dry)