-
Notifications
You must be signed in to change notification settings - Fork 3
Open
Description
It seems like this airflow plugin is out of date with latest lithops API - expecting chunk_size, which I suspect is now chunksize
Running airflow 2.9.0 via docker-compose, installing lithops v3.6.0 (Python 3.12) via _PIP_ADDITIONAL_REQUIREMENTS
$ grep lithops docker-compose.yml
_PIP_ADDITIONAL_REQUIREMENTS: ${_PIP_ADDITIONAL_REQUIREMENTS:-lithops}Installed this airflow-plugin by manually copying the lithops_airflow_plugin/ directory into the plugins/ directory (and changing __init__.py to from lithops_airflow_plugin.lithops_plugin import *)
Running this
from airflow import DAG
from airflow.operators.python import PythonOperator
from lithops_airflow_plugin import LithopsMapOperator
import random
args = {
'owner': 'lithops',
}
def add(x, y):
return x+y
dag = DAG(
dag_id='LithopsTest',
default_args=args,
schedule_interval=None,
)
gen_list = PythonOperator(
task_id='gen_list',
python_callable=lambda: [random.randint(1, 100) for _ in range(10)],
dag=dag
)
mult_num_map = LithopsMapOperator(
task_id='mult_num_map',
map_function=add,
iterdata_from_task={'a': 'gen_list'},
extra_args={'b': 10},
dag=dag,
backend='localhost'
)
gen_list >> mult_num_mapI get
Traceback (most recent call last):
File "/home/airflow/.local/lib/python3.12/site-packages/airflow/models/taskinstance.py", line 465, in _execute_task
result = _execute_callable(context=context, **execute_callable_kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/airflow/.local/lib/python3.12/site-packages/airflow/models/taskinstance.py", line 432, in _execute_callable
return execute_callable(context=context, **execute_callable_kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/airflow/.local/lib/python3.12/site-packages/airflow/models/baseoperator.py", line 400, in wrapper
return func(self, *args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/opt/airflow/plugins/lithops_airflow_plugin/operators/lithops_operator.py", line 86, in execute
self._futures = self.execute_callable(context)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/opt/airflow/plugins/lithops_airflow_plugin/operators/lithops_operator.py", line 232, in execute_callable
return self._executor.map(map_function=self.map_function,
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
TypeError: FunctionExecutor.map() got an unexpected keyword argument 'chunk_size'
[2025-07-11, 06:03:29 UTC] {taskinstance.py:1205} INFO - Marking task as FAILED. dag_id=LithopsTest, task_id=mult_num_map, execution_date=20250711T060325, start_date=20250711T060329, end_date=20250711T060329
[2025-07-11, 06:03:29 UTC] {standard_task_runner.py:110} ERROR - Failed to execute job 15 for task mult_num_map (FunctionExecutor.map() got an unexpected keyword argument 'chunk_size'; 98)
[2025-07-11, 06:03:29 UTC] {local_task_job_runner.py:240} INFO - Task exited with return code 1
[2025-07-11, 06:03:29 UTC] {taskinstance.py:3482} INFO - 0 downstream tasks scheduled from follow-on schedule check
[2025-07-11, 06:03:29 UTC] {local_task_job_runner.py:222}
Metadata
Metadata
Assignees
Labels
No labels