Skip to content

Commit a3c325a

Browse files
FastLeenfx
andauthored
Enabled debug logging for every job task run through a file, which is accessible from both workspace UI and Databricks CLI (#426)
This PR improves debuggability of UCX. ## More concise error messaging from job run <img width="1390" alt="image" src="https://github.com/databrickslabs/ucx/assets/259697/be4dc82c-3724-4684-b8cd-a77a05f24d67"> ## Viewing logs via Databricks CLI <img width="929" alt="image" src="https://github.com/databrickslabs/ucx/assets/259697/b1901d03-1c9a-4c63-81c7-501f0f3e4e48"> ## Added `.ucx/logs` folder in the installation <img width="313" alt="image" src="https://github.com/databrickslabs/ucx/assets/259697/a396b637-308a-4791-9cc5-dbd0494431c4"> where every executes workflow has a folder per job run, with ability to get back to Job Run in Workspace UI <img width="1092" alt="image" src="https://github.com/databrickslabs/ucx/assets/259697/1bf60990-eab6-4866-b0b5-a75c8a44674e"> ## Viewing full debug logs via workspace UI <img width="1218" alt="image" src="https://github.com/databrickslabs/ucx/assets/259697/0d1a84bb-0af9-4cbc-b662-77c1715ff4b1"> Closes #86 --------- Co-authored-by: Serge Smertin <[email protected]>
1 parent 93d8f0f commit a3c325a

File tree

5 files changed

+107
-39
lines changed

5 files changed

+107
-39
lines changed

src/databricks/labs/ucx/framework/logger.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -54,10 +54,11 @@ def format(self, record: logging.LogRecord): # noqa: A003
5454
return f"{self.GRAY}{ts}{self.RESET} {level} {color_marker}[{name}] {msg}{self.RESET}"
5555

5656

57-
def _install():
57+
def _install(level="DEBUG"):
5858
for h in logging.root.handlers:
5959
logging.root.removeHandler(h)
6060
console_handler = logging.StreamHandler(sys.stderr)
6161
console_handler.setFormatter(NiceFormatter())
62-
console_handler.setLevel("DEBUG")
62+
console_handler.setLevel(level)
6363
logging.root.addHandler(console_handler)
64+
return console_handler

src/databricks/labs/ucx/framework/tasks.py

Lines changed: 52 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -29,9 +29,10 @@ def decorator(func):
2929
def wrapper(*args, **kwargs):
3030
# Perform any task-specific logic here
3131
# For example, you can log when the task is started and completed
32-
print(f"Task '{workflow}' is starting...")
32+
logger = logging.getLogger(func.__name__)
33+
logger.info(f"Task '{workflow}' is starting...")
3334
result = func(*args, **kwargs)
34-
print(f"Task '{workflow}' is completed!")
35+
logger.info(f"Task '{workflow}' is completed!")
3536
return result
3637

3738
deps = []
@@ -76,18 +77,60 @@ def trigger(*argv):
7677
if "config" not in args:
7778
msg = "no --config specified"
7879
raise KeyError(msg)
79-
8080
task_name = args.get("task", "not specified")
81+
# `{{parent_run_id}}` is the run of entire workflow, whereas `{{run_id}}` is the run of a task
82+
workflow_run_id = args.get("parent_run_id", "unknown_run_id")
83+
job_id = args.get("job_id")
8184
if task_name not in _TASKS:
8285
msg = f'task "{task_name}" not found. Valid tasks are: {", ".join(_TASKS.keys())}'
8386
raise KeyError(msg)
8487

8588
current_task = _TASKS[task_name]
8689
print(current_task.doc)
8790

88-
_install()
89-
90-
cfg = WorkspaceConfig.from_file(Path(args["config"]))
91-
logging.getLogger("databricks").setLevel(cfg.log_level)
92-
93-
current_task.fn(cfg)
91+
config_path = Path(args["config"])
92+
cfg = WorkspaceConfig.from_file(config_path)
93+
94+
# see https://docs.python.org/3/howto/logging-cookbook.html
95+
databricks_logger = logging.getLogger("databricks")
96+
databricks_logger.setLevel(logging.DEBUG)
97+
98+
ucx_logger = logging.getLogger("databricks.labs.ucx")
99+
ucx_logger.setLevel(logging.DEBUG)
100+
101+
log_path = config_path.parent / "logs" / current_task.workflow / f"run-{workflow_run_id}"
102+
log_path.mkdir(parents=True, exist_ok=True)
103+
104+
log_file = log_path / f"{task_name}.log"
105+
file_handler = logging.FileHandler(log_file.as_posix())
106+
log_format = "%(asctime)s %(levelname)s [%(name)s] {%(threadName)s} %(message)s"
107+
log_formatter = logging.Formatter(fmt=log_format, datefmt="%H:%M:%S")
108+
file_handler.setFormatter(log_formatter)
109+
file_handler.setLevel(logging.DEBUG)
110+
111+
console_handler = _install(cfg.log_level)
112+
databricks_logger.removeHandler(console_handler)
113+
databricks_logger.addHandler(file_handler)
114+
115+
ucx_logger.info(f"See debug logs at {log_file}")
116+
117+
log_readme = log_path.joinpath("README.md")
118+
if not log_readme.exists():
119+
# this may race when run from multiple tasks, but let's accept the risk for now.
120+
with log_readme.open(mode="w") as f:
121+
f.write(f"# Logs for the UCX {current_task.workflow} workflow\n")
122+
f.write("This folder contains UCX log files.\n\n")
123+
f.write(f"See the [{current_task.workflow} job](/#job/{job_id}) and ")
124+
f.write(f"[run #{workflow_run_id}](/#job/{job_id}/run/{workflow_run_id})\n")
125+
126+
try:
127+
current_task.fn(cfg)
128+
except BaseException as error:
129+
log_file_for_cli = str(log_file).lstrip("/Workspace")
130+
cli_command = f"databricks workspace export /{log_file_for_cli}"
131+
ucx_logger.error(f"Task crashed. Execute `{cli_command}` locally to troubleshoot with more details. {error}")
132+
databricks_logger.debug("Task crash details", exc_info=error)
133+
file_handler.flush()
134+
raise
135+
finally:
136+
file_handler.close()

src/databricks/labs/ucx/install.py

Lines changed: 21 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,11 @@
2929
TAG_STEP = "step"
3030
TAG_APP = "App"
3131
NUM_USER_ATTEMPTS = 10 # number of attempts user gets at answering a question
32-
32+
EXTRA_TASK_PARAMS = {
33+
"job_id": "{{job_id}}",
34+
"run_id": "{{run_id}}",
35+
"parent_run_id": "{{parent_run_id}}",
36+
}
3337
DEBUG_NOTEBOOK = """
3438
# Databricks notebook source
3539
# MAGIC %md
@@ -69,26 +73,13 @@
6973
7074
# COMMAND ----------
7175
72-
import logging
73-
import databricks.labs.ucx.runtime
74-
75-
from pathlib import Path
76-
from databricks.labs.ucx.__about__ import __version__
77-
from databricks.labs.ucx.config import WorkspaceConfig
78-
from databricks.labs.ucx.framework.tasks import _TASKS
79-
from databricks.labs.ucx.framework.logger import _install
80-
from databricks.sdk import WorkspaceClient
81-
82-
task_name = dbutils.widgets.get('task')
83-
current_task = _TASKS[task_name]
84-
85-
_install()
86-
print('UCX version: ' + __version__)
87-
logging.getLogger("databricks").setLevel('DEBUG')
88-
89-
cfg = WorkspaceConfig.from_file(Path("/Workspace{config_file}"))
76+
from databricks.labs.ucx.runtime import main
9077
91-
current_task.fn(cfg)
78+
main(f'--config=/Workspace{config_file}',
79+
f'--task=' + dbutils.widgets.get('task'),
80+
f'--job_id=' + dbutils.widgets.get('job_id'),
81+
f'--run_id=' + dbutils.widgets.get('run_id'),
82+
f'--parent_run_id=' + dbutils.widgets.get('parent_run_id'))
9283
"""
9384

9485
logger = logging.getLogger(__name__)
@@ -325,6 +316,7 @@ def _create_jobs(self):
325316
self._deployed_steps = self._deployed_steps()
326317
desired_steps = {t.workflow for t in _TASKS.values()}
327318
wheel_runner = None
319+
328320
if self._override_clusters:
329321
wheel_runner = self._upload_wheel_runner(remote_wheel)
330322
for step_name in desired_steps:
@@ -518,9 +510,8 @@ def _apply_cluster_overrides(settings: dict[str, any], overrides: dict[str, str]
518510
job_task.job_cluster_key = None
519511
if job_task.python_wheel_task is not None:
520512
job_task.python_wheel_task = None
521-
job_task.notebook_task = jobs.NotebookTask(
522-
notebook_path=wheel_runner, base_parameters={"task": job_task.task_key}
523-
)
513+
params = {"task": job_task.task_key} | EXTRA_TASK_PARAMS
514+
job_task.notebook_task = jobs.NotebookTask(notebook_path=wheel_runner, base_parameters=params)
524515
return settings
525516

526517
def _job_task(self, task: Task, dbfs_path: str) -> jobs.Task:
@@ -555,7 +546,12 @@ def _job_notebook_task(self, jobs_task: jobs.Task, task: Task) -> jobs.Task:
555546
notebook_task=jobs.NotebookTask(
556547
notebook_path=remote_notebook,
557548
# ES-872211: currently, we cannot read WSFS files from Scala context
558-
base_parameters={"inventory_database": self._current_config.inventory_database},
549+
base_parameters={
550+
"inventory_database": self._current_config.inventory_database,
551+
"task": task.name,
552+
"config": f"/Workspace{self._config_file}",
553+
}
554+
| EXTRA_TASK_PARAMS,
559555
),
560556
)
561557

@@ -566,7 +562,7 @@ def _job_wheel_task(self, jobs_task: jobs.Task, task: Task, dbfs_path: str) -> j
566562
python_wheel_task=jobs.PythonWheelTask(
567563
package_name="databricks_labs_ucx",
568564
entry_point="runtime", # [project.entry-points.databricks] in pyproject.toml
569-
named_parameters={"task": task.name, "config": f"/Workspace{self._config_file}"},
565+
named_parameters={"task": task.name, "config": f"/Workspace{self._config_file}"} | EXTRA_TASK_PARAMS,
570566
),
571567
)
572568

