Skip to content

Commit 0d206ec

Browse files
authored
Merge pull request #328 from aws-solutions/feature/v2.3.3
Update to version v2.3.3
2 parents e57392b + 783665e commit 0d206ec

File tree

19 files changed

+409
-111
lines changed

19 files changed

+409
-111
lines changed

CHANGELOG.md

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,23 @@ All notable changes to this project will be documented in this file.
55
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
66
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
77

8+
## [2.3.3] - 2025-04-30
9+
10+
### Updated
11+
12+
- Cognito user invitation email template
13+
14+
### Fixed
15+
16+
- NGINX service startup issue and optimized health check settings for OpenSearch Access Proxy instances
17+
- OpenSearch bulk loading to split large payloads into smaller batches to comply with OpenSearch's HTTP request size limits
18+
- PostgreSQL log ingestion parser to parse duration and log message into separate fields
19+
20+
### Security
21+
22+
- Updated http-proxy-middleware to version 2.0.9
23+
- Updated AWS CDK packages to latest versions
24+
825
## [2.3.2] - 2025-03-14
926

1027
### Fixed

NOTICE.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,7 @@ xmltodict under the MIT license
106106
@aws-cdk/aws-kinesisfirehose-alpha under the under the Apache-2.0 license
107107
@aws-cdk/aws-kinesisfirehose-destinations-alpha under the under the Apache-2.0 license
108108
@aws-solutions-constructs/aws-cloudfront-s3 under the under the Apache-2.0 license
109+
@aws-solutions-constructs/aws-cloudfront-oai-s3 under the Apache-2.0 license
109110
@typescript-eslint/eslint-plugin under the MIT license
110111
@typescript-eslint/parser under the BSD-2-Clause
111112
eslint under the MIT license

deployment/ecr/clo-s3-list-objects/Dockerfile

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
FROM public.ecr.aws/lambda/python:3.12.2025.01.24.11 AS builder
1+
FROM public.ecr.aws/lambda/python:3.12.2025.04.03.11 AS builder
22

33
WORKDIR /build
44

@@ -14,7 +14,7 @@ RUN python -m venv .venv && \
1414
cd common-lib && \
1515
poetry build
1616

17-
FROM public.ecr.aws/lambda/python:3.12.2025.01.24.11
17+
FROM public.ecr.aws/lambda/python:3.12.2025.04.03.11
1818

1919
WORKDIR /ws
2020

source/constructs/bin/main.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@ See the License for the specific language governing permissions and
1414
limitations under the License.
1515
*/
1616

