Skip to content

Commit 0178178

Browse files
authored
Make logger/event handlers configureable (#27)
* Make event_handler a config function * Convert Slack to use event handler instead of using slack directly * Notify about regular pipeline start/end * Also add user in non-interactive runs This also makes the function patchable in case we run in a built docker container and pass in the real "starter" of a run in an env variable... * Merge PipelineStart/EndEvents into RunStarted/Finished Also move slack notification event handler to its own package and rename event_base.py to events.py
1 parent 54b74dd commit 0178178

File tree

11 files changed

+186
-115
lines changed

11 files changed

+186
-115
lines changed

data_integration/config.py

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,9 @@
44
import multiprocessing
55
import pathlib
66
import typing
7+
import functools
78

8-
from . import pipelines
9+
from . import pipelines, events
910

1011

1112
def root_pipeline() -> 'pipelines.Pipeline':
@@ -63,14 +64,24 @@ def base_url() -> str:
6364
return 'http://127.0.0.1:5000/data-integration'
6465

6566

66-
def slack_token() -> str:
67+
def slack_token() -> typing.Optional[str]:
6768
"""
6869
When not None, then this slack webhook is notified of failed nodes.
6970
Slack channel's token (i.e. THISIS/ASLACK/TOCKEN) can be retrieved from the
7071
channel's app "Incoming WebHooks" configuration as part part of the Webhook URL
7172
"""
7273
return None
7374

75+
@functools.lru_cache(maxsize=None)
76+
def event_handlers() -> [events.EventHandler]:
77+
"""User specific envent handlers (mainly to notify chat systems)"""
78+
configured_handlers = []
79+
if slack_token():
80+
from data_integration.notification import slack
81+
configured_handlers.append(slack.Slack())
82+
return configured_handlers
83+
84+
7485
def password_masks() -> typing.List[str]:
7586
"""Any passwords which should be masked in the UI or logs"""
7687

data_integration/events.py

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
import json
2+
import abc
3+
import datetime
4+
5+
6+
class Event():
7+
def __init__(self) -> None:
8+
"""
9+
Base class for events that are emitted from mara.
10+
"""
11+
pass
12+
13+
def to_json(self):
14+
return json.dumps({field: value.isoformat() if isinstance(value, datetime.datetime) else value
15+
for field, value in self.__dict__.items()})
16+
17+
18+
class EventHandler(abc.ABC):
19+
@abc.abstractmethod
20+
def handle_event(self, event: Event):
21+
pass
22+
23+
24+
def notify_configured_event_handlers(event: Event):
25+
from . import config
26+
all_handlers = config.event_handlers()
27+
for handler in all_handlers:
28+
handler.handle_event(event)

data_integration/execution.py

Lines changed: 39 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -13,11 +13,14 @@
1313
from multiprocessing import queues
1414

1515
from . import pipelines, config
16-
from .logging import logger, events, system_statistics, run_log, node_cost, slack
16+
from .logging import logger, pipeline_events, system_statistics, run_log, node_cost
17+
from . import events
1718

1819

1920
def run_pipeline(pipeline: pipelines.Pipeline, nodes: {pipelines.Node} = None,
20-
with_upstreams: bool = False) -> [events.Event]:
21+
with_upstreams: bool = False,
22+
interactively_started: bool = False
23+
) -> [events.Event]:
2124
"""
2225
Runs a pipeline in a forked sub process. Acts as a generator that yields events from the sub process.
2326
@@ -129,17 +132,23 @@ def track_finished_pipelines():
129132
in dict(running_pipelines).items(): # type: pipelines.Pipeline
130133
if len(set(running_pipeline.nodes.values()) & processed_nodes) == len(running_pipeline.nodes):
131134
succeeded = running_pipeline not in failed_pipelines
132-
event_queue.put(events.Output(
135+
event_queue.put(pipeline_events.Output(
133136
node_path=running_pipeline.path(), format=logger.Format.ITALICS, is_error=not succeeded,
134137
message=f'{"succeeded" if succeeded else "failed"}, {logger.format_time_difference(run_start_time, datetime.datetime.now())}'))
135-
event_queue.put(events.NodeFinished(
138+
event_queue.put(pipeline_events.NodeFinished(
136139
node_path=running_pipeline.path(), start_time=start_time,
137140
end_time=datetime.datetime.now(), is_pipeline=True, succeeded=succeeded))
138141
del running_pipelines[running_pipeline]
139142
processed_nodes.add(running_pipeline)
140143

141144
# announce run start
142-
event_queue.put(events.RunStarted(node_path=pipeline.path(), start_time=run_start_time, pid=os.getpid()))
145+
event_queue.put(pipeline_events.RunStarted(node_path=pipeline.path(),
146+
start_time=run_start_time,
147+
pid=os.getpid(),
148+
interactively_started=interactively_started,
149+
node_ids=[node.id for node in (nodes or [])],
150+
is_root_pipeline=(pipeline.parent is None))
151+
)
143152

144153
# run as long
145154
# - as task processes are still running
@@ -173,8 +182,8 @@ def track_finished_pipelines():
173182
# book keeping and event emission
174183
pipeline_start_time = datetime.datetime.now()
175184
running_pipelines[next_node] = [pipeline_start_time, 0]
176-
event_queue.put(events.NodeStarted(next_node.path(), pipeline_start_time, True))
177-
event_queue.put(events.Output(
185+
event_queue.put(pipeline_events.NodeStarted(next_node.path(), pipeline_start_time, True))
186+
event_queue.put(pipeline_events.Output(
178187
node_path=next_node.path(), format=logger.Format.ITALICS,
179188
message='★ ' + node_cost.format_duration(
180189
node_durations_and_run_times.get(tuple(next_node.path()), [0, 0])[0])))
@@ -190,13 +199,13 @@ def track_finished_pipelines():
190199
queue([sub_pipeline])
191200

192201
except Exception as e:
193-
event_queue.put(events.NodeStarted(
202+
event_queue.put(pipeline_events.NodeStarted(
194203
node_path=next_node.path(), start_time=task_start_time, is_pipeline=True))
195204
logger.log(message=f'Could not launch parallel tasks', format=logger.Format.ITALICS,
196205
is_error=True)
197206
logger.log(message=traceback.format_exc(),
198-
format=events.Output.Format.VERBATIM, is_error=True)
199-
event_queue.put(events.NodeFinished(
207+
format=pipeline_events.Output.Format.VERBATIM, is_error=True)
208+
event_queue.put(pipeline_events.NodeFinished(
200209
node_path=next_node.path(), start_time=task_start_time,
201210
end_time=datetime.datetime.now(), is_pipeline=True, succeeded=False))
202211

@@ -209,8 +218,9 @@ def track_finished_pipelines():
209218
# run a task in a subprocess
210219
if next_node.parent in running_pipelines:
211220
running_pipelines[next_node.parent][1] += 1
212-
event_queue.put(events.NodeStarted(next_node.path(), datetime.datetime.now(), False))
213-
event_queue.put(events.Output(
221+
event_queue.put(
222+
pipeline_events.NodeStarted(next_node.path(), datetime.datetime.now(), False))
223+
event_queue.put(pipeline_events.Output(
214224
node_path=next_node.path(), format=logger.Format.ITALICS,
215225
message='★ ' + node_cost.format_duration(
216226
node_durations_and_run_times.get(tuple(next_node.path()), [0, 0])[0])))
@@ -238,12 +248,12 @@ def track_finished_pipelines():
238248

239249
end_time = datetime.datetime.now()
240250
event_queue.put(
241-
events.Output(task_process.task.path(),
242-
('succeeded' if succeeded else 'failed') + ', '
243-
+ logger.format_time_difference(task_process.start_time, end_time),
244-
format=logger.Format.ITALICS, is_error=not succeeded))
245-
event_queue.put(events.NodeFinished(task_process.task.path(), task_process.start_time,
246-
end_time, False, succeeded))
251+
pipeline_events.Output(task_process.task.path(),
252+
('succeeded' if succeeded else 'failed') + ', '
253+
+ logger.format_time_difference(task_process.start_time, end_time),
254+
format=logger.Format.ITALICS, is_error=not succeeded))
255+
event_queue.put(pipeline_events.NodeFinished(task_process.task.path(), task_process.start_time,
256+
end_time, False, succeeded))
247257

248258
# check if some pipelines finished
249259
track_finished_pipelines()
@@ -252,8 +262,8 @@ def track_finished_pipelines():
252262
time.sleep(0.001)
253263

254264
except:
255-
event_queue.put(events.Output(node_path=pipeline.path(), message=traceback.format_exc(),
256-
format=logger.Format.ITALICS, is_error=True))
265+
event_queue.put(pipeline_events.Output(node_path=pipeline.path(), message=traceback.format_exc(),
266+
format=logger.Format.ITALICS, is_error=True))
257267

258268
# run again because `dequeue` might have moved more nodes to `finished_nodes`
259269
track_finished_pipelines()
@@ -263,27 +273,23 @@ def track_finished_pipelines():
263273
statistics_process.join()
264274

265275
# run finished
266-
event_queue.put(events.RunFinished(node_path=pipeline.path(), end_time=datetime.datetime.now(),
267-
succeeded=not failed_pipelines))
276+
event_queue.put(pipeline_events.RunFinished(node_path=pipeline.path(), end_time=datetime.datetime.now(),
277+
succeeded=not failed_pipelines,
278+
interactively_started=interactively_started))
268279

269280
# fork the process and run `run`
270281
run_process = multiprocessing_context.Process(target=run, name='pipeline-' + '-'.join(pipeline.path()))
271282
run_process.start()
272283

273284
runlogger = run_log.RunLogger()
274-
event_handlers = [runlogger]
275-
276-
# todo: make event handlers configurable (e.g. for slack)
277-
if config.slack_token():
278-
event_handlers.append(slack.Slack())
279285

280286
# process messages from forked child processes
281287
while True:
282288
try:
283289
while not event_queue.empty():
284290
event = event_queue.get(False)
285-
for event_handler in event_handlers:
286-
event_handler.handle_event(event)
291+
runlogger.handle_event(event)
292+
events.notify_configured_event_handlers(event)
287293
yield event
288294
except queues.Empty:
289295
pass
@@ -294,10 +300,10 @@ def track_finished_pipelines():
294300
# Catching GeneratorExit needs to end in a return!
295301
return
296302
except:
297-
output_event = events.Output(node_path=pipeline.path(), message=traceback.format_exc(),
298-
format=logger.Format.ITALICS, is_error=True)
299-
for event_handler in event_handlers:
300-
event_handler.handle_event(output_event)
303+
output_event = pipeline_events.Output(node_path=pipeline.path(), message=traceback.format_exc(),
304+
format=logger.Format.ITALICS, is_error=True)
305+
runlogger.handle_event(output_event)
306+
events.notify_configured_event_handlers(output_event)
301307
yield output_event
302308
run_log.close_open_run_after_error(runlogger.run_id)
303309
return

data_integration/logging/logger.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,13 +4,13 @@
44
import sys
55
from datetime import datetime
66

7-
from ..logging import events
7+
from ..logging import pipeline_events
88
import data_integration.config
99

10-
Format = events.Output.Format
10+
Format = pipeline_events.Output.Format
1111

1212

13-
def log(message: str, format: events.Output.Format = Format.STANDARD,
13+
def log(message: str, format: pipeline_events.Output.Format = Format.STANDARD,
1414
is_error: bool = False) -> None:
1515
"""
1616
Logs text messages.
@@ -32,7 +32,7 @@ def log(message: str, format: events.Output.Format = Format.STANDARD,
3232
message = message.replace(mask, '***')
3333
if message:
3434
if _event_queue:
35-
_event_queue.put(events.Output(_node_path, message, format, is_error))
35+
_event_queue.put(pipeline_events.Output(_node_path, message, format, is_error))
3636
elif is_error:
3737
sys.stderr.write(message + '\n')
3838
else:

data_integration/logging/events.py renamed to data_integration/logging/pipeline_events.py

Lines changed: 40 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1,27 +1,12 @@
11
"""Events that are emitted during pipeline execution"""
22

3-
import abc
43
import datetime
54
import json
65

76
import enum
7+
import typing as t
88

9-
10-
class Event():
11-
def __init__(self) -> None:
12-
"""
13-
Base class for events that are emitted from mara.
14-
"""
15-
16-
def to_json(self):
17-
return json.dumps({field: value.isoformat() if isinstance(value, datetime.datetime) else value
18-
for field, value in self.__dict__.items()})
19-
20-
21-
class EventHandler(abc.ABC):
22-
@abc.abstractmethod
23-
def handle_event(self, event: Event):
24-
pass
9+
from ..events import Event
2510

2611

2712
class PipelineEvent(Event):
@@ -42,33 +27,51 @@ def to_json(self):
4227

4328

4429
class RunStarted(PipelineEvent):
45-
def __init__(self, node_path: [str], start_time: datetime.datetime, pid: int) -> None:
30+
def __init__(self, node_path: [str],
31+
start_time: datetime.datetime,
32+
pid: int,
33+
is_root_pipeline: bool = False,
34+
node_ids: t.Optional[t.List[str]] = None,
35+
interactively_started: bool = False) -> None:
4636
"""
4737
A pipeline run started
4838
Args:
4939
node_path: The path of the pipeline that was run
5040
start_time: The time when the run started
5141
pid: The process id of the process that runs the pipeline
42+
node_ids: list of node.ids which should be run
43+
is_root_pipeline: whether this pipeline run runs the root pipeline
44+
interactively_started: whether or not the run was started interactively
5245
"""
5346
super().__init__([])
5447
self.node_path = node_path
5548
self.start_time = start_time
5649
self.pid = pid
50+
self.interactively_started = interactively_started
51+
self.is_root_pipeline = is_root_pipeline
52+
self.node_ids = node_ids or []
53+
54+
self.user: str = get_user_display_name(interactively_started)
5755

5856

5957
class RunFinished(PipelineEvent):
60-
def __init__(self, node_path: [str], end_time: datetime.datetime, succeeded: bool) -> None:
58+
def __init__(self, node_path: [str],
59+
end_time: datetime.datetime,
60+
succeeded: bool,
61+
interactively_started: bool = False) -> None:
6162
"""
6263
A pipeline run finished
6364
Args:
6465
node_path: The path of the pipeline that was run
6566
end_time: The time when the run finished
6667
succeeded: Whether the run succeeded
68+
interactively_started: whether or not the run was started interactively
6769
"""
6870
super().__init__([])
6971
self.node_path = node_path
7072
self.end_time = end_time
7173
self.succeeded = succeeded
74+
self.interactively_started = interactively_started
7275

7376

7477
class NodeStarted(PipelineEvent):
@@ -126,3 +129,21 @@ def __init__(self, node_path: [str], message: str,
126129
self.format = format
127130
self.is_error = is_error
128131
self.timestamp = datetime.datetime.now()
132+
133+
134+
def get_user_display_name(interactively_started:bool) -> t.Optional[str]:
135+
"""Gets the display name for the user which started a run
136+
137+
Defaults to MARA_RUN_USER_DISPLAY_NAME and falls back to the current OS-level name
138+
if the run was started interactively, else None.
139+
140+
:param interactively_started: True if the run was triggered interactively
141+
142+
Patch if you have more sophisticated needs.
143+
"""
144+
import os
145+
if 'MARA_RUN_USER_DISPLAY_NAME' in os.environ:
146+
return os.environ.get('MARA_RUN_USER_DISPLAY_NAME')
147+
if not interactively_started:
148+
return None
149+
return os.environ.get('SUDO_USER') or os.environ.get('USER') or os.getlogin()

0 commit comments

Comments
 (0)