Skip to content

Commit 2026b8d

Browse files
authored
fix: thread safe analytics processor (#147)
* fix: thread safe analytics processor * use long adder
1 parent db55ef5 commit 2026b8d

File tree

2 files changed

+57
-27
lines changed

2 files changed

+57
-27
lines changed

src/main/java/com/flagsmith/threads/AnalyticsProcessor.java

Lines changed: 37 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,11 @@
66
import com.flagsmith.MapperFactory;
77
import com.flagsmith.interfaces.FlagsmithSdk;
88
import java.time.Instant;
9-
import java.util.HashMap;
109
import java.util.Map;
10+
import java.util.concurrent.ConcurrentHashMap;
11+
import java.util.concurrent.atomic.AtomicBoolean;
12+
import java.util.concurrent.atomic.LongAdder;
13+
1114
import lombok.Getter;
1215
import lombok.ToString;
1316
import okhttp3.HttpUrl;
@@ -21,9 +24,10 @@ public class AnalyticsProcessor {
2124

2225
private final String analyticsEndpoint = "analytics/flags/";
2326
private Integer analyticsTimer = 10;
24-
private Map<String, Integer> analyticsData;
27+
private Map<String, LongAdder> analyticsData;
2528
@ToString.Exclude private FlagsmithSdk api;
2629
private Long nextFlush;
30+
private AtomicBoolean isFlushing = new AtomicBoolean(false);
2731
private RequestProcessor requestProcessor;
2832
private HttpUrl analyticsUrl;
2933
FlagsmithLogger logger;
@@ -63,7 +67,7 @@ public AnalyticsProcessor(FlagsmithSdk api, OkHttpClient client, FlagsmithLogger
6367
*/
6468
public AnalyticsProcessor(
6569
FlagsmithSdk api, FlagsmithLogger logger, RequestProcessor requestProcessor) {
66-
this.analyticsData = new HashMap<String, Integer>();
70+
this.analyticsData = new ConcurrentHashMap<String, LongAdder>();
6771
this.requestProcessor = requestProcessor;
6872
this.logger = logger;
6973
this.nextFlush = Instant.now().getEpochSecond() + analyticsTimer;
@@ -111,38 +115,44 @@ private HttpUrl getAnalyticsUrl() {
111115
* Push the analytics to the server.
112116
*/
113117
public void flush() {
114-
115-
if (analyticsData.isEmpty()) {
116-
return;
117-
}
118-
119-
String response;
120-
121-
try {
122-
ObjectMapper mapper = MapperFactory.getMapper();
123-
response = mapper.writeValueAsString(analyticsData);
124-
} catch (JsonProcessingException jpe) {
125-
logger.error("Error parsing analytics data to JSON.", jpe);
126-
return;
118+
// Make sure analytics data is only flushed once.
119+
if (isFlushing.compareAndSet(false, true)) {
120+
if (analyticsData.isEmpty()) {
121+
isFlushing.set(false);
122+
return;
123+
}
124+
125+
String response;
126+
127+
try {
128+
ObjectMapper mapper = MapperFactory.getMapper();
129+
response = mapper.writeValueAsString(analyticsData);
130+
analyticsData.clear();
131+
} catch (JsonProcessingException jpe) {
132+
logger.error("Error parsing analytics data to JSON.", jpe);
133+
isFlushing.set(false);
134+
return;
135+
}
136+
137+
MediaType json = MediaType.parse("application/json; charset=utf-8");
138+
RequestBody body = RequestBody.create(json, response);
139+
140+
Request request = api.newPostRequest(getAnalyticsUrl(), body);
141+
142+
getRequestProcessor().executeAsync(request, Boolean.FALSE);
143+
144+
setNextFlush();
145+
isFlushing.set(false);
127146
}
128-
129-
MediaType json = MediaType.parse("application/json; charset=utf-8");
130-
RequestBody body = RequestBody.create(json, response);
131-
132-
Request request = api.newPostRequest(getAnalyticsUrl(), body);
133-
134-
getRequestProcessor().executeAsync(request, Boolean.FALSE);
135-
136-
analyticsData.clear();
137-
setNextFlush();
138147
}
139148

140149
/**
141150
* Track the feature usage for analytics.
142151
* @param featureName name of the feature to track evaluation for
143152
*/
144153
public void trackFeature(String featureName) {
145-
analyticsData.put(featureName, analyticsData.getOrDefault(featureName, 0) + 1);
154+
analyticsData.computeIfAbsent(featureName, k -> new LongAdder()).increment();
155+
146156
if (nextFlush.compareTo(Instant.now().getEpochSecond()) < 0) {
147157
this.flush();
148158
}

src/test/java/com/flagsmith/threads/AnalyticsProcessorTest.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,14 +8,23 @@
88
import static org.mockito.Mockito.verify;
99
import static org.mockito.Mockito.when;
1010

11+
import com.fasterxml.jackson.databind.ObjectMapper;
1112
import com.flagsmith.FlagsmithApiWrapper;
1213
import com.flagsmith.FlagsmithException;
1314
import com.flagsmith.FlagsmithLogger;
15+
import com.flagsmith.MapperFactory;
1416
import com.flagsmith.config.FlagsmithConfig;
1517
import com.flagsmith.config.Retry;
1618
import java.io.IOException;
19+
import java.util.Map;
20+
import java.util.concurrent.ConcurrentHashMap;
21+
import java.util.concurrent.atomic.LongAccumulator;
22+
import java.util.concurrent.atomic.LongAdder;
23+
24+
import lombok.SneakyThrows;
1725
import okhttp3.Response;
1826
import okhttp3.mock.MockInterceptor;
27+
import org.junit.Assert;
1928
import org.junit.jupiter.api.Assertions;
2029
import org.junit.jupiter.api.BeforeEach;
2130
import org.junit.jupiter.api.Test;
@@ -96,4 +105,15 @@ public void testClose_ShutsDownExecutor() {
96105
// Then
97106
verify(requestProcessor, times(1)).close();
98107
}
108+
109+
@Test
110+
@SneakyThrows
111+
public void AnalyticsProcessor_longAdderGetsSerializedCorrectly() {
112+
analytics.trackFeature("foo");
113+
114+
ObjectMapper mapper = MapperFactory.getMapper();
115+
String response = mapper.writeValueAsString(analytics.getAnalyticsData());
116+
117+
Assertions.assertEquals("{\"foo\":1}", response);
118+
}
99119
}

0 commit comments

Comments
 (0)