Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion docs/en/changes/changes.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,10 @@
* Add JSON format support for the `/debugging/config/dump` status API.
* Enhance status APIs to support multiple `accept` header values, e.g. `Accept: application/json; charset=utf-8`.
* Storage: separate `SpanAttachedEventRecord` for SkyWalking trace and Zipkin trace.
* [Break Change]BanyanDB: Setup new Group policy.
* [Break Change]BanyanDB: Setup new Group policy.
* Bump up commons-beanutils to 1.11.0.
* Refactor: simplify the `Accept` http header process.
* [Break Change]Storage: Move `event` from metrics to recods.

#### UI

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@
import org.apache.skywalking.oap.server.core.CoreModule;
import org.apache.skywalking.oap.server.core.analysis.Layer;
import org.apache.skywalking.oap.server.core.analysis.TimeBucket;
import org.apache.skywalking.oap.server.core.analysis.worker.MetricsStreamProcessor;
import org.apache.skywalking.oap.server.core.analysis.worker.RecordStreamProcessor;
import org.apache.skywalking.oap.server.core.config.NamingControl;
import org.apache.skywalking.oap.server.core.analysis.metrics.Event;
import org.apache.skywalking.oap.server.core.analysis.record.Event;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
import lombok.RequiredArgsConstructor;

