@@ -59,14 +59,15 @@ def create_http_session(status_forcelist: Optional[List[int]] = None):
5959 backoff_factor = 1 ,
6060 raise_on_status = True ,
6161 )
62+ print ("XD" )
6263 adapter = HTTPAdapter (max_retries = retry_strategy )
6364 session = requests .Session ()
6465 if "AIRFLOW_API_TOKEN" in os .environ :
6566 token = os .environ ["AIRFLOW_API_TOKEN" ]
6667 session .headers .update ({"Authorization" : "Bearer " + token })
6768 session .mount ("https://" , adapter )
6869 session .mount ("http://" , adapter )
69-
70+ print ( "Session!!!" )
7071 return session
7172
7273 def get_dag (self , dag_id : str ) -> DAGModel :
@@ -84,6 +85,7 @@ def get_dag(self, dag_id: str) -> DAGModel:
8485 raise RuntimeError (res .json ().get ("title" ))
8586 dag_json = res .json ()
8687 dag = DAGModel (dag_id = dag_json ["dag_id" ], tags = dag_json ["tags" ])
88+ print ("XD" )
8789 return dag
8890
8991 def wait_for_dag (self , dag_id : str , tag : str ) -> DAGModel :
@@ -96,11 +98,14 @@ def wait_for_dag(self, dag_id: str, tag: str) -> DAGModel:
9698 session = AirflowClient .create_http_session ([404 ])
9799 count = 0
98100 while count <= self .max_retries :
101+ print ("getting" )
102+ print (f"{ self .rest_api_url } /dags/{ dag_id } " )
99103 res = session .get (
100104 url = f"{ self .rest_api_url } /dags/{ dag_id } " ,
101105 headers = {"Content-Type" : "application/json" },
102106 verify = AirflowClient .VERIFY ,
103107 )
108+ print (res )
104109 dag_json = res .json ()
105110 if res .status_code != 200 :
106111 raise RuntimeError (dag_json .get ("title" ))
0 commit comments