Skip to content

Conversation

@Ian-Nara
Copy link
Contributor

@Ian-Nara Ian-Nara commented Dec 9, 2025

No description provided.

@Ian-Nara Ian-Nara changed the base branch from main to ian-UID2-6345-sqs-components December 9, 2025 00:59
@Ian-Nara Ian-Nara marked this pull request as ready for review December 9, 2025 01:00
@Ian-Nara Ian-Nara force-pushed the ian-UID2-6345-traffic-components branch from 2023858 to 52e7818 Compare December 9, 2025 23:44
@Ian-Nara Ian-Nara force-pushed the ian-UID2-6345-sqs-components branch 2 times, most recently from 6e9fa46 to bbda3f2 Compare December 10, 2025 03:55
Copy link

@asloobq asloobq left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

submitting partial review

// process delta files and count records in [deltaWindowStart, newestDeltaTs]
// files are sorted newest to oldest, records within files are sorted newest to oldest
// stop when the newest record in a file is older than the window
int sum = 0;
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion: rename sum to totalRecords or totalRequests

/**
* Extract timestamp from SQS message (from SentTimestamp attribute)
*/
private Long extractTimestampFromMessage(Message msg) {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you reuse this method from SQSMessageParser.extractTimestamp()

}

/**
* Count SQS messages from oldestQueueTs to oldestQueueTs + 5 minutes
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you remind me why the unprocessed messages in queue are capped upto 5 mins ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We do not wish to read the entire queue (to apply the allowlist based on timestamp), just 5 minutes at a time.

for (Message msg : sqsMessages) {
Long ts = extractTimestampFromMessage(msg);

if (ts < oldestQueueTs || ts > windowEnd) {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ts < oldestQueueTs I think this is not possible because oldestQueueTs was determined from the same sqsMessages

* Determine traffic status based on current vs baseline traffic.
* Logs warnings at 50%, 75%, and 90% of the circuit breaker threshold.
*/
TrafficStatus determineStatus(int sumCurrent, int baselineTraffic) {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion: rename sumCurrent to last24HrTraffic or similar

LOGGER.warn("high_message_volume: 90% of threshold reached, sumCurrent={}, threshold={} ({}x{}), thresholdPercent={}%",
sumCurrent, threshold, thresholdMultiplier, baselineTraffic, String.format("%.1f", thresholdPercent));
} else if (thresholdPercent >= 75.0) {
LOGGER.warn("high_message_volume: 75% of threshold reached, sumCurrent={}, threshold={} ({}x{}), thresholdPercent={}%",
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the log messages are same except for 90/75/50. I think you can do it with one log message

}

if (sumCurrent >= threshold) {
LOGGER.error("circuit_breaker_triggered: traffic threshold breached, sumCurrent={}, threshold={} ({}x{})",
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the log type ERROR on purpose ? I think the system is working fine so this shouldn't be an error. May be WARN ?

* @param sqsMessages List of SQS messages this consumer has read (non-denylisted)
* @param queueAttributes Queue attributes including invisible message count (can be null)
* @param denylistedCount Number of denylisted messages read by this consumer
* @param filteredAsTooRecentCount Number of messages filtered as "too recent" by window reader
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wouldn't these be part of sqsMessages ?

Copy link
Contributor Author

@Ian-Nara Ian-Nara Dec 10, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no, these are not returned by SqsBatchProcessor

// Filter for eligible messages (>= deltaWindowSeconds old)
List<SqsParsedMessage> eligibleMessages = filterEligibleMessages(parsedBatch, currentTime);

return BatchProcessingResult.withMessages(eligibleMessages);

But they will appear in the invisible message count returned by getQueueAttributes. We remove / "deduplicate" this from the invisible count, since we are using that to represent messages from other consumers

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay, this is a bit confusing TBH.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree, I will add to the list, to revisit this when the usage of trafficcalculator is added.

List<TrafficFilterRule> rules = new ArrayList<>();
try {
JsonArray denylistRequests = config.getJsonArray("denylist_requests");
if (denylistRequests == null) {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so config should at least have an empty list ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, just to avoid key typos when configuring


TrafficFilterRule rule = new TrafficFilterRule(range, ipAddresses);

LOGGER.info("loaded traffic filter rule: range=[{}, {}], IPs={}", rule.getRangeStart(), rule.getRangeEnd(), rule.getIpAddresses());
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will this log all the IP addresses or just the size of the list ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This would log the IP addresses we configured. Do you think we should avoid this?

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On one hand, this could flood the logs if the list is too big, on the other hand it would be useful for debugging. Let's keep it for now.

*/
private static class TrafficFilterRule {
private final List<Long> range;
private final List<String> ipAddresses;
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

probably a Set is better to do contains() efficiently

Base automatically changed from ian-UID2-6345-sqs-components to main December 10, 2025 21:19
@Ian-Nara Ian-Nara merged commit b7161ce into main Dec 11, 2025
4 checks passed
@Ian-Nara Ian-Nara deleted the ian-UID2-6345-traffic-components branch December 11, 2025 21:24
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants