Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ public SubscriptionManager(SubscriptionCollection subscriptionCollection) {
this.subscriptionCollection = subscriptionCollection;
}

@Scheduled(every = "PT12H")
@Scheduled(every = "${subscription.cleanup.interval}")
public void cleanup() {
this.subscriptionCollection.cleanUp();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import java.util.Map;

import org.eclipse.microprofile.config.inject.ConfigProperty;
import org.iris_events.subscription.model.Resource;
Expand Down Expand Up @@ -89,6 +90,8 @@ public void remove(final String sessionId) {
final var sessionSubscriptionsSetId = Utils.getSessionSubscriptionsSetId(sessionId);
final var sessionSubscriptionIds = redisClient.smembers(sessionSubscriptionsSetId).stream().map(Response::toString)
.toList();

removeFromResourceIndexes(sessionSubscriptionIds);
final var idsToRemove = Stream.concat(Stream.of(sessionSubscriptionsSetId), sessionSubscriptionIds.stream()).toList();
redisClient.del(idsToRemove);
}
Expand Down Expand Up @@ -154,6 +157,26 @@ public void cleanUp() {
cleanSubscriptionPointers(SESSION_SUB_TEMPLATE);
}

private void removeFromResourceIndexes(final List<String> subscriptionIds) {
// Group operations by resource set key to batch them
Map<String, List<String>> removalsByResourceSet = subscriptionIds.stream()
.filter(Utils::isValidSubscriptionId)
.collect(Collectors.groupingBy(Utils::getResourceSetKey));

// Execute batched removals
for (Map.Entry<String, List<String>> entry : removalsByResourceSet.entrySet()) {
String resourceSetKey = entry.getKey();
List<String> subscriptionsToRemove = entry.getValue();

List<String> sremArgs = new ArrayList<>(subscriptionsToRemove.size() + 1);
sremArgs.add(resourceSetKey);
sremArgs.addAll(subscriptionsToRemove);

redisClient.srem(sremArgs);
}

}

private void cleanSubscriptionPointers(String subTemplate) {
var scanCursor = "0";
var subscriptionIdsToRemove = new HashMap<String, List<String>>();
Expand Down
15 changes: 15 additions & 0 deletions src/main/java/org/iris_events/subscription/collection/Utils.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,11 @@ public class Utils {
public static final String RESOURCE_SNAP_TEMPLATE = "resTypeResIdSnap|%s";
public static final String RESOURCE_SUB_TEMPLATE = "resTypeResIdSub|%s";
public static final String PIPE = "|";
public static final String SUBSCRIPTION_ID_DELIMITER = "\\|";
public static final int MIN_SUBSCRIPTION_PARTS = 4;
public static final int RESOURCE_TYPE_INDEX = 2;
public static final int RESOURCE_ID_INDEX = 3;


public static String getResourceSubscriptionsSetId(final String resourceType, final String resourceId) {
final var uniqueResId = getUniqueResId(resourceType, resourceId);
Expand All @@ -32,4 +37,14 @@ public static String generateSubscriptionId(Subscription subscription) {
final var subId = subscription.sessionId() + PIPE + subscription.resourceType() + PIPE + subscription.resourceId();
return String.format(SUB_TEMPLATE, subId);
}

public static boolean isValidSubscriptionId(String subscriptionId) {
return subscriptionId != null &&
subscriptionId.split(SUBSCRIPTION_ID_DELIMITER).length >= MIN_SUBSCRIPTION_PARTS;
}

public static String getResourceSetKey(String subscriptionId) {
String[] parts = subscriptionId.split(SUBSCRIPTION_ID_DELIMITER);
return Utils.getResourceSubscriptionsSetId(parts[RESOURCE_TYPE_INDEX], parts[RESOURCE_ID_INDEX]);
}
}
3 changes: 2 additions & 1 deletion src/main/resources/application.properties
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,10 @@ quarkus.health.openapi.included=true

# SUBSCRIPTION COLLECTION REDIS
subscription.collection.redis.ttl=86400
subscription.cleanup.interval=${SUBS_CLEAN_INTERVAL:PT10M}

quarkus.otel.exporter.otlp.traces.endpoint=${OTLP_ENDPOINT:http://localhost:4318}
quarkus.otel.exporter.otlp.traces.protocol=http/protobuf
quarkus.otel.exporter.otlp.traces.protocol=${OTLP_PROTOCOL:http/protobuf}
quarkus.otel.enabled=false
%prod.quarkus.otel.enabled=true

Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
package org.iris_events.subscription.collection;

import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.CoreMatchers.*;
import static org.hamcrest.MatcherAssert.assertThat;

import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.Set;

import io.vertx.redis.client.Response;
import org.hamcrest.CoreMatchers;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
Expand Down Expand Up @@ -113,6 +114,97 @@ void removeBySessionId() {
assertThat(size, is(2));
}

@Test
void testBatchedRemovalByResourceSet() {
final var resourceType = "sharedResourceType";
final var resourceId = "sharedResourceId";

subscriptionCollection.insert(new Subscription(resourceType, resourceId, "session1"));
subscriptionCollection.insert(new Subscription(resourceType, resourceId, "session2"));
subscriptionCollection.insert(new Subscription(resourceType, resourceId, "session3"));

subscriptionCollection.insert(new Subscription("otherType", "otherId", "session1"));

assertThat(subscriptionCollection.size(), is(8)); // 4 original + 4 new

// Remove session1 (should remove 2 subscriptions)
subscriptionCollection.remove("session1");

// Verify results
assertThat(subscriptionCollection.size(), is(6)); // 8 - 2 = 6
assertThat(subscriptionCollection.get("session1").size(), is(0));
assertThat(subscriptionCollection.get(new Resource(resourceType, resourceId)).size(), is(2)); // session2, session3
}

@Test
void testRemovalFromResourceSets() {
// Setup: Create subscriptions
final var sessionId = getSessionId("1");
final var resourceType = getResourceTypeId("1");
final var resourceId = getResourceId("1");

subscriptionCollection.insert(getSubscription("1", "1", "1"));
subscriptionCollection.insert(getSubscription("1", "1", "2")); // Same resource, different session

// Verify setup: Both subscriptions should be in the resource set
final var resourceSetKey = Utils.getResourceSubscriptionsSetId(resourceType, resourceId);
final var subscriptionsInSet = redisClient.smembers(resourceSetKey);
assertThat(subscriptionsInSet.size(), is(2));

// Remove session1
subscriptionCollection.remove(sessionId);

// Verify: Only session1's subscription should be removed from resource set
final var subscriptionsAfterRemoval = redisClient.smembers(resourceSetKey);
assertThat(subscriptionsAfterRemoval.size(), is(1));

// Verify the remaining subscription belongs to session2
final var remainingSubscription = subscriptionsAfterRemoval.stream()
.map(Response::toString)
.findFirst()
.orElse("");
assertThat(remainingSubscription, containsString("sessionId2"));
}

@Test
void testRemovalFromMultipleResourceSets() {
final var sessionId = getSessionId("1");

// Session1 has subscriptions to 2 different resources
subscriptionCollection.insert(getSubscription("1", "1", "1")); // resource1
subscriptionCollection.insert(getSubscription("2", "2", "1")); // resource2

// Other sessions also have subscriptions to these resources
subscriptionCollection.insert(getSubscription("1", "1", "2")); // resource1
subscriptionCollection.insert(getSubscription("2", "2", "3")); // resource2

// Get resource set keys
final var resourceSet1Key = Utils.getResourceSubscriptionsSetId(getResourceTypeId("1"), getResourceId("1"));
final var resourceSet2Key = Utils.getResourceSubscriptionsSetId(getResourceTypeId("2"), getResourceId("2"));

// Verify setup: Each resource set should have 2 subscriptions
assertThat(redisClient.smembers(resourceSet1Key).size(), is(2));
assertThat(redisClient.smembers(resourceSet2Key).size(), is(2));

// Remove session1
subscriptionCollection.remove(sessionId);

// Verify: Session1's subscriptions removed from both resource sets
assertThat(redisClient.smembers(resourceSet1Key).size(), is(1));
assertThat(redisClient.smembers(resourceSet2Key).size(), is(1));

// Verify remaining subscriptions don't belong to session1
final var remaining1 = redisClient.smembers(resourceSet1Key).stream()
.map(Response::toString)
.findFirst().orElse("");
final var remaining2 = redisClient.smembers(resourceSet2Key).stream()
.map(Response::toString)
.findFirst().orElse("");

assertThat(remaining1, not(containsString("sessionId1")));
assertThat(remaining2, not(containsString("sessionId1")));
}

@Test
void removeBySessionIdResTypeResId() {
final var sessionId1 = getSessionId("1");
Expand Down
Loading