diff --git a/src/main/java/com/uid2/optout/delta/DeltaFileWriter.java b/src/main/java/com/uid2/optout/delta/DeltaFileWriter.java new file mode 100644 index 0000000..b9d54bc --- /dev/null +++ b/src/main/java/com/uid2/optout/delta/DeltaFileWriter.java @@ -0,0 +1,113 @@ +package com.uid2.optout.delta; + +import com.uid2.shared.optout.OptOutConst; +import com.uid2.shared.optout.OptOutEntry; +import com.uid2.shared.optout.OptOutUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; + +/** + * Handles binary writing of delta file entries. + * + * Delta files have the following format: + * - Start entry: null hash (32 bytes) + null hash (32 bytes) + timestamp (8 bytes) + * - Opt-out entries: hash (32 bytes) + id (32 bytes) + timestamp (7 bytes) + metadata (1 byte) + * - End entry: ones hash (32 bytes) + ones hash (32 bytes) + timestamp (8 bytes) + * + * Each entry is 72 bytes (OptOutConst.EntrySize) + */ +public class DeltaFileWriter { + private static final Logger LOGGER = LoggerFactory.getLogger(DeltaFileWriter.class); + + private ByteBuffer buffer; + + /** + * Create a DeltaFileWriter with the specified initial buffer size. + * + * @param bufferSize Initial buffer size in bytes + */ + public DeltaFileWriter(int bufferSize) { + this.buffer = ByteBuffer.allocate(bufferSize).order(ByteOrder.LITTLE_ENDIAN); + } + + /** + * Write the start-of-delta sentinel entry. + * Uses null hash bytes and the window start timestamp. + * + * @param stream Output stream to write to + * @param windowStart Window start timestamp (epoch seconds) + * @throws IOException if write fails + */ + public void writeStartOfDelta(ByteArrayOutputStream stream, long windowStart) throws IOException { + ensureCapacity(OptOutConst.EntrySize); + + buffer.put(OptOutUtils.nullHashBytes); + buffer.put(OptOutUtils.nullHashBytes); + buffer.putLong(windowStart); + + flushToStream(stream); + } + + /** + * Write a single opt-out entry. + * + * @param stream Output stream to write to + * @param hashBytes Hash bytes (32 bytes) + * @param idBytes ID bytes (32 bytes) + * @param timestamp Entry timestamp (epoch seconds) + * @throws IOException if write fails + */ + public void writeOptOutEntry(ByteArrayOutputStream stream, byte[] hashBytes, byte[] idBytes, long timestamp) throws IOException { + ensureCapacity(OptOutConst.EntrySize); + + OptOutEntry.writeTo(buffer, hashBytes, idBytes, timestamp); + + flushToStream(stream); + } + + /** + * Write the end-of-delta sentinel entry. + * Uses ones hash bytes and the window end timestamp. + * + * @param stream Output stream to write to + * @param windowEnd Window end timestamp (epoch seconds) + * @throws IOException if write fails + */ + public void writeEndOfDelta(ByteArrayOutputStream stream, long windowEnd) throws IOException { + ensureCapacity(OptOutConst.EntrySize); + + buffer.put(OptOutUtils.onesHashBytes); + buffer.put(OptOutUtils.onesHashBytes); + buffer.putLong(windowEnd); + + flushToStream(stream); + } + + /** + * Flush the buffer contents to the output stream and clear the buffer. + */ + private void flushToStream(ByteArrayOutputStream stream) throws IOException { + buffer.flip(); + byte[] entry = new byte[buffer.remaining()]; + buffer.get(entry); + stream.write(entry); + buffer.clear(); + } + + /** + * Ensure buffer has sufficient capacity, expanding if necessary. + */ + private void ensureCapacity(int dataSize) { + if (buffer.capacity() < dataSize) { + int newCapacity = Integer.highestOneBit(dataSize) << 1; + LOGGER.info("expanding buffer size: current {}, need {}, new {}", buffer.capacity(), dataSize, newCapacity); + this.buffer = ByteBuffer.allocate(newCapacity).order(ByteOrder.LITTLE_ENDIAN); + } + } +} + diff --git a/src/main/java/com/uid2/optout/delta/DeltaProductionJobStatus.java b/src/main/java/com/uid2/optout/delta/DeltaProductionJobStatus.java new file mode 100644 index 0000000..d649eaa --- /dev/null +++ b/src/main/java/com/uid2/optout/delta/DeltaProductionJobStatus.java @@ -0,0 +1,85 @@ +package com.uid2.optout.delta; + +import io.vertx.core.json.JsonObject; +import java.time.Instant; + +/** + * Represents the status and result of an async delta production job on a pod. + * + * This class tracks the lifecycle of a delta production job including its state + * (running, completed, failed), timing information, and result or error details. + * + */ +public class DeltaProductionJobStatus { + private final Instant startTime; + private volatile JobState state; + private volatile JsonObject result; + private volatile String errorMessage; + private volatile Instant endTime; + + public enum JobState { + RUNNING, + COMPLETED, + FAILED + } + + public DeltaProductionJobStatus() { + this.startTime = Instant.now(); + this.state = JobState.RUNNING; + } + + /** + * Mark the job as completed with the given result. + * @param result The result details as a JsonObject + */ + public void complete(JsonObject result) { + this.result = result; + this.state = JobState.COMPLETED; + this.endTime = Instant.now(); + } + + /** + * Mark the job as failed with the given error message. + * @param errorMessage Description of the failure + */ + public void fail(String errorMessage) { + this.errorMessage = errorMessage; + this.state = JobState.FAILED; + this.endTime = Instant.now(); + } + + /** + * Get the current state of the job. + * @return The job state + */ + public JobState getState() { + return state; + } + + /** + * Convert the job status to a JSON representation for API responses. + * @return JsonObject with state, timing, and result/error information + */ + public JsonObject toJson() { + JsonObject json = new JsonObject() + .put("state", state.name().toLowerCase()) + .put("start_time", startTime.toString()); + + if (endTime != null) { + json.put("end_time", endTime.toString()); + long durationSeconds = endTime.getEpochSecond() - startTime.getEpochSecond(); + json.put("duration_seconds", durationSeconds); + } + + if (state == JobState.COMPLETED && result != null) { + json.put("result", result); + } + + if (state == JobState.FAILED && errorMessage != null) { + json.put("error", errorMessage); + } + + return json; + } +} + diff --git a/src/main/java/com/uid2/optout/delta/DeltaProductionMetrics.java b/src/main/java/com/uid2/optout/delta/DeltaProductionMetrics.java new file mode 100644 index 0000000..eca36d2 --- /dev/null +++ b/src/main/java/com/uid2/optout/delta/DeltaProductionMetrics.java @@ -0,0 +1,60 @@ +package com.uid2.optout.delta; + +import io.micrometer.core.instrument.Counter; +import io.micrometer.core.instrument.Metrics; + +/** + * Metrics counters for delta production operations. + * + * Tracks: + * - Number of delta files produced + * - Number of opt-out entries processed + * - Number of dropped request files produced + * - Number of dropped requests processed + */ +public class DeltaProductionMetrics { + + private final Counter deltasProduced; + private final Counter entriesProcessed; + private final Counter droppedRequestFilesProduced; + private final Counter droppedRequestsProcessed; + + public DeltaProductionMetrics() { + this.deltasProduced = Counter + .builder("uid2_optout_sqs_delta_produced_total") + .description("counter for how many optout delta files are produced from SQS") + .register(Metrics.globalRegistry); + + this.entriesProcessed = Counter + .builder("uid2_optout_sqs_entries_processed_total") + .description("counter for how many optout entries are processed from SQS") + .register(Metrics.globalRegistry); + + this.droppedRequestFilesProduced = Counter + .builder("uid2_optout_sqs_dropped_request_files_produced_total") + .description("counter for how many optout dropped request files are produced from SQS") + .register(Metrics.globalRegistry); + + this.droppedRequestsProcessed = Counter + .builder("uid2_optout_sqs_dropped_requests_processed_total") + .description("counter for how many optout dropped requests are processed from SQS") + .register(Metrics.globalRegistry); + } + + /** + * Record that a delta file was produced with the given number of entries. + */ + public void recordDeltaProduced(int entryCount) { + deltasProduced.increment(); + entriesProcessed.increment(entryCount); + } + + /** + * Record that a dropped requests file was produced with the given number of entries. + */ + public void recordDroppedRequestsProduced(int requestCount) { + droppedRequestFilesProduced.increment(); + droppedRequestsProcessed.increment(requestCount); + } +} + diff --git a/src/main/java/com/uid2/optout/delta/DeltaProductionResult.java b/src/main/java/com/uid2/optout/delta/DeltaProductionResult.java new file mode 100644 index 0000000..73be044 --- /dev/null +++ b/src/main/java/com/uid2/optout/delta/DeltaProductionResult.java @@ -0,0 +1,126 @@ +package com.uid2.optout.delta; + +import io.vertx.core.json.JsonObject; + +/** + * Immutable result containing statistics from a delta production job. + * + *

This class holds production counts and the stop reason, with methods for JSON serialization. + * Use {@link Builder} to accumulate statistics during production, then call {@link Builder#build()} + * to create the immutable result.

+ * + *

Note: Job duration is tracked by {@link DeltaProductionJobStatus}, not this class.

+ */ +public class DeltaProductionResult { + private final int deltasProduced; + private final int entriesProcessed; + private final int droppedRequestFilesProduced; + private final int droppedRequestsProcessed; + private final StopReason stopReason; + + /** + * Private constructor. Use {@link #builder()} to create instances. + */ + private DeltaProductionResult(int deltasProduced, int entriesProcessed, + int droppedRequestFilesProduced, int droppedRequestsProcessed, + StopReason stopReason) { + this.deltasProduced = deltasProduced; + this.entriesProcessed = entriesProcessed; + this.droppedRequestFilesProduced = droppedRequestFilesProduced; + this.droppedRequestsProcessed = droppedRequestsProcessed; + this.stopReason = stopReason; + } + + /** + * Creates a new Builder for accumulating production statistics. + */ + public static Builder builder() { + return new Builder(); + } + + // ==================== Getters ==================== + + public int getDeltasProduced() { + return deltasProduced; + } + + public int getEntriesProcessed() { + return entriesProcessed; + } + + public int getDroppedRequestFilesProduced() { + return droppedRequestFilesProduced; + } + + public int getDroppedRequestsProcessed() { + return droppedRequestsProcessed; + } + + public StopReason getStopReason() { + return stopReason; + } + + // ==================== JSON Serialization ==================== + + public JsonObject toJson() { + return new JsonObject() + .put("deltas_produced", deltasProduced) + .put("entries_processed", entriesProcessed) + .put("dropped_request_files_produced", droppedRequestFilesProduced) + .put("dropped_requests_processed", droppedRequestsProcessed) + .put("stop_reason", stopReason.name()); + } + + public JsonObject toJsonWithStatus(String status) { + return toJson().put("status", status); + } + + @Override + public String toString() { + return String.format( + "DeltaProductionResult{deltasProduced=%d, entriesProcessed=%d, " + + "droppedRequestFilesProduced=%d, droppedRequestsProcessed=%d, stopReason=%s}", + deltasProduced, entriesProcessed, droppedRequestFilesProduced, + droppedRequestsProcessed, stopReason); + } + + // ==================== Builder ==================== + + /** + * Mutable builder for accumulating production statistics. + * + *

Use this builder to track stats during delta production jobs, + * then call {@link #build()} to create the immutable result.

+ */ + public static class Builder { + private int deltasProduced; + private int entriesProcessed; + private int droppedRequestFilesProduced; + private int droppedRequestsProcessed; + private StopReason stopReason = StopReason.NONE; + + public Builder incrementDeltas(int count) { + deltasProduced++; + entriesProcessed += count; + return this; + } + + public Builder incrementDroppedRequests(int count) { + droppedRequestFilesProduced++; + droppedRequestsProcessed += count; + return this; + } + + /** + * Builds the DeltaProductionResult with the accumulated statistics. + */ + public DeltaProductionResult build() { + return new DeltaProductionResult( + deltasProduced, + entriesProcessed, + droppedRequestFilesProduced, + droppedRequestsProcessed, + stopReason); + } + } +} diff --git a/src/test/java/com/uid2/optout/delta/DeltaFileWriterTest.java b/src/test/java/com/uid2/optout/delta/DeltaFileWriterTest.java new file mode 100644 index 0000000..0864d6a --- /dev/null +++ b/src/test/java/com/uid2/optout/delta/DeltaFileWriterTest.java @@ -0,0 +1,257 @@ +package com.uid2.optout.delta; + +import com.uid2.shared.optout.OptOutConst; +import com.uid2.shared.optout.OptOutEntry; +import com.uid2.shared.optout.OptOutUtils; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import java.util.Arrays; + +import static org.junit.jupiter.api.Assertions.*; + +class DeltaFileWriterTest { + + // Test constants + private static final long WINDOW_START = 1700000000L; + private static final long WINDOW_END = 1700003600L; + private static final long ENTRY_TIMESTAMP = 1700001000L; + + private DeltaFileWriter writer; + private ByteArrayOutputStream outputStream; + + @BeforeEach + void setUp() { + writer = new DeltaFileWriter(OptOutConst.EntrySize); + outputStream = new ByteArrayOutputStream(); + } + + // ==================== writeStartOfDelta tests ==================== + + @Test + void testWriteStartOfDelta_writesCorrectSize() throws IOException { + writer.writeStartOfDelta(outputStream, WINDOW_START); + + assertEquals(OptOutConst.EntrySize, outputStream.size()); + } + + @Test + void testWriteStartOfDelta_writesNullHashBytes() throws IOException { + writer.writeStartOfDelta(outputStream, WINDOW_START); + + byte[] result = outputStream.toByteArray(); + assertArrayEquals(OptOutUtils.nullHashBytes, extractFirstHash(result)); + assertArrayEquals(OptOutUtils.nullHashBytes, extractSecondHash(result)); + } + + @Test + void testWriteStartOfDelta_writesTimestamp() throws IOException { + writer.writeStartOfDelta(outputStream, WINDOW_START); + + assertEquals(WINDOW_START, extractTimestamp(outputStream.toByteArray())); + } + + // ==================== writeEndOfDelta tests ==================== + + @Test + void testWriteEndOfDelta_writesCorrectSize() throws IOException { + writer.writeEndOfDelta(outputStream, WINDOW_END); + + assertEquals(OptOutConst.EntrySize, outputStream.size()); + } + + @Test + void testWriteEndOfDelta_writesOnesHashBytes() throws IOException { + writer.writeEndOfDelta(outputStream, WINDOW_END); + + byte[] result = outputStream.toByteArray(); + assertArrayEquals(OptOutUtils.onesHashBytes, extractFirstHash(result)); + assertArrayEquals(OptOutUtils.onesHashBytes, extractSecondHash(result)); + } + + @Test + void testWriteEndOfDelta_writesTimestamp() throws IOException { + writer.writeEndOfDelta(outputStream, WINDOW_END); + + assertEquals(WINDOW_END, extractTimestamp(outputStream.toByteArray())); + } + + // ==================== writeOptOutEntry tests ==================== + + @Test + void testWriteOptOutEntry_writesCorrectSize() throws IOException { + writeTestOptOutEntry((byte) 0xAA, (byte) 0xBB, ENTRY_TIMESTAMP); + + assertEquals(OptOutConst.EntrySize, outputStream.size()); + } + + @Test + void testWriteOptOutEntry_writesHashBytes() throws IOException { + byte[] hashBytes = createTestBytes(32, (byte) 0xAA); + writeTestOptOutEntry((byte) 0xAA, (byte) 0xBB, ENTRY_TIMESTAMP); + + assertArrayEquals(hashBytes, extractFirstHash(outputStream.toByteArray())); + } + + @Test + void testWriteOptOutEntry_writesIdBytes() throws IOException { + byte[] idBytes = createTestBytes(32, (byte) 0xBB); + writeTestOptOutEntry((byte) 0xAA, (byte) 0xBB, ENTRY_TIMESTAMP); + + assertArrayEquals(idBytes, extractSecondHash(outputStream.toByteArray())); + } + + @Test + void testWriteOptOutEntry_canBeReadByOptOutEntry() throws IOException { + byte[] hashBytes = createTestBytes(32, (byte) 0x11); + byte[] idBytes = createTestBytes(32, (byte) 0x22); + long timestamp = 1700001234L; + + writer.writeOptOutEntry(outputStream, hashBytes, idBytes, timestamp); + + OptOutEntry entry = OptOutEntry.parse(outputStream.toByteArray(), 0); + assertArrayEquals(hashBytes, entry.identityHash); + assertArrayEquals(idBytes, entry.advertisingId); + assertEquals(timestamp, entry.timestamp); + } + + // ==================== Multiple entries tests ==================== + + @Test + void testWriteMultipleEntries_writesCorrectTotalSize() throws IOException { + writeCompleteDeltaFile(2); + + // 4 entries (start + 2 opt-out + end) * 72 bytes = 288 bytes + assertEquals(4 * OptOutConst.EntrySize, outputStream.size()); + } + + @Test + void testWriteMultipleEntries_entriesAreContiguous() throws IOException { + byte[] hashBytes = createTestBytes(32, (byte) 0xEE); + + writer.writeStartOfDelta(outputStream, WINDOW_START); + writeTestOptOutEntry((byte) 0xEE, (byte) 0xFF, WINDOW_START + 500); + writer.writeEndOfDelta(outputStream, WINDOW_END); + + byte[] result = outputStream.toByteArray(); + + // Verify start entry - null hashes at offset 0 + assertArrayEquals(OptOutUtils.nullHashBytes, extractHashAtOffset(result, 0)); + + // Verify opt-out entry - custom hash at offset 72 + assertArrayEquals(hashBytes, extractHashAtOffset(result, OptOutConst.EntrySize)); + + // Verify end entry - ones hashes at offset 144 + assertArrayEquals(OptOutUtils.onesHashBytes, extractHashAtOffset(result, 2 * OptOutConst.EntrySize)); + } + + // ==================== Buffer capacity tests ==================== + + @Test + void testSmallBufferSize_expandsAutomatically() throws IOException { + DeltaFileWriter smallBufferWriter = new DeltaFileWriter(16); + + assertDoesNotThrow(() -> smallBufferWriter.writeStartOfDelta(outputStream, WINDOW_START)); + assertEquals(OptOutConst.EntrySize, outputStream.size()); + } + + @Test + void testSmallBuffer_multipleWrites() throws IOException { + DeltaFileWriter smallBufferWriter = new DeltaFileWriter(8); + + smallBufferWriter.writeStartOfDelta(outputStream, WINDOW_START); + writeTestOptOutEntry(smallBufferWriter, (byte) 0x12, (byte) 0x34, ENTRY_TIMESTAMP); + smallBufferWriter.writeEndOfDelta(outputStream, WINDOW_END); + + assertEquals(3 * OptOutConst.EntrySize, outputStream.size()); + } + + @Test + void testLargeBufferSize_worksCorrectly() throws IOException { + DeltaFileWriter largeBufferWriter = new DeltaFileWriter(1024 * 1024); + + largeBufferWriter.writeStartOfDelta(outputStream, WINDOW_START); + + assertEquals(OptOutConst.EntrySize, outputStream.size()); + } + + // ==================== Edge cases ==================== + + @Test + void testZeroTimestamp() throws IOException { + writer.writeStartOfDelta(outputStream, 0L); + + assertEquals(0L, extractTimestamp(outputStream.toByteArray())); + } + + @Test + void testMaxTimestamp() throws IOException { + writer.writeStartOfDelta(outputStream, Long.MAX_VALUE); + + assertEquals(Long.MAX_VALUE, extractTimestamp(outputStream.toByteArray())); + } + + @Test + void testReuseWriterForMultipleFiles() throws IOException { + ByteArrayOutputStream stream1 = new ByteArrayOutputStream(); + ByteArrayOutputStream stream2 = new ByteArrayOutputStream(); + + // Write first file + writer.writeStartOfDelta(stream1, WINDOW_START); + writer.writeEndOfDelta(stream1, WINDOW_END); + + // Write second file with same writer + writer.writeStartOfDelta(stream2, WINDOW_START + 10000); + writer.writeEndOfDelta(stream2, WINDOW_END + 10000); + + assertEquals(2 * OptOutConst.EntrySize, stream1.size()); + assertEquals(2 * OptOutConst.EntrySize, stream2.size()); + assertNotEquals(extractTimestamp(stream1.toByteArray()), extractTimestamp(stream2.toByteArray())); + } + + // ==================== Helper methods ==================== + + private byte[] createTestBytes(int size, byte fillValue) { + byte[] bytes = new byte[size]; + Arrays.fill(bytes, fillValue); + return bytes; + } + + private void writeTestOptOutEntry(byte hashFill, byte idFill, long timestamp) throws IOException { + writeTestOptOutEntry(writer, hashFill, idFill, timestamp); + } + + private void writeTestOptOutEntry(DeltaFileWriter w, byte hashFill, byte idFill, long timestamp) throws IOException { + byte[] hashBytes = createTestBytes(32, hashFill); + byte[] idBytes = createTestBytes(32, idFill); + w.writeOptOutEntry(outputStream, hashBytes, idBytes, timestamp); + } + + private void writeCompleteDeltaFile(int numOptOutEntries) throws IOException { + writer.writeStartOfDelta(outputStream, WINDOW_START); + for (int i = 0; i < numOptOutEntries; i++) { + writeTestOptOutEntry((byte) (0xCC + i), (byte) (0xDD + i), ENTRY_TIMESTAMP + i * 100); + } + writer.writeEndOfDelta(outputStream, WINDOW_END); + } + + private static byte[] extractFirstHash(byte[] data) { + return Arrays.copyOfRange(data, 0, 32); + } + + private static byte[] extractSecondHash(byte[] data) { + return Arrays.copyOfRange(data, 32, 64); + } + + private static byte[] extractHashAtOffset(byte[] data, int offset) { + return Arrays.copyOfRange(data, offset, offset + 32); + } + + private static long extractTimestamp(byte[] data) { + return ByteBuffer.wrap(data, 64, 8).order(ByteOrder.LITTLE_ENDIAN).getLong(); + } +}