Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,23 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [2.3.3] - 2025-04-30

### Updated

- Cognito user invitation email template

### Fixed

- NGINX service startup issue and optimized health check settings for OpenSearch Access Proxy instances
- OpenSearch bulk loading to split large payloads into smaller batches to comply with OpenSearch's HTTP request size limits
- PostgreSQL log ingestion parser to parse duration and log message into separate fields

### Security

- Updated http-proxy-middleware to version 2.0.9
- Updated AWS CDK packages to latest versions

## [2.3.2] - 2025-03-14

### Fixed
Expand Down
1 change: 1 addition & 0 deletions NOTICE.txt
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ xmltodict under the MIT license
@aws-cdk/aws-kinesisfirehose-alpha under the under the Apache-2.0 license
@aws-cdk/aws-kinesisfirehose-destinations-alpha under the under the Apache-2.0 license
@aws-solutions-constructs/aws-cloudfront-s3 under the under the Apache-2.0 license
@aws-solutions-constructs/aws-cloudfront-oai-s3 under the Apache-2.0 license
@typescript-eslint/eslint-plugin under the MIT license
@typescript-eslint/parser under the BSD-2-Clause
eslint under the MIT license
Expand Down
4 changes: 2 additions & 2 deletions deployment/ecr/clo-s3-list-objects/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM public.ecr.aws/lambda/python:3.12.2025.01.24.11 AS builder
FROM public.ecr.aws/lambda/python:3.12.2025.04.03.11 AS builder

WORKDIR /build

Expand All @@ -14,7 +14,7 @@ RUN python -m venv .venv && \
cd common-lib && \
poetry build

FROM public.ecr.aws/lambda/python:3.12.2025.01.24.11
FROM public.ecr.aws/lambda/python:3.12.2025.04.03.11

WORKDIR /ws

Expand Down
2 changes: 1 addition & 1 deletion source/constructs/bin/main.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ See the License for the specific language governing permissions and
limitations under the License.
*/

