11import pytest
2- from airflow .models import DagBag
32
3+ airflow = pytest .importorskip ("airflow" )
4+ from airflow .models import DagBag # noqa: E402
45
5- class TestChurnPipelineDAG :
66
7+ class TestChurnPipelineDAG :
78 @pytest .fixture
89 def dagbag (self ):
910 """Load DAG bag"""
@@ -17,11 +18,9 @@ def test_dag_loaded(self, dagbag):
1718 def test_dag_structure (self , dagbag ):
1819 """Test DAG has expected structure"""
1920 dag = dagbag .get_dag ("churn_training_pipeline" )
20-
2121 assert dag is not None
2222 assert len (dag .tasks ) > 0
2323
24- # Check expected tasks exist
2524 task_ids = [task .task_id for task in dag .tasks ]
2625 assert "validate_data" in task_ids
2726 assert "train_model" in task_ids
@@ -34,20 +33,9 @@ def test_dag_dependencies(self, dagbag):
3433 validate_task = dag .get_task ("validate_data" )
3534 train_task = dag .get_task ("train_model" )
3635
37- # validate should come before train
3836 assert train_task in validate_task .downstream_list
3937
4038 def test_dag_schedule (self , dagbag ):
4139 """Test DAG schedule is set correctly"""
4240 dag = dagbag .get_dag ("churn_training_pipeline" )
43-
44- assert dag .schedule_interval is not None
45- # Should run weekly
4641 assert dag .schedule_interval == "@weekly"
47-
48- def test_dag_tags (self , dagbag ):
49- """Test DAG has appropriate tags"""
50- dag = dagbag .get_dag ("churn_training_pipeline" )
51-
52- assert "ml" in dag .tags
53- assert "churn" in dag .tags
0 commit comments