diff --git a/docs/en/changes/changes.md b/docs/en/changes/changes.md index 01899d88a329..fd650c4fda9c 100644 --- a/docs/en/changes/changes.md +++ b/docs/en/changes/changes.md @@ -23,9 +23,10 @@ * Add JSON format support for the `/debugging/config/dump` status API. * Enhance status APIs to support multiple `accept` header values, e.g. `Accept: application/json; charset=utf-8`. * Storage: separate `SpanAttachedEventRecord` for SkyWalking trace and Zipkin trace. -* [Break Change]BanyanDB: Setup new Group policy. +* [Break Change]BanyanDB: Setup new Group policy. * Bump up commons-beanutils to 1.11.0. * Refactor: simplify the `Accept` http header process. +* [Break Change]Storage: Move `event` from metrics to recods. #### UI diff --git a/oap-server/analyzer/event-analyzer/src/main/java/org/apache/skywalking/oap/server/analyzer/event/listener/EventRecordAnalyzerListener.java b/oap-server/analyzer/event-analyzer/src/main/java/org/apache/skywalking/oap/server/analyzer/event/listener/EventRecordAnalyzerListener.java index f2e957469763..bd1f123f345f 100644 --- a/oap-server/analyzer/event-analyzer/src/main/java/org/apache/skywalking/oap/server/analyzer/event/listener/EventRecordAnalyzerListener.java +++ b/oap-server/analyzer/event-analyzer/src/main/java/org/apache/skywalking/oap/server/analyzer/event/listener/EventRecordAnalyzerListener.java @@ -23,9 +23,9 @@ import org.apache.skywalking.oap.server.core.CoreModule; import org.apache.skywalking.oap.server.core.analysis.Layer; import org.apache.skywalking.oap.server.core.analysis.TimeBucket; -import org.apache.skywalking.oap.server.core.analysis.worker.MetricsStreamProcessor; +import org.apache.skywalking.oap.server.core.analysis.worker.RecordStreamProcessor; import org.apache.skywalking.oap.server.core.config.NamingControl; -import org.apache.skywalking.oap.server.core.analysis.metrics.Event; +import org.apache.skywalking.oap.server.core.analysis.record.Event; import org.apache.skywalking.oap.server.library.module.ModuleManager; import lombok.RequiredArgsConstructor; @@ -42,7 +42,7 @@ public class EventRecordAnalyzerListener implements EventAnalyzerListener { @Override public void build() { - MetricsStreamProcessor.getInstance().in(event); + RecordStreamProcessor.getInstance().in(event); } @Override @@ -67,9 +67,11 @@ public void parse(final org.apache.skywalking.apm.network.event.v3.Event e) { event.setStartTime(e.getStartTime()); event.setEndTime(e.getEndTime()); if (e.getStartTime() > 0) { - event.setTimeBucket(TimeBucket.getMinuteTimeBucket(e.getStartTime())); + event.setTimeBucket(TimeBucket.getRecordTimeBucket(e.getStartTime())); + event.setTimestamp(e.getStartTime()); } else if (e.getEndTime() > 0) { - event.setTimeBucket(TimeBucket.getMinuteTimeBucket(e.getEndTime())); + event.setTimeBucket(TimeBucket.getRecordTimeBucket(e.getEndTime())); + event.setTimestamp(e.getEndTime()); } } diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/metrics/Event.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/record/Event.java similarity index 57% rename from oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/metrics/Event.java rename to oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/record/Event.java index 98d2fbe220ac..32f052501297 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/metrics/Event.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/record/Event.java @@ -16,18 +16,13 @@ * */ -package org.apache.skywalking.oap.server.core.analysis.metrics; +package org.apache.skywalking.oap.server.core.analysis.record; -import com.google.common.base.Strings; -import lombok.EqualsAndHashCode; import lombok.Getter; import lombok.Setter; import org.apache.skywalking.oap.server.core.analysis.Layer; -import org.apache.skywalking.oap.server.core.analysis.MetricsExtension; import org.apache.skywalking.oap.server.core.analysis.Stream; -import org.apache.skywalking.oap.server.core.analysis.TimeBucket; -import org.apache.skywalking.oap.server.core.analysis.worker.MetricsStreamProcessor; -import org.apache.skywalking.oap.server.core.remote.grpc.proto.RemoteData; +import org.apache.skywalking.oap.server.core.analysis.worker.RecordStreamProcessor; import org.apache.skywalking.oap.server.core.source.ScopeDeclaration; import org.apache.skywalking.oap.server.core.storage.StorageID; import org.apache.skywalking.oap.server.core.storage.annotation.BanyanDB; @@ -38,19 +33,13 @@ import org.apache.skywalking.oap.server.core.storage.type.StorageBuilder; import static org.apache.skywalking.oap.server.core.source.DefaultScopeDefine.EVENT; -import static org.apache.skywalking.oap.server.library.util.StringUtil.isNotBlank; @Getter @Setter @ScopeDeclaration(id = EVENT, name = "Event") -@Stream(name = Event.INDEX_NAME, scopeId = EVENT, builder = Event.Builder.class, processor = MetricsStreamProcessor.class) -@MetricsExtension(supportDownSampling = false, supportUpdate = true) -@EqualsAndHashCode( - callSuper = false, - of = "uuid" -) -@BanyanDB.IndexMode -public class Event extends Metrics { +@Stream(name = Event.INDEX_NAME, scopeId = EVENT, builder = Event.Builder.class, processor = RecordStreamProcessor.class) +@BanyanDB.TimestampColumn(Event.TIMESTAMP) +public class Event extends Record { public static final String INDEX_NAME = "events"; @@ -62,7 +51,7 @@ public class Event extends Metrics { public static final String ENDPOINT = "endpoint"; - public static final String NAME = "name"; + public static final String NAME = "event_name"; public static final String TYPE = "type"; @@ -78,10 +67,7 @@ public class Event extends Metrics { private static final int PARAMETER_MAX_LENGTH = 4000; - @Override - protected StorageID id0() { - return new StorageID().append(UUID, getUuid()); - } + public static final String TIMESTAMP = "timestamp"; @Column(name = UUID) @BanyanDB.SeriesID(index = 0) @@ -109,7 +95,6 @@ protected StorageID id0() { private String parameters; @ElasticSearch.EnableDocValues - @BanyanDB.EnableSort @Column(name = START_TIME) private long startTime; @@ -119,105 +104,16 @@ protected StorageID id0() { @Column(name = LAYER) private Layer layer; - @Override - public boolean combine(final Metrics metrics) { - final Event event = (Event) metrics; - - // Set time bucket only when it's never set. - if (getTimeBucket() <= 0) { - if (event.getStartTime() > 0) { - setTimeBucket(TimeBucket.getMinuteTimeBucket(event.getStartTime())); - } else if (event.getEndTime() > 0) { - setTimeBucket(TimeBucket.getMinuteTimeBucket(event.getEndTime())); - } - } - - // Set start time only when it's never set, (`start` event may come after `end` event). - if (getStartTime() <= 0 && event.getStartTime() > 0) { - setStartTime(event.getStartTime()); - } - - if (event.getEndTime() > 0) { - setEndTime(event.getEndTime()); - } - - if (isNotBlank(event.getType())) { - setType(event.getType()); - } - if (isNotBlank(event.getMessage())) { - setMessage(event.getMessage()); - } - if (isNotBlank(event.getParameters())) { - setParameters(event.getParameters()); - } - return true; - } - - /** - * @since 9.0.0 Limit the length of {@link #parameters} - */ - public void setParameters(String parameters) { - this.parameters = parameters == null || parameters.length() <= PARAMETER_MAX_LENGTH ? - parameters : parameters.substring(0, PARAMETER_MAX_LENGTH); - } - - @Override - public void calculate() { - } - - @Override - public Metrics toHour() { - return null; - } - - @Override - public Metrics toDay() { - return null; - } - - @Override - public void deserialize(final RemoteData remoteData) { - setUuid(remoteData.getDataStrings(0)); - setService(remoteData.getDataStrings(1)); - setServiceInstance(remoteData.getDataStrings(2)); - setEndpoint(remoteData.getDataStrings(3)); - setName(remoteData.getDataStrings(4)); - setType(remoteData.getDataStrings(5)); - setMessage(remoteData.getDataStrings(6)); - setParameters(remoteData.getDataStrings(7)); - - setStartTime(remoteData.getDataLongs(0)); - setEndTime(remoteData.getDataLongs(1)); - setTimeBucket(remoteData.getDataLongs(2)); - - setLayer(Layer.valueOf(remoteData.getDataIntegers(0))); - } - - @Override - public RemoteData.Builder serialize() { - final RemoteData.Builder builder = RemoteData.newBuilder(); - - builder.addDataStrings(getUuid()); - builder.addDataStrings(getService()); - builder.addDataStrings(getServiceInstance()); - builder.addDataStrings(getEndpoint()); - builder.addDataStrings(getName()); - builder.addDataStrings(getType()); - builder.addDataStrings(getMessage()); - builder.addDataStrings(Strings.nullToEmpty(getParameters())); - - builder.addDataLongs(getStartTime()); - builder.addDataLongs(getEndTime()); - builder.addDataLongs(getTimeBucket()); - - builder.addDataIntegers(getLayer().value()); - - return builder; - } + @Setter + @Getter + @ElasticSearch.EnableDocValues + @Column(name = TIMESTAMP) + private long timestamp; @Override - public int remoteHashCode() { - return hashCode(); + public StorageID id() { + return new StorageID().append(TIME_BUCKET, getTimeBucket()) + .append(UUID, uuid); } public static class Builder implements StorageBuilder { @@ -235,6 +131,7 @@ public Event storage2Entity(final Convert2Entity converter) { record.setStartTime(((Number) converter.get(START_TIME)).longValue()); record.setEndTime(((Number) converter.get(END_TIME)).longValue()); record.setTimeBucket(((Number) converter.get(TIME_BUCKET)).longValue()); + record.setTimestamp(((Number) converter.get(TIMESTAMP)).longValue()); if (converter.get(LAYER) != null) { record.setLayer(Layer.valueOf(((Number) converter.get(LAYER)).intValue())); } @@ -254,6 +151,7 @@ public void entity2Storage(final Event storageData, final Convert2Storage conver converter.accept(START_TIME, storageData.getStartTime()); converter.accept(END_TIME, storageData.getEndTime()); converter.accept(TIME_BUCKET, storageData.getTimeBucket()); + converter.accept(TIMESTAMP, storageData.getTimestamp()); Layer layer = storageData.getLayer(); converter.accept(LAYER, layer != null ? layer.value() : Layer.UNDEFINED.value()); } diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/EventQueryService.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/EventQueryService.java index 43cf3ea79105..5aafe2f6bd2e 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/EventQueryService.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/EventQueryService.java @@ -18,16 +18,21 @@ package org.apache.skywalking.oap.server.core.query; +import org.apache.skywalking.oap.server.core.query.enumeration.Order; import org.apache.skywalking.oap.server.core.query.input.Duration; +import org.apache.skywalking.oap.server.core.query.type.event.Event; import org.apache.skywalking.oap.server.core.query.type.event.EventQueryCondition; import org.apache.skywalking.oap.server.core.query.type.event.Events; import org.apache.skywalking.oap.server.core.storage.StorageModule; import org.apache.skywalking.oap.server.core.storage.query.IEventQueryDAO; import org.apache.skywalking.oap.server.library.module.ModuleManager; import org.apache.skywalking.oap.server.library.module.Service; - +import java.util.Comparator; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.Objects; +import java.util.stream.Collectors; import static java.util.Objects.isNull; import static org.apache.skywalking.oap.server.library.util.StringUtil.isBlank; @@ -53,7 +58,8 @@ public Events queryEvents(final EventQueryCondition condition) throws Exception if (isBlank(condition.getUuid()) && isDurationInvalid(condition.getTime())) { throw new IllegalArgumentException("time field is required when uuid is absent."); } - return getDao().queryEvents(condition); + Events events = getDao().queryEvents(condition); + return mergeAndSortEvents(events, condition.getOrder()); } public Events queryEvents(final List conditions) throws Exception { @@ -61,10 +67,42 @@ public Events queryEvents(final List conditions) throws Exc if (Objects.nonNull(condition)) { throw new IllegalArgumentException("time field is required when uuid is absent."); } - return getDao().queryEvents(conditions); + Events events = getDao().queryEvents(conditions); + return mergeAndSortEvents(events, conditions.get(0).getOrder()); } boolean isDurationInvalid(final Duration duration) { return isNull(duration) || (isBlank(duration.getStart()) || isBlank(duration.getEnd())); } + + private Events mergeAndSortEvents(Events events, Order order) { + final Order queryOrder = isNull(order) ? Order.DES : order; + Map mergedEvents = new HashMap<>(); + for (Event event : events.getEvents()) { + String key = event.getUuid(); + if (!mergedEvents.containsKey(key)) { + mergedEvents.put(key, event); + } else { + Event existingEvent = mergedEvents.get(key); + if ((event.getStartTime() > 0 && existingEvent.getStartTime() > event.getStartTime()) + || existingEvent.getStartTime() == 0) { + existingEvent.setStartTime(event.getStartTime()); + if (existingEvent.getEndTime() == 0) { + existingEvent.setTimestamp(event.getTimestamp()); + } + } else if (event.getEndTime() > 0 && existingEvent.getEndTime() < event.getEndTime()) { + event.setStartTime(existingEvent.getStartTime()); + mergedEvents.put(key, event); + } + } + } + List sortedEvents; + if (queryOrder == Order.ASC) { + sortedEvents = mergedEvents.values().stream().sorted(Comparator.comparing(Event::getTimestamp)).collect(Collectors.toList()); + } else { + sortedEvents = mergedEvents.values().stream().sorted(Comparator.comparing(Event::getTimestamp).reversed()).collect(Collectors.toList()); + } + + return new Events(sortedEvents); + } } diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/type/event/Event.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/type/event/Event.java index bd6bc0c320cd..4e476204559f 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/type/event/Event.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/type/event/Event.java @@ -49,6 +49,9 @@ public class Event { private long endTime; + // The timestamp of the event. If the end time is set, it will be used as the timestamp, otherwise, the start time will be used. + private long timestamp; + private String layer; public void setParameters(final List parameters) { diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBStorageProvider.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBStorageProvider.java index d224e7498ad4..886856d855b0 100644 --- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBStorageProvider.java +++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBStorageProvider.java @@ -60,7 +60,7 @@ import org.apache.skywalking.oap.server.library.module.ModuleStartException; import org.apache.skywalking.oap.server.library.module.ServiceNotProvidedException; import org.apache.skywalking.oap.server.storage.plugin.banyandb.measure.BanyanDBEBPFProfilingScheduleQueryDAO; -import org.apache.skywalking.oap.server.storage.plugin.banyandb.measure.BanyanDBEventQueryDAO; +import org.apache.skywalking.oap.server.storage.plugin.banyandb.stream.BanyanDBEventQueryDAO; import org.apache.skywalking.oap.server.storage.plugin.banyandb.measure.BanyanDBHierarchyQueryDAO; import org.apache.skywalking.oap.server.storage.plugin.banyandb.measure.BanyanDBMetadataQueryDAO; import org.apache.skywalking.oap.server.storage.plugin.banyandb.measure.BanyanDBMetricsQueryDAO; diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBEventQueryDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBEventQueryDAO.java similarity index 71% rename from oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBEventQueryDAO.java rename to oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBEventQueryDAO.java index 5ecf61bda74d..abaee1e18906 100644 --- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBEventQueryDAO.java +++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBEventQueryDAO.java @@ -16,7 +16,7 @@ * */ -package org.apache.skywalking.oap.server.storage.plugin.banyandb.measure; +package org.apache.skywalking.oap.server.storage.plugin.banyandb.stream; import java.util.ArrayList; import java.util.Collections; @@ -26,10 +26,10 @@ import com.google.common.collect.ImmutableSet; import org.apache.skywalking.banyandb.v1.client.AbstractCriteria; import org.apache.skywalking.banyandb.v1.client.AbstractQuery; -import org.apache.skywalking.banyandb.v1.client.DataPoint; -import org.apache.skywalking.banyandb.v1.client.MeasureQuery; -import org.apache.skywalking.banyandb.v1.client.MeasureQueryResponse; +import org.apache.skywalking.banyandb.v1.client.Element; import org.apache.skywalking.banyandb.v1.client.PairQueryCondition; +import org.apache.skywalking.banyandb.v1.client.StreamQuery; +import org.apache.skywalking.banyandb.v1.client.StreamQueryResponse; import org.apache.skywalking.oap.server.core.analysis.DownSampling; import org.apache.skywalking.oap.server.core.analysis.Layer; import org.apache.skywalking.oap.server.core.query.PaginationUtils; @@ -39,11 +39,10 @@ import org.apache.skywalking.oap.server.core.query.type.event.EventType; import org.apache.skywalking.oap.server.core.query.type.event.Events; import org.apache.skywalking.oap.server.core.query.type.event.Source; -import org.apache.skywalking.oap.server.core.analysis.metrics.Event; +import org.apache.skywalking.oap.server.core.analysis.record.Event; import org.apache.skywalking.oap.server.core.storage.query.IEventQueryDAO; import org.apache.skywalking.oap.server.storage.plugin.banyandb.BanyanDBStorageClient; import org.apache.skywalking.oap.server.storage.plugin.banyandb.MetadataRegistry; -import org.apache.skywalking.oap.server.storage.plugin.banyandb.stream.AbstractBanyanDBDAO; import static com.google.common.base.Strings.isNullOrEmpty; import static java.util.Objects.isNull; @@ -51,7 +50,7 @@ public class BanyanDBEventQueryDAO extends AbstractBanyanDBDAO implements IEventQueryDAO { private static final Set TAGS = ImmutableSet.of( Event.UUID, Event.SERVICE, Event.SERVICE_INSTANCE, Event.ENDPOINT, Event.NAME, - Event.MESSAGE, Event.TYPE, Event.START_TIME, Event.END_TIME, Event.PARAMETERS, Event.LAYER); + Event.MESSAGE, Event.TYPE, Event.START_TIME, Event.END_TIME, Event.PARAMETERS, Event.LAYER, Event.TIMESTAMP); public BanyanDBEventQueryDAO(final BanyanDBStorageClient client) { super(client); @@ -59,17 +58,15 @@ public BanyanDBEventQueryDAO(final BanyanDBStorageClient client) { @Override public Events queryEvents(EventQueryCondition condition) throws Exception { - MetadataRegistry.Schema schema = MetadataRegistry.INSTANCE.findMetadata(Event.INDEX_NAME, DownSampling.Minute); final Duration time = condition.getTime(); boolean isColdStage = time != null && time.isColdStage(); - MeasureQueryResponse resp = query(isColdStage, schema, TAGS, - Collections.emptySet(), getTimestampRange(time), buildQuery(Collections.singletonList(condition))); + StreamQueryResponse resp = query(isColdStage, Event.INDEX_NAME, TAGS, getTimestampRange(time), buildQuery(Collections.singletonList(condition))); Events events = new Events(); if (resp.size() == 0) { return events; } - for (final DataPoint dataPoint : resp.getDataPoints()) { - events.getEvents().add(buildEventView(dataPoint)); + for (final Element e : resp.getElements()) { + events.getEvents().add(buildEventView(e)); } return events; } @@ -80,35 +77,34 @@ public Events queryEvents(List conditionList) throws Except // Duration should be same for all conditions final Duration time = conditionList.get(0).getTime(); boolean isColdStage = time != null && time.isColdStage(); - MeasureQueryResponse resp = query(isColdStage, schema, TAGS, - Collections.emptySet(), getTimestampRange(time), buildQuery(conditionList)); + StreamQueryResponse resp = query(isColdStage, Event.INDEX_NAME, TAGS, getTimestampRange(time), buildQuery(conditionList)); Events events = new Events(); if (resp.size() == 0) { return events; } - for (final DataPoint dataPoint : resp.getDataPoints()) { - events.getEvents().add(buildEventView(dataPoint)); + for (final Element e : resp.getElements()) { + events.getEvents().add(buildEventView(e)); } return events; } - public QueryBuilder buildQuery(final List conditionList) { + public QueryBuilder buildQuery(final List conditionList) { EventQueryCondition condition = conditionList.get(0); final Order queryOrder = isNull(condition.getOrder()) ? Order.DES : condition.getOrder(); final PaginationUtils.Page page = PaginationUtils.INSTANCE.exchange(condition.getPaging()); - return new QueryBuilder() { + return new QueryBuilder() { @Override - protected void apply(MeasureQuery query) { + protected void apply(StreamQuery query) { List eventsQueryConditions = new ArrayList<>(conditionList.size()); - query.limit(page.getLimit()); - query.offset(page.getFrom()); + query.setLimit(page.getLimit()); + query.setOffset(page.getFrom()); if (queryOrder == Order.ASC) { query.setOrderBy( - new AbstractQuery.OrderBy(Event.START_TIME, AbstractQuery.Sort.ASC)); + new AbstractQuery.OrderBy(AbstractQuery.Sort.ASC)); } else { query.setOrderBy( - new AbstractQuery.OrderBy(Event.START_TIME, AbstractQuery.Sort.DESC)); + new AbstractQuery.OrderBy(AbstractQuery.Sort.DESC)); } for (final EventQueryCondition condition : conditionList) { List> queryConditions = new ArrayList<>(); @@ -151,31 +147,31 @@ protected void apply(MeasureQuery query) { } protected org.apache.skywalking.oap.server.core.query.type.event.Event buildEventView( - final DataPoint dataPoint) { + final Element e) { final org.apache.skywalking.oap.server.core.query.type.event.Event event = new org.apache.skywalking.oap.server.core.query.type.event.Event(); - event.setUuid(dataPoint.getTagValue(Event.UUID)); + event.setUuid(e.getTagValue(Event.UUID)); - String service = getValueOrDefault(dataPoint, Event.SERVICE, ""); - String serviceInstance = getValueOrDefault(dataPoint, Event.SERVICE_INSTANCE, ""); - String endpoint = getValueOrDefault(dataPoint, Event.ENDPOINT, ""); + String service = getValueOrDefault(e, Event.SERVICE, ""); + String serviceInstance = getValueOrDefault(e, Event.SERVICE_INSTANCE, ""); + String endpoint = getValueOrDefault(e, Event.ENDPOINT, ""); event.setSource(new Source(service, serviceInstance, endpoint)); - event.setName(dataPoint.getTagValue(Event.NAME)); - event.setType(EventType.parse(dataPoint.getTagValue(Event.TYPE))); - event.setMessage(dataPoint.getTagValue(Event.MESSAGE)); - event.setParameters((String) dataPoint.getTagValue(Event.PARAMETERS)); - event.setStartTime(dataPoint.getTagValue(Event.START_TIME)); - event.setEndTime(dataPoint.getTagValue(Event.END_TIME)); - - event.setLayer(Layer.valueOf(((Number) dataPoint.getTagValue(Event.LAYER)).intValue()).name()); + event.setName(e.getTagValue(Event.NAME)); + event.setType(EventType.parse(e.getTagValue(Event.TYPE))); + event.setMessage(e.getTagValue(Event.MESSAGE)); + event.setParameters((String) e.getTagValue(Event.PARAMETERS)); + event.setStartTime(e.getTagValue(Event.START_TIME)); + event.setEndTime(e.getTagValue(Event.END_TIME)); + event.setTimestamp(e.getTagValue(Event.TIMESTAMP)); + event.setLayer(Layer.valueOf(((Number) e.getTagValue(Event.LAYER)).intValue()).name()); return event; } - private T getValueOrDefault(DataPoint dataPoint, String tagName, T defaultValue) { - T v = dataPoint.getTagValue(tagName); + private T getValueOrDefault(Element e, String tagName, T defaultValue) { + T v = e.getTagValue(tagName); return v == null ? defaultValue : v; } } diff --git a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/ESEventQueryDAO.java b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/ESEventQueryDAO.java index f2f61681088d..89369b770d86 100644 --- a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/ESEventQueryDAO.java +++ b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/ESEventQueryDAO.java @@ -26,7 +26,7 @@ import org.apache.skywalking.library.elasticsearch.response.search.SearchHit; import org.apache.skywalking.library.elasticsearch.response.search.SearchResponse; import org.apache.skywalking.oap.server.core.analysis.Layer; -import org.apache.skywalking.oap.server.core.analysis.metrics.Event; +import org.apache.skywalking.oap.server.core.analysis.record.Event; import org.apache.skywalking.oap.server.core.query.PaginationUtils; import org.apache.skywalking.oap.server.core.query.enumeration.Order; import org.apache.skywalking.oap.server.core.query.input.Duration; @@ -79,7 +79,7 @@ private Events getEventsResultByCurrentBuilder(final SearchBuilder searchBuilder private void buildMustQueryListByCondition(final EventQueryCondition condition, final BoolQueryBuilder query) { if (IndexController.LogicIndicesRegister.isMergedTable(Event.INDEX_NAME)) { - query.must(Query.term(IndexController.LogicIndicesRegister.METRIC_TABLE_NAME, Event.INDEX_NAME)); + query.must(Query.term(IndexController.LogicIndicesRegister.RECORD_TABLE_NAME, Event.INDEX_NAME)); } if (!isNullOrEmpty(condition.getUuid())) { @@ -113,10 +113,10 @@ private void buildMustQueryListByCondition(final EventQueryCondition condition, final Duration startTime = condition.getTime(); if (startTime != null) { if (startTime.getStartTimestamp() > 0) { - query.must(Query.range(Event.START_TIME).gt(startTime.getStartTimestamp())); + query.must(Query.range(Event.TIMESTAMP).gt(startTime.getStartTimestamp())); } if (startTime.getEndTimestamp() > 0) { - query.must(Query.range(Event.END_TIME).lt(startTime.getEndTimestamp())); + query.must(Query.range(Event.TIMESTAMP).lt(startTime.getEndTimestamp())); } } @@ -139,7 +139,7 @@ protected SearchBuilder buildQuery(final List conditionList return Search.builder().query(query) .sort( - Event.START_TIME, + Event.TIMESTAMP, Order.DES.equals(queryOrder) ? Sort.Order.DESC : Sort.Order.ASC ) .from(page.getFrom()) @@ -157,7 +157,7 @@ protected SearchBuilder buildQuery(final EventQueryCondition condition) { return Search.builder() .query(query) .sort( - Event.START_TIME, + Event.TIMESTAMP, Order.DES.equals(queryOrder) ? Sort.Order.DESC : Sort.Order.ASC ) .from(page.getFrom()) @@ -186,7 +186,7 @@ protected org.apache.skywalking.oap.server.core.query.type.event.Event parseSear if (!endTimeStr.isEmpty() && !Objects.equals(endTimeStr, "0")) { event.setEndTime(Long.parseLong(endTimeStr)); } - + event.setTimestamp(Long.parseLong(searchHit.getSource().get(Event.TIMESTAMP).toString())); event.setLayer(Layer.valueOf(Integer.parseInt(searchHit.getSource().get(Event.LAYER).toString())).name()); return event; diff --git a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/common/dao/JDBCEventQueryDAO.java b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/common/dao/JDBCEventQueryDAO.java index 45c7dacd324e..6fe70c14e008 100644 --- a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/common/dao/JDBCEventQueryDAO.java +++ b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/common/dao/JDBCEventQueryDAO.java @@ -23,7 +23,7 @@ import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.apache.skywalking.oap.server.core.analysis.Layer; -import org.apache.skywalking.oap.server.core.analysis.metrics.Event; +import org.apache.skywalking.oap.server.core.analysis.record.Event; import org.apache.skywalking.oap.server.core.query.PaginationUtils; import org.apache.skywalking.oap.server.core.query.enumeration.Order; import org.apache.skywalking.oap.server.core.query.input.Duration; @@ -69,9 +69,9 @@ public Events queryEvents(final EventQueryCondition condition) throws Exception for (String table : tables) { String sql = "select * from " + table + whereClause; if (Order.DES.equals(queryOrder)) { - sql += " order by " + Event.START_TIME + " desc"; + sql += " order by " + Event.TIMESTAMP + " desc"; } else { - sql += " order by " + Event.START_TIME + " asc"; + sql += " order by " + Event.TIMESTAMP + " asc"; } sql += " limit " + (page.getLimit() + page.getFrom()); if (log.isDebugEnabled()) { @@ -87,8 +87,8 @@ public Events queryEvents(final EventQueryCondition condition) throws Exception } final var comparator = Order.DES.equals(queryOrder) ? - comparing(org.apache.skywalking.oap.server.core.query.type.event.Event::getStartTime).reversed() : - comparing(org.apache.skywalking.oap.server.core.query.type.event.Event::getStartTime); + comparing(org.apache.skywalking.oap.server.core.query.type.event.Event::getTimestamp).reversed() : + comparing(org.apache.skywalking.oap.server.core.query.type.event.Event::getTimestamp); return new Events( events .stream() @@ -123,9 +123,9 @@ public Events queryEvents(List conditions) throws Exception for (String table : tables) { String sql = "select * from " + table + whereClause; if (Order.DES.equals(queryOrder)) { - sql += " order by " + Event.START_TIME + " desc"; + sql += " order by " + Event.TIMESTAMP + " desc"; } else { - sql += " order by " + Event.START_TIME + " asc"; + sql += " order by " + Event.TIMESTAMP + " asc"; } sql += " limit " + (page.getLimit() + page.getFrom()); if (log.isDebugEnabled()) { @@ -141,8 +141,8 @@ public Events queryEvents(List conditions) throws Exception } final var comparator = Order.DES.equals(queryOrder) ? - comparing(org.apache.skywalking.oap.server.core.query.type.event.Event::getStartTime).reversed() : - comparing(org.apache.skywalking.oap.server.core.query.type.event.Event::getStartTime); + comparing(org.apache.skywalking.oap.server.core.query.type.event.Event::getTimestamp).reversed() : + comparing(org.apache.skywalking.oap.server.core.query.type.event.Event::getTimestamp); return new Events( events .stream() @@ -169,6 +169,7 @@ protected org.apache.skywalking.oap.server.core.query.type.event.Event parseResu event.setParameters(resultSet.getString(Event.PARAMETERS)); event.setStartTime(resultSet.getLong(Event.START_TIME)); event.setEndTime(resultSet.getLong(Event.END_TIME)); + event.setTimestamp(resultSet.getLong(Event.TIMESTAMP)); event.setLayer(Layer.valueOf(resultSet.getInt(Event.LAYER)).name()); return event; } @@ -214,11 +215,11 @@ protected Tuple2, Stream> buildQuery(final EventQueryCond final Duration time = condition.getTime(); if (time != null) { if (time.getStartTimestamp() > 0) { - conditions.add(Event.START_TIME + ">?"); + conditions.add(Event.TIMESTAMP + ">?"); parameters.add(time.getStartTimestamp()); } if (time.getEndTimestamp() > 0) { - conditions.add(Event.END_TIME + "