diff --git a/src/main/java/com/uid2/optout/delta/ManualOverrideService.java b/src/main/java/com/uid2/optout/delta/ManualOverrideService.java new file mode 100644 index 00000000..82109d93 --- /dev/null +++ b/src/main/java/com/uid2/optout/delta/ManualOverrideService.java @@ -0,0 +1,90 @@ +package com.uid2.optout.delta; + +import com.uid2.shared.Utils; +import com.uid2.shared.cloud.CloudStorageException; +import com.uid2.shared.cloud.ICloudStorage; +import io.vertx.core.json.DecodeException; +import io.vertx.core.json.JsonObject; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.ByteArrayInputStream; +import java.io.InputStream; +import java.io.IOException; + +/** + * Service for managing manual override status in S3. + * + * The manual override allows operators to force DELAYED_PROCESSING status, + * which stops delta production until manually cleared. + * + * Override file format: + *
+ * {"manual_override": "DELAYED_PROCESSING"}
+ * 
+ */ +public class ManualOverrideService { + private static final Logger LOGGER = LoggerFactory.getLogger(ManualOverrideService.class); + + private static final String OVERRIDE_KEY = "manual_override"; + private static final String DELAYED_PROCESSING_VALUE = "DELAYED_PROCESSING"; + + private final ICloudStorage cloudStorage; + private final String overrideS3Path; + + /** + * Create a ManualOverrideService. + * + * @param cloudStorage Cloud storage client for reading/writing override file + * @param overrideS3Path S3 path where the override file is stored + */ + public ManualOverrideService(ICloudStorage cloudStorage, String overrideS3Path) { + this.cloudStorage = cloudStorage; + this.overrideS3Path = overrideS3Path; + } + + /** + * Check if DELAYED_PROCESSING override is currently set. + * + * @return true if manual override is set to DELAYED_PROCESSING + */ + public boolean isDelayedProcessing() { + return DELAYED_PROCESSING_VALUE.equalsIgnoreCase(getOverrideValue()); + } + + /** + * Set the manual override to DELAYED_PROCESSING. + * This will stop delta production until manually cleared. + * + * @return true if override was set successfully + */ + public boolean setDelayedProcessing() { + try { + JsonObject config = new JsonObject().put(OVERRIDE_KEY, DELAYED_PROCESSING_VALUE); + cloudStorage.upload(new ByteArrayInputStream(config.encode().getBytes()), overrideS3Path); + LOGGER.info("set manual override to DELAYED_PROCESSING: {}", overrideS3Path); + return true; + } catch (CloudStorageException e) { + LOGGER.error("manual_override_error: failed to set override at {}", overrideS3Path, e); + return false; + } + } + + /** + * Get the current manual override value + */ + private String getOverrideValue() { + try { + InputStream inputStream = cloudStorage.download(overrideS3Path); + JsonObject configJson = Utils.toJsonObject(inputStream); + return configJson.getString(OVERRIDE_KEY, ""); + } catch (CloudStorageException e) { + LOGGER.error("manual_override_error: no manual override file found at {}", overrideS3Path); + return ""; + } catch (IOException | DecodeException e) { + LOGGER.error("manual_override_error: failed to parse override file at {}", overrideS3Path, e); + return ""; + } + } +} + diff --git a/src/main/java/com/uid2/optout/delta/S3UploadService.java b/src/main/java/com/uid2/optout/delta/S3UploadService.java new file mode 100644 index 00000000..0d6c1c45 --- /dev/null +++ b/src/main/java/com/uid2/optout/delta/S3UploadService.java @@ -0,0 +1,85 @@ +package com.uid2.optout.delta; + +import com.uid2.optout.sqs.SqsMessageOperations; +import com.uid2.shared.cloud.ICloudStorage; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import software.amazon.awssdk.services.sqs.SqsClient; +import software.amazon.awssdk.services.sqs.model.Message; + +import com.uid2.shared.cloud.CloudStorageException; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.util.List; + +/** + * Service for uploading data to S3 and deleting messages from SQS after successful upload. + * + * This class encapsulates the critical "upload then delete" pattern that ensures + * data is persisted to S3 before messages are removed from the queue. + */ +public class S3UploadService { + private static final Logger LOGGER = LoggerFactory.getLogger(S3UploadService.class); + + private final ICloudStorage cloudStorage; + private final SqsClient sqsClient; + private final String queueUrl; + + /** + * Callback interface for after successful upload. + */ + @FunctionalInterface + public interface UploadSuccessCallback { + /** + * Called after successful S3 upload, before SQS message deletion. + * + * @param messageCount Number of messages in the uploaded batch + */ + void onSuccess(int messageCount); + } + + /** + * Create an S3UploadService. + * + * @param cloudStorage Cloud storage client for S3 operations + * @param sqsClient SQS client for message deletion + * @param queueUrl SQS queue URL + */ + public S3UploadService(ICloudStorage cloudStorage, SqsClient sqsClient, String queueUrl) { + this.cloudStorage = cloudStorage; + this.sqsClient = sqsClient; + this.queueUrl = queueUrl; + } + + /** + * Upload data to S3 and delete associated messages from SQS after successful upload. + * + *

Critical behavior: Messages are ONLY deleted from SQS after + * the S3 upload succeeds. This ensures no data loss if upload fails.

+ * + * @param data Data to upload + * @param s3Path S3 path (key) for the upload + * @param messages SQS messages to delete after successful upload + * @param onSuccess Callback invoked after successful upload (before message deletion) + * @throws IOException if the upload fails + */ + public void uploadAndDeleteMessages(byte[] data, String s3Path, List messages, UploadSuccessCallback onSuccess) throws IOException { + LOGGER.info("uploading to s3: path={}, size={} bytes, messages={}", s3Path, data.length, messages.size()); + + try (ByteArrayInputStream inputStream = new ByteArrayInputStream(data)) { + cloudStorage.upload(inputStream, s3Path); + + if (onSuccess != null) { + onSuccess.onSuccess(messages.size()); + } + } catch (CloudStorageException e) { + LOGGER.error("s3_error: failed to upload to path={}", s3Path, e); + throw new IOException("s3 upload failed: " + s3Path, e); + } + + if (!messages.isEmpty()) { + SqsMessageOperations.deleteMessagesFromSqs(sqsClient, queueUrl, messages); + } + } +} diff --git a/src/main/java/com/uid2/optout/sqs/SqsBatchProcessor.java b/src/main/java/com/uid2/optout/sqs/SqsBatchProcessor.java index afe08619..33fdf7e0 100644 --- a/src/main/java/com/uid2/optout/sqs/SqsBatchProcessor.java +++ b/src/main/java/com/uid2/optout/sqs/SqsBatchProcessor.java @@ -1,12 +1,19 @@ package com.uid2.optout.sqs; +import com.uid2.optout.delta.S3UploadService; import com.uid2.optout.delta.StopReason; import com.uid2.shared.optout.OptOutUtils; +import io.vertx.core.json.JsonArray; +import io.vertx.core.json.JsonObject; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import software.amazon.awssdk.services.sqs.SqsClient; import software.amazon.awssdk.services.sqs.model.Message; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.time.Instant; +import java.time.temporal.ChronoUnit; import java.util.List; import java.util.Set; import java.util.stream.Collectors; @@ -17,15 +24,19 @@ */ public class SqsBatchProcessor { private static final Logger LOGGER = LoggerFactory.getLogger(SqsBatchProcessor.class); - - private final SqsClient sqsClient; - private final String queueUrl; + private static final String MALFORMED_FILE_PREFIX = "optout-malformed-"; private final int deltaWindowSeconds; + private final S3UploadService s3UploadService; + private final String malformedRequestsS3Path; + private final int replicaId; - public SqsBatchProcessor(SqsClient sqsClient, String queueUrl, int deltaWindowSeconds) { - this.sqsClient = sqsClient; - this.queueUrl = queueUrl; + public SqsBatchProcessor(SqsClient sqsClient, String queueUrl, int deltaWindowSeconds, + S3UploadService s3UploadService, String malformedRequestsS3Path, + int replicaId) { this.deltaWindowSeconds = deltaWindowSeconds; + this.s3UploadService = s3UploadService; + this.malformedRequestsS3Path = malformedRequestsS3Path; + this.replicaId = replicaId; } /** @@ -74,7 +85,7 @@ public List getMessages() { * @param batchNumber The batch number (for logging) * @return BatchProcessingResult containing eligible messages and processing metadata */ - public BatchProcessingResult processBatch(List messageBatch, int batchNumber) { + public BatchProcessingResult processBatch(List messageBatch, int batchNumber) throws IOException { // Parse and sort messages by timestamp List parsedBatch = SqsMessageParser.parseAndSortMessages(messageBatch); @@ -82,8 +93,8 @@ public BatchProcessingResult processBatch(List messageBatch, int batchN if (parsedBatch.size() < messageBatch.size()) { List invalidMessages = identifyInvalidMessages(messageBatch, parsedBatch); if (!invalidMessages.isEmpty()) { - LOGGER.error("sqs_error: found {} invalid messages in batch {}, deleting", invalidMessages.size(), batchNumber); - SqsMessageOperations.deleteMessagesFromSqs(this.sqsClient, this.queueUrl, invalidMessages); + LOGGER.error("sqs_error: found {} invalid messages in batch {}, uploading to S3 and deleting", invalidMessages.size(), batchNumber); + uploadMalformedMessages(invalidMessages); } } @@ -147,4 +158,48 @@ private List identifyInvalidMessages(List originalBatch, List< .filter(msg -> !validIds.contains(msg.messageId())) .collect(Collectors.toList()); } + + /** + * Uploads malformed messages to S3 and then deletes them from SQS. + * The destination is "malformed" folder in the dropped requests S3 bucket. + * + * @param invalidMessages The malformed messages to upload and delete + */ + private void uploadMalformedMessages(List invalidMessages) throws IOException { + if (s3UploadService == null) { + LOGGER.error("s3_error: s3UploadService is null, skipping upload of {} malformed messages", invalidMessages.size()); + return; + } + + // serialize messages to json string + JsonArray messagesJson = new JsonArray(); + for (Message msg : invalidMessages) { + messagesJson.add(new JsonObject() + .put("messageId", msg.messageId()) + .put("body", msg.body()) + .put("attributes", msg.attributesAsStrings())); + } + + // format file name and data + byte[] data = messagesJson.encodePrettily().getBytes(StandardCharsets.UTF_8); + String filename = generateMalformedMessageFileName(); + String s3Path = malformedRequestsS3Path + filename; + + // upload and delete messages + try { + s3UploadService.uploadAndDeleteMessages(data, s3Path, invalidMessages, null); + LOGGER.info("uploaded {} malformed messages to {}", invalidMessages.size(), s3Path); + } catch (IOException e) { + LOGGER.error("failed to upload and delete malformed sqs messages, path={}, filename={}, error={}", malformedRequestsS3Path, filename, e.getMessage(), e); + throw e; + } + } + + private String generateMalformedMessageFileName() { + return String.format("%s%03d_%s_%08x.json", + MALFORMED_FILE_PREFIX, + replicaId, + Instant.now().truncatedTo(ChronoUnit.SECONDS).toString().replace(':', '.'), + OptOutUtils.rand.nextInt()); + } } diff --git a/src/main/java/com/uid2/optout/sqs/SqsWindowReader.java b/src/main/java/com/uid2/optout/sqs/SqsWindowReader.java index 3a9fe711..c8733e79 100644 --- a/src/main/java/com/uid2/optout/sqs/SqsWindowReader.java +++ b/src/main/java/com/uid2/optout/sqs/SqsWindowReader.java @@ -1,5 +1,6 @@ package com.uid2.optout.sqs; +import com.uid2.optout.delta.S3UploadService; import com.uid2.optout.delta.StopReason; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -8,6 +9,7 @@ import java.util.ArrayList; import java.util.List; +import java.io.IOException; /** * Reads messages from SQS for complete 5-minute time windows. @@ -26,14 +28,17 @@ public class SqsWindowReader { private int maxMessagesPerWindow; public SqsWindowReader(SqsClient sqsClient, String queueUrl, int maxMessagesPerPoll, - int visibilityTimeout, int deltaWindowSeconds, int maxMessagesPerWindow) { + int visibilityTimeout, int deltaWindowSeconds, int maxMessagesPerWindow, + S3UploadService malformedRequestUploadService, String malformedRequestsS3Path, + int replicaId) { this.sqsClient = sqsClient; this.queueUrl = queueUrl; this.maxMessagesPerPoll = maxMessagesPerPoll; this.visibilityTimeout = visibilityTimeout; this.deltaWindowSeconds = deltaWindowSeconds; this.maxMessagesPerWindow = maxMessagesPerWindow; - this.batchProcessor = new SqsBatchProcessor(sqsClient, queueUrl, deltaWindowSeconds); + this.batchProcessor = new SqsBatchProcessor(sqsClient, queueUrl, deltaWindowSeconds, + malformedRequestUploadService, malformedRequestsS3Path, replicaId); LOGGER.info("initialized: maxMessagesPerWindow={}, maxMessagesPerPoll={}, visibilityTimeout={}, deltaWindowSeconds={}", maxMessagesPerWindow, maxMessagesPerPoll, visibilityTimeout, deltaWindowSeconds); } @@ -88,7 +93,7 @@ public static WindowReadResult messageLimitExceeded(List messa * * @return WindowReadResult with messages for the window, or empty if done */ - public WindowReadResult readWindow() { + public WindowReadResult readWindow() throws IOException { List windowMessages = new ArrayList<>(); long currentWindowStart = 0; int batchNumber = 0; diff --git a/src/main/java/com/uid2/optout/vertx/OptOutSqsLogProducer.java b/src/main/java/com/uid2/optout/vertx/OptOutSqsLogProducer.java index f0986907..da9b68ef 100644 --- a/src/main/java/com/uid2/optout/vertx/OptOutSqsLogProducer.java +++ b/src/main/java/com/uid2/optout/vertx/OptOutSqsLogProducer.java @@ -158,7 +158,9 @@ public OptOutSqsLogProducer(JsonObject jsonConfig, ICloudStorage cloudStorage, O // Initialize window reader with memory protection limit this.windowReader = new SqsWindowReader( this.sqsClient, this.queueUrl, this.maxMessagesPerPoll, - this.visibilityTimeout, this.deltaWindowSeconds, this.maxMessagesPerFile + this.visibilityTimeout, this.deltaWindowSeconds, this.maxMessagesPerFile, + null, null, // will be done in Orchestrator after refactoring + this.replicaId ); LOGGER.info("OptOutSqsLogProducer initialized with maxMessagesPerFile: {}", this.maxMessagesPerFile); } diff --git a/src/main/java/com/uid2/optout/vertx/OptOutTrafficCalculator.java b/src/main/java/com/uid2/optout/vertx/OptOutTrafficCalculator.java deleted file mode 100644 index e69de29b..00000000 diff --git a/src/main/java/com/uid2/optout/vertx/OptOutTrafficFilter.java b/src/main/java/com/uid2/optout/vertx/OptOutTrafficFilter.java deleted file mode 100644 index e69de29b..00000000 diff --git a/src/test/java/com/uid2/optout/delta/ManualOverrideServiceTest.java b/src/test/java/com/uid2/optout/delta/ManualOverrideServiceTest.java new file mode 100644 index 00000000..5dc3fbda --- /dev/null +++ b/src/test/java/com/uid2/optout/delta/ManualOverrideServiceTest.java @@ -0,0 +1,145 @@ +package com.uid2.optout.delta; + +import com.uid2.shared.cloud.CloudStorageException; +import com.uid2.shared.cloud.ICloudStorage; +import io.vertx.core.json.JsonObject; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.mockito.ArgumentCaptor; + +import java.io.ByteArrayInputStream; +import java.io.InputStream; +import java.nio.charset.StandardCharsets; + +import static org.junit.jupiter.api.Assertions.*; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.*; + +public class ManualOverrideServiceTest { + + private ICloudStorage mockCloudStorage; + private ManualOverrideService manualOverrideService; + + private static final String TEST_OVERRIDE_PATH = "config/manual-override.json"; + + @BeforeEach + void setUp() { + mockCloudStorage = mock(ICloudStorage.class); + manualOverrideService = new ManualOverrideService(mockCloudStorage, TEST_OVERRIDE_PATH); + } + + // ==================== isDelayedProcessing tests ==================== + + @Test + void testIsDelayedProcessing_returnsTrue_whenOverrideSet() throws Exception { + JsonObject config = new JsonObject().put("manual_override", "DELAYED_PROCESSING"); + InputStream inputStream = new ByteArrayInputStream(config.encode().getBytes(StandardCharsets.UTF_8)); + when(mockCloudStorage.download(TEST_OVERRIDE_PATH)).thenReturn(inputStream); + + assertTrue(manualOverrideService.isDelayedProcessing()); + } + + @Test + void testIsDelayedProcessing_returnsFalse_whenOverrideNotSet() throws Exception { + JsonObject config = new JsonObject().put("manual_override", ""); + InputStream inputStream = new ByteArrayInputStream(config.encode().getBytes(StandardCharsets.UTF_8)); + when(mockCloudStorage.download(TEST_OVERRIDE_PATH)).thenReturn(inputStream); + + assertFalse(manualOverrideService.isDelayedProcessing()); + } + + @Test + void testIsDelayedProcessing_returnsFalse_whenFileNotFound() throws Exception { + when(mockCloudStorage.download(TEST_OVERRIDE_PATH)) + .thenThrow(new CloudStorageException("File not found")); + + assertFalse(manualOverrideService.isDelayedProcessing()); + } + + @Test + void testIsDelayedProcessing_returnsFalse_whenDifferentValue() throws Exception { + JsonObject config = new JsonObject().put("manual_override", "SOME_OTHER_VALUE"); + InputStream inputStream = new ByteArrayInputStream(config.encode().getBytes(StandardCharsets.UTF_8)); + when(mockCloudStorage.download(TEST_OVERRIDE_PATH)).thenReturn(inputStream); + + assertFalse(manualOverrideService.isDelayedProcessing()); + } + + @Test + void testIsDelayedProcessing_returnsFalse_whenKeyMissing() throws Exception { + JsonObject config = new JsonObject().put("other_key", "value"); + InputStream inputStream = new ByteArrayInputStream(config.encode().getBytes(StandardCharsets.UTF_8)); + when(mockCloudStorage.download(TEST_OVERRIDE_PATH)).thenReturn(inputStream); + + assertFalse(manualOverrideService.isDelayedProcessing()); + } + + @Test + void testIsDelayedProcessing_returnsFalse_whenInvalidJson() throws Exception { + InputStream inputStream = new ByteArrayInputStream("not valid json".getBytes(StandardCharsets.UTF_8)); + when(mockCloudStorage.download(TEST_OVERRIDE_PATH)).thenReturn(inputStream); + + assertFalse(manualOverrideService.isDelayedProcessing()); + } + + @Test + void testIsDelayedProcessing_caseInsensitive() throws Exception { + JsonObject config = new JsonObject().put("manual_override", "delayed_processing"); + InputStream inputStream = new ByteArrayInputStream(config.encode().getBytes(StandardCharsets.UTF_8)); + when(mockCloudStorage.download(TEST_OVERRIDE_PATH)).thenReturn(inputStream); + + assertTrue(manualOverrideService.isDelayedProcessing()); + } + + @Test + void testIsDelayedProcessing_emptyFile() throws Exception { + InputStream inputStream = new ByteArrayInputStream("{}".getBytes(StandardCharsets.UTF_8)); + when(mockCloudStorage.download(TEST_OVERRIDE_PATH)).thenReturn(inputStream); + + assertFalse(manualOverrideService.isDelayedProcessing()); + } + + @Test + void testIsDelayedProcessing_nullValue() throws Exception { + InputStream inputStream = new ByteArrayInputStream("{\"manual_override\":null}".getBytes(StandardCharsets.UTF_8)); + when(mockCloudStorage.download(TEST_OVERRIDE_PATH)).thenReturn(inputStream); + + assertFalse(manualOverrideService.isDelayedProcessing()); + } + + // ==================== setDelayedProcessing tests ==================== + + @Test + void testSetDelayedProcessing_success() throws Exception { + boolean result = manualOverrideService.setDelayedProcessing(); + + assertTrue(result); + + ArgumentCaptor inputStreamCaptor = ArgumentCaptor.forClass(InputStream.class); + verify(mockCloudStorage).upload(inputStreamCaptor.capture(), eq(TEST_OVERRIDE_PATH)); + + // Verify the uploaded content + String uploadedContent = new String(inputStreamCaptor.getValue().readAllBytes(), StandardCharsets.UTF_8); + JsonObject uploadedJson = new JsonObject(uploadedContent); + assertEquals("DELAYED_PROCESSING", uploadedJson.getString("manual_override")); + } + + @Test + void testSetDelayedProcessing_failsOnCloudStorageException() throws Exception { + doThrow(new CloudStorageException("Upload failed")) + .when(mockCloudStorage).upload(any(InputStream.class), eq(TEST_OVERRIDE_PATH)); + + boolean result = manualOverrideService.setDelayedProcessing(); + + assertFalse(result); + } + + @Test + void testSetDelayedProcessing_uploadsToCorrectPath() throws Exception { + manualOverrideService.setDelayedProcessing(); + + verify(mockCloudStorage).upload(any(InputStream.class), eq(TEST_OVERRIDE_PATH)); + } +} + diff --git a/src/test/java/com/uid2/optout/delta/S3UploadServiceTest.java b/src/test/java/com/uid2/optout/delta/S3UploadServiceTest.java new file mode 100644 index 00000000..c043cd33 --- /dev/null +++ b/src/test/java/com/uid2/optout/delta/S3UploadServiceTest.java @@ -0,0 +1,201 @@ +package com.uid2.optout.delta; + +import com.uid2.shared.cloud.CloudStorageException; +import com.uid2.shared.cloud.ICloudStorage; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.mockito.ArgumentCaptor; +import software.amazon.awssdk.services.sqs.SqsClient; +import software.amazon.awssdk.services.sqs.model.DeleteMessageBatchRequest; +import software.amazon.awssdk.services.sqs.model.DeleteMessageBatchResponse; +import software.amazon.awssdk.services.sqs.model.Message; + +import java.io.IOException; +import java.io.InputStream; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.junit.jupiter.api.Assertions.*; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.*; + +public class S3UploadServiceTest { + + private ICloudStorage mockCloudStorage; + private SqsClient mockSqsClient; + private S3UploadService s3UploadService; + + private static final String TEST_QUEUE_URL = "https://sqs.us-east-1.amazonaws.com/123456789/test-queue"; + private static final String TEST_S3_PATH = "test-bucket/test-file.dat"; + + @BeforeEach + void setUp() { + mockCloudStorage = mock(ICloudStorage.class); + mockSqsClient = mock(SqsClient.class); + s3UploadService = new S3UploadService(mockCloudStorage, mockSqsClient, TEST_QUEUE_URL); + } + + // ==================== uploadAndDeleteMessages tests ==================== + + @Test + void testUploadAndDeleteMessages_success() throws Exception { + // Setup - create test data and messages + byte[] data = "test data".getBytes(StandardCharsets.UTF_8); + List messages = createMessages(3); + + when(mockSqsClient.deleteMessageBatch(any(DeleteMessageBatchRequest.class))) + .thenReturn(DeleteMessageBatchResponse.builder().build()); + + // Act - upload and delete messages + s3UploadService.uploadAndDeleteMessages(data, TEST_S3_PATH, messages, null); + + // Assert - S3 upload was called + ArgumentCaptor inputStreamCaptor = ArgumentCaptor.forClass(InputStream.class); + verify(mockCloudStorage).upload(inputStreamCaptor.capture(), eq(TEST_S3_PATH)); + + // Assert - content was uploaded + byte[] uploadedData = inputStreamCaptor.getValue().readAllBytes(); + assertArrayEquals(data, uploadedData); + + // Assert - SQS delete was called + verify(mockSqsClient).deleteMessageBatch(any(DeleteMessageBatchRequest.class)); + } + + @Test + void testUploadAndDeleteMessages_callsSuccessCallback() throws Exception { + // Setup - create test data and messages + byte[] data = "test data".getBytes(StandardCharsets.UTF_8); + List messages = createMessages(5); + AtomicInteger callbackCount = new AtomicInteger(0); + + when(mockSqsClient.deleteMessageBatch(any(DeleteMessageBatchRequest.class))) + .thenReturn(DeleteMessageBatchResponse.builder().build()); + + // Act - upload and delete messages + s3UploadService.uploadAndDeleteMessages(data, TEST_S3_PATH, messages, (count) -> { + callbackCount.set(count); + }); + + // Assert - callback was called + assertEquals(5, callbackCount.get()); + } + + @Test + void testUploadAndDeleteMessages_s3UploadFails_throwsIOException() throws Exception { + // Setup - create test data and messages + byte[] data = "test data".getBytes(StandardCharsets.UTF_8); + List messages = createMessages(2); + + doThrow(new CloudStorageException("S3 error")) + .when(mockCloudStorage).upload(any(InputStream.class), eq(TEST_S3_PATH)); + + // Act & Assert - upload and delete messages throws IOException + IOException exception = assertThrows(IOException.class, () -> { + s3UploadService.uploadAndDeleteMessages(data, TEST_S3_PATH, messages, null); + }); + + // Assert - exception was thrown + assertTrue(exception.getMessage().contains("s3 upload failed")); + + // Assert - SQS delete was NOT called (messages should remain in queue) + verify(mockSqsClient, never()).deleteMessageBatch(any(DeleteMessageBatchRequest.class)); + } + + @Test + void testUploadAndDeleteMessages_emptyMessageList_skipsSqsDelete() throws Exception { + // Setup - create test data and messages + byte[] data = "test data".getBytes(StandardCharsets.UTF_8); + List messages = new ArrayList<>(); + + // Act - upload and delete messages + s3UploadService.uploadAndDeleteMessages(data, TEST_S3_PATH, messages, null); + + // Assert - S3 upload was called + verify(mockCloudStorage).upload(any(InputStream.class), eq(TEST_S3_PATH)); + + // Assert - SQS delete was NOT called (no messages to delete) + verify(mockSqsClient, never()).deleteMessageBatch(any(DeleteMessageBatchRequest.class)); + } + + @Test + void testUploadAndDeleteMessages_callbackIsOptional() throws Exception { + // Setup - create test data and messages + byte[] data = "test data".getBytes(StandardCharsets.UTF_8); + List messages = createMessages(1); + + when(mockSqsClient.deleteMessageBatch(any(DeleteMessageBatchRequest.class))) + .thenReturn(DeleteMessageBatchResponse.builder().build()); + + // Act & Assert - should not throw when callback is null + assertDoesNotThrow(() -> { + s3UploadService.uploadAndDeleteMessages(data, TEST_S3_PATH, messages, null); + }); + } + + @Test + void testUploadAndDeleteMessages_deletesCorrectMessages() throws Exception { + // Setup - create test data and messages + byte[] data = "test data".getBytes(StandardCharsets.UTF_8); + List messages = List.of( + Message.builder().messageId("msg-1").receiptHandle("receipt-1").build(), + Message.builder().messageId("msg-2").receiptHandle("receipt-2").build() + ); + + when(mockSqsClient.deleteMessageBatch(any(DeleteMessageBatchRequest.class))) + .thenReturn(DeleteMessageBatchResponse.builder().build()); + + // Act - upload and delete messages + s3UploadService.uploadAndDeleteMessages(data, TEST_S3_PATH, messages, null); + + // Assert - SQS delete was called + ArgumentCaptor deleteCaptor = + ArgumentCaptor.forClass(DeleteMessageBatchRequest.class); + verify(mockSqsClient).deleteMessageBatch(deleteCaptor.capture()); + + // Assert - correct messages were deleted + DeleteMessageBatchRequest deleteRequest = deleteCaptor.getValue(); + assertEquals(TEST_QUEUE_URL, deleteRequest.queueUrl()); + assertEquals(2, deleteRequest.entries().size()); + } + + @Test + void testUploadAndDeleteMessages_largeData() throws Exception { + // Setup - create 1MB of data and messages + byte[] data = new byte[1024 * 1024]; + for (int i = 0; i < data.length; i++) { + data[i] = (byte) (i % 256); + } + List messages = createMessages(10); + + when(mockSqsClient.deleteMessageBatch(any(DeleteMessageBatchRequest.class))) + .thenReturn(DeleteMessageBatchResponse.builder().build()); + + // Act - upload and delete messages + s3UploadService.uploadAndDeleteMessages(data, TEST_S3_PATH, messages, null); + + // Assert - S3 upload was called + ArgumentCaptor inputStreamCaptor = ArgumentCaptor.forClass(InputStream.class); + verify(mockCloudStorage).upload(inputStreamCaptor.capture(), eq(TEST_S3_PATH)); + + // Assert - file content is correct + byte[] uploadedData = inputStreamCaptor.getValue().readAllBytes(); + assertArrayEquals(data, uploadedData); + } + + // ==================== Helper methods ==================== + + private List createMessages(int count) { + List messages = new ArrayList<>(); + for (int i = 0; i < count; i++) { + messages.add(Message.builder() + .messageId("msg-" + i) + .receiptHandle("receipt-" + i) + .build()); + } + return messages; + } +} + diff --git a/src/test/java/com/uid2/optout/sqs/SqsBatchProcessorTest.java b/src/test/java/com/uid2/optout/sqs/SqsBatchProcessorTest.java index 312eda06..34603d4f 100644 --- a/src/test/java/com/uid2/optout/sqs/SqsBatchProcessorTest.java +++ b/src/test/java/com/uid2/optout/sqs/SqsBatchProcessorTest.java @@ -1,21 +1,29 @@ package com.uid2.optout.sqs; +import com.uid2.optout.delta.S3UploadService; +import io.vertx.core.json.JsonArray; import io.vertx.core.json.JsonObject; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.mockito.ArgumentCaptor; +import software.amazon.awssdk.services.sqs.SqsClient; import software.amazon.awssdk.services.sqs.model.Message; import software.amazon.awssdk.services.sqs.model.MessageSystemAttributeName; +import java.io.IOException; +import java.nio.charset.StandardCharsets; import java.util.*; import static org.junit.jupiter.api.Assertions.*; +import static org.mockito.Mockito.*; public class SqsBatchProcessorTest { private static final String VALID_HASH_BASE64 = "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA="; // 32 bytes private static final String VALID_ID_BASE64 = "AQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQE="; // 32 bytes private static final long TEST_TIMESTAMP_MS = 1699308900000L; // Nov 7, 2023 in ms + private static final String TEST_MALFORMED_S3_PATH = "malformed/"; private static final int DEFAULT_WINDOW_SECONDS = 300; // 5 minutes @@ -23,8 +31,7 @@ public class SqsBatchProcessorTest { @BeforeEach public void setUp() { - // Pass null for SqsClient - filterEligibleMessages doesn't use it - batchProcessor = new SqsBatchProcessor(null, "test-queue-url", DEFAULT_WINDOW_SECONDS); + batchProcessor = new SqsBatchProcessor(null, "test-queue-url", DEFAULT_WINDOW_SECONDS, null, "", 0); } private Message createValidMessage(String identityHash, String advertisingId, long timestampMs) { @@ -156,7 +163,7 @@ public void testFilterEligibleMessages_preservesOrder() { @Test public void testFilterEligibleMessages_zeroWindowSeconds() { // Create processor with 0 window seconds - SqsBatchProcessor zeroWindowProcessor = new SqsBatchProcessor(null, "test-queue-url", 0); + SqsBatchProcessor zeroWindowProcessor = new SqsBatchProcessor(null, "test-queue-url", 0, null, "", 0); List messages = new ArrayList<>(); Message mockMsg = createValidMessage(VALID_HASH_BASE64, VALID_ID_BASE64, TEST_TIMESTAMP_MS); @@ -168,5 +175,216 @@ public void testFilterEligibleMessages_zeroWindowSeconds() { assertEquals(1, result.size()); // With 0 window, current time messages should be eligible } + + // ==================== Malformed Message Upload Tests ==================== + + private Message createMalformedMessage(String messageId, String body) { + // use old timestamp so message is eligible for processing + long oldTimestampMs = System.currentTimeMillis() - 600_000; // 10 minutes ago + Map attributes = new HashMap<>(); + attributes.put(MessageSystemAttributeName.SENT_TIMESTAMP, String.valueOf(oldTimestampMs)); + + return Message.builder() + .body(body) + .attributes(attributes) + .messageId(messageId) + .receiptHandle("receipt-" + messageId) + .build(); + } + + @Test + public void testProcessBatch_malformedMessagesUploadedToS3() throws IOException { + // Setup - mock S3UploadService + S3UploadService mockS3UploadService = mock(S3UploadService.class); + SqsBatchProcessor processor = new SqsBatchProcessor( + null, "test-queue-url", DEFAULT_WINDOW_SECONDS, + mockS3UploadService, TEST_MALFORMED_S3_PATH, 0 + ); + + // Setup - create batch with one valid and one malformed message + Message validMessage = createValidMessage(VALID_HASH_BASE64, VALID_ID_BASE64, + System.currentTimeMillis() - 600_000); // 10 min ago + Message malformedMessage = createMalformedMessage("malformed-1", "{ invalid json body }"); + + List batch = Arrays.asList(validMessage, malformedMessage); + + // Act - process batch + SqsBatchProcessor.BatchProcessingResult result = processor.processBatch(batch, 1); + + // Assert - S3 upload was called + ArgumentCaptor dataCaptor = ArgumentCaptor.forClass(byte[].class); + ArgumentCaptor pathCaptor = ArgumentCaptor.forClass(String.class); + @SuppressWarnings("unchecked") + ArgumentCaptor> messagesCaptor = ArgumentCaptor.forClass(List.class); + + verify(mockS3UploadService).uploadAndDeleteMessages( + dataCaptor.capture(), + pathCaptor.capture(), + messagesCaptor.capture(), + isNull() + ); + + // Assert - the S3 path format: {prefix}optout-malformed-{replicaId:03d}_{timestamp}_{randomHex:08x}.json + String s3Path = pathCaptor.getValue(); + assertTrue(s3Path.startsWith(TEST_MALFORMED_S3_PATH + "optout-malformed-"), + "S3 path should start with prefix + optout-malformed-, got: " + s3Path); + assertTrue(s3Path.matches(".*optout-malformed-\\d{3}_\\d{4}-\\d{2}-\\d{2}T\\d{2}\\.\\d{2}\\.\\d{2}Z_[a-f0-9]{8}\\.json"), + "S3 path should match format with replica ID, ISO timestamp and 8-char hex, got: " + s3Path); + + // Assert - the uploaded data contains the malformed message + String uploadedJson = new String(dataCaptor.getValue(), StandardCharsets.UTF_8); + JsonArray uploadedArray = new JsonArray(uploadedJson); + assertEquals(1, uploadedArray.size()); + + JsonObject uploadedMessage = uploadedArray.getJsonObject(0); + assertEquals("malformed-1", uploadedMessage.getString("messageId")); + assertEquals("{ invalid json body }", uploadedMessage.getString("body")); + + // Assert - only the malformed message was passed for deletion + List deletedMessages = messagesCaptor.getValue(); + assertEquals(1, deletedMessages.size()); + assertEquals("malformed-1", deletedMessages.get(0).messageId()); + + // Assert - the valid message was still processed + assertTrue(result.hasMessages()); + } + + @Test + public void testProcessBatch_allMalformedMessages() throws IOException { + // Setup - mock S3UploadService + S3UploadService mockS3UploadService = mock(S3UploadService.class); + SqsBatchProcessor processor = new SqsBatchProcessor( + null, "test-queue-url", DEFAULT_WINDOW_SECONDS, + mockS3UploadService, TEST_MALFORMED_S3_PATH, 0 + ); + + // Setup - create batch with only malformed messages + Message malformed1 = createMalformedMessage("malformed-1", "{}"); // Missing required fields + Message malformed2 = createMalformedMessage("malformed-2", "not json at all"); + + List batch = Arrays.asList(malformed1, malformed2); + + // Act - process batch + SqsBatchProcessor.BatchProcessingResult result = processor.processBatch(batch, 5); + + // Assert - S3 upload was called with both malformed messages + @SuppressWarnings("unchecked") + ArgumentCaptor> messagesCaptor = ArgumentCaptor.forClass(List.class); + verify(mockS3UploadService).uploadAndDeleteMessages( + any(byte[].class), + contains("optout-malformed-"), + messagesCaptor.capture(), + isNull() + ); + + List deletedMessages = messagesCaptor.getValue(); + assertEquals(2, deletedMessages.size()); + + // Verify result indicates corrupt messages were handled + assertFalse(result.hasMessages()); + } + + @Test + public void testProcessBatch_s3UploadFailure_throwsException() throws IOException { + // Setup - mock S3UploadService and SqsClient + S3UploadService mockS3UploadService = mock(S3UploadService.class); + SqsClient mockSqsClient = mock(SqsClient.class); + + // Setup - make S3 upload fail + doThrow(new IOException("S3 connection failed")) + .when(mockS3UploadService).uploadAndDeleteMessages(any(), any(), any(), any()); + + SqsBatchProcessor processor = new SqsBatchProcessor( + mockSqsClient, "test-queue-url", DEFAULT_WINDOW_SECONDS, + mockS3UploadService, TEST_MALFORMED_S3_PATH, 0 + ); + + // Setup - create batch with malformed message + Message malformedMessage = createMalformedMessage("malformed-1", "bad data"); + List batch = Arrays.asList(malformedMessage); + + // Act - process batch - should throw IOException when S3 upload fails + IOException thrown = assertThrows(IOException.class, () -> processor.processBatch(batch, 1)); + assertEquals("S3 connection failed", thrown.getMessage()); + + // Assert - S3 upload was attempted + verify(mockS3UploadService).uploadAndDeleteMessages(any(), any(), any(), any()); + } + + @Test + public void testProcessBatch_noMalformedMessages_noS3Upload() throws IOException { + // Setup - mock S3UploadService + S3UploadService mockS3UploadService = mock(S3UploadService.class); + SqsBatchProcessor processor = new SqsBatchProcessor( + null, "test-queue-url", DEFAULT_WINDOW_SECONDS, + mockS3UploadService, TEST_MALFORMED_S3_PATH, 0 + ); + + // Setup - create batch with only valid messages (old enough to be eligible) + Message valid1 = createValidMessage(VALID_HASH_BASE64, VALID_ID_BASE64, + System.currentTimeMillis() - 600_000); + Message valid2 = createValidMessage(VALID_HASH_BASE64, VALID_ID_BASE64, + System.currentTimeMillis() - 700_000); + + List batch = Arrays.asList(valid1, valid2); + + // Act - process batch + SqsBatchProcessor.BatchProcessingResult result = processor.processBatch(batch, 1); + + // Assert - S3 upload was NOT called (no malformed messages) + verify(mockS3UploadService, never()).uploadAndDeleteMessages(any(), any(), any(), any()); + + // Assert - valid messages were processed + assertTrue(result.hasMessages()); + assertEquals(2, result.getMessages().size()); + } + + @Test + public void testProcessBatch_malformedMessageJsonStructure() throws IOException { + // Setup - mock S3UploadService + S3UploadService mockS3UploadService = mock(S3UploadService.class); + int testReplicaId = 42; + SqsBatchProcessor processor = new SqsBatchProcessor( + null, "test-queue-url", DEFAULT_WINDOW_SECONDS, + mockS3UploadService, TEST_MALFORMED_S3_PATH, testReplicaId + ); + + // Setup - create a malformed message with specific content + String malformedBody = "{\"some_field\": \"some_value\"}"; // Missing identity_hash and advertising_id + Message malformedMessage = createMalformedMessage("test-msg-123", malformedBody); + + List batch = Arrays.asList(malformedMessage); + processor.processBatch(batch, 1); + + // Assert - capture and verify the uploaded JSON structure + ArgumentCaptor dataCaptor = ArgumentCaptor.forClass(byte[].class); + ArgumentCaptor pathCaptor = ArgumentCaptor.forClass(String.class); + + verify(mockS3UploadService).uploadAndDeleteMessages( + dataCaptor.capture(), + pathCaptor.capture(), + any(), + isNull() + ); + + // Assert - JSON structure + String uploadedJson = new String(dataCaptor.getValue(), StandardCharsets.UTF_8); + JsonArray array = new JsonArray(uploadedJson); + assertEquals(1, array.size()); + + JsonObject msgJson = array.getJsonObject(0); + assertTrue(msgJson.containsKey("messageId"), "Should contain messageId"); + assertTrue(msgJson.containsKey("body"), "Should contain body"); + assertTrue(msgJson.containsKey("attributes"), "Should contain attributes"); + + assertEquals("test-msg-123", msgJson.getString("messageId")); + assertEquals(malformedBody, msgJson.getString("body")); + + // Assert - path format: {prefix}optout-malformed-{replicaId:03d}_{timestamp}_{randomHex:08x}.json + String path = pathCaptor.getValue(); + assertTrue(path.matches(".*optout-malformed-042_.*"), "Path should contain replica ID 042, got: " + path); + assertTrue(path.matches(".*optout-malformed-042_\\d{4}-\\d{2}-\\d{2}T\\d{2}\\.\\d{2}\\.\\d{2}Z_[a-f0-9]{8}\\.json"), + "Path should match format with replica ID, ISO timestamp and 8-char hex, got: " + path); + } } diff --git a/src/test/java/com/uid2/optout/sqs/SqsWindowReaderTest.java b/src/test/java/com/uid2/optout/sqs/SqsWindowReaderTest.java index 044be5ae..b604c28c 100644 --- a/src/test/java/com/uid2/optout/sqs/SqsWindowReaderTest.java +++ b/src/test/java/com/uid2/optout/sqs/SqsWindowReaderTest.java @@ -7,6 +7,7 @@ import software.amazon.awssdk.services.sqs.SqsClient; import software.amazon.awssdk.services.sqs.model.*; +import java.io.IOException; import java.util.*; import static org.junit.jupiter.api.Assertions.*; @@ -31,12 +32,13 @@ void setUp() { mockSqsClient = mock(SqsClient.class); windowReader = new SqsWindowReader( mockSqsClient, TEST_QUEUE_URL, MAX_MESSAGES_PER_POLL, - VISIBILITY_TIMEOUT, DELTA_WINDOW_SECONDS, MAX_MESSAGES_PER_WINDOW + VISIBILITY_TIMEOUT, DELTA_WINDOW_SECONDS, MAX_MESSAGES_PER_WINDOW, + null, "", 0 ); } @Test - void testReadWindow_emptyQueue() { + void testReadWindow_emptyQueue() throws IOException { when(mockSqsClient.receiveMessage(any(ReceiveMessageRequest.class))) .thenReturn(ReceiveMessageResponse.builder().messages(List.of()).build()); @@ -48,7 +50,7 @@ void testReadWindow_emptyQueue() { } @Test - void testReadWindow_singleBatchSingleWindow() { + void testReadWindow_singleBatchSingleWindow() throws IOException { long windowStartSeconds = System.currentTimeMillis() / 1000 - 600; // 10 minutes ago List messages = Arrays.asList( createMessage(windowStartSeconds + 10), @@ -68,7 +70,7 @@ void testReadWindow_singleBatchSingleWindow() { } @Test - void testReadWindow_multipleBatchesSameWindow() { + void testReadWindow_multipleBatchesSameWindow() throws IOException { long windowStartSeconds = System.currentTimeMillis() / 1000 - 600; // 10 minutes ago List batch1 = Arrays.asList( @@ -93,7 +95,7 @@ void testReadWindow_multipleBatchesSameWindow() { } @Test - void testReadWindow_messagesTooRecent() { + void testReadWindow_messagesTooRecent() throws IOException { long currentTimeMs = System.currentTimeMillis(); List messages = Arrays.asList( createMessageWithTimestampMs(currentTimeMs - 1000), // 1 second ago @@ -111,10 +113,11 @@ void testReadWindow_messagesTooRecent() { } @Test - void testReadWindow_messageLimitExceeded() { + void testReadWindow_messageLimitExceeded() throws IOException { SqsWindowReader smallLimitReader = new SqsWindowReader( mockSqsClient, TEST_QUEUE_URL, MAX_MESSAGES_PER_POLL, - VISIBILITY_TIMEOUT, DELTA_WINDOW_SECONDS, 5 // Only 5 messages max + VISIBILITY_TIMEOUT, DELTA_WINDOW_SECONDS, 5, // Only 5 messages max + null, "", 0 ); long windowStartSeconds = System.currentTimeMillis() / 1000 - 600; @@ -138,7 +141,7 @@ void testReadWindow_messageLimitExceeded() { } @Test - void testReadWindow_discoversNewWindow() { + void testReadWindow_discoversNewWindow() throws IOException { long window1StartSeconds = System.currentTimeMillis() / 1000 - 900; // 15 minutes ago long window2StartSeconds = window1StartSeconds + DELTA_WINDOW_SECONDS + 100; // Next window @@ -159,7 +162,7 @@ void testReadWindow_discoversNewWindow() { } @Test - void testReadWindow_multipleWindowsMultipleBatchesPerWindow() { + void testReadWindow_multipleWindowsMultipleBatchesPerWindow() throws IOException { // Window 1: 2 batches, then discovers window 2 // Window 2: 2 batches (must be > 5 min old for eligibility) long window1StartSeconds = System.currentTimeMillis() / 1000 - 1200; // 20 minutes ago @@ -220,7 +223,7 @@ void testReadWindow_multipleWindowsMultipleBatchesPerWindow() { } @Test - void testReadWindow_corruptMessagesSkipped() { + void testReadWindow_corruptMessagesSkipped() throws IOException { long windowStartSeconds = System.currentTimeMillis() / 1000 - 600; // Corrupt message (missing required fields) diff --git a/src/test/java/com/uid2/optout/vertx/OptOutTrafficCalculatorTest.java b/src/test/java/com/uid2/optout/vertx/OptOutTrafficCalculatorTest.java deleted file mode 100644 index e69de29b..00000000 diff --git a/src/test/java/com/uid2/optout/vertx/OptOutTrafficFilterTest.java b/src/test/java/com/uid2/optout/vertx/OptOutTrafficFilterTest.java deleted file mode 100644 index e69de29b..00000000