-
Notifications
You must be signed in to change notification settings - Fork 2.2k
Contribute Openlineage to dbt-core
#11688
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
|
Thanks for your pull request, and welcome to our community! We require contributors to sign our Contributor License Agreement and we don't seem to have your signature on file. Check out this article for more information on why we have a CLA. In order for us to review and merge your code, please submit the Individual Contributor License Agreement form attached above above. If you have questions about the CLA, or if you believe you've received this message in error, please reach out through a comment on this PR. CLA has not been signed by users: @MassyB |
|
Thank you for your pull request! We could not find a changelog entry for this change. For details on how to document a change, see the contributing guide. |
a84a7ec to
9ccb1ed
Compare
|
Thanks for your pull request, and welcome to our community! We require contributors to sign our Contributor License Agreement and we don't seem to have your signature on file. Check out this article for more information on why we have a CLA. In order for us to review and merge your code, please submit the Individual Contributor License Agreement form attached above above. If you have questions about the CLA, or if you believe you've received this message in error, please reach out through a comment on this PR. CLA has not been signed by users: @MassyB |
9ccb1ed to
f26d822
Compare
|
Thanks for your pull request, and welcome to our community! We require contributors to sign our Contributor License Agreement and we don't seem to have your signature on file. Check out this article for more information on why we have a CLA. In order for us to review and merge your code, please submit the Individual Contributor License Agreement form attached above above. If you have questions about the CLA, or if you believe you've received this message in error, please reach out through a comment on this PR. CLA has not been signed by users: @MassyB |
4fc0388 to
d5c0d5c
Compare
|
Thanks for your pull request, and welcome to our community! We require contributors to sign our Contributor License Agreement and we don't seem to have your signature on file. Check out this article for more information on why we have a CLA. In order for us to review and merge your code, please submit the Individual Contributor License Agreement form attached above above. If you have questions about the CLA, or if you believe you've received this message in error, please reach out through a comment on this PR. CLA has not been signed by users: @MassyB |
d5c0d5c to
d77afdd
Compare
|
Thanks for your pull request, and welcome to our community! We require contributors to sign our Contributor License Agreement and we don't seem to have your signature on file. Check out this article for more information on why we have a CLA. In order for us to review and merge your code, please submit the Individual Contributor License Agreement form attached above above. If you have questions about the CLA, or if you believe you've received this message in error, please reach out through a comment on this PR. CLA has not been signed by users: @MassyB |
d77afdd to
4167064
Compare
|
Thanks for your pull request, and welcome to our community! We require contributors to sign our Contributor License Agreement and we don't seem to have your signature on file. Check out this article for more information on why we have a CLA. In order for us to review and merge your code, please submit the Individual Contributor License Agreement form attached above above. If you have questions about the CLA, or if you believe you've received this message in error, please reach out through a comment on this PR. CLA has not been signed by users: @MassyB |
4167064 to
9e2209b
Compare
|
Thanks for your pull request, and welcome to our community! We require contributors to sign our Contributor License Agreement and we don't seem to have your signature on file. Check out this article for more information on why we have a CLA. In order for us to review and merge your code, please submit the Individual Contributor License Agreement form attached above above. If you have questions about the CLA, or if you believe you've received this message in error, please reach out through a comment on this PR. CLA has not been signed by users: @MassyB |
9e2209b to
a1d67cf
Compare
|
Thanks for your pull request, and welcome to our community! We require contributors to sign our Contributor License Agreement and we don't seem to have your signature on file. Check out this article for more information on why we have a CLA. In order for us to review and merge your code, please submit the Individual Contributor License Agreement form attached above above. If you have questions about the CLA, or if you believe you've received this message in error, please reach out through a comment on this PR. CLA has not been signed by users: @MassyB |
a1d67cf to
fd8fea2
Compare
|
Thanks for your pull request, and welcome to our community! We require contributors to sign our Contributor License Agreement and we don't seem to have your signature on file. Check out this article for more information on why we have a CLA. In order for us to review and merge your code, please submit the Individual Contributor License Agreement form attached above above. If you have questions about the CLA, or if you believe you've received this message in error, please reach out through a comment on this PR. CLA has not been signed by users: @MassyB |
|
|
||
| def add_to_parser(self, parser: OptionParser, ctx: Context): | ||
| def parser_process(value: str, state: ParsingState): | ||
| @t.no_type_check |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
my pre-commit mypy step was failing on this so I added some annotations and mypy ignore comments
fd8fea2 to
4e338af
Compare
| ol_handler = OpenLineageHandler(ctx) | ||
| callbacks = ctx.obj.get("callbacks", []) | ||
| if is_runnable_dbt_command(flags): | ||
| callbacks.append(ol_handler.handle) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is where the OL callback is added
| ) | ||
|
|
||
|
|
||
| ALL_PROTO_TYPES: Dict[str, Any] = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
useful to convert a dict to an actual type defined in proto
| return f"Artifacts skipped for command : {self.msg}" | ||
|
|
||
|
|
||
| class OpenLineageException(WarnLevel): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
now that all the events have been moved to https://github.com/dbt-labs/proto-python-public
How do we do to add an event ?
the documentation still references core_types.proto but I couldn’t find it
| @@ -0,0 +1,410 @@ | |||
| import traceback | |||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is the most important part of the PR where we construct OL events out of the dbt structured logs
| "pydantic<2", | ||
| # ---- | ||
| # OpenLineage Dependencies | ||
| "openlineage-python==1.30.1", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
only the python client is added from OL
| return ParseDict(e, msg_cls()) | ||
|
|
||
|
|
||
| def assert_ol_events_match(expected_event, actual_event): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the main function used in functional tests to assert that two sets of events are the same.
The interesting part is the usage of regex-like feature where patterns like {{ .* }} is used to match a given string.
You can use a regex by enclosing it like so
{{<space><YOUR-REGEX-HERE><space>}}
| try: | ||
| self.handle_unsafe(e) | ||
| except Exception as exception: | ||
| self._handle_exception(exception) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
all exceptions related to OL are non critical. They don't make dbt fail
| self._handle_exception(exception) | ||
|
|
||
| def _handle_exception(self, e: Exception): | ||
| fire_event(OpenLineageException(exc=str(e), exc_info=traceback.format_exc())) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I need help to add this new event OpenLineageException following the new public proto
| @@ -0,0 +1,1010 @@ | |||
| [ | |||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is an example of OL events generated
| @@ -0,0 +1,1010 @@ | |||
| [ | |||
| { | |||
| "eventTime":"{{ .* }}", | |||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
regexes have to be defined following:
{{<space><REGEX><space>}}
Signed-off-by: Massy Bourennani <[email protected]>
Signed-off-by: Massy Bourennani <[email protected]>
6dd1a3d to
b9b0e9c
Compare
dbt-coredbt-core
dbt-coredbt-core
Resolves #11750
TL;DR
This PR integrates dbt with Openlineage. It unlocks lineage tracking and observability of the dbt pipelines.
Openlineage is an open source standard. From its main page:
Openlineage (OL) defines events according to a specification. This PR constructs those OL events by consuming the dbt structured logs and sends them to an endpoint.
The endpoint that consumes OL events is totally configurable by the user. It can be Marquez, Datadog or something else. Examples in this PR are using Datadog.
Problem
Let's build the jaffle shop project using the following command
We have this output
I've truncated the output but:
This output doesn't tell us the SQL queries executed by every model. We can use the
--debugfor that:We have the following output:
12:11:28 Running with dbt=1.9.0 12:11:28 running dbt with arguments {'printer_width': '80', 'indirect_selection': 'eager', 'write_json': 'True', 'log_cache_events': 'False', 'partial_parse': 'True', 'cache_selected_only': 'False', 'profiles_dir': '/usr/src/dbt/profiles', 'version_check': 'True', 'warn_error': 'None', 'log_path': '/usr/src/dbt/logs', 'fail_fast': 'False', 'debug': 'True', 'use_colors': 'False', 'use_experimental_parser': 'False', 'no_print': 'None', 'quiet': 'False', 'empty': 'False', 'warn_error_options': 'WarnErrorOptions(include=[], exclude=[])', 'introspect': 'True', 'invocation_command': 'dbt build --debug', 'log_format': 'default', 'target_path': 'None', 'static_parser': 'True', 'send_anonymous_usage_stats': 'False'} 12:11:28 [WARNING]: Deprecated functionality User config should be moved from the 'config' key in profiles.yml to the 'flags' key in dbt_project.yml. 12:11:29 Registered adapter: postgres=1.9.0 12:11:30 checksum: c99e828bba267739642b5a3ce85f17518764ea526e0e6c4fdc649171c1a66bff, vars: {}, profile: , target: , version: 1.9.0 12:11:31 Partial parsing enabled: 0 files deleted, 0 files added, 0 files changed. 12:11:31 Partial parsing enabled, no changes found, skipping parsing 12:11:32 Wrote artifact WritableManifest to /usr/src/dbt/target/manifest.json 12:11:32 Wrote artifact SemanticManifest to /usr/src/dbt/target/semantic_manifest.json 12:11:33 Found 5 models, 1 snapshot, 3 seeds, 23 data tests, 1 source, 434 macros 12:11:33 12:11:33 Concurrency: 2 threads (target='pg') 12:11:33 12:11:33 Acquiring new postgres connection 'master' 12:11:33 Acquiring new postgres connection 'list_postgres' 12:11:33 Acquiring new postgres connection 'list_postgres' 12:11:33 Using postgres connection "list_postgres" 12:11:33 Using postgres connection "list_postgres" 12:11:33 On list_postgres: /* {"app": "dbt", "dbt_version": "1.9.0", "profile_name": "jaffle_shop", "target_name": "pg", "connection_name": "list_postgres"} */ select distinct nspname from pg_namespace 12:11:33 On list_postgres: /* {"app": "dbt", "dbt_version": "1.9.0", "profile_name": "jaffle_shop", "target_name": "pg", "connection_name": "list_postgres"} */ select distinct nspname from pg_namespace 12:11:33 Opening a new connection, currently in state init 12:11:33 Opening a new connection, currently in state init 12:11:33 SQL status: SELECT 11 in 0.136 seconds 12:11:33 SQL status: SELECT 11 in 0.134 seconds 12:11:33 On list_postgres: Close 12:11:33 On list_postgres: Close 12:11:33 Re-using an available connection from the pool (formerly list_postgres, now list_postgres_snapshots) 12:11:33 Re-using an available connection from the pool (formerly list_postgres, now list_postgres_public) 12:11:33 Using postgres connection "list_postgres_snapshots" 12:11:33 Using postgres connection "list_postgres_public" 12:11:33 On list_postgres_snapshots: BEGIN 12:11:33 On list_postgres_public: BEGIN 12:11:33 Opening a new connection, currently in state closed 12:11:33 Opening a new connection, currently in state closed 12:11:33 SQL status: BEGIN in 0.086 seconds 12:11:33 SQL status: BEGIN in 0.084 seconds 12:11:33 Using postgres connection "list_postgres_snapshots" 12:11:33 Using postgres connection "list_postgres_public" .... 12:11:42 On test.jaffle_shop.unique_customers_customer_id.c5af1ff4b1: ROLLBACK 12:11:42 SQL status: SELECT 1 in 0.002 seconds 12:11:42 On test.jaffle_shop.unique_customers_customer_id.c5af1ff4b1: Close 12:11:42 On test.jaffle_shop.accepted_values_orders_status__placed__shipped__completed__return_pending__returned.be6b5b5ec3: ROLLBACK 12:11:42 21 of 32 PASS unique_customers_customer_id ..................................... [PASS in 0.43s] 12:11:42 On test.jaffle_shop.accepted_values_orders_status__placed__shipped__completed__return_pending__returned.be6b5b5ec3: Close 12:11:42 Finished running node test.jaffle_shop.unique_customers_customer_id.c5af1ff4b1 12:11:42 22 of 32 PASS accepted_values_orders_status__placed__shipped__completed__return_pending__returned [PASS in 0.45s] 12:11:42 Began running node test.jaffle_shop.foo_bar_test_orders_bim.45fc81421f 12:11:42 Finished running node test.jaffle_shop.accepted_values_orders_status__placed__shipped__completed__return_pending__returned.be6b5b5ec3 12:11:42 23 of 32 START test foo_bar_test_orders_bim .................................... [RUN] 12:11:42 Began running node test.jaffle_shop.not_null_orders_amount.106140f9fd ... 12:11:45 Done. PASS=32 WARN=0 ERROR=0 SKIP=0 TOTAL=32 12:11:45 Resource report: {"command_name": "build", "command_success": true, "command_wall_clock_time": 17.469067, "process_in_blocks": "0", "process_kernel_time": 0.741213, "process_mem_max_rss": "176572", "process_out_blocks": "0", "process_user_time": 22.572857} 12:11:45 Command `dbt build` succeeded at 12:11:45.380948 after 17.48 secondsObservability of the dbt pipeline is not ideal:
This PR is about enhancing the observability of dbt pipelines and addressing the shortcomings mentioned above.
Solution
Instead of relying on the textual logs to report progress of the dbt pipeline, This PR integrates dbt-core with Openlineage. Like what has been done for Apache Airflow.
Below are examples on how we leverage those OL events in Datadog to report on the progression of dbt pipelines.
When running:
In the waterfall view we can see:
This is when we build the entire jaffle shop project:
An interesting flame graph view when the jaffle shop project is executed using two threads
PR details
You can see a presentation of the integration in this short YT video (relevant part is ~10 minutes long). Be sure to check the linked PRs in order to have more context.
in a nutshell this PR adds a callback that listens for particular dbt structured logs events.
for each of those events an OL event is generated and emitted.
How to test
This PR adds functional tests that checks the generated OL events against expected ones.
You can run them by setting up a dev environment and execute the following command
pytest "tests/functional/openlineage/openlineage_project.py"If there is a failure you will get a json-path-like to the attribute that has a discrepancy.
For unit tests you can run
pytest "tests/unit/openlineage/"Linked PRs/Issues
Additional context form the Openlineage repository
testandbuildcommands OpenLineage/OpenLineage#3362testandbuildcommands OpenLineage/OpenLineage#3362Checklist
PS
Perhaps the most important motivation of this PR: tell your dbt teammate @le-brice Massy says hi.