Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
90 changes: 90 additions & 0 deletions src/main/java/com/uid2/optout/delta/ManualOverrideService.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
package com.uid2.optout.delta;

import com.uid2.shared.Utils;
import com.uid2.shared.cloud.CloudStorageException;
import com.uid2.shared.cloud.ICloudStorage;
import io.vertx.core.json.DecodeException;
import io.vertx.core.json.JsonObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.ByteArrayInputStream;
import java.io.InputStream;
import java.io.IOException;

/**
* Service for managing manual override status in S3.
*
* The manual override allows operators to force DELAYED_PROCESSING status,
* which stops delta production until manually cleared.
*
* Override file format:
* <pre>
* {"manual_override": "DELAYED_PROCESSING"}
* </pre>
*/
public class ManualOverrideService {
private static final Logger LOGGER = LoggerFactory.getLogger(ManualOverrideService.class);

private static final String OVERRIDE_KEY = "manual_override";
private static final String DELAYED_PROCESSING_VALUE = "DELAYED_PROCESSING";

private final ICloudStorage cloudStorage;
private final String overrideS3Path;

/**
* Create a ManualOverrideService.
*
* @param cloudStorage Cloud storage client for reading/writing override file
* @param overrideS3Path S3 path where the override file is stored
*/
public ManualOverrideService(ICloudStorage cloudStorage, String overrideS3Path) {
this.cloudStorage = cloudStorage;
this.overrideS3Path = overrideS3Path;
}

/**
* Check if DELAYED_PROCESSING override is currently set.
*
* @return true if manual override is set to DELAYED_PROCESSING
*/
public boolean isDelayedProcessing() {
return DELAYED_PROCESSING_VALUE.equalsIgnoreCase(getOverrideValue());
}

/**
* Set the manual override to DELAYED_PROCESSING.
* This will stop delta production until manually cleared.
*
* @return true if override was set successfully
*/
public boolean setDelayedProcessing() {
try {
JsonObject config = new JsonObject().put(OVERRIDE_KEY, DELAYED_PROCESSING_VALUE);
cloudStorage.upload(new ByteArrayInputStream(config.encode().getBytes()), overrideS3Path);
LOGGER.info("set manual override to DELAYED_PROCESSING: {}", overrideS3Path);
return true;
} catch (CloudStorageException e) {
LOGGER.error("manual_override_error: failed to set override at {}", overrideS3Path, e);
return false;
}
}

/**
* Get the current manual override value
*/
private String getOverrideValue() {
try {
InputStream inputStream = cloudStorage.download(overrideS3Path);
JsonObject configJson = Utils.toJsonObject(inputStream);
return configJson.getString(OVERRIDE_KEY, "");
} catch (CloudStorageException e) {
LOGGER.error("manual_override_error: no manual override file found at {}", overrideS3Path);
return "";
} catch (IOException | DecodeException e) {
LOGGER.error("manual_override_error: failed to parse override file at {}", overrideS3Path, e);
return "";
}
}
}

85 changes: 85 additions & 0 deletions src/main/java/com/uid2/optout/delta/S3UploadService.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
package com.uid2.optout.delta;

import com.uid2.optout.sqs.SqsMessageOperations;
import com.uid2.shared.cloud.ICloudStorage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.services.sqs.SqsClient;
import software.amazon.awssdk.services.sqs.model.Message;

import com.uid2.shared.cloud.CloudStorageException;

import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.util.List;

