Skip to content

Commit 367e8ba

Browse files
authored
Event logging in case of error (#33)
* Notify when the runlogger.handle_event() throws an exception The can happen e.g. when the mara DB is not available for some reason * Make event handling robust against exceptions during handling * Make the commandline only succeed if a RunFinished event arrived * Clean the message cache when a pipeline run starts and finishes Now that we keep the event handlers arround and not recreate them on each pipeline run we have to clean up the state. This will screw the case where we have two runs next to each other *in the browser*. IMO thats anyways not supported... fixing thid requires giving all events a run id... * Use OutputEvent and remove GenericExceptionEvent
1 parent 05f1eba commit 367e8ba

File tree

5 files changed

+123
-79
lines changed

5 files changed

+123
-79
lines changed

data_integration/events.py

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import json
22
import abc
33
import datetime
4+
import sys
45

56

67
class Event():
@@ -23,6 +24,14 @@ def handle_event(self, event: Event):
2324

2425
def notify_configured_event_handlers(event: Event):
2526
from . import config
26-
all_handlers = config.event_handlers()
27+
try:
28+
all_handlers = config.event_handlers()
29+
except BaseException as e:
30+
print(f"Exception while getting configured event handlers: {repr(e)}", file=sys.stderr)
31+
return
32+
2733
for handler in all_handlers:
28-
handler.handle_event(event)
34+
try:
35+
handler.handle_event(event)
36+
except BaseException as e:
37+
print(f"Handler {repr(handler)} could report about {repr(event)}: {repr(e)}", file=sys.stderr)

data_integration/execution.py

Lines changed: 42 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
import functools
99
import multiprocessing
1010
import os
11+
import sys
1112
import signal
1213
import time
1314
import traceback
@@ -284,13 +285,24 @@ def track_finished_pipelines():
284285

285286
runlogger = run_log.RunLogger()
286287

288+
def _notify_all(event):
289+
try:
290+
runlogger.handle_event(event)
291+
except BaseException as e:
292+
# This includes the case when the mara DB is not reachable when writing the event.
293+
# Not sure if we should just ignore that, but at least get other notifications
294+
# out in case of an error
295+
events.notify_configured_event_handlers(event)
296+
# this will notify the UI in case of a problem later on
297+
raise e
298+
events.notify_configured_event_handlers(event)
299+
287300
# process messages from forked child processes
288301
while True:
289302
try:
290303
while not event_queue.empty():
291304
event = event_queue.get(False)
292-
runlogger.handle_event(event)
293-
events.notify_configured_event_handlers(event)
305+
_notify_all(event)
294306
yield event
295307
except queues.Empty:
296308
pass
@@ -301,12 +313,35 @@ def track_finished_pipelines():
301313
# Catching GeneratorExit needs to end in a return!
302314
return
303315
except:
304-
output_event = pipeline_events.Output(node_path=pipeline.path(), message=traceback.format_exc(),
305-
format=logger.Format.ITALICS, is_error=True)
306-
runlogger.handle_event(output_event)
307-
events.notify_configured_event_handlers(output_event)
316+
def _create_exception_output_event(msg: str = ''):
317+
if msg:
318+
msg = msg + '\n'
319+
return pipeline_events.Output(node_path=pipeline.path(), message=msg + traceback.format_exc(),
320+
format=logger.Format.ITALICS, is_error=True)
321+
322+
output_event = _create_exception_output_event()
323+
exception_events = []
324+
try:
325+
_notify_all(output_event)
326+
except BaseException as e:
327+
# we are already in the generic exception handler, so we cannot do anything
328+
# if we still fail, as we have to get to the final close_open_run_after_error()
329+
# and 'return'...
330+
msg = "Could not notify about final output event"
331+
exception_events.append(_create_exception_output_event(msg))
308332
yield output_event
309-
run_log.close_open_run_after_error(runlogger.run_id)
333+
try:
334+
run_log.close_open_run_after_error(runlogger.run_id)
335+
except BaseException as e:
336+
msg = "Exception during 'close_open_run_after_error()'"
337+
exception_events.append(_create_exception_output_event(msg))
338+
339+
# At least try to notify the UI
340+
for e in exception_events:
341+
print(f"{repr(e)}", file=sys.stderr)
342+
yield e
343+
events.notify_configured_event_handlers(e)
344+
310345
return
311346
if not run_process.is_alive():
312347
break

data_integration/logging/pipeline_events.py

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -21,10 +21,6 @@ def __init__(self, node_path: [str]) -> None:
2121
super().__init__()
2222
self.node_path = node_path
2323

24-
def to_json(self):
25-
return json.dumps({field: value.isoformat() if isinstance(value, datetime.datetime) else value
26-
for field, value in self.__dict__.items()})
27-
2824

2925
class RunStarted(PipelineEvent):
3026
def __init__(self, node_path: [str],
@@ -131,7 +127,7 @@ def __init__(self, node_path: [str], message: str,
131127
self.timestamp = datetime.datetime.now()
132128

133129

134-
def get_user_display_name(interactively_started:bool) -> t.Optional[str]:
130+
def get_user_display_name(interactively_started: bool) -> t.Optional[str]:
135131
"""Gets the display name for the user which started a run
136132
137133
Defaults to MARA_RUN_USER_DISPLAY_NAME and falls back to the current OS-level name
Lines changed: 66 additions & 62 deletions
Original file line numberDiff line numberDiff line change
@@ -1,62 +1,66 @@
1-
import abc
2-
3-
from data_integration import events
4-
from data_integration.logging import pipeline_events
5-
6-
7-
class ChatNotifier(events.EventHandler, abc.ABC):
8-
9-
def __init__(self):
10-
""" Abstract class for sending notifications to chat bots when pipeline errors occur"""
11-
12-
# keep a list of log messages and error log messsages for each node
13-
self.node_output: {tuple: {bool: [events.Event]}} = None
14-
15-
16-
def handle_event(self, event: events.Event):
17-
"""
18-
Send notifications for failed tasks and interactively started pipelines
19-
20-
Args:
21-
event: The current event of interest
22-
"""
23-
24-
if isinstance(event, pipeline_events.Output):
25-
# collect the output and error output of each node so that it can be shown if something fails
26-
key = tuple(event.node_path)
27-
28-
if not self.node_output:
29-
self.node_output = {}
30-
31-
if not key in self.node_output:
32-
self.node_output[key] = {True: [], False: []}
33-
34-
self.node_output[key][event.is_error].append(event)
35-
36-
elif isinstance(event, pipeline_events.NodeFinished):
37-
if not event.succeeded and event.is_pipeline is False:
38-
self.send_task_failed_message(event)
39-
40-
41-
elif isinstance(event, pipeline_events.RunStarted):
42-
if event.interactively_started:
43-
self.send_run_started_interactively_message(event)
44-
45-
elif isinstance(event, pipeline_events.RunFinished):
46-
if event.interactively_started:
47-
self.send_run_finished_interactively_message(event)
48-
49-
@abc.abstractmethod
50-
def send_run_started_interactively_message(self, event: pipeline_events.RunStarted):
51-
"""Send a notification that somebody manually triggered the run of a pipeline"""
52-
pass
53-
54-
@abc.abstractmethod
55-
def send_run_finished_interactively_message(self, event: pipeline_events.RunFinished):
56-
"""Send a notification that a manually triggered pipeline run finished"""
57-
pass
58-
59-
@abc.abstractmethod
60-
def send_task_failed_message(self, event: pipeline_events.NodeFinished):
61-
"""Send a notification that a task failed"""
62-
pass
1+
import abc
2+
3+
from data_integration import events
4+
from data_integration.logging import pipeline_events
5+
6+
7+
class ChatNotifier(events.EventHandler, abc.ABC):
8+
9+
def __init__(self):
10+
""" Abstract class for sending notifications to chat bots when pipeline errors occur"""
11+
12+
# keep a list of log messages and error log messages for each node
13+
self.node_output: {tuple: {bool: [events.Event]}} = None
14+
15+
16+
def handle_event(self, event: events.Event):
17+
"""
18+
Send notifications for failed tasks and interactively started pipelines
19+
20+
Args:
21+
event: The current event of interest
22+
"""
23+
24+
if isinstance(event, pipeline_events.Output):
25+
# collect the output and error output of each node so that it can be shown if something fails
26+
key = tuple(event.node_path)
27+
28+
if not self.node_output:
29+
self.node_output = {}
30+
31+
if not key in self.node_output:
32+
self.node_output[key] = {True: [], False: []}
33+
34+
self.node_output[key][event.is_error].append(event)
35+
36+
elif isinstance(event, pipeline_events.NodeFinished):
37+
if not event.succeeded and event.is_pipeline is False:
38+
self.send_task_failed_message(event)
39+
40+
41+
elif isinstance(event, pipeline_events.RunStarted):
42+
if event.interactively_started:
43+
self.send_run_started_interactively_message(event)
44+
# reset the saved outputs, just to be sure...
45+
self.node_output = None
46+
47+
elif isinstance(event, pipeline_events.RunFinished):
48+
if event.interactively_started:
49+
self.send_run_finished_interactively_message(event)
50+
# reset the saved outputs
51+
self.node_output = None
52+
53+
@abc.abstractmethod
54+
def send_run_started_interactively_message(self, event: pipeline_events.RunStarted):
55+
"""Send a notification that somebody manually triggered the run of a pipeline"""
56+
pass
57+
58+
@abc.abstractmethod
59+
def send_run_finished_interactively_message(self, event: pipeline_events.RunFinished):
60+
"""Send a notification that a manually triggered pipeline run finished"""
61+
pass
62+
63+
@abc.abstractmethod
64+
def send_task_failed_message(self, event: pipeline_events.NodeFinished):
65+
"""Send a notification that a task failed"""
66+
pass

data_integration/ui/cli.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -42,15 +42,15 @@ def run_pipeline(pipeline: pipelines.Pipeline, nodes: {pipelines.Node} = None,
4242

4343
theme = plain if disable_colors else colorful
4444

45-
succeeded = True
45+
succeeded = False
4646
for event in execution.run_pipeline(pipeline, nodes, with_upstreams, interactively_started=interactively_started):
4747
if isinstance(event, pipeline_events.Output):
4848
print(f'{theme[PATH_COLOR]}{" / ".join(event.node_path)}{":" if event.node_path else ""}{theme[RESET_ALL]} '
4949
+ theme[event.format] + (theme[ERROR_COLOR] if event.is_error else '')
5050
+ event.message + theme[RESET_ALL])
5151
elif isinstance(event, pipeline_events.RunFinished):
52-
if not event.succeeded:
53-
succeeded = False
52+
if event.succeeded:
53+
succeeded = True
5454

5555
return succeeded
5656

0 commit comments

Comments
 (0)