Skip to content

Commit a6f9539

Browse files
committed
Refactor Client and SseEventBus to use AtomicInteger and AtomicReference for thread-safe operations
1 parent 314d307 commit a6f9539

File tree

3 files changed

+55
-13
lines changed

3 files changed

+55
-13
lines changed

src/main/java/ch/rasc/sse/eventbus/Client.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ public class Client {
2525

2626
private volatile long lastTransfer;
2727

28-
private final boolean completeAfterMessage;
28+
private volatile boolean completeAfterMessage;
2929

3030
Client(String id, SseEmitter sseEmitter, boolean completeAfterMessage) {
3131
this.id = id;
@@ -54,6 +54,10 @@ void updateEmitter(SseEmitter emitter) {
5454
this.sseEmitter = emitter;
5555
}
5656

57+
void updateCompleteAfterMessage(boolean completeAfterMessage) {
58+
this.completeAfterMessage = completeAfterMessage;
59+
}
60+
5761
boolean isCompleteAfterMessage() {
5862
return this.completeAfterMessage;
5963
}

src/main/java/ch/rasc/sse/eventbus/ClientEvent.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
package ch.rasc.sse.eventbus;
1717

1818
import java.time.Duration;
19+
import java.util.concurrent.atomic.AtomicInteger;
1920

2021
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
2122
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter.SseEventBuilder;
@@ -28,13 +29,13 @@ public class ClientEvent {
2829

2930
private final String convertedValue;
3031

31-
private int errorCounter;
32+
private final AtomicInteger errorCounter;
3233

3334
public ClientEvent(Client client, SseEvent event, String convertedValue) {
3435
this.client = client;
3536
this.event = event;
3637
this.convertedValue = convertedValue;
37-
this.errorCounter = 0;
38+
this.errorCounter = new AtomicInteger(0);
3839
}
3940

4041
public Client getClient() {
@@ -78,11 +79,11 @@ private static void addStringData(SseEventBuilder sseBuilder, String value) {
7879
}
7980

8081
void incErrorCounter() {
81-
this.errorCounter++;
82+
this.errorCounter.incrementAndGet();
8283
}
8384

8485
public int getErrorCounter() {
85-
return this.errorCounter;
86+
return this.errorCounter.get();
8687
}
8788

8889
}

src/main/java/ch/rasc/sse/eventbus/SseEventBus.java

Lines changed: 45 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import java.util.concurrent.ConcurrentMap;
2828
import java.util.concurrent.ScheduledExecutorService;
2929
import java.util.concurrent.TimeUnit;
30+
import java.util.concurrent.atomic.AtomicReference;
3031

3132
import org.apache.commons.logging.Log;
3233
import org.apache.commons.logging.LogFactory;
@@ -235,25 +236,40 @@ public void registerClient(String clientId, SseEmitter emitter) {
235236
* first message
236237
*/
237238
public void registerClient(String clientId, SseEmitter emitter, boolean completeAfterMessage) {
239+
AtomicReference<SseEmitter> oldEmitter = new AtomicReference<>();
238240
this.clients.compute(clientId, (id, existing) -> {
239241
if (existing == null) {
240242
return new Client(id, emitter, completeAfterMessage);
241243
}
244+
oldEmitter.set(existing.sseEmitter());
242245
existing.updateEmitter(emitter);
246+
existing.updateCompleteAfterMessage(completeAfterMessage);
243247
return existing;
244248
});
249+
if (oldEmitter.get() != null) {
250+
try {
251+
oldEmitter.get().complete();
252+
}
253+
catch (Exception e) {
254+
logger.debug("Error completing old emitter for client " + clientId, e);
255+
}
256+
}
245257
}
246258

247259
/**
248260
* Unregisters a client and unsubscribes the client from all events.
249261
* @param clientId unique client identifier
250262
*/
251263
public void unregisterClient(String clientId) {
252-
unsubscribeFromAllEvents(clientId);
253-
Client removed = this.clients.remove(clientId);
254-
if (removed != null) {
264+
AtomicReference<SseEmitter> removedEmitter = new AtomicReference<>();
265+
this.clients.computeIfPresent(clientId, (id, client) -> {
266+
removedEmitter.set(client.sseEmitter());
267+
unsubscribeFromAllEvents(id);
268+
return null;
269+
});
270+
if (removedEmitter.get() != null) {
255271
try {
256-
removed.sseEmitter().complete();
272+
removedEmitter.get().complete();
257273
}
258274
catch (Exception e) {
259275
logger.debug("Error completing emitter for client " + clientId, e);
@@ -284,8 +300,8 @@ public void subscribe(String clientId, String event) {
284300
* @param event the event name
285301
*/
286302
public void subscribeOnly(String clientId, String event) {
287-
this.subscriptionRegistry.subscribe(clientId, event);
288303
this.unsubscribeFromAllEvents(clientId, event);
304+
this.subscriptionRegistry.subscribe(clientId, event);
289305
}
290306

291307
/**
@@ -493,9 +509,30 @@ private void cleanUpClients() {
493509
staleClients.add(entry.getKey());
494510
}
495511
}
496-
staleClients.forEach(this::unregisterClient);
497-
if (!staleClients.isEmpty()) {
498-
this.listener.afterClientsUnregistered(staleClients);
512+
Set<String> actuallyRemoved = new HashSet<>();
513+
for (String clientId : staleClients) {
514+
long recheckExpiration = System.currentTimeMillis() - this.clientExpiration.toMillis();
515+
AtomicReference<SseEmitter> removedEmitter = new AtomicReference<>();
516+
this.clients.computeIfPresent(clientId, (id, client) -> {
517+
if (client.lastTransfer() < recheckExpiration) {
518+
removedEmitter.set(client.sseEmitter());
519+
unsubscribeFromAllEvents(id);
520+
return null;
521+
}
522+
return client;
523+
});
524+
if (removedEmitter.get() != null) {
525+
actuallyRemoved.add(clientId);
526+
try {
527+
removedEmitter.get().complete();
528+
}
529+
catch (Exception e) {
530+
logger.debug("Error completing emitter for client " + clientId, e);
531+
}
532+
}
533+
}
534+
if (!actuallyRemoved.isEmpty()) {
535+
this.listener.afterClientsUnregistered(actuallyRemoved);
499536
}
500537
}
501538
}

0 commit comments

Comments
 (0)