/**
* Service for uploading data to S3 and deleting messages from SQS after successful upload.
*
* This class encapsulates the critical "upload then delete" pattern that ensures
* data is persisted to S3 before messages are removed from the queue.
*/
public class S3UploadService {
private static final Logger LOGGER = LoggerFactory.getLogger(S3UploadService.class);

private final ICloudStorage cloudStorage;
private final SqsClient sqsClient;
private final String queueUrl;

/**
* Callback interface for after successful upload.
*/
@FunctionalInterface
public interface UploadSuccessCallback {
/**
* Called after successful S3 upload, before SQS message deletion.
*
* @param messageCount Number of messages in the uploaded batch
*/
void onSuccess(int messageCount);
}

/**
* Create an S3UploadService.
*
* @param cloudStorage Cloud storage client for S3 operations
* @param sqsClient SQS client for message deletion
* @param queueUrl SQS queue URL
*/
public S3UploadService(ICloudStorage cloudStorage, SqsClient sqsClient, String queueUrl) {
this.cloudStorage = cloudStorage;
this.sqsClient = sqsClient;
this.queueUrl = queueUrl;
}

/**
* Upload data to S3 and delete associated messages from SQS after successful upload.
*
* <p><strong>Critical behavior:</strong> Messages are ONLY deleted from SQS after
* the S3 upload succeeds. This ensures no data loss if upload fails.</p>
*
* @param data Data to upload
* @param s3Path S3 path (key) for the upload
* @param messages SQS messages to delete after successful upload
* @param onSuccess Callback invoked after successful upload (before message deletion)
* @throws IOException if the upload fails
*/
public void uploadAndDeleteMessages(byte[] data, String s3Path, List<Message> messages, UploadSuccessCallback onSuccess) throws IOException {
LOGGER.info("uploading to s3: path={}, size={} bytes, messages={}", s3Path, data.length, messages.size());

try (ByteArrayInputStream inputStream = new ByteArrayInputStream(data)) {
cloudStorage.upload(inputStream, s3Path);

if (onSuccess != null) {
onSuccess.onSuccess(messages.size());
}
} catch (CloudStorageException e) {
LOGGER.error("s3_error: failed to upload to path={}", s3Path, e);
throw new IOException("s3 upload failed: " + s3Path, e);
}

if (!messages.isEmpty()) {
SqsMessageOperations.deleteMessagesFromSqs(sqsClient, queueUrl, messages);
}
}
}
73 changes: 64 additions & 9 deletions src/main/java/com/uid2/optout/sqs/SqsBatchProcessor.java
Original file line number Diff line number Diff line change
@@ -1,12 +1,19 @@
package com.uid2.optout.sqs;

