Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
113 changes: 113 additions & 0 deletions src/main/java/com/uid2/optout/delta/DeltaFileWriter.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
package com.uid2.optout.delta;

import com.uid2.shared.optout.OptOutConst;
import com.uid2.shared.optout.OptOutEntry;
import com.uid2.shared.optout.OptOutUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;

/**
* Handles binary writing of delta file entries.
*
* Delta files have the following format:
* - Start entry: null hash (32 bytes) + null hash (32 bytes) + timestamp (8 bytes)
* - Opt-out entries: hash (32 bytes) + id (32 bytes) + timestamp (7 bytes) + metadata (1 byte)
* - End entry: ones hash (32 bytes) + ones hash (32 bytes) + timestamp (8 bytes)
*
* Each entry is 72 bytes (OptOutConst.EntrySize)
*/
public class DeltaFileWriter {
private static final Logger LOGGER = LoggerFactory.getLogger(DeltaFileWriter.class);

private ByteBuffer buffer;

/**
* Create a DeltaFileWriter with the specified initial buffer size.
*
* @param bufferSize Initial buffer size in bytes
*/
public DeltaFileWriter(int bufferSize) {
this.buffer = ByteBuffer.allocate(bufferSize).order(ByteOrder.LITTLE_ENDIAN);
}

/**
* Write the start-of-delta sentinel entry.
* Uses null hash bytes and the window start timestamp.
*
* @param stream Output stream to write to
* @param windowStart Window start timestamp (epoch seconds)
* @throws IOException if write fails
*/
public void writeStartOfDelta(ByteArrayOutputStream stream, long windowStart) throws IOException {
ensureCapacity(OptOutConst.EntrySize);

buffer.put(OptOutUtils.nullHashBytes);
buffer.put(OptOutUtils.nullHashBytes);
buffer.putLong(windowStart);

flushToStream(stream);
}

/**
* Write a single opt-out entry.
*
* @param stream Output stream to write to
* @param hashBytes Hash bytes (32 bytes)
* @param idBytes ID bytes (32 bytes)
* @param timestamp Entry timestamp (epoch seconds)
* @throws IOException if write fails
*/
public void writeOptOutEntry(ByteArrayOutputStream stream, byte[] hashBytes, byte[] idBytes, long timestamp) throws IOException {
ensureCapacity(OptOutConst.EntrySize);

OptOutEntry.writeTo(buffer, hashBytes, idBytes, timestamp);

flushToStream(stream);
}

/**
* Write the end-of-delta sentinel entry.
* Uses ones hash bytes and the window end timestamp.
*
* @param stream Output stream to write to
* @param windowEnd Window end timestamp (epoch seconds)
* @throws IOException if write fails
*/
public void writeEndOfDelta(ByteArrayOutputStream stream, long windowEnd) throws IOException {
ensureCapacity(OptOutConst.EntrySize);

buffer.put(OptOutUtils.onesHashBytes);
buffer.put(OptOutUtils.onesHashBytes);
buffer.putLong(windowEnd);

flushToStream(stream);
}

/**
* Flush the buffer contents to the output stream and clear the buffer.
*/
private void flushToStream(ByteArrayOutputStream stream) throws IOException {
buffer.flip();
byte[] entry = new byte[buffer.remaining()];
buffer.get(entry);
stream.write(entry);
buffer.clear();
}

/**
* Ensure buffer has sufficient capacity, expanding if necessary.
*/
private void ensureCapacity(int dataSize) {
if (buffer.capacity() < dataSize) {
int newCapacity = Integer.highestOneBit(dataSize) << 1;
LOGGER.info("expanding buffer size: current {}, need {}, new {}", buffer.capacity(), dataSize, newCapacity);
this.buffer = ByteBuffer.allocate(newCapacity).order(ByteOrder.LITTLE_ENDIAN);
}
}
Comment on lines +105 to +111
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method is always called with dataSize = OptOutConst.EntrySize. Why don't we remove this method and the ctor's bufferSize param, and initialize buffer with the required capacity?

}

85 changes: 85 additions & 0 deletions src/main/java/com/uid2/optout/delta/DeltaProductionJobStatus.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
package com.uid2.optout.delta;

import io.vertx.core.json.JsonObject;
import java.time.Instant;

/**
* Represents the status and result of an async delta production job on a pod.
*
* This class tracks the lifecycle of a delta production job including its state
* (running, completed, failed), timing information, and result or error details.
*
*/
public class DeltaProductionJobStatus {
private final Instant startTime;
private volatile JobState state;
private volatile JsonObject result;
private volatile String errorMessage;
private volatile Instant endTime;

public enum JobState {
RUNNING,
COMPLETED,
FAILED
}

public DeltaProductionJobStatus() {
this.startTime = Instant.now();
this.state = JobState.RUNNING;
}

/**
* Mark the job as completed with the given result.
* @param result The result details as a JsonObject
*/
public void complete(JsonObject result) {
this.result = result;
this.state = JobState.COMPLETED;
this.endTime = Instant.now();
}

/**
* Mark the job as failed with the given error message.
* @param errorMessage Description of the failure
*/
public void fail(String errorMessage) {
this.errorMessage = errorMessage;
this.state = JobState.FAILED;
this.endTime = Instant.now();
}

/**
* Get the current state of the job.
* @return The job state
*/
public JobState getState() {
return state;
}

/**
* Convert the job status to a JSON representation for API responses.
* @return JsonObject with state, timing, and result/error information
*/
public JsonObject toJson() {
JsonObject json = new JsonObject()
.put("state", state.name().toLowerCase())
.put("start_time", startTime.toString());

if (endTime != null) {
json.put("end_time", endTime.toString());
long durationSeconds = endTime.getEpochSecond() - startTime.getEpochSecond();
json.put("duration_seconds", durationSeconds);
}

if (state == JobState.COMPLETED && result != null) {
json.put("result", result);
}

if (state == JobState.FAILED && errorMessage != null) {
json.put("error", errorMessage);
}

return json;
}
}

