Skip to content
This repository was archived by the owner on May 17, 2024. It is now read-only.
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 23 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -100,10 +100,28 @@ tasks:
python_callable: 'print_params'
op_kwargs:
param1: 'value1'
- name: 't4'
strategy: 'NotebookOperatorStrategy'
depends_on:
- 't3'
args:
retries: 2
trigger_rule: 'all_success'
provide_context: True
runner: 'DATABRICKS'
databricks_conn_id: '<CONNECTION_ID (Optional)>'
notebook_path: '<NOTEBOOK_PATH_IN_DATABRICKS_WORKSPACE>'
cluster_id: '<CLUSTER_ID (provide cluster_id or `new_cluster` definition)>'
new_cluster:
spark_version: '2.1.0-db3-scala2.11'
node_type_id: 'r3.xlarge'
aws_attributes:
availability: 'ON_DEMAND'
num_workers: 8
- name: 'end'
strategy: 'DummyOperatorStrategy'
depends_on:
- 't3'
depends_on:
- 't4'
```

# DAG Factory
Expand All @@ -118,11 +136,13 @@ A task strategy represents a strategy in which a task can be executed. A strateg
The strategies supported by Castor at this moment in time are:
- [DummyOperatorStrategy](castor/task_creator/strategies/python_operator_strategy.py)
- [PythonOperatorStrategy](castor/task_creator/strategies/dummy_operator_strategy.py)
- [NotebookOperatorStrategy](castor/task_creator/strategies/notebook_operator_strategy.py)


# Operator factory
It is responsible for creating Airflow Operators based on a set of parameters supplied by the DAG Factory.

The operators supported by Castor at this moment in time are:
- [DummyOperator](https://airflow.apache.org/docs/apache-airflow/stable/_api/airflow/operators/dummy/index.html)
- [PythonOperator](https://airflow.apache.org/docs/apache-airflow/stable/_modules/airflow/operators/python.html#PythonOperator)
- [PythonOperator](https://airflow.apache.org/docs/apache-airflow/stable/_modules/airflow/operators/python.html#PythonOperator)
- [DatabricksOperator](https://airflow.apache.org/docs/apache-airflow-providers-databricks/stable/operators.html#databrickssubmitrunoperator)
32 changes: 25 additions & 7 deletions castor/config_files/init_castor_dag.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -10,18 +10,18 @@ tasks:
strategy: 'DummyOperatorStrategy'
- name: 't1'
strategy: 'PythonOperatorStrategy'
depends_on:
depends_on:
- 'start'
args:
retries: 2
trigger_rule: 'all_success'
provide_context: True
python_callable: 'print_params'
op_kwargs:
param1: 'value1'
param1: 'value1'
- name: 't2'
strategy: 'PythonOperatorStrategy'
depends_on:
depends_on:
- 'start'
args:
retries: 2
Expand All @@ -32,7 +32,7 @@ tasks:
param1: 'value1'
- name: 't3'
strategy: 'PythonOperatorStrategy'
depends_on:
depends_on:
- 't1'
- 't2'
args:
Expand All @@ -41,8 +41,26 @@ tasks:
provide_context: True
python_callable: 'print_params'
op_kwargs:
param1: 'value1'
param1: 'value1'
- name: 't4'
strategy: 'NotebookOperatorStrategy'
depends_on:
- 't3'
args:
retries: 2
trigger_rule: 'all_success'
provide_context: True
runner: 'DATABRICKS'
databricks_conn_id: '<CONNECTION_ID (Optional)>'
notebook_path: '<NOTEBOOK_PATH_IN_DATABRICKS_WORKSPACE>'
cluster_id: '<CLUSTER_ID (optional)>'
new_cluster:
spark_version: '2.1.0-db3-scala2.11'
node_type_id: 'r3.xlarge'
aws_attributes:
availability: 'ON_DEMAND'
num_workers: 8
- name: 'end'
strategy: 'DummyOperatorStrategy'
depends_on:
- 't3'
depends_on:
- 't4'
38 changes: 38 additions & 0 deletions castor/operator_factory/airflow_operator_factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

from airflow.operators.python_operator import PythonOperator
from airflow.operators.dummy_operator import DummyOperator
from airflow.providers.databricks.operators.databricks import DatabricksSubmitRunOperator

sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), os.path.pardir)))

Expand All @@ -25,3 +26,40 @@ def get_python_operator(dag, task_id, args):
**args
)
return task

@staticmethod
def get_databricks_operator(dag, task_id, args):
notebook_task_params = AirflowOperatorFactory._get_databricks_job_params(args)
conn_id = args['databricks_conn_id'] if "databricks_conn_id" in args else 'databricks_default'

return DatabricksSubmitRunOperator(
task_id=task_id,
databricks_conn_id=conn_id,
dag=dag,
json=notebook_task_params)

@staticmethod
def _get_databricks_job_params(args):
notebook_task_params: dict = dict()
if 'cluster_id' in args:
notebook_task_params.update({'existing_cluster_id': args['cluster_id']})
elif 'new_cluster' in args:
notebook_task_params.update({'new_cluster': args['new_cluster']})
else:
new_cluster = {
'spark_version': os.getenv('DATABRICKS_SPARK_VERSION', '2.1.0-db3-scala2.11'),
'node_type_id': os.getenv('DATABRICKS_NODE_TYPE_ID', 'r3.xlarge'),
'aws_attributes': {
'availability': os.getenv('DATABRICKS_AWS_ATTRIBUTES_AVAILABILITY', 'ON_DEMAND'),
},
'num_workers': os.getenv('DATABRICKS_NUM_WORKERS', 8),
}
notebook_task_params.update({'new_cluster': new_cluster})

if 'notebook_path' in args:
notebook_task_params.update({
'notebook_task': {
'notebook_path': args['notebook_path'],
},
})
return notebook_task_params
21 changes: 21 additions & 0 deletions castor/task_creator/strategies/notebook_operator_strategy.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
import os
import sys

sys.path.append(os.path.abspath(os.path.dirname(__file__)))
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), os.path.pardir, os.path.pardir)))
from task_creator.task_strategy import TaskStrategy
from operator_factory.airflow_operator_factory import AirflowOperatorFactory


class NotebookOperatorStrategy(TaskStrategy):

def __init__(self, task_name, args):
self.task_name = task_name
self.args = args

def create_task(self, dag):
if self.args['runner'] and self.args['runner'] == "DATABRICKS":
return AirflowOperatorFactory.get_databricks_operator(dag, self.task_name, self.args)
else:
msg = "Unknown Notebook strategy runner: {}, Supported runner `DATABRICKS`"
raise NameError(msg.format(self.args['runner']))
4 changes: 4 additions & 0 deletions castor/task_creator/task_creator.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
from task_creator.task_strategy import TaskStrategy
from task_creator.strategies.python_operator_strategy import PythonOperatorStrategy
from task_creator.strategies.dummy_operator_strategy import DummyOperatorStrategy
from task_creator.strategies.notebook_operator_strategy import NotebookOperatorStrategy


class TaskCreator:
def __init__(self, task) -> None:
Expand All @@ -17,6 +19,8 @@ def create_task(self, dag):
self._strategy = PythonOperatorStrategy(self.name, self.args)
elif self._strategy == 'DummyOperatorStrategy':
self._strategy = DummyOperatorStrategy(self.name)
elif self._strategy == 'NotebookOperatorStrategy':
self._strategy = NotebookOperatorStrategy(self.name, self.args)
else:
msg = "Unknown Castor strategy: {}"
raise NameError(msg.format(self._strategy))
Expand Down