src/databricks/labs/ucx/runtime.py

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -249,12 +249,14 @@ def destroy_schema(cfg: WorkspaceConfig):
249249
RuntimeBackend().execute(f"DROP DATABASE {cfg.inventory_database} CASCADE")
250250

251251

252-
def main():
253-
trigger(*sys.argv)
252+
def main(*argv):
253+
if len(argv) == 0:
254+
argv = sys.argv
255+
trigger(*argv)
254256

255257

256258
if __name__ == "__main__":
257259
if "DATABRICKS_RUNTIME_VERSION" not in os.environ:
258260
msg = "Only intended to run in Databricks Runtime"
259261
raise SystemExit(msg)
260-
main()
262+
main(*sys.argv)

tests/integration/test_installation.py

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,32 @@ def test_destroying_non_existing_schema_fails_with_correct_message(ws, sql_backe
4444
assert "cannot be found" in str(failure.value)
4545

4646

47+
def test_logs_are_available(ws, sql_backend, env_or_skip, make_random):
48+
default_cluster_id = env_or_skip("TEST_DEFAULT_CLUSTER_ID")
49+
ws.clusters.ensure_cluster_is_running(default_cluster_id)
50+
51+
install = WorkspaceInstaller.run_for_config(
52+
ws,
53+
WorkspaceConfig(
54+
inventory_database=f"ucx_{make_random(4)}",
55+
instance_pool_id=env_or_skip("TEST_INSTANCE_POOL_ID"),
56+
groups=GroupsConfig(auto=True),
57+
log_level="INFO",
58+
),
59+
prefix=make_random(4),
60+
override_clusters={
61+
"main": default_cluster_id,
62+
},
63+
)
64+
65+
with pytest.raises(OperationFailed):
66+
install.run_workflow("destroy-schema")
67+
assert True
68+
69+
workflow_run_logs = list(ws.workspace.list(f"{install._install_folder}/logs"))
70+
assert len(workflow_run_logs) == 1
71+
72+
4773
def test_jobs_with_no_inventory_database(
4874
ws,
4975
sql_backend,

0 commit comments

Comments
 (0)