@@ -23,6 +23,18 @@ class Task:
2323 dashboard : str = None
2424
2525
26+ @staticmethod
27+ def _remove_extra_indentation (doc : str ) -> str :
28+ lines = doc .splitlines ()
29+ stripped = []
30+ for line in lines :
31+ if line .startswith (" " * 4 ):
32+ stripped .append (line [4 :])
33+ else :
34+ stripped .append (line )
35+ return "\n " .join (stripped )
36+
37+
2638def task (workflow , * , depends_on = None , job_cluster = "main" , notebook : str | None = None , dashboard : str | None = None ):
2739 def decorator (func ):
2840 @wraps (func )
@@ -59,7 +71,7 @@ def wrapper(*args, **kwargs):
5971 task_id = len (_TASKS ),
6072 workflow = workflow ,
6173 name = func .__name__ ,
62- doc = func .__doc__ ,
74+ doc = _remove_extra_indentation ( func .__doc__ ) ,
6375 fn = func ,
6476 depends_on = deps ,
6577 job_cluster = job_cluster ,
@@ -77,60 +89,18 @@ def trigger(*argv):
7789 if "config" not in args :
7890 msg = "no --config specified"
7991 raise KeyError (msg )
92+
8093 task_name = args .get ("task" , "not specified" )
81- # `{{parent_run_id}}` is the run of entire workflow, whereas `{{run_id}}` is the run of a task
82- workflow_run_id = args .get ("parent_run_id" , "unknown_run_id" )
83- job_id = args .get ("job_id" )
8494 if task_name not in _TASKS :
8595 msg = f'task "{ task_name } " not found. Valid tasks are: { ", " .join (_TASKS .keys ())} '
8696 raise KeyError (msg )
8797
8898 current_task = _TASKS [task_name ]
8999 print (current_task .doc )
90100
91- config_path = Path (args ["config" ])
92- cfg = WorkspaceConfig .from_file (config_path )
93-
94- # see https://docs.python.org/3/howto/logging-cookbook.html
95- databricks_logger = logging .getLogger ("databricks" )
96- databricks_logger .setLevel (logging .DEBUG )
97-
98- ucx_logger = logging .getLogger ("databricks.labs.ucx" )
99- ucx_logger .setLevel (logging .DEBUG )
100-
101- log_path = config_path .parent / "logs" / current_task .workflow / f"run-{ workflow_run_id } "
102- log_path .mkdir (parents = True , exist_ok = True )
103-
104- log_file = log_path / f"{ task_name } .log"
105- file_handler = logging .FileHandler (log_file .as_posix ())
106- log_format = "%(asctime)s %(levelname)s [%(name)s] {%(threadName)s} %(message)s"
107- log_formatter = logging .Formatter (fmt = log_format , datefmt = "%H:%M:%S" )
108- file_handler .setFormatter (log_formatter )
109- file_handler .setLevel (logging .DEBUG )
110-
111- console_handler = _install (cfg .log_level )
112- databricks_logger .removeHandler (console_handler )
113- databricks_logger .addHandler (file_handler )
114-
115- ucx_logger .info (f"See debug logs at { log_file } " )
116-
117- log_readme = log_path .joinpath ("README.md" )
118- if not log_readme .exists ():
119- # this may race when run from multiple tasks, but let's accept the risk for now.
120- with log_readme .open (mode = "w" ) as f :
121- f .write (f"# Logs for the UCX { current_task .workflow } workflow\n " )
122- f .write ("This folder contains UCX log files.\n \n " )
123- f .write (f"See the [{ current_task .workflow } job](/#job/{ job_id } ) and " )
124- f .write (f"[run #{ workflow_run_id } ](/#job/{ job_id } /run/{ workflow_run_id } )\n " )
125-
126- try :
127- current_task .fn (cfg )
128- except BaseException as error :
129- log_file_for_cli = str (log_file ).lstrip ("/Workspace" )
130- cli_command = f"databricks workspace export /{ log_file_for_cli } "
131- ucx_logger .error (f"Task crashed. Execute `{ cli_command } ` locally to troubleshoot with more details. { error } " )
132- databricks_logger .debug ("Task crash details" , exc_info = error )
133- file_handler .flush ()
134- raise
135- finally :
136- file_handler .close ()
101+ _install ()
102+
103+ cfg = WorkspaceConfig .from_file (Path (args ["config" ]))
104+ logging .getLogger ("databricks" ).setLevel (cfg .log_level )
105+
106+ current_task .fn (cfg )
0 commit comments