@@ -122,16 +122,31 @@ def create_func_env_vars(self):
122122
123123class DevTaskOperator (BaseTaskOperator ):
124124 def create_task (self ):
125- from gaiaflow .core .runner import run
125+ import os
126+
127+ current_dir = os .path .dirname (os .path .abspath (__file__ ))
126128
127129 args , kwargs = self .resolve_args_kwargs ()
128130 kwargs ["params" ] = dict (self .params )
129- op_kwargs = {"func_path" : self .func_path , "args" : args , "kwargs" : kwargs }
131+ op_kwargs = {"func_path" : self .func_path , "args" : args , "kwargs" :
132+ kwargs , "current_dir" : current_dir }
133+
134+ def run_wrapper (** op_kwargs ):
135+ import sys
136+
137+ sys .path .append (op_kwargs .get ("current_dir" , "" ))
138+ from runner import run
139+
140+ return run (
141+ func_path = op_kwargs .get ("func_path" ),
142+ args = op_kwargs .get ("args" ),
143+ kwargs = op_kwargs .get ("kwargs" ),
144+ )
130145
131146 return ExternalPythonOperator (
132147 task_id = self .task_id ,
133148 python = "/home/airflow/.local/share/mamba/envs/default_user_env/bin/python" ,
134- python_callable = run ,
149+ python_callable = run_wrapper ,
135150 op_kwargs = op_kwargs ,
136151 do_xcom_push = True ,
137152 retries = self .retries ,
@@ -216,7 +231,7 @@ def create_task(self):
216231 do_xcom_push = True ,
217232 retries = self .retries ,
218233 params = self .params ,
219- container_resources = resources ,
234+ # container_resources=resources,
220235 )
221236
222237
0 commit comments