Skip to content

Commit 395b23c

Browse files
committed
add scaffolding
Signed-off-by: Massy Bourennani <[email protected]>
1 parent f26d822 commit 395b23c

File tree

6 files changed

+76
-3
lines changed

6 files changed

+76
-3
lines changed

core/dbt/cli/requires.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
from dbt.exceptions import DbtProjectError, FailFastError
3131
from dbt.flags import get_flag_dict, get_flags, set_flags
3232
from dbt.mp_context import get_mp_context
33+
from dbt.openlineage import OpenLineageHandler
3334
from dbt.parser.manifest import parse_manifest
3435
from dbt.plugins import set_up_plugin_manager
3536
from dbt.profiler import profiler
@@ -77,7 +78,9 @@ def wrapper(*args, **kwargs):
7778
reset_invocation_id()
7879

7980
# Logging
80-
callbacks = ctx.obj.get("callbacks", [])
81+
ol_handler = OpenLineageHandler()
82+
# todo replace here (conditional on OL config defined)
83+
callbacks = ctx.obj.get("callbacks", [ol_handler.emit])
8184
setup_event_logger(flags=flags, callbacks=callbacks)
8285

8386
# Tracking

core/dbt/events/core_types.proto

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2262,3 +2262,14 @@ message ArtifactUploadSkippedMsg {
22622262
CoreEventInfo info = 1;
22632263
ArtifactUploadSkipped data = 2;
22642264
}
2265+
2266+
// Z064
2267+
message OpenLineageException {
2268+
string exc = 1;
2269+
string exc_info = 2;
2270+
}
2271+
2272+
message OpenLineageExceptionMsg {
2273+
CoreEventInfo info = 1;
2274+
OpenLineageException data = 2;
2275+
}

core/dbt/events/core_types_pb2.py

Lines changed: 16 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

core/dbt/events/types.py

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2212,3 +2212,16 @@ def code(self) -> str:
22122212

22132213
def message(self) -> str:
22142214
return f"Artifacts skipped for command : {self.msg}"
2215+
2216+
2217+
# todo add a Test on this
2218+
# https://github.com/dbt-labs/dbt-core/blob/main/core/dbt/events/README.md#adding-a-new-event
2219+
class OpenLineageException(WarnLevel):
2220+
def code(self) -> str:
2221+
return "Z064"
2222+
2223+
def message(self):
2224+
return (
2225+
f"Encountered an error while creating OpenLineageEvent: {self.exc}\n"
2226+
f"{self.exc_info}"
2227+
)

core/dbt/openlineage/__init__.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
from .handler import OpenLineageHandler

core/dbt/openlineage/handler.py

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
import traceback
2+
3+
from dbt.events.types import OpenLineageException
4+
from dbt_common.events.base_types import EventMsg
5+
from dbt_common.events.functions import fire_event
6+
7+
8+
class OpenLineageHandler:
9+
10+
def emit(self, e: EventMsg):
11+
"""
12+
callback passed to the eventManager
13+
"""
14+
if e.info.name == "OpenLineageException":
15+
# not relevant to us
16+
return
17+
try:
18+
self.emit_unsafe(e)
19+
except Exception as e:
20+
self._handle_exception(e)
21+
22+
def emit_unsafe(self, e: EventMsg):
23+
# we only emit events for some family of structured logs not all of them
24+
# import pdb; pdb.set_trace()
25+
26+
# todo add OL handling here
27+
28+
pass
29+
30+
def _handle_exception(self, e: Exception):
31+
fire_event(OpenLineageException(exc=str(e), exc_info=traceback.format_exc()))

0 commit comments

Comments
 (0)