2626import pytest
2727from botocore .exceptions import ClientError
2828from moto import mock_aws
29+ from tests_common .test_utils .config import conf_vars
30+ from tests_common .test_utils .version_compat import AIRFLOW_V_3_0_PLUS
2931
3032from airflow .models import DAG , DagRun , TaskInstance
31- from airflow .operators .empty import EmptyOperator
3233from airflow .providers .amazon .aws .hooks .s3 import S3Hook
3334from airflow .providers .amazon .aws .log .s3_task_handler import S3TaskHandler
35+ from airflow .providers .standard .operators .empty import EmptyOperator
3436from airflow .utils .state import State , TaskInstanceState
3537from airflow .utils .timezone import datetime
36- from tests .test_utils .config import conf_vars
3738
3839
3940@pytest .fixture (autouse = True )
@@ -43,26 +44,40 @@ def s3mock():
4344
4445
4546@pytest .mark .db_test
46- class TestS3TaskHandler :
47- @conf_vars ({("logging" , "remote_log_conn_id" ): "aws_default" })
47+ class TestS3RemoteLogIO :
4848 @pytest .fixture (autouse = True )
4949 def setup_tests (self , create_log_template , tmp_path_factory , session ):
50- self .remote_log_base = "s3://bucket/remote/log/location"
51- self .remote_log_location = "s3://bucket/remote/log/location/1.log"
52- self .remote_log_key = "remote/log/location/1.log"
53- self .local_log_location = str (tmp_path_factory .mktemp ("local-s3-log-location" ))
54- create_log_template ("{try_number}.log" )
55- self .s3_task_handler = S3TaskHandler (self .local_log_location , self .remote_log_base )
56- # Verify the hook now with the config override
57- assert self .s3_task_handler .hook is not None
58-
59- date = datetime (2016 , 1 , 1 )
60- self .dag = DAG ("dag_for_testing_s3_task_handler" , schedule = None , start_date = date )
61- task = EmptyOperator (task_id = "task_for_testing_s3_log_handler" , dag = self .dag )
62- dag_run = DagRun (dag_id = self .dag .dag_id , execution_date = date , run_id = "test" , run_type = "manual" )
63- session .add (dag_run )
64- session .commit ()
65- session .refresh (dag_run )
50+ with conf_vars ({("logging" , "remote_log_conn_id" ): "aws_default" }):
51+ self .remote_log_base = "s3://bucket/remote/log/location"
52+ self .remote_log_location = "s3://bucket/remote/log/location/1.log"
53+ self .remote_log_key = "remote/log/location/1.log"
54+ self .local_log_location = str (tmp_path_factory .mktemp ("local-s3-log-location" ))
55+ create_log_template ("{try_number}.log" )
56+ self .s3_task_handler = S3TaskHandler (self .local_log_location , self .remote_log_base )
57+ # Verify the hook now with the config override
58+ self .subject = self .s3_task_handler .io
59+ assert self .subject .hook is not None
60+
61+ date = datetime (2016 , 1 , 1 )
62+ self .dag = DAG ("dag_for_testing_s3_task_handler" , schedule = None , start_date = date )
63+ task = EmptyOperator (task_id = "task_for_testing_s3_log_handler" , dag = self .dag )
64+ if AIRFLOW_V_3_0_PLUS :
65+ dag_run = DagRun (
66+ dag_id = self .dag .dag_id ,
67+ logical_date = date ,
68+ run_id = "test" ,
69+ run_type = "manual" ,
70+ )
71+ else :
72+ dag_run = DagRun (
73+ dag_id = self .dag .dag_id ,
74+ execution_date = date ,
75+ run_id = "test" ,
76+ run_type = "manual" ,
77+ )
78+ session .add (dag_run )
79+ session .commit ()
80+ session .refresh (dag_run )
6681
6782 self .ti = TaskInstance (task = task , run_id = dag_run .run_id )
6883 self .ti .dag_run = dag_run
@@ -83,71 +98,30 @@ def setup_tests(self, create_log_template, tmp_path_factory, session):
8398 os .remove (self .s3_task_handler .handler .baseFilename )
8499
85100 def test_hook (self ):
86- assert isinstance (self .s3_task_handler .hook , S3Hook )
87- assert self .s3_task_handler .hook .transfer_config .use_threads is False
101+ assert isinstance (self .subject .hook , S3Hook )
102+ assert self .subject .hook .transfer_config .use_threads is False
88103
89104 def test_log_exists (self ):
90105 self .conn .put_object (Bucket = "bucket" , Key = self .remote_log_key , Body = b"" )
91- assert self .s3_task_handler .s3_log_exists (self .remote_log_location )
106+ assert self .subject .s3_log_exists (self .remote_log_location )
92107
93108 def test_log_exists_none (self ):
94- assert not self .s3_task_handler .s3_log_exists (self .remote_log_location )
109+ assert not self .subject .s3_log_exists (self .remote_log_location )
95110
96111 def test_log_exists_raises (self ):
97- assert not self .s3_task_handler .s3_log_exists ("s3://nonexistentbucket/foo" )
112+ assert not self .subject .s3_log_exists ("s3://nonexistentbucket/foo" )
98113
99114 def test_log_exists_no_hook (self ):
100- handler = S3TaskHandler (self .local_log_location , self .remote_log_base )
115+ subject = S3TaskHandler (self .local_log_location , self .remote_log_base ). io
101116 with mock .patch .object (S3Hook , "__init__" , spec = S3Hook ) as mock_hook :
102117 mock_hook .side_effect = ConnectionError ("Fake: Failed to connect" )
103118 with pytest .raises (ConnectionError , match = "Fake: Failed to connect" ):
104- handler .s3_log_exists (self .remote_log_location )
105-
106- def test_set_context_raw (self ):
107- self .ti .raw = True
108- mock_open = mock .mock_open ()
109- with mock .patch ("airflow.providers.amazon.aws.log.s3_task_handler.open" , mock_open ):
110- self .s3_task_handler .set_context (self .ti )
111-
112- assert not self .s3_task_handler .upload_on_close
113- mock_open .assert_not_called ()
114-
115- def test_set_context_not_raw (self ):
116- mock_open = mock .mock_open ()
117- with mock .patch ("airflow.providers.amazon.aws.log.s3_task_handler.open" , mock_open ):
118- self .s3_task_handler .set_context (self .ti )
119-
120- assert self .s3_task_handler .upload_on_close
121- mock_open .assert_called_once_with (os .path .join (self .local_log_location , "1.log" ), "w" )
122- mock_open ().write .assert_not_called ()
123-
124- def test_read (self ):
125- self .conn .put_object (Bucket = "bucket" , Key = self .remote_log_key , Body = b"Log line\n " )
126- ti = copy .copy (self .ti )
127- ti .state = TaskInstanceState .SUCCESS
128- log , metadata = self .s3_task_handler .read (ti )
129- actual = log [0 ][0 ][- 1 ]
130- assert "*** Found logs in s3:\n *** * s3://bucket/remote/log/location/1.log\n " in actual
131- assert actual .endswith ("Log line" )
132- assert metadata == [{"end_of_log" : True , "log_pos" : 8 }]
133-
134- def test_read_when_s3_log_missing (self ):
135- ti = copy .copy (self .ti )
136- ti .state = TaskInstanceState .SUCCESS
137- self .s3_task_handler ._read_from_logs_server = mock .Mock (return_value = ([], []))
138- log , metadata = self .s3_task_handler .read (ti )
139- assert 1 == len (log )
140- assert len (log ) == len (metadata )
141- actual = log [0 ][0 ][- 1 ]
142- expected = "*** No logs found on s3 for ti=<TaskInstance: dag_for_testing_s3_task_handler.task_for_testing_s3_log_handler test [success]>\n "
143- assert expected in actual
144- assert {"end_of_log" : True , "log_pos" : 0 } == metadata [0 ]
119+ subject .s3_log_exists (self .remote_log_location )
145120
146121 def test_s3_read_when_log_missing (self ):
147- handler = self .s3_task_handler
148122 url = "s3://bucket/foo"
149- with mock .patch .object (handler .log , "error" ) as mock_error :
150- result = handler .s3_read (url , return_error = True )
123+ with mock .patch .object (self . subject .log , "error" ) as mock_error :
124+ result = self . subject .s3_read (url , return_error = True )
151125 msg = (
152126 f"Could not read logs from { url } with error: An error occurred (404) when calling the "
153127 f"HeadObject operation: Not Found"
@@ -156,10 +130,9 @@ def test_s3_read_when_log_missing(self):
156130 mock_error .assert_called_once_with (msg , exc_info = True )
157131
158132 def test_read_raises_return_error (self ):
159- handler = self .s3_task_handler
160133 url = "s3://nonexistentbucket/foo"
161- with mock .patch .object (handler .log , "error" ) as mock_error :
162- result = handler .s3_read (url , return_error = True )
134+ with mock .patch .object (self . subject .log , "error" ) as mock_error :
135+ result = self . subject .s3_read (url , return_error = True )
163136 msg = (
164137 f"Could not read logs from { url } with error: An error occurred (NoSuchBucket) when "
165138 f"calling the HeadObject operation: The specified bucket does not exist"
@@ -168,8 +141,8 @@ def test_read_raises_return_error(self):
168141 mock_error .assert_called_once_with (msg , exc_info = True )
169142
170143 def test_write (self ):
171- with mock .patch .object (self .s3_task_handler .log , "error" ) as mock_error :
172- self .s3_task_handler . s3_write ("text" , self .remote_log_location )
144+ with mock .patch .object (self .subject .log , "error" ) as mock_error :
145+ self .subject . write ("text" , self .remote_log_location )
173146 # We shouldn't expect any error logs in the default working case.
174147 mock_error .assert_not_called ()
175148 body = boto3 .resource ("s3" ).Object ("bucket" , self .remote_log_key ).get ()["Body" ].read ()
@@ -178,18 +151,132 @@ def test_write(self):
178151
179152 def test_write_existing (self ):
180153 self .conn .put_object (Bucket = "bucket" , Key = self .remote_log_key , Body = b"previous " )
181- self .s3_task_handler . s3_write ("text" , self .remote_log_location )
154+ self .subject . write ("text" , self .remote_log_location )
182155 body = boto3 .resource ("s3" ).Object ("bucket" , self .remote_log_key ).get ()["Body" ].read ()
183156
184157 assert body == b"previous \n text"
185158
186159 def test_write_raises (self ):
187- handler = self .s3_task_handler
188160 url = "s3://nonexistentbucket/foo"
189- with mock .patch .object (handler .log , "error" ) as mock_error :
190- handler . s3_write ("text" , url )
161+ with mock .patch .object (self . subject .log , "error" ) as mock_error :
162+ self . subject . write ("text" , url )
191163 mock_error .assert_called_once_with ("Could not write logs to %s" , url , exc_info = True )
192164
165+
166+ @pytest .mark .db_test
167+ class TestS3TaskHandler :
168+ @conf_vars ({("logging" , "remote_log_conn_id" ): "aws_default" })
169+ @pytest .fixture (autouse = True )
170+ def setup_tests (self , create_log_template , tmp_path_factory , session ):
171+ self .remote_log_base = "s3://bucket/remote/log/location"
172+ self .remote_log_location = "s3://bucket/remote/log/location/1.log"
173+ self .remote_log_key = "remote/log/location/1.log"
174+ self .local_log_location = str (tmp_path_factory .mktemp ("local-s3-log-location" ))
175+ create_log_template ("{try_number}.log" )
176+ self .s3_task_handler = S3TaskHandler (self .local_log_location , self .remote_log_base )
177+ # Verify the hook now with the config override
178+ assert self .s3_task_handler .io .hook is not None
179+
180+ date = datetime (2016 , 1 , 1 )
181+ self .dag = DAG ("dag_for_testing_s3_task_handler" , schedule = None , start_date = date )
182+ task = EmptyOperator (task_id = "task_for_testing_s3_log_handler" , dag = self .dag )
183+ if AIRFLOW_V_3_0_PLUS :
184+ dag_run = DagRun (
185+ dag_id = self .dag .dag_id ,
186+ logical_date = date ,
187+ run_id = "test" ,
188+ run_type = "manual" ,
189+ )
190+ else :
191+ dag_run = DagRun (
192+ dag_id = self .dag .dag_id ,
193+ execution_date = date ,
194+ run_id = "test" ,
195+ run_type = "manual" ,
196+ )
197+ session .add (dag_run )
198+ session .commit ()
199+ session .refresh (dag_run )
200+
201+ self .ti = TaskInstance (task = task , run_id = dag_run .run_id )
202+ self .ti .dag_run = dag_run
203+ self .ti .try_number = 1
204+ self .ti .state = State .RUNNING
205+ session .add (self .ti )
206+ session .commit ()
207+
208+ self .conn = boto3 .client ("s3" )
209+ self .conn .create_bucket (Bucket = "bucket" )
210+ yield
211+
212+ self .dag .clear ()
213+
214+ session .query (DagRun ).delete ()
215+ if self .s3_task_handler .handler :
216+ with contextlib .suppress (Exception ):
217+ os .remove (self .s3_task_handler .handler .baseFilename )
218+
219+ def test_set_context_raw (self ):
220+ self .ti .raw = True
221+ mock_open = mock .mock_open ()
222+ with mock .patch ("airflow.providers.amazon.aws.log.s3_task_handler.open" , mock_open ):
223+ self .s3_task_handler .set_context (self .ti )
224+
225+ assert not self .s3_task_handler .upload_on_close
226+ mock_open .assert_not_called ()
227+
228+ def test_set_context_not_raw (self ):
229+ mock_open = mock .mock_open ()
230+ with mock .patch ("airflow.providers.amazon.aws.log.s3_task_handler.open" , mock_open ):
231+ self .s3_task_handler .set_context (self .ti )
232+
233+ assert self .s3_task_handler .upload_on_close
234+ mock_open .assert_called_once_with (os .path .join (self .local_log_location , "1.log" ), "w" )
235+ mock_open ().write .assert_not_called ()
236+
237+ def test_read (self ):
238+ # Test what happens when we have two log files to read
239+ self .conn .put_object (Bucket = "bucket" , Key = self .remote_log_key , Body = b"Log line\n Line 2\n " )
240+ self .conn .put_object (
241+ Bucket = "bucket" , Key = self .remote_log_key + ".trigger.log" , Body = b"Log line 3\n Line 4\n "
242+ )
243+ ti = copy .copy (self .ti )
244+ ti .state = TaskInstanceState .SUCCESS
245+ log , metadata = self .s3_task_handler .read (ti )
246+
247+ expected_s3_uri = f"s3://bucket/{ self .remote_log_key } "
248+
249+ if AIRFLOW_V_3_0_PLUS :
250+ assert log [0 ].event == "::group::Log message source details"
251+ assert expected_s3_uri in log [0 ].sources
252+ assert log [1 ].event == "::endgroup::"
253+ assert log [2 ].event == "Log line"
254+ assert log [3 ].event == "Line 2"
255+ assert log [4 ].event == "Log line 3"
256+ assert log [5 ].event == "Line 4"
257+ assert metadata == {"end_of_log" : True , "log_pos" : 4 }
258+ else :
259+ actual = log [0 ][0 ][- 1 ]
260+ assert f"*** Found logs in s3:\n *** * { expected_s3_uri } \n " in actual
261+ assert actual .endswith ("Line 4" )
262+ assert metadata == [{"end_of_log" : True , "log_pos" : 33 }]
263+
264+ def test_read_when_s3_log_missing (self ):
265+ ti = copy .copy (self .ti )
266+ ti .state = TaskInstanceState .SUCCESS
267+ self .s3_task_handler ._read_from_logs_server = mock .Mock (return_value = ([], []))
268+ log , metadata = self .s3_task_handler .read (ti )
269+ if AIRFLOW_V_3_0_PLUS :
270+ assert len (log ) == 2
271+ assert metadata == {"end_of_log" : True , "log_pos" : 0 }
272+ else :
273+ assert len (log ) == 1
274+ assert len (log ) == len (metadata )
275+ actual = log [0 ][0 ][- 1 ]
276+ expected = "*** No logs found on s3 for ti=<TaskInstance: dag_for_testing_s3_task_handler.task_for_testing_s3_log_handler test [success]>\n "
277+ assert expected in actual
278+ assert metadata [0 ] == {"end_of_log" : True , "log_pos" : 0 }
279+
193280 def test_close (self ):
194281 self .s3_task_handler .set_context (self .ti )
195282 assert self .s3_task_handler .upload_on_close
@@ -221,3 +308,7 @@ def test_close_with_delete_local_logs_conf(self, delete_local_copy, expected_exi
221308
222309 handler .close ()
223310 assert os .path .exists (handler .handler .baseFilename ) == expected_existence_of_local_copy
311+
312+ def test_filename_template_for_backward_compatibility (self ):
313+ # filename_template arg support for running the latest provider on airflow 2
314+ S3TaskHandler (self .local_log_location , self .remote_log_base , filename_template = None )
0 commit comments