import com.uid2.optout.delta.S3UploadService;
import com.uid2.optout.delta.StopReason;
import com.uid2.shared.optout.OptOutUtils;
import io.vertx.core.json.JsonArray;
import io.vertx.core.json.JsonObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.services.sqs.SqsClient;
import software.amazon.awssdk.services.sqs.model.Message;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
Expand All @@ -17,15 +24,19 @@
*/
public class SqsBatchProcessor {
private static final Logger LOGGER = LoggerFactory.getLogger(SqsBatchProcessor.class);

private final SqsClient sqsClient;
private final String queueUrl;
private static final String MALFORMED_FILE_PREFIX = "optout-malformed-";
private final int deltaWindowSeconds;
private final S3UploadService s3UploadService;
private final String malformedRequestsS3Path;
private final int replicaId;

public SqsBatchProcessor(SqsClient sqsClient, String queueUrl, int deltaWindowSeconds) {
this.sqsClient = sqsClient;
this.queueUrl = queueUrl;
public SqsBatchProcessor(SqsClient sqsClient, String queueUrl, int deltaWindowSeconds,
S3UploadService s3UploadService, String malformedRequestsS3Path,
int replicaId) {
this.deltaWindowSeconds = deltaWindowSeconds;
this.s3UploadService = s3UploadService;
this.malformedRequestsS3Path = malformedRequestsS3Path;
this.replicaId = replicaId;
}

/**
Expand Down Expand Up @@ -74,16 +85,16 @@ public List<SqsParsedMessage> getMessages() {
* @param batchNumber The batch number (for logging)
* @return BatchProcessingResult containing eligible messages and processing metadata
*/
public BatchProcessingResult processBatch(List<Message> messageBatch, int batchNumber) {
public BatchProcessingResult processBatch(List<Message> messageBatch, int batchNumber) throws IOException {
// Parse and sort messages by timestamp
List<SqsParsedMessage> parsedBatch = SqsMessageParser.parseAndSortMessages(messageBatch);

// Identify and delete corrupt messages
if (parsedBatch.size() < messageBatch.size()) {
List<Message> invalidMessages = identifyInvalidMessages(messageBatch, parsedBatch);
if (!invalidMessages.isEmpty()) {
LOGGER.error("sqs_error: found {} invalid messages in batch {}, deleting", invalidMessages.size(), batchNumber);
SqsMessageOperations.deleteMessagesFromSqs(this.sqsClient, this.queueUrl, invalidMessages);
LOGGER.error("sqs_error: found {} invalid messages in batch {}, uploading to S3 and deleting", invalidMessages.size(), batchNumber);
uploadMalformedMessages(invalidMessages);
}
}

Expand Down Expand Up @@ -147,4 +158,48 @@ private List<Message> identifyInvalidMessages(List<Message> originalBatch, List<
.filter(msg -> !validIds.contains(msg.messageId()))
.collect(Collectors.toList());
}

/**
* Uploads malformed messages to S3 and then deletes them from SQS.
* The destination is "malformed" folder in the dropped requests S3 bucket.
*
* @param invalidMessages The malformed messages to upload and delete
*/
private void uploadMalformedMessages(List<Message> invalidMessages) throws IOException {
if (s3UploadService == null) {
LOGGER.error("s3_error: s3UploadService is null, skipping upload of {} malformed messages", invalidMessages.size());
return;
}

// serialize messages to json string
JsonArray messagesJson = new JsonArray();
for (Message msg : invalidMessages) {
messagesJson.add(new JsonObject()
.put("messageId", msg.messageId())
.put("body", msg.body())
.put("attributes", msg.attributesAsStrings()));
}

// format file name and data
byte[] data = messagesJson.encodePrettily().getBytes(StandardCharsets.UTF_8);
String filename = generateMalformedMessageFileName();
String s3Path = malformedRequestsS3Path + filename;

// upload and delete messages
try {
s3UploadService.uploadAndDeleteMessages(data, s3Path, invalidMessages, null);
LOGGER.info("uploaded {} malformed messages to {}", invalidMessages.size(), s3Path);
} catch (IOException e) {
LOGGER.error("failed to upload and delete malformed sqs messages, path={}, filename={}, error={}", malformedRequestsS3Path, filename, e.getMessage(), e);
throw e;
}
}

private String generateMalformedMessageFileName() {
return String.format("%s%03d_%s_%08x.json",
MALFORMED_FILE_PREFIX,
replicaId,
Instant.now().truncatedTo(ChronoUnit.SECONDS).toString().replace(':', '.'),
OptOutUtils.rand.nextInt());
}
}
11 changes: 8 additions & 3 deletions src/main/java/com/uid2/optout/sqs/SqsWindowReader.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.uid2.optout.sqs;

import com.uid2.optout.delta.S3UploadService;
import com.uid2.optout.delta.StopReason;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -8,6 +9,7 @@

import java.util.ArrayList;
import java.util.List;
import java.io.IOException;

/**
* Reads messages from SQS for complete 5-minute time windows.
Expand All @@ -26,14 +28,17 @@ public class SqsWindowReader {
private int maxMessagesPerWindow;

public SqsWindowReader(SqsClient sqsClient, String queueUrl, int maxMessagesPerPoll,
int visibilityTimeout, int deltaWindowSeconds, int maxMessagesPerWindow) {
int visibilityTimeout, int deltaWindowSeconds, int maxMessagesPerWindow,
S3UploadService malformedRequestUploadService, String malformedRequestsS3Path,
int replicaId) {
this.sqsClient = sqsClient;
this.queueUrl = queueUrl;
this.maxMessagesPerPoll = maxMessagesPerPoll;
this.visibilityTimeout = visibilityTimeout;
this.deltaWindowSeconds = deltaWindowSeconds;
this.maxMessagesPerWindow = maxMessagesPerWindow;
this.batchProcessor = new SqsBatchProcessor(sqsClient, queueUrl, deltaWindowSeconds);
this.batchProcessor = new SqsBatchProcessor(sqsClient, queueUrl, deltaWindowSeconds,
malformedRequestUploadService, malformedRequestsS3Path, replicaId);
LOGGER.info("initialized: maxMessagesPerWindow={}, maxMessagesPerPoll={}, visibilityTimeout={}, deltaWindowSeconds={}",
maxMessagesPerWindow, maxMessagesPerPoll, visibilityTimeout, deltaWindowSeconds);
}
Expand Down Expand Up @@ -88,7 +93,7 @@ public static WindowReadResult messageLimitExceeded(List<SqsParsedMessage> messa
*
* @return WindowReadResult with messages for the window, or empty if done
*/
public WindowReadResult readWindow() {
public WindowReadResult readWindow() throws IOException {
List<SqsParsedMessage> windowMessages = new ArrayList<>();
long currentWindowStart = 0;
int batchNumber = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,9 @@ public OptOutSqsLogProducer(JsonObject jsonConfig, ICloudStorage cloudStorage, O
// Initialize window reader with memory protection limit
this.windowReader = new SqsWindowReader(
this.sqsClient, this.queueUrl, this.maxMessagesPerPoll,
this.visibilityTimeout, this.deltaWindowSeconds, this.maxMessagesPerFile
this.visibilityTimeout, this.deltaWindowSeconds, this.maxMessagesPerFile,
null, null, // will be done in Orchestrator after refactoring
this.replicaId
);
LOGGER.info("OptOutSqsLogProducer initialized with maxMessagesPerFile: {}", this.maxMessagesPerFile);
}
Expand Down
Empty file.
Empty file.
Loading
Loading