import 'source-map-support/register';
import {
App,
Aspects,
Expand All @@ -31,6 +30,7 @@ import {
NagSuppressions,
} from 'cdk-nag';
import { IConstruct } from 'constructs';
import 'source-map-support/register';
import { MainStack } from '../lib/main-stack';
import { MicroBatchMainStack } from '../lib/microbatch/main/microbatch-main-stack';
import { MicroBatchApplicationFluentBitPipelineStack } from '../lib/microbatch/pipeline/application-fluent-bit-stack';
Expand Down
3 changes: 2 additions & 1 deletion source/constructs/lambda/microbatch/connector/source/rds.py
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ def _transform(cls, data: dict) -> dict:


class PostgresQueryLogFormat(metaclass=LogFormat):
PATTERN = r"(\d{4}-\d{2}-\d{2}\s+\d{2}:\d{2}:\d{2}(?:\.\d{3})? \w{3})\s*(?::(\S*?)\({0,1}(\d*)\){0,1}:(\w*)@(\w*):)?\s*\[(\d+)\][:| ](\w+):\s*(.*?(?=(?:(?:\n\d{4}-\d{2}-\d{2}\s+\d{2}:\d{2}:\d{2}(?:\.\d{3})? \w{3}\s*(?::\S*?\({0,1}(\d*)\){0,1}:\w*@\w*:)?\s*\[\d+\][:| ]\w+)|$)))"
PATTERN = r"(\d{4}-\d{2}-\d{2}\s+\d{2}:\d{2}:\d{2}(?:\.\d{3})? \w{3})\s*(?::(\S*?)\({0,1}(\d*)\){0,1}:(\w*)@(\w*):)?\s*\[(\d+)\][:| ](\w+):\s*(?:(?:duration:\s*([\d.]+)\s*ms(?:\s+statement:\s*)?)|(?:statement:\s*)|(?:\s*))(.*?(?=(?:(?:\n\d{4}-\d{2}-\d{2}\s+\d{2}:\d{2}:\d{2}(?:\.\d{3})? \w{3}\s*(?::\S*?\({0,1}(\d*)\){0,1}:\w*@\w*:)?\s*\[\d+\][:| ]\w+)|$)))"
NAME = (
"timestamp",
"host",
Expand All @@ -170,6 +170,7 @@ class PostgresQueryLogFormat(metaclass=LogFormat):
"database",
"connection_id",
"operation",
"duration",
"object",
)
FLAGS = re.DOTALL
Expand Down
2 changes: 1 addition & 1 deletion source/constructs/lambda/microbatch/test/mock.py
Original file line number Diff line number Diff line change
Expand Up @@ -1548,7 +1548,7 @@ def mock_rds_context():
"POSTGRES_QUERY_LOGS"
] = """2023-11-29 09:23:46 UTC::@:[560]:LOG: checkpoint starting: time
2023-11-29 09:23:46 UTC::@:[560]:LOG: checkpoint complete: wrote 0 buffers (0.0%); 0 WAL file(s) added, 0 removed, 0 recycled; write=0.001 s, sync=0.001 s, total=0.001 s; sync files=0, longest=0.000 s, average=0.000 s; distance=0 kB, estimate=0 kB
2023-11-29 09:23:51 UTC:10.0.2.55(57562):postgres@postgres:[29735]:LOG: statement: SELECT d.datname as "Name",
2023-11-29 09:23:51 UTC:10.0.2.55(57562):postgres@postgres:[29735]:LOG: duration: 100.50 ms statement: SELECT d.datname as "Name",
pg_catalog.pg_get_userbyid(d.datdba) as "Owner",
pg_catalog.pg_encoding_to_char(d.encoding) as "Encoding",
d.datcollate as "Collate",
Expand Down
44 changes: 28 additions & 16 deletions source/constructs/lambda/microbatch/test/test_connector.py

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,11 @@
SLEEP_INTERVAL = 10
DEFAULT_BULK_BATCH_SIZE = "10000"
BULK_ACTION = "index"
DEFAULT_MAX_PAYLOAD_SIZE = 100

# batch size can be overwritten via Env. var.
batch_size = int(os.environ.get("BULK_BATCH_SIZE", DEFAULT_BULK_BATCH_SIZE))
MAX_PAYLOAD_SIZE = int(os.environ.get("MAX_HTTP_PAYLOAD_SIZE_IN_MB", DEFAULT_MAX_PAYLOAD_SIZE))
bucket_name = os.environ.get("LOG_BUCKET_NAME")

default_region = os.environ.get("AWS_REGION")
Expand Down Expand Up @@ -367,35 +369,75 @@ def process_event(self, event):
if not self.is_event_valid(event):
return

failed_records_count = 0

total_logs_counter = Counter()
for bucket, key in self.get_bucket_and_keys(event):
lines = self.s3_read_object_by_lines(bucket, key)
if CONFIG_JSON:
log_entry_iter = self.get_log_entry_iter(lines)
# log format is LogEntry type
logs = self._counter_iter(
(log.dict(self._time_key) for log in log_entry_iter),
total_logs_counter,
)
self.process_s3_log_file(total_logs_counter, bucket, key)

else:
# service log and application log with s3 data buffer
log_iter = self._log_parser.parse(lines)

logs = self._counter_iter(log_iter, total_logs_counter)
def process_s3_log_file(self, total_logs_counter, bucket, key):
lines = self.s3_read_object_by_lines(bucket, key)
logs = self.get_log_records(total_logs_counter, lines)

batches = self._batch_iter(logs, batch_size)
for idx, records in enumerate(batches):
total, failed_records = self._bulk(records)
failed_records_count = 0
current_batch = []
current_batch_size = 0
batch_number = 0
# If configured MAX_PAYLOAD_SIZE is >= 100MB, reserve 5MB buffer
# Otherwise for smaller payload sizes (10 MB), reserve 1MB buffer
if MAX_PAYLOAD_SIZE >= 100:
BUFFER_MB = 5
else:
BUFFER_MB = 1
## Calculate actual usable payload size in bytes by subtracting buffer from max payload size
MAX_PAYLOAD_SIZE_BYTES = (MAX_PAYLOAD_SIZE - BUFFER_MB) * 1024 * 1024
for record in logs:
record_size = idx_svc.calculate_record_size(record)
should_process_current_batch = (
current_batch_size + record_size > MAX_PAYLOAD_SIZE_BYTES or
len(current_batch) == batch_size
)
if should_process_current_batch and current_batch:
logger.debug(f"Processing batch_number: {batch_number}, record_count: {len(current_batch)}")
_, failed_records = self._bulk(current_batch)
failed_records_count += len(failed_records)
if failed_records:
restorer.export_failed_records(
plugin_modules,
failed_records,
restorer._get_export_prefix(batch_number, bucket, key),
)
batch_number += 1
current_batch = []
current_batch_size = 0
current_batch.append(record)
current_batch_size += record_size
if current_batch:
logger.debug(f"Processing batch_number: {batch_number}, record_count: {len(current_batch)}")
_, failed_records = self._bulk(current_batch)
failed_records_count += len(failed_records)
if failed_records:
restorer.export_failed_records(
plugin_modules,
failed_records,
restorer._get_export_prefix(idx, bucket, key),
plugin_modules,
failed_records,
restorer._get_export_prefix(batch_number, bucket, key),
)
self._put_metric(total_logs_counter.value, failed_records_count)

def get_log_records(self, total_logs_counter, lines):
if CONFIG_JSON:
log_entry_iter = self.get_log_entry_iter(lines)
# log format is LogEntry type
logs = self._counter_iter(
(log.dict(self._time_key) for log in log_entry_iter),
total_logs_counter,
)
self._put_metric(total_logs_counter.value, failed_records_count)

else:
# service log and application log with s3 data buffer
log_iter = self._log_parser.parse(lines)
logs = self._counter_iter(log_iter, total_logs_counter)
return logs

def get_log_entry_iter(self, lines):
if self._parser_name != "JSONWithS3":
# regex log from s3 source
Expand Down
38 changes: 36 additions & 2 deletions source/constructs/lambda/pipeline/log-processor/idx/idx_svc.py
Original file line number Diff line number Diff line change
Expand Up @@ -324,6 +324,27 @@ def _create_bulk_records(self, records: list, need_json_serial=False) -> str:
bulk_body.append(json.dumps(record) + "\n")
data = "".join(bulk_body)
return data

def calculate_record_size(self, record):
"""
Calculate size of a single record including bulk format overhead

