|
21 | 21 | from errors import InvalidHeaders, NoOperationPermissions |
22 | 22 | from logging_decorator import send_log_to_firehose, generate_and_send_logs |
23 | 23 | from file_level_validation import file_level_validation |
| 24 | + from batch_processor import ingestion_progress |
24 | 25 |
|
25 | 26 |
|
26 | 27 | from tests.utils_for_recordprocessor_tests.utils_for_recordprocessor_tests import GenericSetUp, GenericTearDown |
@@ -254,4 +255,55 @@ def test_splunk_logger_unhandled_failure(self): |
254 | 255 | DeliveryStreamName=Firehose.STREAM_NAME, Record=expected_firehose_record |
255 | 256 | ) |
256 | 257 |
|
257 | | -# TODO: unit tests for ingestion decorator |
| 258 | + def test_splunk_logger_ingestion_started(self): |
| 259 | + """Tests the splunk logger is called with ingestion_progress(False, ...)""" |
| 260 | + |
| 261 | + with ( # noqa: E999 |
| 262 | + patch("logging_decorator.datetime") as mock_datetime, # noqa: E999 |
| 263 | + patch("logging_decorator.time") as mock_time, # noqa: E999 |
| 264 | + patch("logging_decorator.logger") as mock_logger, # noqa: E999 |
| 265 | + patch("logging_decorator.firehose_client") as mock_firehose_client, # noqa: E999 |
| 266 | + ): # noqa: E999 |
| 267 | + mock_time.time.side_effect = [1672531200, 1672531200.123456] |
| 268 | + mock_datetime.now.return_value = datetime(2024, 1, 1, 12, 0, 0) |
| 269 | + ingestion_progress(finished=False, message_body=deepcopy(MOCK_FILE_DETAILS.base_ingestion_event)) |
| 270 | + |
| 271 | + expected_message = "Ingestion started" |
| 272 | + expected_log_data = {**COMMON_LOG_DATA, "statusCode": 200, "message": expected_message} |
| 273 | + expected_log_data["function_name"] = "record_processor_ingestion_progress" |
| 274 | + |
| 275 | + # Log data is the first positional argument of the first call to logger.info |
| 276 | + log_data = json.loads(mock_logger.info.call_args_list[0][0][0]) |
| 277 | + self.assertEqual(log_data, expected_log_data) |
| 278 | + |
| 279 | + expected_firehose_record = {"Data": json.dumps({"event": log_data}).encode("utf-8")} |
| 280 | + mock_firehose_client.put_record.assert_called_once_with( |
| 281 | + DeliveryStreamName=Firehose.STREAM_NAME, Record=expected_firehose_record |
| 282 | + ) |
| 283 | + |
| 284 | + def test_splunk_logger_ingestion_finished(self): |
| 285 | + """Tests the splunk logger is called with ingestion_progress(True, ...)""" |
| 286 | + |
| 287 | + with ( # noqa: E999 |
| 288 | + patch("logging_decorator.datetime") as mock_datetime, # noqa: E999 |
| 289 | + patch("logging_decorator.time") as mock_time, # noqa: E999 |
| 290 | + patch("logging_decorator.logger") as mock_logger, # noqa: E999 |
| 291 | + patch("logging_decorator.firehose_client") as mock_firehose_client, # noqa: E999 |
| 292 | + ): # noqa: E999 |
| 293 | + mock_time.time.side_effect = [1672531200.123456, 1672531200.123456] |
| 294 | + mock_datetime.now.return_value = datetime(2024, 1, 1, 12, 0, 0) |
| 295 | + ingestion_progress(finished=True, message_body=deepcopy(MOCK_FILE_DETAILS.base_ingestion_event), |
| 296 | + row_count=3, start_time=1672531200) |
| 297 | + |
| 298 | + expected_message = "Ingestion finished" |
| 299 | + expected_log_data = {**COMMON_LOG_DATA, "statusCode": 200, "message": expected_message, "row_count": 3} |
| 300 | + expected_log_data["function_name"] = "record_processor_ingestion_progress" |
| 301 | + |
| 302 | + # Log data is the first positional argument of the first call to logger.info |
| 303 | + log_data = json.loads(mock_logger.info.call_args_list[0][0][0]) |
| 304 | + self.assertEqual(log_data, expected_log_data) |
| 305 | + |
| 306 | + expected_firehose_record = {"Data": json.dumps({"event": log_data}).encode("utf-8")} |
| 307 | + mock_firehose_client.put_record.assert_called_once_with( |
| 308 | + DeliveryStreamName=Firehose.STREAM_NAME, Record=expected_firehose_record |
| 309 | + ) |
0 commit comments