Skip to content

Commit c89e056

Browse files
authored
[Break Change]Storage: Move event from metrics to recods (#13276)
1 parent 6e435d8 commit c89e056

File tree

12 files changed

+167
-185
lines changed

12 files changed

+167
-185
lines changed

docs/en/changes/changes.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,9 +23,10 @@
2323
* Add JSON format support for the `/debugging/config/dump` status API.
2424
* Enhance status APIs to support multiple `accept` header values, e.g. `Accept: application/json; charset=utf-8`.
2525
* Storage: separate `SpanAttachedEventRecord` for SkyWalking trace and Zipkin trace.
26-
* [Break Change]BanyanDB: Setup new Group policy.
26+
* [Break Change]BanyanDB: Setup new Group policy.
2727
* Bump up commons-beanutils to 1.11.0.
2828
* Refactor: simplify the `Accept` http header process.
29+
* [Break Change]Storage: Move `event` from metrics to recods.
2930

3031
#### UI
3132

oap-server/analyzer/event-analyzer/src/main/java/org/apache/skywalking/oap/server/analyzer/event/listener/EventRecordAnalyzerListener.java

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -23,9 +23,9 @@
2323
import org.apache.skywalking.oap.server.core.CoreModule;
2424
import org.apache.skywalking.oap.server.core.analysis.Layer;
2525
import org.apache.skywalking.oap.server.core.analysis.TimeBucket;
26-
import org.apache.skywalking.oap.server.core.analysis.worker.MetricsStreamProcessor;
26+
import org.apache.skywalking.oap.server.core.analysis.worker.RecordStreamProcessor;
2727
import org.apache.skywalking.oap.server.core.config.NamingControl;
28-
import org.apache.skywalking.oap.server.core.analysis.metrics.Event;
28+
import org.apache.skywalking.oap.server.core.analysis.record.Event;
2929
import org.apache.skywalking.oap.server.library.module.ModuleManager;
3030
import lombok.RequiredArgsConstructor;
3131

@@ -42,7 +42,7 @@ public class EventRecordAnalyzerListener implements EventAnalyzerListener {
4242

4343
@Override
4444
public void build() {
45-
MetricsStreamProcessor.getInstance().in(event);
45+
RecordStreamProcessor.getInstance().in(event);
4646
}
4747

4848
@Override
@@ -67,9 +67,11 @@ public void parse(final org.apache.skywalking.apm.network.event.v3.Event e) {
6767
event.setStartTime(e.getStartTime());
6868
event.setEndTime(e.getEndTime());
6969
if (e.getStartTime() > 0) {
70-
event.setTimeBucket(TimeBucket.getMinuteTimeBucket(e.getStartTime()));
70+
event.setTimeBucket(TimeBucket.getRecordTimeBucket(e.getStartTime()));
71+
event.setTimestamp(e.getStartTime());
7172
} else if (e.getEndTime() > 0) {
72-
event.setTimeBucket(TimeBucket.getMinuteTimeBucket(e.getEndTime()));
73+
event.setTimeBucket(TimeBucket.getRecordTimeBucket(e.getEndTime()));
74+
event.setTimestamp(e.getEndTime());
7375
}
7476
}
7577

oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/metrics/Event.java renamed to oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/record/Event.java

Lines changed: 17 additions & 119 deletions
Original file line numberDiff line numberDiff line change
@@ -16,18 +16,13 @@
1616
*
1717
*/
1818

19-
package org.apache.skywalking.oap.server.core.analysis.metrics;
19+
package org.apache.skywalking.oap.server.core.analysis.record;
2020

21-
import com.google.common.base.Strings;
22-
import lombok.EqualsAndHashCode;
2321
import lombok.Getter;
2422
import lombok.Setter;
2523
import org.apache.skywalking.oap.server.core.analysis.Layer;
26-
import org.apache.skywalking.oap.server.core.analysis.MetricsExtension;
2724
import org.apache.skywalking.oap.server.core.analysis.Stream;
28-
import org.apache.skywalking.oap.server.core.analysis.TimeBucket;
29-
import org.apache.skywalking.oap.server.core.analysis.worker.MetricsStreamProcessor;
30-
import org.apache.skywalking.oap.server.core.remote.grpc.proto.RemoteData;
25+
import org.apache.skywalking.oap.server.core.analysis.worker.RecordStreamProcessor;
3126
import org.apache.skywalking.oap.server.core.source.ScopeDeclaration;
3227
import org.apache.skywalking.oap.server.core.storage.StorageID;
3328
import org.apache.skywalking.oap.server.core.storage.annotation.BanyanDB;
@@ -38,19 +33,13 @@
3833
import org.apache.skywalking.oap.server.core.storage.type.StorageBuilder;
3934

4035
import static org.apache.skywalking.oap.server.core.source.DefaultScopeDefine.EVENT;
41-
import static org.apache.skywalking.oap.server.library.util.StringUtil.isNotBlank;
4236

4337
@Getter
4438
@Setter
4539
@ScopeDeclaration(id = EVENT, name = "Event")
46-
@Stream(name = Event.INDEX_NAME, scopeId = EVENT, builder = Event.Builder.class, processor = MetricsStreamProcessor.class)
47-
@MetricsExtension(supportDownSampling = false, supportUpdate = true)
48-
@EqualsAndHashCode(
49-
callSuper = false,
50-
of = "uuid"
51-
)
52-
@BanyanDB.IndexMode
53-
public class Event extends Metrics {
40+
@Stream(name = Event.INDEX_NAME, scopeId = EVENT, builder = Event.Builder.class, processor = RecordStreamProcessor.class)
41+
@BanyanDB.TimestampColumn(Event.TIMESTAMP)
42+
public class Event extends Record {
5443

5544
public static final String INDEX_NAME = "events";
5645

@@ -62,7 +51,7 @@ public class Event extends Metrics {
6251

6352
public static final String ENDPOINT = "endpoint";
6453

65-
public static final String NAME = "name";
54+
public static final String NAME = "event_name";
6655

6756
public static final String TYPE = "type";
6857

@@ -78,10 +67,7 @@ public class Event extends Metrics {
7867

7968
private static final int PARAMETER_MAX_LENGTH = 4000;
8069

81-
@Override
82-
protected StorageID id0() {
83-
return new StorageID().append(UUID, getUuid());
84-
}
70+
public static final String TIMESTAMP = "timestamp";
8571

8672
@Column(name = UUID)
8773
@BanyanDB.SeriesID(index = 0)
@@ -109,7 +95,6 @@ protected StorageID id0() {
10995
private String parameters;
11096

11197
@ElasticSearch.EnableDocValues
112-
@BanyanDB.EnableSort
11398
@Column(name = START_TIME)
11499
private long startTime;
115100

@@ -119,105 +104,16 @@ protected StorageID id0() {
119104
@Column(name = LAYER)
120105
private Layer layer;
121106

122-
@Override
123-
public boolean combine(final Metrics metrics) {
124-
final Event event = (Event) metrics;
125-
126-
// Set time bucket only when it's never set.
127-
if (getTimeBucket() <= 0) {
128-
if (event.getStartTime() > 0) {
129-
setTimeBucket(TimeBucket.getMinuteTimeBucket(event.getStartTime()));
130-
} else if (event.getEndTime() > 0) {
131-
setTimeBucket(TimeBucket.getMinuteTimeBucket(event.getEndTime()));
132-
}
133-
}
134-
135-
// Set start time only when it's never set, (`start` event may come after `end` event).
136-
if (getStartTime() <= 0 && event.getStartTime() > 0) {
137-
setStartTime(event.getStartTime());
138-
}
139-
140-
if (event.getEndTime() > 0) {
141-
setEndTime(event.getEndTime());
142-
}
143-
144-
if (isNotBlank(event.getType())) {
145-
setType(event.getType());
146-
}
147-
if (isNotBlank(event.getMessage())) {
148-
setMessage(event.getMessage());
149-
}
150-
if (isNotBlank(event.getParameters())) {
151-
setParameters(event.getParameters());
152-
}
153-
return true;
154-
}
155-
156-
/**
157-
* @since 9.0.0 Limit the length of {@link #parameters}
158-
*/
159-
public void setParameters(String parameters) {
160-
this.parameters = parameters == null || parameters.length() <= PARAMETER_MAX_LENGTH ?
161-
parameters : parameters.substring(0, PARAMETER_MAX_LENGTH);
162-
}
163-
164-
@Override
165-
public void calculate() {
166-
}
167-
168-
@Override
169-
public Metrics toHour() {
170-
return null;
171-
}
172-
173-
@Override
174-
public Metrics toDay() {
175-
return null;
176-
}
177-
178-
@Override
179-
public void deserialize(final RemoteData remoteData) {
180-
setUuid(remoteData.getDataStrings(0));
181-
setService(remoteData.getDataStrings(1));
182-
setServiceInstance(remoteData.getDataStrings(2));
183-
setEndpoint(remoteData.getDataStrings(3));
184-
setName(remoteData.getDataStrings(4));
185-
setType(remoteData.getDataStrings(5));
186-
setMessage(remoteData.getDataStrings(6));
187-
setParameters(remoteData.getDataStrings(7));
188-
189-
setStartTime(remoteData.getDataLongs(0));
190-
setEndTime(remoteData.getDataLongs(1));
191-
setTimeBucket(remoteData.getDataLongs(2));
192-
193-
setLayer(Layer.valueOf(remoteData.getDataIntegers(0)));
194-
}
195-
196-
@Override
197-
public RemoteData.Builder serialize() {
198-
final RemoteData.Builder builder = RemoteData.newBuilder();
199-
200-
builder.addDataStrings(getUuid());
201-
builder.addDataStrings(getService());
202-
builder.addDataStrings(getServiceInstance());
203-
builder.addDataStrings(getEndpoint());
204-
builder.addDataStrings(getName());
205-
builder.addDataStrings(getType());
206-
builder.addDataStrings(getMessage());
207-
builder.addDataStrings(Strings.nullToEmpty(getParameters()));
208-
209-
builder.addDataLongs(getStartTime());
210-
builder.addDataLongs(getEndTime());
211-
builder.addDataLongs(getTimeBucket());
212-
213-
builder.addDataIntegers(getLayer().value());
214-
215-
return builder;
216-
}
107+
@Setter
108+
@Getter
109+
@ElasticSearch.EnableDocValues
110+
@Column(name = TIMESTAMP)
111+
private long timestamp;
217112

218113
@Override
219-
public int remoteHashCode() {
220-
return hashCode();
114+
public StorageID id() {
115+
return new StorageID().append(TIME_BUCKET, getTimeBucket())
116+
.append(UUID, uuid);
221117
}
222118

223119
public static class Builder implements StorageBuilder<Event> {
@@ -235,6 +131,7 @@ public Event storage2Entity(final Convert2Entity converter) {
235131
record.setStartTime(((Number) converter.get(START_TIME)).longValue());
236132
record.setEndTime(((Number) converter.get(END_TIME)).longValue());
237133
record.setTimeBucket(((Number) converter.get(TIME_BUCKET)).longValue());
134+
record.setTimestamp(((Number) converter.get(TIMESTAMP)).longValue());
238135
if (converter.get(LAYER) != null) {
239136
record.setLayer(Layer.valueOf(((Number) converter.get(LAYER)).intValue()));
240137
}
@@ -254,6 +151,7 @@ public void entity2Storage(final Event storageData, final Convert2Storage conver
254151
converter.accept(START_TIME, storageData.getStartTime());
255152
converter.accept(END_TIME, storageData.getEndTime());
256153
converter.accept(TIME_BUCKET, storageData.getTimeBucket());
154+
converter.accept(TIMESTAMP, storageData.getTimestamp());
257155
Layer layer = storageData.getLayer();
258156
converter.accept(LAYER, layer != null ? layer.value() : Layer.UNDEFINED.value());
259157
}

oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/EventQueryService.java

Lines changed: 41 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,16 +18,21 @@
1818

1919
package org.apache.skywalking.oap.server.core.query;
2020

21+
import org.apache.skywalking.oap.server.core.query.enumeration.Order;
2122
import org.apache.skywalking.oap.server.core.query.input.Duration;
23+
import org.apache.skywalking.oap.server.core.query.type.event.Event;
2224
import org.apache.skywalking.oap.server.core.query.type.event.EventQueryCondition;
2325
import org.apache.skywalking.oap.server.core.query.type.event.Events;
2426
import org.apache.skywalking.oap.server.core.storage.StorageModule;
2527
import org.apache.skywalking.oap.server.core.storage.query.IEventQueryDAO;
2628
import org.apache.skywalking.oap.server.library.module.ModuleManager;
2729
import org.apache.skywalking.oap.server.library.module.Service;
28-
30+
import java.util.Comparator;
31+
import java.util.HashMap;
2932
import java.util.List;
33+
import java.util.Map;
3034
import java.util.Objects;
35+
import java.util.stream.Collectors;
3136

3237
import static java.util.Objects.isNull;
3338
import static org.apache.skywalking.oap.server.library.util.StringUtil.isBlank;
@@ -53,18 +58,51 @@ public Events queryEvents(final EventQueryCondition condition) throws Exception
5358
if (isBlank(condition.getUuid()) && isDurationInvalid(condition.getTime())) {
5459
throw new IllegalArgumentException("time field is required when uuid is absent.");
5560
}
56-
return getDao().queryEvents(condition);
61+
Events events = getDao().queryEvents(condition);
62+
return mergeAndSortEvents(events, condition.getOrder());
5763
}
5864

5965
public Events queryEvents(final List<EventQueryCondition> conditions) throws Exception {
6066
EventQueryCondition condition = conditions.stream().filter(c -> isBlank(c.getUuid()) && isDurationInvalid(c.getTime())).findFirst().orElse(null);
6167
if (Objects.nonNull(condition)) {
6268
throw new IllegalArgumentException("time field is required when uuid is absent.");
6369
}
64-
return getDao().queryEvents(conditions);
70+
Events events = getDao().queryEvents(conditions);
71+
return mergeAndSortEvents(events, conditions.get(0).getOrder());
6572
}
6673

6774
boolean isDurationInvalid(final Duration duration) {
6875
return isNull(duration) || (isBlank(duration.getStart()) || isBlank(duration.getEnd()));
6976
}
77+
78+
private Events mergeAndSortEvents(Events events, Order order) {
79+
final Order queryOrder = isNull(order) ? Order.DES : order;
80+
Map<String, Event> mergedEvents = new HashMap<>();
81+
for (Event event : events.getEvents()) {
82+
String key = event.getUuid();
83+
if (!mergedEvents.containsKey(key)) {
84+
mergedEvents.put(key, event);
85+
} else {
86+
Event existingEvent = mergedEvents.get(key);
87+
if ((event.getStartTime() > 0 && existingEvent.getStartTime() > event.getStartTime())
88+
|| existingEvent.getStartTime() == 0) {
89+
existingEvent.setStartTime(event.getStartTime());
90+
if (existingEvent.getEndTime() == 0) {
91+
existingEvent.setTimestamp(event.getTimestamp());
92+
}
93+
} else if (event.getEndTime() > 0 && existingEvent.getEndTime() < event.getEndTime()) {
94+
event.setStartTime(existingEvent.getStartTime());
95+
mergedEvents.put(key, event);
96+
}
97+
}
98+
}
99+
List<Event> sortedEvents;
100+
if (queryOrder == Order.ASC) {
101+
sortedEvents = mergedEvents.values().stream().sorted(Comparator.comparing(Event::getTimestamp)).collect(Collectors.toList());
102+
} else {
103+
sortedEvents = mergedEvents.values().stream().sorted(Comparator.comparing(Event::getTimestamp).reversed()).collect(Collectors.toList());
104+
}
105+
106+
return new Events(sortedEvents);
107+
}
70108
}

oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/type/event/Event.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,9 @@ public class Event {
4949

5050
private long endTime;
5151

52+
// The timestamp of the event. If the end time is set, it will be used as the timestamp, otherwise, the start time will be used.
53+
private long timestamp;
54+
5255
private String layer;
5356

5457
public void setParameters(final List<KeyValue> parameters) {

oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBStorageProvider.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@
6060
import org.apache.skywalking.oap.server.library.module.ModuleStartException;
6161
import org.apache.skywalking.oap.server.library.module.ServiceNotProvidedException;
6262
import org.apache.skywalking.oap.server.storage.plugin.banyandb.measure.BanyanDBEBPFProfilingScheduleQueryDAO;
63-
import org.apache.skywalking.oap.server.storage.plugin.banyandb.measure.BanyanDBEventQueryDAO;
63+
import org.apache.skywalking.oap.server.storage.plugin.banyandb.stream.BanyanDBEventQueryDAO;
6464
import org.apache.skywalking.oap.server.storage.plugin.banyandb.measure.BanyanDBHierarchyQueryDAO;
6565
import org.apache.skywalking.oap.server.storage.plugin.banyandb.measure.BanyanDBMetadataQueryDAO;
6666
import org.apache.skywalking.oap.server.storage.plugin.banyandb.measure.BanyanDBMetricsQueryDAO;

0 commit comments

Comments
 (0)