Expand All @@ -42,7 +42,7 @@ public class EventRecordAnalyzerListener implements EventAnalyzerListener {

@Override
public void build() {
MetricsStreamProcessor.getInstance().in(event);
RecordStreamProcessor.getInstance().in(event);
}

@Override
Expand All @@ -67,9 +67,11 @@ public void parse(final org.apache.skywalking.apm.network.event.v3.Event e) {
event.setStartTime(e.getStartTime());
event.setEndTime(e.getEndTime());
if (e.getStartTime() > 0) {
event.setTimeBucket(TimeBucket.getMinuteTimeBucket(e.getStartTime()));
event.setTimeBucket(TimeBucket.getRecordTimeBucket(e.getStartTime()));
event.setTimestamp(e.getStartTime());
} else if (e.getEndTime() > 0) {
event.setTimeBucket(TimeBucket.getMinuteTimeBucket(e.getEndTime()));
event.setTimeBucket(TimeBucket.getRecordTimeBucket(e.getEndTime()));
event.setTimestamp(e.getEndTime());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,18 +16,13 @@
*
*/

package org.apache.skywalking.oap.server.core.analysis.metrics;
package org.apache.skywalking.oap.server.core.analysis.record;

import com.google.common.base.Strings;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.Setter;
import org.apache.skywalking.oap.server.core.analysis.Layer;
import org.apache.skywalking.oap.server.core.analysis.MetricsExtension;
import org.apache.skywalking.oap.server.core.analysis.Stream;
import org.apache.skywalking.oap.server.core.analysis.TimeBucket;
import org.apache.skywalking.oap.server.core.analysis.worker.MetricsStreamProcessor;
import org.apache.skywalking.oap.server.core.remote.grpc.proto.RemoteData;
import org.apache.skywalking.oap.server.core.analysis.worker.RecordStreamProcessor;
import org.apache.skywalking.oap.server.core.source.ScopeDeclaration;
import org.apache.skywalking.oap.server.core.storage.StorageID;
import org.apache.skywalking.oap.server.core.storage.annotation.BanyanDB;
Expand All @@ -38,19 +33,13 @@
import org.apache.skywalking.oap.server.core.storage.type.StorageBuilder;

import static org.apache.skywalking.oap.server.core.source.DefaultScopeDefine.EVENT;
import static org.apache.skywalking.oap.server.library.util.StringUtil.isNotBlank;

@Getter
@Setter
@ScopeDeclaration(id = EVENT, name = "Event")
@Stream(name = Event.INDEX_NAME, scopeId = EVENT, builder = Event.Builder.class, processor = MetricsStreamProcessor.class)
@MetricsExtension(supportDownSampling = false, supportUpdate = true)
@EqualsAndHashCode(
callSuper = false,
of = "uuid"
)
@BanyanDB.IndexMode
public class Event extends Metrics {
@Stream(name = Event.INDEX_NAME, scopeId = EVENT, builder = Event.Builder.class, processor = RecordStreamProcessor.class)
@BanyanDB.TimestampColumn(Event.TIMESTAMP)
public class Event extends Record {

public static final String INDEX_NAME = "events";

Expand All @@ -62,7 +51,7 @@ public class Event extends Metrics {

public static final String ENDPOINT = "endpoint";

public static final String NAME = "name";
public static final String NAME = "event_name";

public static final String TYPE = "type";

Expand All @@ -78,10 +67,7 @@ public class Event extends Metrics {

private static final int PARAMETER_MAX_LENGTH = 4000;

@Override
protected StorageID id0() {
return new StorageID().append(UUID, getUuid());
}
public static final String TIMESTAMP = "timestamp";

@Column(name = UUID)
@BanyanDB.SeriesID(index = 0)
Expand Down Expand Up @@ -109,7 +95,6 @@ protected StorageID id0() {
private String parameters;

@ElasticSearch.EnableDocValues
@BanyanDB.EnableSort
@Column(name = START_TIME)
private long startTime;

Expand All @@ -119,105 +104,16 @@ protected StorageID id0() {
@Column(name = LAYER)
private Layer layer;

@Override
public boolean combine(final Metrics metrics) {
final Event event = (Event) metrics;

// Set time bucket only when it's never set.
if (getTimeBucket() <= 0) {
if (event.getStartTime() > 0) {
setTimeBucket(TimeBucket.getMinuteTimeBucket(event.getStartTime()));
} else if (event.getEndTime() > 0) {
setTimeBucket(TimeBucket.getMinuteTimeBucket(event.getEndTime()));
}
}

// Set start time only when it's never set, (`start` event may come after `end` event).
if (getStartTime() <= 0 && event.getStartTime() > 0) {
setStartTime(event.getStartTime());
}

if (event.getEndTime() > 0) {
setEndTime(event.getEndTime());
}

if (isNotBlank(event.getType())) {
setType(event.getType());
}
if (isNotBlank(event.getMessage())) {
setMessage(event.getMessage());
}
if (isNotBlank(event.getParameters())) {
setParameters(event.getParameters());
}
return true;
}

/**
* @since 9.0.0 Limit the length of {@link #parameters}
*/
public void setParameters(String parameters) {
this.parameters = parameters == null || parameters.length() <= PARAMETER_MAX_LENGTH ?
parameters : parameters.substring(0, PARAMETER_MAX_LENGTH);
}

@Override
public void calculate() {
}

@Override
public Metrics toHour() {
return null;
}

@Override
public Metrics toDay() {
return null;
}

@Override
public void deserialize(final RemoteData remoteData) {
setUuid(remoteData.getDataStrings(0));
setService(remoteData.getDataStrings(1));
setServiceInstance(remoteData.getDataStrings(2));
setEndpoint(remoteData.getDataStrings(3));
setName(remoteData.getDataStrings(4));
setType(remoteData.getDataStrings(5));
setMessage(remoteData.getDataStrings(6));
setParameters(remoteData.getDataStrings(7));

setStartTime(remoteData.getDataLongs(0));
setEndTime(remoteData.getDataLongs(1));
setTimeBucket(remoteData.getDataLongs(2));

setLayer(Layer.valueOf(remoteData.getDataIntegers(0)));
}

@Override
public RemoteData.Builder serialize() {
final RemoteData.Builder builder = RemoteData.newBuilder();

builder.addDataStrings(getUuid());
builder.addDataStrings(getService());
builder.addDataStrings(getServiceInstance());
builder.addDataStrings(getEndpoint());
builder.addDataStrings(getName());
builder.addDataStrings(getType());
builder.addDataStrings(getMessage());
builder.addDataStrings(Strings.nullToEmpty(getParameters()));

builder.addDataLongs(getStartTime());
builder.addDataLongs(getEndTime());
builder.addDataLongs(getTimeBucket());

builder.addDataIntegers(getLayer().value());

return builder;
}
@Setter
@Getter
@ElasticSearch.EnableDocValues
@Column(name = TIMESTAMP)
private long timestamp;

@Override
public int remoteHashCode() {
return hashCode();
public StorageID id() {
return new StorageID().append(TIME_BUCKET, getTimeBucket())
.append(UUID, uuid);
}

public static class Builder implements StorageBuilder<Event> {
Expand All @@ -235,6 +131,7 @@ public Event storage2Entity(final Convert2Entity converter) {
record.setStartTime(((Number) converter.get(START_TIME)).longValue());
record.setEndTime(((Number) converter.get(END_TIME)).longValue());
record.setTimeBucket(((Number) converter.get(TIME_BUCKET)).longValue());
record.setTimestamp(((Number) converter.get(TIMESTAMP)).longValue());
if (converter.get(LAYER) != null) {
record.setLayer(Layer.valueOf(((Number) converter.get(LAYER)).intValue()));
}
Expand All @@ -254,6 +151,7 @@ public void entity2Storage(final Event storageData, final Convert2Storage conver
converter.accept(START_TIME, storageData.getStartTime());
converter.accept(END_TIME, storageData.getEndTime());
converter.accept(TIME_BUCKET, storageData.getTimeBucket());
converter.accept(TIMESTAMP, storageData.getTimestamp());
Layer layer = storageData.getLayer();
converter.accept(LAYER, layer != null ? layer.value() : Layer.UNDEFINED.value());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,21 @@

package org.apache.skywalking.oap.server.core.query;

import org.apache.skywalking.oap.server.core.query.enumeration.Order;
import org.apache.skywalking.oap.server.core.query.input.Duration;
import org.apache.skywalking.oap.server.core.query.type.event.Event;
import org.apache.skywalking.oap.server.core.query.type.event.EventQueryCondition;
import org.apache.skywalking.oap.server.core.query.type.event.Events;
import org.apache.skywalking.oap.server.core.storage.StorageModule;
import org.apache.skywalking.oap.server.core.storage.query.IEventQueryDAO;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
import org.apache.skywalking.oap.server.library.module.Service;

import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;

import static java.util.Objects.isNull;
import static org.apache.skywalking.oap.server.library.util.StringUtil.isBlank;
Expand All @@ -53,18 +58,51 @@ public Events queryEvents(final EventQueryCondition condition) throws Exception
if (isBlank(condition.getUuid()) && isDurationInvalid(condition.getTime())) {
throw new IllegalArgumentException("time field is required when uuid is absent.");
}
return getDao().queryEvents(condition);
Events events = getDao().queryEvents(condition);
return mergeAndSortEvents(events, condition.getOrder());
}

public Events queryEvents(final List<EventQueryCondition> conditions) throws Exception {
EventQueryCondition condition = conditions.stream().filter(c -> isBlank(c.getUuid()) && isDurationInvalid(c.getTime())).findFirst().orElse(null);
if (Objects.nonNull(condition)) {
throw new IllegalArgumentException("time field is required when uuid is absent.");
}
return getDao().queryEvents(conditions);
Events events = getDao().queryEvents(conditions);
return mergeAndSortEvents(events, conditions.get(0).getOrder());
}

boolean isDurationInvalid(final Duration duration) {
return isNull(duration) || (isBlank(duration.getStart()) || isBlank(duration.getEnd()));
}

private Events mergeAndSortEvents(Events events, Order order) {
final Order queryOrder = isNull(order) ? Order.DES : order;
Map<String, Event> mergedEvents = new HashMap<>();
for (Event event : events.getEvents()) {
String key = event.getUuid();
if (!mergedEvents.containsKey(key)) {
mergedEvents.put(key, event);
} else {
Event existingEvent = mergedEvents.get(key);
if ((event.getStartTime() > 0 && existingEvent.getStartTime() > event.getStartTime())
|| existingEvent.getStartTime() == 0) {
existingEvent.setStartTime(event.getStartTime());
if (existingEvent.getEndTime() == 0) {
existingEvent.setTimestamp(event.getTimestamp());
}
} else if (event.getEndTime() > 0 && existingEvent.getEndTime() < event.getEndTime()) {
event.setStartTime(existingEvent.getStartTime());
mergedEvents.put(key, event);
}
}
}
List<Event> sortedEvents;
if (queryOrder == Order.ASC) {
sortedEvents = mergedEvents.values().stream().sorted(Comparator.comparing(Event::getTimestamp)).collect(Collectors.toList());
} else {
sortedEvents = mergedEvents.values().stream().sorted(Comparator.comparing(Event::getTimestamp).reversed()).collect(Collectors.toList());
}

return new Events(sortedEvents);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,9 @@ public class Event {

private long endTime;

// The timestamp of the event. If the end time is set, it will be used as the timestamp, otherwise, the start time will be used.
private long timestamp;

private String layer;

public void setParameters(final List<KeyValue> parameters) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@
import org.apache.skywalking.oap.server.library.module.ModuleStartException;
import org.apache.skywalking.oap.server.library.module.ServiceNotProvidedException;
import org.apache.skywalking.oap.server.storage.plugin.banyandb.measure.BanyanDBEBPFProfilingScheduleQueryDAO;
import org.apache.skywalking.oap.server.storage.plugin.banyandb.measure.BanyanDBEventQueryDAO;
import org.apache.skywalking.oap.server.storage.plugin.banyandb.stream.BanyanDBEventQueryDAO;
import org.apache.skywalking.oap.server.storage.plugin.banyandb.measure.BanyanDBHierarchyQueryDAO;
import org.apache.skywalking.oap.server.storage.plugin.banyandb.measure.BanyanDBMetadataQueryDAO;
import org.apache.skywalking.oap.server.storage.plugin.banyandb.measure.BanyanDBMetricsQueryDAO;
Expand Down
Loading
Loading