17-
import 'source-map-support/register';
1817
import {
1918
App,
2019
Aspects,
@@ -31,6 +30,7 @@ import {
3130
NagSuppressions,
3231
} from 'cdk-nag';
3332
import { IConstruct } from 'constructs';
33+
import 'source-map-support/register';
3434
import { MainStack } from '../lib/main-stack';
3535
import { MicroBatchMainStack } from '../lib/microbatch/main/microbatch-main-stack';
3636
import { MicroBatchApplicationFluentBitPipelineStack } from '../lib/microbatch/pipeline/application-fluent-bit-stack';

source/constructs/lambda/microbatch/connector/source/rds.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -161,7 +161,7 @@ def _transform(cls, data: dict) -> dict:
161161

162162

163163
class PostgresQueryLogFormat(metaclass=LogFormat):
164-
PATTERN = r"(\d{4}-\d{2}-\d{2}\s+\d{2}:\d{2}:\d{2}(?:\.\d{3})? \w{3})\s*(?::(\S*?)\({0,1}(\d*)\){0,1}:(\w*)@(\w*):)?\s*\[(\d+)\][:| ](\w+):\s*(.*?(?=(?:(?:\n\d{4}-\d{2}-\d{2}\s+\d{2}:\d{2}:\d{2}(?:\.\d{3})? \w{3}\s*(?::\S*?\({0,1}(\d*)\){0,1}:\w*@\w*:)?\s*\[\d+\][:| ]\w+)|$)))"
164+
PATTERN = r"(\d{4}-\d{2}-\d{2}\s+\d{2}:\d{2}:\d{2}(?:\.\d{3})? \w{3})\s*(?::(\S*?)\({0,1}(\d*)\){0,1}:(\w*)@(\w*):)?\s*\[(\d+)\][:| ](\w+):\s*(?:(?:duration:\s*([\d.]+)\s*ms(?:\s+statement:\s*)?)|(?:statement:\s*)|(?:\s*))(.*?(?=(?:(?:\n\d{4}-\d{2}-\d{2}\s+\d{2}:\d{2}:\d{2}(?:\.\d{3})? \w{3}\s*(?::\S*?\({0,1}(\d*)\){0,1}:\w*@\w*:)?\s*\[\d+\][:| ]\w+)|$)))"
165165
NAME = (
166166
"timestamp",
167167
"host",
@@ -170,6 +170,7 @@ class PostgresQueryLogFormat(metaclass=LogFormat):
170170
"database",
171171
"connection_id",
172172
"operation",
173+
"duration",
173174
"object",
174175
)
175176
FLAGS = re.DOTALL

source/constructs/lambda/microbatch/test/mock.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1548,7 +1548,7 @@ def mock_rds_context():
15481548
"POSTGRES_QUERY_LOGS"
15491549
] = """2023-11-29 09:23:46 UTC::@:[560]:LOG: checkpoint starting: time
15501550
2023-11-29 09:23:46 UTC::@:[560]:LOG: checkpoint complete: wrote 0 buffers (0.0%); 0 WAL file(s) added, 0 removed, 0 recycled; write=0.001 s, sync=0.001 s, total=0.001 s; sync files=0, longest=0.000 s, average=0.000 s; distance=0 kB, estimate=0 kB
1551-
2023-11-29 09:23:51 UTC:10.0.2.55(57562):postgres@postgres:[29735]:LOG: statement: SELECT d.datname as "Name",
1551+
2023-11-29 09:23:51 UTC:10.0.2.55(57562):postgres@postgres:[29735]:LOG: duration: 100.50 ms statement: SELECT d.datname as "Name",
15521552
pg_catalog.pg_get_userbyid(d.datdba) as "Owner",
15531553
pg_catalog.pg_encoding_to_char(d.encoding) as "Encoding",
15541554
d.datcollate as "Collate",

source/constructs/lambda/microbatch/test/test_connector.py

Lines changed: 28 additions & 16 deletions
Large diffs are not rendered by default.

source/constructs/lambda/pipeline/log-processor/event/event_parser.py

Lines changed: 63 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -28,9 +28,11 @@
2828
SLEEP_INTERVAL = 10
2929
DEFAULT_BULK_BATCH_SIZE = "10000"
3030
BULK_ACTION = "index"
31+
DEFAULT_MAX_PAYLOAD_SIZE = 100
3132

3233
# batch size can be overwritten via Env. var.
3334
batch_size = int(os.environ.get("BULK_BATCH_SIZE", DEFAULT_BULK_BATCH_SIZE))
35+
MAX_PAYLOAD_SIZE = int(os.environ.get("MAX_HTTP_PAYLOAD_SIZE_IN_MB", DEFAULT_MAX_PAYLOAD_SIZE))
3436
bucket_name = os.environ.get("LOG_BUCKET_NAME")
3537

3638
default_region = os.environ.get("AWS_REGION")
@@ -367,35 +369,75 @@ def process_event(self, event):
367369
if not self.is_event_valid(event):
368370
return
369371

370-
failed_records_count = 0
372+
371373
total_logs_counter = Counter()
372374
for bucket, key in self.get_bucket_and_keys(event):
373-
lines = self.s3_read_object_by_lines(bucket, key)
374-
if CONFIG_JSON:
375-
log_entry_iter = self.get_log_entry_iter(lines)
376-
# log format is LogEntry type
377-
logs = self._counter_iter(
378-
(log.dict(self._time_key) for log in log_entry_iter),
379-
total_logs_counter,
380-
)
375+
self.process_s3_log_file(total_logs_counter, bucket, key)
381376

382-
else:
383-
# service log and application log with s3 data buffer
384-
log_iter = self._log_parser.parse(lines)
385-
386-
logs = self._counter_iter(log_iter, total_logs_counter)
377+
def process_s3_log_file(self, total_logs_counter, bucket, key):
378+
lines = self.s3_read_object_by_lines(bucket, key)
379+
logs = self.get_log_records(total_logs_counter, lines)
387380

388-
batches = self._batch_iter(logs, batch_size)
389-
for idx, records in enumerate(batches):
390-
total, failed_records = self._bulk(records)
381+
failed_records_count = 0
382+
current_batch = []
383+
current_batch_size = 0
384+
batch_number = 0
385+
# If configured MAX_PAYLOAD_SIZE is >= 100MB, reserve 5MB buffer
386+
# Otherwise for smaller payload sizes (10 MB), reserve 1MB buffer
387+
if MAX_PAYLOAD_SIZE >= 100:
388+
BUFFER_MB = 5
389+
else:
390+
BUFFER_MB = 1
391+
## Calculate actual usable payload size in bytes by subtracting buffer from max payload size
392+
MAX_PAYLOAD_SIZE_BYTES = (MAX_PAYLOAD_SIZE - BUFFER_MB) * 1024 * 1024
393+
for record in logs:
394+
record_size = idx_svc.calculate_record_size(record)
395+
should_process_current_batch = (
396+
current_batch_size + record_size > MAX_PAYLOAD_SIZE_BYTES or
397+
len(current_batch) == batch_size
398+
)
399+
if should_process_current_batch and current_batch:
400+
logger.debug(f"Processing batch_number: {batch_number}, record_count: {len(current_batch)}")
401+
_, failed_records = self._bulk(current_batch)
391402
failed_records_count += len(failed_records)
403+
if failed_records:
404+
restorer.export_failed_records(
405+
plugin_modules,
406+
failed_records,
407+
restorer._get_export_prefix(batch_number, bucket, key),
408+
)
409+
batch_number += 1
410+
current_batch = []
411+
current_batch_size = 0
412+
current_batch.append(record)
413+
current_batch_size += record_size
414+
if current_batch:
415+
logger.debug(f"Processing batch_number: {batch_number}, record_count: {len(current_batch)}")
416+
_, failed_records = self._bulk(current_batch)
417+
failed_records_count += len(failed_records)
418+
if failed_records:
392419
restorer.export_failed_records(
393-
plugin_modules,
394-
failed_records,
395-
restorer._get_export_prefix(idx, bucket, key),
420+
plugin_modules,
421+
failed_records,
422+
restorer._get_export_prefix(batch_number, bucket, key),
423+
)
424+
self._put_metric(total_logs_counter.value, failed_records_count)
425+
426+
def get_log_records(self, total_logs_counter, lines):
427+
if CONFIG_JSON:
428+
log_entry_iter = self.get_log_entry_iter(lines)
429+
# log format is LogEntry type
430+
logs = self._counter_iter(
431+
(log.dict(self._time_key) for log in log_entry_iter),
432+
total_logs_counter,
396433
)
397-
self._put_metric(total_logs_counter.value, failed_records_count)
398434

435+
else:
436+
# service log and application log with s3 data buffer
437+
log_iter = self._log_parser.parse(lines)
438+
logs = self._counter_iter(log_iter, total_logs_counter)
439+
return logs
440+
399441
def get_log_entry_iter(self, lines):
400442
if self._parser_name != "JSONWithS3":
401443
# regex log from s3 source

source/constructs/lambda/pipeline/log-processor/idx/idx_svc.py

Lines changed: 36 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -324,6 +324,27 @@ def _create_bulk_records(self, records: list, need_json_serial=False) -> str:
324324
bulk_body.append(json.dumps(record) + "\n")
325325
data = "".join(bulk_body)
326326
return data
327+
328+
def calculate_record_size(self, record):
329+
"""
330+
Calculate size of a single record including bulk format overhead
331+
332+
Args:
333+
record (dict): The record to be indexed
334+
335+
Returns:
336+
int: Size in bytes of the record including bulk format
337+
"""
338+
try:
339+
action_size = len(json.dumps({BULK_ACTION: {}}).encode('utf-8')) + 1
340+
341+
record_size = len(json.dumps(record).encode('utf-8')) + 1
342+
total_size = action_size + record_size
343+
return total_size
344+
except Exception as e:
345+
logger.error(f"Error calculating record size: {str(e)}")
346+
# Return a large number to force a new batch in case of error
347+
return 1024 * 1024 # Returning 1 MB as record size
327348

328349
def json_serial(self, obj):
329350
"""JSON serializer for objects not serializable by default json code"""
@@ -399,14 +420,27 @@ def adjust_bulk_batch_size(self, func_name=function_name):
399420
response = lambda_client.get_function_configuration(FunctionName=func_name)
400421
variables = response["Environment"]["Variables"]
401422

423+
updated = False
424+
402425
if (
403426
variables.get("BULK_BATCH_SIZE")
404427
and int(variables["BULK_BATCH_SIZE"]) >= 4000
405428
):
406429
variables["BULK_BATCH_SIZE"] = str(int(variables["BULK_BATCH_SIZE"]) - 2000)
430+
updated = True
431+
if variables.get("MAX_HTTP_PAYLOAD_SIZE_IN_MB"):
432+
current_payload_size = int(variables["MAX_HTTP_PAYLOAD_SIZE_IN_MB"])
433+
if current_payload_size > 10:
434+
variables["MAX_HTTP_PAYLOAD_SIZE_IN_MB"] = "10"
435+
updated = True
436+
else:
437+
variables["MAX_HTTP_PAYLOAD_SIZE_IN_MB"] = "10"
438+
updated = True
439+
if updated:
407440
lambda_client.update_function_configuration(
408-
FunctionName=func_name, Environment={"Variables": variables}
409-
)
441+
FunctionName=func_name,
442+
Environment={"Variables": variables}
443+
)
410444

411445
def adjust_lambda_env_var(self, env_name: str, val, func_name=function_name):
412446
response = lambda_client.get_function_configuration(FunctionName=function_name)

source/constructs/lambda/pipeline/log-processor/poetry.lock

Lines changed: 24 additions & 23 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)