|
4 | 4 | from pyflink.table.expressions import col, lit |
5 | 5 | import uuid |
6 | 6 | from functools import reduce |
7 | | -import boto3 |
8 | | -from botocore.exceptions import ClientError |
9 | | -import json |
10 | 7 | import logging |
| 8 | +import os |
| 9 | + |
| 10 | +from aws_clients_python_lib.aws_lambda import aws_lambda_function_return_json_object |
| 11 | +from aws_clients_python_lib.secrets_manager import get_secrets |
| 12 | +from cc_clients_python_lib.http_status import HttpStatus |
11 | 13 |
|
12 | 14 |
|
13 | 15 | __copyright__ = "Copyright (c) 2024-2025 Jeffrey Jonathan Jennings" |
@@ -58,41 +60,33 @@ def lambda_handler(event, context): |
58 | 60 | required_event_fields = ["catalog_name", "database_name", "ccaf_secrets_path"] |
59 | 61 | for field in required_event_fields: |
60 | 62 | if field not in event: |
61 | | - logger.error(f"Missing required field: {field}") |
62 | | - return { |
63 | | - 'statusCode': 400, |
64 | | - 'body': json.dumps({'error': f'Missing required field: {field}'}) |
65 | | - } |
| 63 | + return aws_lambda_function_return_json_object(logger, HttpStatus.BAD_REQUEST, f"Missing required field: {field}") |
| 64 | + elif field == "": |
| 65 | + return aws_lambda_function_return_json_object(logger, HttpStatus.BAD_REQUEST, f"Field is blank: {field}") |
66 | 66 |
|
67 | 67 | # Get the catalog name, database name, and secrets path from the event. |
68 | 68 | catalog_name = event.get("catalog_name", "").lower() |
69 | 69 | database_name = event.get("database_name", "").lower() |
70 | | - secrets_path = event.get("ccaf_secrets_path", "") |
| 70 | + ccaf_secrets_path = event.get("ccaf_secrets_path", "") |
71 | 71 |
|
72 | | - try: |
73 | | - get_secret_value_response = boto3.client('secretsmanager').get_secret_value(SecretId=secrets_path) |
74 | | - settings = json.loads(get_secret_value_response['SecretString']) |
75 | | - |
76 | | - # Create the TableEnvironment with the Confluent Cloud for Apache Flink settings. |
77 | | - tbl_env = TableEnvironment.create( |
78 | | - ConfluentSettings |
79 | | - .new_builder() |
80 | | - .set_cloud(settings[FLINK_CLOUD]) |
81 | | - .set_region(settings[FLINK_REGION]) |
82 | | - .set_flink_api_key(settings[FLINK_API_KEY]) |
83 | | - .set_flink_api_secret(settings[FLINK_API_SECRET]) |
84 | | - .set_organization_id(settings[ORGANIZATION_ID]) |
85 | | - .set_environment_id(settings[ENVIRONMENT_ID]) |
86 | | - .set_compute_pool_id(settings[FLINK_COMPUTE_POOL_ID]) |
87 | | - .set_principal_id(settings[FLINK_PRINCIPAL_ID]) |
88 | | - .build() |
89 | | - ) |
90 | | - except ClientError as e: |
91 | | - logger.error("Failed to get secrets from the AWS Secrets Manager because of %s.", e) |
92 | | - return { |
93 | | - 'statusCode': 500, |
94 | | - 'body': json.dumps({'error': str(e)}) |
95 | | - } |
| 72 | + # Create the TableEnvironment with the Confluent Cloud for Apache Flink settings. |
| 73 | + settings, error_message = get_secrets(os.environ['AWS_REGION'], ccaf_secrets_path) |
| 74 | + if settings == {}: |
| 75 | + return aws_lambda_function_return_json_object(logger, HttpStatus.INTERNAL_SERVER_ERROR, error_message) |
| 76 | + |
| 77 | + tbl_env = TableEnvironment.create( |
| 78 | + ConfluentSettings |
| 79 | + .new_builder() |
| 80 | + .set_cloud(settings[FLINK_CLOUD]) |
| 81 | + .set_region(settings[FLINK_REGION]) |
| 82 | + .set_flink_api_key(settings[FLINK_API_KEY]) |
| 83 | + .set_flink_api_secret(settings[FLINK_API_SECRET]) |
| 84 | + .set_organization_id(settings[ORGANIZATION_ID]) |
| 85 | + .set_environment_id(settings[ENVIRONMENT_ID]) |
| 86 | + .set_compute_pool_id(settings[FLINK_COMPUTE_POOL_ID]) |
| 87 | + .set_principal_id(settings[FLINK_PRINCIPAL_ID]) |
| 88 | + .build() |
| 89 | + ) |
96 | 90 |
|
97 | 91 | # The catalog name and database name are used to set the current catalog and database. |
98 | 92 | tbl_env.use_catalog(catalog_name) |
@@ -132,11 +126,7 @@ def lambda_handler(event, context): |
132 | 126 | else: |
133 | 127 | logger.info(f"Sink table '{flight_avro_table_path.get_full_name()}' already exists.") |
134 | 128 | except Exception as e: |
135 | | - logger.error(f"A critical error occurred during the processing of the table because {e}") |
136 | | - return { |
137 | | - 'statusCode': 500, |
138 | | - 'body': json.dumps({'error': str(e)}) |
139 | | - } |
| 129 | + return aws_lambda_function_return_json_object(logger, HttpStatus.INTERNAL_SERVER_ERROR, f"A critical error occurred during the processing {flight_avro_table_path.get_full_name()} sink table because {e}") |
140 | 130 |
|
141 | 131 | # The first table is the SkyOne table that is read in. |
142 | 132 | airline = tbl_env.from_path(f"{catalog_name}.{database_name}.skyone_avro") |
@@ -183,15 +173,6 @@ def lambda_handler(event, context): |
183 | 173 |
|
184 | 174 | # Get the processed statement name. |
185 | 175 | processed_statement_name = ConfluentTools.get_statement_name(table_result) |
186 | | - success_message = f"Data processed and inserted successfully as: {processed_statement_name}" |
187 | | - logger.info(success_message) |
188 | | - return { |
189 | | - 'statusCode': 200, |
190 | | - 'body': json.dumps({'message': success_message}) |
191 | | - } |
| 176 | + return aws_lambda_function_return_json_object(logger, HttpStatus.OK, f"Completed Flink SQL creation and deployment to CCAF: {processed_statement_name}") |
192 | 177 | except Exception as e: |
193 | | - logger.error(f"An error occurred during data insertion: {e}") |
194 | | - return { |
195 | | - 'statusCode': 500, |
196 | | - 'body': json.dumps({'error': str(e)}) |
197 | | - } |
| 178 | + return aws_lambda_function_return_json_object(logger, HttpStatus.INTERNAL_SERVER_ERROR, f"An error occurred during data insertion: {e}") |
0 commit comments