Skip to content

Commit 03526d2

Browse files
authored
Time dagster asset loading (#4669)
* update deps * chore: update oso_core deps * feat(oso_core): use structured logging from structlog * fix oso_core logging * fix: allow for legacy definition import timing * fix(oso_dagster): allow early asset factories to provide useful metadata * feat(oso_dagster): asset loading * fix(oso_dagster): decorated functions should use functools.wraps * chore(ops): use legacy definitions file * fix env vars * improve docs * fix some old references * fix(oso_dagster): add some logging when using deferred table loading * add some docs * small clean up * doc fix * doc fix
1 parent 69076a4 commit 03526d2

File tree

18 files changed

+1750
-1595
lines changed

18 files changed

+1750
-1595
lines changed

lib/oso-core/oso_core/logging/__init__.py

Lines changed: 5 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,13 @@
11
import logging
22
import os
3-
import sys
43
import typing as t
54

6-
import colorlog
5+
import structlog
6+
from oso_core.logging.defaults import configure_structured_logging
77

88
connected_to_application_logs = False
99

10-
logger = logging.getLogger(__name__)
10+
logger = structlog.get_logger(__name__)
1111

1212

1313
def add_oso_core_to_current_application_logging():
@@ -69,28 +69,9 @@ def setup_multiple_modules_logging(module_names: t.List[str]):
6969
def setup_module_logging(
7070
module_name: str,
7171
level: int = logging.DEBUG,
72-
override_format: str = "",
7372
color: bool = False,
7473
):
74+
configure_structured_logging()
75+
7576
logger = logging.getLogger(module_name)
7677
logger.setLevel(level) # Adjust the level as needed
77-
78-
# Create a handler that logs to stdout
79-
if color:
80-
format = "%(asctime)s - %(log_color)s%(levelname)-8s%(reset)s - %(name)s - %(message)s"
81-
stdout_handler = colorlog.StreamHandler(sys.stdout)
82-
formatter = colorlog.ColoredFormatter(format, datefmt="%Y-%m-%dT%H:%M:%S")
83-
else:
84-
format = "%(asctime)s - %(levelname)-8s - %(name)s - %(message)s"
85-
stdout_handler = logging.StreamHandler(sys.stdout)
86-
formatter = logging.Formatter(format, datefmt="%Y-%m-%dT%H:%M:%S")
87-
stdout_handler.setLevel(level) # Adjust the level as needed
88-
89-
# Add the filter to the handler
90-
stdout_handler.addFilter(ModuleFilter(module_name))
91-
92-
# Set a formatter (optional)
93-
stdout_handler.setFormatter(formatter)
94-
95-
# Add the handler to the logger
96-
logger.addHandler(stdout_handler)

lib/oso-core/oso_core/logging/decorators.py

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,10 +6,12 @@
66
import logging
77

88

9-
def time_function(logger: logging.Logger):
9+
def time_function(logger: logging.Logger, override_name: str = ""):
1010
"""Decorator to time a function and log the duration."""
1111

1212
def decorator(func):
13+
name = override_name or func.__name__
14+
1315
@functools.wraps(func)
1416
def wrapper(*args, **kwargs):
1517
import time
@@ -18,7 +20,11 @@ def wrapper(*args, **kwargs):
1820
result = func(*args, **kwargs)
1921
end_time = time.time()
2022
duration = end_time - start_time
21-
logger.info(f"Function {func.__name__} took {duration:.4f} seconds")
23+
duration_ms = duration * 1000
24+
logger.info(
25+
f"Function `{name}` took {duration_ms:.4f} milliseconds.",
26+
extra=dict(event_type="timing", duration_ms=duration_ms),
27+
)
2228
return result
2329

2430
return wrapper
Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
import logging
2+
3+
import structlog
4+
5+
STRUCTURED_LOGGING_CONFIGURED = False
6+
7+
8+
def flatten_extra_field(
9+
logger: logging.Logger, log_method: str, event_dict: structlog.types.EventDict
10+
):
11+
"""This flattens the `extra` dict that is allowed in stdlib's logging so
12+
that it appears as top level items in the log message."""
13+
if "extra" in event_dict:
14+
for key, value in event_dict["extra"].items():
15+
event_dict[key] = value
16+
del event_dict["extra"]
17+
return event_dict
18+
19+
20+
def ensure_event_type(
21+
logger: logging.Logger, log_method: str, event_dict: structlog.types.EventDict
22+
):
23+
"""This allows us to add an event_type to logs so we can do filtering in the
24+
log UI"""
25+
26+
if "event_type" not in event_dict:
27+
event_dict["event_type"] = "default"
28+
29+
return event_dict
30+
31+
32+
def configure_structured_logging() -> None:
33+
"""Configure structured logging for the application. This was taken almost
34+
directly from the structlog docs. See:
35+
36+
https://www.structlog.org/en/stable/standard-library.html#rendering-using-structlog-based-formatters-within-logging
37+
38+
This sets up a structured handler at the root logger. It does not configure
39+
the level of the root logger, so it is up to the user to enable the
40+
appropriate level so that logs appear on stdout.
41+
42+
This should be called at the start of an application.
43+
"""
44+
global STRUCTURED_LOGGING_CONFIGURED
45+
if STRUCTURED_LOGGING_CONFIGURED:
46+
return
47+
STRUCTURED_LOGGING_CONFIGURED = True
48+
49+
timestamper = structlog.processors.TimeStamper(fmt="iso", utc=True)
50+
shared_processors = [
51+
flatten_extra_field,
52+
structlog.stdlib.add_logger_name,
53+
structlog.stdlib.add_log_level,
54+
timestamper,
55+
structlog.processors.StackInfoRenderer(),
56+
ensure_event_type,
57+
]
58+
59+
structlog.configure(
60+
processors=shared_processors
61+
+ [
62+
structlog.stdlib.ProcessorFormatter.wrap_for_formatter,
63+
],
64+
logger_factory=structlog.stdlib.LoggerFactory(),
65+
cache_logger_on_first_use=True,
66+
)
67+
68+
formatter = structlog.stdlib.ProcessorFormatter(
69+
# These run ONLY on `logging` entries that do NOT originate within
70+
# structlog.
71+
foreign_pre_chain=shared_processors,
72+
# These run on ALL entries after the pre_chain is done.
73+
processors=[
74+
# Remove _record & _from_structlog.
75+
structlog.stdlib.ProcessorFormatter.remove_processors_meta,
76+
structlog.processors.JSONRenderer(),
77+
],
78+
)
79+
80+
handler = logging.StreamHandler()
81+
# Use OUR `ProcessorFormatter` to format all `logging` entries.
82+
handler.setFormatter(formatter)
83+
root_logger = logging.getLogger()
84+
root_logger.addHandler(handler)
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
import logging
2+
from contextlib import contextmanager
3+
4+
5+
@contextmanager
6+
def time_context(logger: logging.Logger, name: str, **log_kwargs):
7+
"""Context manager to measure the timing of a block of code and log the
8+
duration."""
9+
import time
10+
11+
start_time = time.time()
12+
try:
13+
yield
14+
finally:
15+
end_time = time.time()
16+
duration = end_time - start_time
17+
duration_ms = duration * 1000
18+
logger.info(
19+
f"Timing `{name}` took {duration_ms:.4f} milliseconds.",
20+
extra=dict(
21+
event_type="timing",
22+
duration_ms=duration_ms,
23+
**log_kwargs,
24+
),
25+
)

lib/oso-core/pyproject.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,7 @@ dependencies = [
6868
"pyarrow-stubs>=17.16,<21.0",
6969
"gcloud-aio-storage<10.0.0,>=9.3.0",
7070
"kr8s==0.20.9",
71+
"structlog>=25.4.0",
7172
]
7273

7374
[build-system]

ops/helm-charts/oso-dagster/Chart.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ name: oso-dagster
33
description: Extension of the dagster template
44

55
type: application
6-
version: 0.19.0
6+
version: 0.20.0
77
appVersion: "1.0.0"
88
dependencies:
99
- name: dagster

ops/helm-charts/oso-dagster/templates/config-map.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ data:
2525
DAGSTER_MCS_K8S_SERVICE_NAME: "{{ .Values.sqlmesh.mcs.k8s.serviceName }}"
2626
DAGSTER_MCS_K8S_NAMESPACE: "{{ .Values.sqlmesh.mcs.k8s.namespace }}"
2727
DAGSTER_SQLMESH_GATEWAY: "{{ .Values.sqlmesh.gateway }}"
28+
DAGSTER_EAGERLY_LOAD_SQL_TABLES: "{{ .Values.osoDagster.eagerlyLoadSqlTables }}"
2829
SQLMESH_POSTGRES_INSTANCE_CONNECTION_STRING: "{{ .Values.configMap.secretPrefix }}-{{ .Values.secretmanagerKeys.sqlmeshPostgresInstanceStringConnectionString }}"
2930
SQLMESH_POSTGRES_USER: "{{ .Values.configMap.secretPrefix }}-{{ .Values.secretmanagerKeys.sqlmeshPostgresUser }}"
3031
SQLMESH_POSTGRES_PASSWORD: "{{ .Values.configMap.secretPrefix }}-{{ .Values.secretmanagerKeys.sqlmeshPostgresPassword }}"

ops/k8s-apps/base/dagster/dagster.yaml

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,9 @@ spec:
8888
port: 3030
8989
dagsterApiGrpcArgs:
9090
- "-m"
91-
- "oso_dagster.definitions"
91+
- "oso_dagster.definitions.legacy"
92+
- "--log-format"
93+
- "json"
9294
includeConfigInLaunchedRuns:
9395
enabled: false
9496

pyproject.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@ dependencies = [
6666
"pyarrow-stubs>=17.16,<21.0",
6767
"gcloud-aio-storage<10.0.0,>=9.3.0",
6868
"kr8s==0.20.9",
69+
"structlog>=25.4.0",
6970
]
7071
name = "oso"
7172
version = "1.0.0"
@@ -125,7 +126,7 @@ packages = [
125126
]
126127

127128
[tool.dagster]
128-
module_name = "oso_dagster.definitions"
129+
module_name = "oso_dagster.definitions.legacy"
129130

130131
[tool.pyright]
131132
pythonVersion = "3.12"

0 commit comments

Comments
 (0)