Skip to content

Commit cd953b6

Browse files
authored
Prevent traffic metrics inconsistent between in-memory and database server. (#13045)
1 parent 028757f commit cd953b6

File tree

4 files changed

+31
-3
lines changed

4 files changed

+31
-3
lines changed

docs/en/changes/changes.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,7 @@
7171
* Bump up netty to 4.11.118 to fix CVE-2025-24970.
7272
* Add `Get Alarm Runtime Status` API.
7373
* Add `lock` when query the Alarm metrics window values.
74+
* Add a fail-safe mechanism to prevent traffic metrics inconsistent between in-memory and database server.
7475

7576
#### UI
7677

oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsPersistentWorker.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -107,7 +107,7 @@ public class MetricsPersistentWorker extends PersistenceWorker<Metrics> implemen
107107
long storageSessionTimeout, int metricsDataTTL, MetricStreamKind kind) {
108108
super(moduleDefineHolder, new ReadWriteSafeCache<>(new MergableBufferedData(), new MergableBufferedData()));
109109
this.model = model;
110-
this.sessionCache = new MetricsSessionCache(storageSessionTimeout);
110+
this.sessionCache = new MetricsSessionCache(storageSessionTimeout, supportUpdate);
111111
this.metricsDAO = metricsDAO;
112112
this.nextAlarmWorker = Optional.ofNullable(nextAlarmWorker);
113113
this.nextExportWorker = Optional.ofNullable(nextExportWorker);

oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsSessionCache.java

Lines changed: 28 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,16 +19,23 @@
1919
package org.apache.skywalking.oap.server.core.analysis.worker;
2020

2121
import java.util.Iterator;
22+
import java.util.List;
2223
import java.util.Map;
2324
import java.util.concurrent.ConcurrentHashMap;
2425
import lombok.AccessLevel;
2526
import lombok.Setter;
27+
import org.apache.skywalking.oap.server.core.analysis.MetricsExtension;
28+
import org.apache.skywalking.oap.server.core.analysis.manual.service.ServiceTraffic;
2629
import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
30+
import org.apache.skywalking.oap.server.core.storage.IMetricsDAO;
31+
import org.apache.skywalking.oap.server.core.storage.SessionCacheCallback;
32+
import org.apache.skywalking.oap.server.core.storage.model.Model;
2733
import org.apache.skywalking.oap.server.library.client.request.InsertRequest;
2834
import org.apache.skywalking.oap.server.library.client.request.UpdateRequest;
2935

3036
/**
3137
* MetricsSessionCache is a key-value cache to hold hot metric in-memory to reduce payload to pre-read.
38+
* Every instance of MetricsSessionCache maps to an instance of {@link MetricsPersistentWorker}.
3239
*
3340
* There are two ways to make sure metrics in-cache,
3441
* 1. Metrics is read from the Database through {@link MetricsPersistentWorker}.loadFromStorage
@@ -45,14 +52,16 @@ public class MetricsSessionCache {
4552
private final Map<Metrics, Metrics> sessionCache;
4653
@Setter(AccessLevel.PACKAGE)
4754
private long timeoutThreshold;
55+
private final boolean supportUpdate;
4856

49-
public MetricsSessionCache(long timeoutThreshold) {
57+
public MetricsSessionCache(long timeoutThreshold, final boolean supportUpdate) {
5058
// Due to the cache would be updated depending on final storage implementation,
5159
// the map/cache could be updated concurrently.
5260
// Set to ConcurrentHashMap in order to avoid HashMap deadlock.
5361
// Since 9.3.0
5462
this.sessionCache = new ConcurrentHashMap<>(100);
5563
this.timeoutThreshold = timeoutThreshold;
64+
this.supportUpdate = supportUpdate;
5665
}
5766

5867
Metrics get(Metrics metrics) {
@@ -67,6 +76,24 @@ public void put(Metrics metrics) {
6776
sessionCache.put(metrics, metrics);
6877
}
6978

79+
/**
80+
* This method relies on the response of database flush callback.
81+
* Push the data into the in-memory cache for all metrics except {@link MetricsExtension#supportUpdate()} labeled
82+
* as false.
83+
* Because those data(e.g. {@link ServiceTraffic}) is one-time writing in the whole TTL period, and some
84+
* database(e.g. BanyanDB) has in-memory cache at the server side to improve performance but trade off the 100%
85+
* eventual consistency of writing, which means database server could respond
86+
* {@link SessionCacheCallback#onInsertCompleted()} but ends of writing failure caused by crashing.
87+
* This fail-safe mechanism would require the cache of this kind of metric must be read through
88+
* {@link IMetricsDAO#multiGet(Model, List)} which guaranteed data existence.
89+
*/
90+
public void cacheAfterFlush(Metrics metrics) {
91+
if (supportUpdate) {
92+
put(metrics);
93+
}
94+
// Don't add into the cache. Rely on multiGet from Database.
95+
}
96+
7097
void removeExpired() {
7198
Iterator<Metrics> iterator = sessionCache.values().iterator();
7299
long timestamp = System.currentTimeMillis();

oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/SessionCacheCallback.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ public void onInsertCompleted() {
4040
if (isFailed) {
4141
return;
4242
}
43-
sessionCache.put(metrics);
43+
sessionCache.cacheAfterFlush(metrics);
4444
}
4545

4646
public void onUpdateFailure() {

0 commit comments

Comments
 (0)