60 changes: 60 additions & 0 deletions src/main/java/com/uid2/optout/delta/DeltaProductionMetrics.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
package com.uid2.optout.delta;

import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.Metrics;

/**
* Metrics counters for delta production operations.
*
* Tracks:
* - Number of delta files produced
* - Number of opt-out entries processed
* - Number of dropped request files produced
* - Number of dropped requests processed
*/
public class DeltaProductionMetrics {

private final Counter deltasProduced;
private final Counter entriesProcessed;
private final Counter droppedRequestFilesProduced;
private final Counter droppedRequestsProcessed;

public DeltaProductionMetrics() {
this.deltasProduced = Counter
.builder("uid2_optout_sqs_delta_produced_total")
.description("counter for how many optout delta files are produced from SQS")
.register(Metrics.globalRegistry);

this.entriesProcessed = Counter
.builder("uid2_optout_sqs_entries_processed_total")
.description("counter for how many optout entries are processed from SQS")
.register(Metrics.globalRegistry);

this.droppedRequestFilesProduced = Counter
.builder("uid2_optout_sqs_dropped_request_files_produced_total")
.description("counter for how many optout dropped request files are produced from SQS")
.register(Metrics.globalRegistry);

this.droppedRequestsProcessed = Counter
.builder("uid2_optout_sqs_dropped_requests_processed_total")
.description("counter for how many optout dropped requests are processed from SQS")
.register(Metrics.globalRegistry);
}

/**
* Record that a delta file was produced with the given number of entries.
*/
public void recordDeltaProduced(int entryCount) {
deltasProduced.increment();
entriesProcessed.increment(entryCount);
}

/**
* Record that a dropped requests file was produced with the given number of entries.
*/
public void recordDroppedRequestsProduced(int requestCount) {
droppedRequestFilesProduced.increment();
droppedRequestsProcessed.increment(requestCount);
}
}

126 changes: 126 additions & 0 deletions src/main/java/com/uid2/optout/delta/DeltaProductionResult.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
package com.uid2.optout.delta;

import io.vertx.core.json.JsonObject;

/**
* Immutable result containing statistics from a delta production job.
*
* <p>This class holds production counts and the stop reason, with methods for JSON serialization.
* Use {@link Builder} to accumulate statistics during production, then call {@link Builder#build()}
* to create the immutable result.</p>
*
* <p>Note: Job duration is tracked by {@link DeltaProductionJobStatus}, not this class.</p>
*/
public class DeltaProductionResult {
private final int deltasProduced;
private final int entriesProcessed;
private final int droppedRequestFilesProduced;
private final int droppedRequestsProcessed;
private final StopReason stopReason;

/**
* Private constructor. Use {@link #builder()} to create instances.
*/
private DeltaProductionResult(int deltasProduced, int entriesProcessed,
int droppedRequestFilesProduced, int droppedRequestsProcessed,
StopReason stopReason) {
this.deltasProduced = deltasProduced;
this.entriesProcessed = entriesProcessed;
this.droppedRequestFilesProduced = droppedRequestFilesProduced;
this.droppedRequestsProcessed = droppedRequestsProcessed;
this.stopReason = stopReason;
}

/**
* Creates a new Builder for accumulating production statistics.
*/
public static Builder builder() {
return new Builder();
}

// ==================== Getters ====================

public int getDeltasProduced() {
return deltasProduced;
}

public int getEntriesProcessed() {
return entriesProcessed;
}

public int getDroppedRequestFilesProduced() {
return droppedRequestFilesProduced;
}

public int getDroppedRequestsProcessed() {
return droppedRequestsProcessed;
}

public StopReason getStopReason() {
return stopReason;
}

// ==================== JSON Serialization ====================

public JsonObject toJson() {
return new JsonObject()
.put("deltas_produced", deltasProduced)
.put("entries_processed", entriesProcessed)
.put("dropped_request_files_produced", droppedRequestFilesProduced)
.put("dropped_requests_processed", droppedRequestsProcessed)
.put("stop_reason", stopReason.name());
}

public JsonObject toJsonWithStatus(String status) {
return toJson().put("status", status);
}

@Override
public String toString() {
return String.format(
"DeltaProductionResult{deltasProduced=%d, entriesProcessed=%d, " +
"droppedRequestFilesProduced=%d, droppedRequestsProcessed=%d, stopReason=%s}",
deltasProduced, entriesProcessed, droppedRequestFilesProduced,
droppedRequestsProcessed, stopReason);
}

// ==================== Builder ====================

/**
* Mutable builder for accumulating production statistics.
*
* <p>Use this builder to track stats during delta production jobs,
* then call {@link #build()} to create the immutable result.</p>
*/
public static class Builder {
private int deltasProduced;
private int entriesProcessed;
private int droppedRequestFilesProduced;
private int droppedRequestsProcessed;
private StopReason stopReason = StopReason.NONE;

public Builder incrementDeltas(int count) {
deltasProduced++;
entriesProcessed += count;
return this;
}

public Builder incrementDroppedRequests(int count) {
droppedRequestFilesProduced++;
droppedRequestsProcessed += count;
return this;
}

/**
* Builds the DeltaProductionResult with the accumulated statistics.
*/
public DeltaProductionResult build() {
return new DeltaProductionResult(
deltasProduced,
entriesProcessed,
droppedRequestFilesProduced,
droppedRequestsProcessed,
stopReason);
}
}
}
Loading
Loading