Args:
record (dict): The record to be indexed

Returns:
int: Size in bytes of the record including bulk format
"""
try:
action_size = len(json.dumps({BULK_ACTION: {}}).encode('utf-8')) + 1

record_size = len(json.dumps(record).encode('utf-8')) + 1
total_size = action_size + record_size
return total_size
except Exception as e:
logger.error(f"Error calculating record size: {str(e)}")
# Return a large number to force a new batch in case of error
return 1024 * 1024 # Returning 1 MB as record size

def json_serial(self, obj):
"""JSON serializer for objects not serializable by default json code"""
Expand Down Expand Up @@ -399,14 +420,27 @@ def adjust_bulk_batch_size(self, func_name=function_name):
response = lambda_client.get_function_configuration(FunctionName=func_name)
variables = response["Environment"]["Variables"]

updated = False

if (
variables.get("BULK_BATCH_SIZE")
and int(variables["BULK_BATCH_SIZE"]) >= 4000
):
variables["BULK_BATCH_SIZE"] = str(int(variables["BULK_BATCH_SIZE"]) - 2000)
updated = True
if variables.get("MAX_HTTP_PAYLOAD_SIZE_IN_MB"):
current_payload_size = int(variables["MAX_HTTP_PAYLOAD_SIZE_IN_MB"])
if current_payload_size > 10:
variables["MAX_HTTP_PAYLOAD_SIZE_IN_MB"] = "10"
updated = True
else:
variables["MAX_HTTP_PAYLOAD_SIZE_IN_MB"] = "10"
updated = True
if updated:
lambda_client.update_function_configuration(
FunctionName=func_name, Environment={"Variables": variables}
)
FunctionName=func_name,
Environment={"Variables": variables}
)

def adjust_lambda_env_var(self, env_name: str, val, func_name=function_name):
response = lambda_client.get_function_configuration(FunctionName=function_name)
Expand Down
47 changes: 24 additions & 23 deletions source/constructs/lambda/pipeline/log-processor/poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ requests = "^2.32.3"
requests-aws4auth = "^1.3.1"
boto3 = "^1.36.1"
botocore = "^1.36.1"
moto = "~4.2.13"
moto = "~5.1.3"
pytest = "^8.3.4"
pytest-cov = "^6.0.0"
pytest-mock = "^3.14.0"
Expand Down
Loading
Loading