Skip to content

Commit d7d5812

Browse files
authored
Fix the query time range in the metadata API. (#13332)
1 parent ff8cdcb commit d7d5812

File tree

4 files changed

+49
-16
lines changed
  • docs/en/changes
  • oap-server/server-storage-plugin
    • storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure
    • storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query
    • storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/common/dao

4 files changed

+49
-16
lines changed

docs/en/changes/changes.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
* refactor: implement OTEL handler with SPI for extensibility.
3636
* chore: add `toString` implementation for `StorageID`.
3737
* chore: add a warning log when connecting to ES takes too long.
38+
* Fix the query time range in the metadata API.
3839

3940
#### UI
4041

oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBMetadataQueryDAO.java

Lines changed: 32 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,8 @@ protected void apply(MeasureQuery query) {
114114
public List<ServiceInstance> listInstances(Duration duration, String serviceId) throws IOException {
115115
TimestampRange timestampRange = null;
116116
if (duration != null) {
117+
// The data time should <= endTimeBucket.
118+
// It's equals to the condition `query.and(lte(InstanceTraffic.TIME_BUCKET, endTimeBucket))`
117119
timestampRange = new TimestampRange(0, duration.getEndTimestamp());
118120
}
119121
MetadataRegistry.Schema schema = MetadataRegistry.INSTANCE.findMetadata(InstanceTraffic.INDEX_NAME, DownSampling.Minute);
@@ -127,8 +129,8 @@ protected void apply(MeasureQuery query) {
127129
if (StringUtil.isNotEmpty(serviceId)) {
128130
query.and(eq(InstanceTraffic.SERVICE_ID, serviceId));
129131
}
130-
final var minuteTimeBucket = TimeBucket.getMinuteTimeBucket(duration.getStartTimestamp());
131-
query.and(gte(InstanceTraffic.LAST_PING_TIME_BUCKET, minuteTimeBucket));
132+
final var startTimeBucket = TimeBucket.getMinuteTimeBucket(duration.getStartTimestamp());
133+
query.and(gte(InstanceTraffic.LAST_PING_TIME_BUCKET, startTimeBucket));
132134
query.limit(limit);
133135
}
134136
});
@@ -184,9 +186,16 @@ protected void apply(MeasureQuery query) {
184186
@Override
185187
public List<Endpoint> findEndpoint(String keyword, String serviceId, int limit, Duration duration) throws IOException {
186188
MetadataRegistry.Schema schema = MetadataRegistry.INSTANCE.findMetadata(EndpointTraffic.INDEX_NAME, DownSampling.Minute);
189+
TimestampRange timestampRange = null;
190+
if (duration != null) {
191+
// The data time should <= endTimeBucket.
192+
// It's equals to the condition `query.and(lte(EndpointTraffic.TIME_BUCKET, endTimeBucket))`
193+
timestampRange = new TimestampRange(0, duration.getEndTimestamp());
194+
}
187195
MeasureQueryResponse resp = query(false, schema,
188196
ENDPOINT_TRAFFIC_TAGS,
189197
Collections.emptySet(),
198+
timestampRange,
190199
new QueryBuilder<MeasureQuery>() {
191200
@Override
192201
protected void apply(MeasureQuery query) {
@@ -201,9 +210,7 @@ protected void apply(MeasureQuery query) {
201210
}
202211
if (duration != null) {
203212
final var startTimeBucket = TimeBucket.getMinuteTimeBucket(duration.getStartTimestamp());
204-
final var endTimeBucket = TimeBucket.getMinuteTimeBucket(duration.getEndTimestamp());
205213
query.and(gte(EndpointTraffic.LAST_PING_TIME_BUCKET, startTimeBucket));
206-
query.and(lte(EndpointTraffic.LAST_PING_TIME_BUCKET, endTimeBucket));
207214
}
208215
query.setOrderBy(new AbstractQuery.OrderBy(AbstractQuery.Sort.DESC));
209216
query.limit(limit);
@@ -220,19 +227,23 @@ protected void apply(MeasureQuery query) {
220227
@Override
221228
public List<Process> listProcesses(String serviceId, ProfilingSupportStatus supportStatus, long lastPingStartTimeBucket, long lastPingEndTimeBucket) throws IOException {
222229
MetadataRegistry.Schema schema = MetadataRegistry.INSTANCE.findMetadata(ProcessTraffic.INDEX_NAME, DownSampling.Minute);
230+
TimestampRange timestampRange = null;
231+
if (lastPingEndTimeBucket > 0) {
232+
// The data time should <= endTimeBucket.
233+
// It's equals to the condition `query.and(lte(ProcessTraffic.TIME_BUCKET, endTimeBucket))`
234+
timestampRange = new TimestampRange(0, TimeBucket.getTimestamp(lastPingEndTimeBucket));
235+
}
223236
MeasureQueryResponse resp = query(false, schema,
224237
PROCESS_TRAFFIC_TAGS,
225238
Collections.emptySet(),
239+
timestampRange,
226240
new QueryBuilder<MeasureQuery>() {
227241
@Override
228242
protected void apply(MeasureQuery query) {
229243
query.and(eq(ProcessTraffic.SERVICE_ID, serviceId));
230244
if (lastPingStartTimeBucket > 0) {
231245
query.and(gte(ProcessTraffic.LAST_PING_TIME_BUCKET, lastPingStartTimeBucket));
232246
}
233-
if (lastPingEndTimeBucket > 0) {
234-
query.and(lte(ProcessTraffic.LAST_PING_TIME_BUCKET, lastPingEndTimeBucket));
235-
}
236247
if (supportStatus != null) {
237248
query.and(eq(ProcessTraffic.PROFILING_SUPPORT_STATUS, supportStatus.value()));
238249
}
@@ -252,10 +263,17 @@ protected void apply(MeasureQuery query) {
252263
@Override
253264
public List<Process> listProcesses(String serviceInstanceId, Duration duration, boolean includeVirtual) throws IOException {
254265
MetadataRegistry.Schema schema = MetadataRegistry.INSTANCE.findMetadata(ProcessTraffic.INDEX_NAME, DownSampling.Minute);
266+
TimestampRange timestampRange = null;
267+
if (duration != null) {
268+
// The data time should <= endTimeBucket.
269+
// It's equals to the condition `query.and(lte(ProcessTraffic.TIME_BUCKET, endTimeBucket))`
270+
timestampRange = new TimestampRange(0, duration.getEndTimestamp());
271+
}
255272
long lastPingStartTimeBucket = duration.getStartTimeBucket();
256273
MeasureQueryResponse resp = query(false, schema,
257274
PROCESS_TRAFFIC_TAGS,
258275
Collections.emptySet(),
276+
timestampRange,
259277
new QueryBuilder<MeasureQuery>() {
260278
@Override
261279
protected void apply(MeasureQuery query) {
@@ -279,15 +297,21 @@ protected void apply(MeasureQuery query) {
279297
@Override
280298
public List<Process> listProcesses(String agentId, long startPingTimeBucket, long endPingTimeBucket) throws IOException {
281299
MetadataRegistry.Schema schema = MetadataRegistry.INSTANCE.findMetadata(ProcessTraffic.INDEX_NAME, DownSampling.Minute);
300+
TimestampRange timestampRange = null;
301+
if (endPingTimeBucket > 0) {
302+
// The data time should <= endTimeBucket.
303+
// It's equals to the condition `query.and(lte(ProcessTraffic.TIME_BUCKET, endTimeBucket))`
304+
timestampRange = new TimestampRange(0, TimeBucket.getTimestamp(endPingTimeBucket));
305+
}
282306
MeasureQueryResponse resp = query(false, schema,
283307
PROCESS_TRAFFIC_TAGS,
284308
Collections.emptySet(),
309+
timestampRange,
285310
new QueryBuilder<MeasureQuery>() {
286311
@Override
287312
protected void apply(MeasureQuery query) {
288313
query.and(eq(ProcessTraffic.AGENT_ID, agentId));
289314
query.and(gte(ProcessTraffic.LAST_PING_TIME_BUCKET, startPingTimeBucket));
290-
query.and(lte(ProcessTraffic.LAST_PING_TIME_BUCKET, endPingTimeBucket));
291315
query.and(ne(ProcessTraffic.DETECT_TYPE, ProcessDetectType.VIRTUAL.value()));
292316
query.limit(limit);
293317
}

oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/MetadataQueryEsDAO.java

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,6 @@
3131
import org.apache.commons.lang3.StringUtils;
3232
import org.apache.skywalking.library.elasticsearch.requests.search.BoolQueryBuilder;
3333
import org.apache.skywalking.library.elasticsearch.requests.search.Query;
34-
import org.apache.skywalking.library.elasticsearch.requests.search.RangeQueryBuilder;
3534
import org.apache.skywalking.library.elasticsearch.requests.search.Search;
3635
import org.apache.skywalking.library.elasticsearch.requests.search.SearchBuilder;
3736
import org.apache.skywalking.library.elasticsearch.requests.search.Sort;
@@ -161,7 +160,7 @@ public List<ServiceInstance> listInstances(@Nullable Duration duration,
161160
final long startMinuteTimeBucket = TimeBucket.getMinuteTimeBucket(duration.getStartTimestamp());
162161
final long endMinuteTimeBucket = TimeBucket.getMinuteTimeBucket(duration.getEndTimestamp());
163162
query.must(Query.range(InstanceTraffic.LAST_PING_TIME_BUCKET).gte(startMinuteTimeBucket))
164-
.must(Query.range(InstanceTraffic.TIME_BUCKET).lt(endMinuteTimeBucket));
163+
.must(Query.range(InstanceTraffic.TIME_BUCKET).lte(endMinuteTimeBucket));
165164
}
166165
if (IndexController.LogicIndicesRegister.isMergedTable(InstanceTraffic.INDEX_NAME)) {
167166
query.must(Query.term(IndexController.LogicIndicesRegister.METRIC_TABLE_NAME, InstanceTraffic.INDEX_NAME));
@@ -237,7 +236,8 @@ public List<Endpoint> findEndpoint(String keyword, String serviceId, int limit,
237236
if (duration != null) {
238237
final long startMinuteTimeBucket = TimeBucket.getMinuteTimeBucket(duration.getStartTimestamp());
239238
final long endMinuteTimeBucket = TimeBucket.getMinuteTimeBucket(duration.getEndTimestamp());
240-
query.must(Query.range(EndpointTraffic.LAST_PING_TIME_BUCKET).gte(startMinuteTimeBucket).lte(endMinuteTimeBucket));
239+
query.must(Query.range(EndpointTraffic.LAST_PING_TIME_BUCKET).gte(startMinuteTimeBucket))
240+
.must(Query.range(EndpointTraffic.TIME_BUCKET).lte(endMinuteTimeBucket));
241241
}
242242

243243
final var search = Search.builder().query(query).size(limit).sort(
@@ -389,9 +389,10 @@ private void appendProcessWhereQuery(BoolQueryBuilder query, String serviceId, S
389389
query.must(Query.term(ProcessTraffic.PROFILING_SUPPORT_STATUS, profilingSupportStatus.value()));
390390
}
391391
if (lastPingStartTimeBucket > 0) {
392-
final RangeQueryBuilder rangeQuery = Query.range(ProcessTraffic.LAST_PING_TIME_BUCKET);
393-
rangeQuery.gte(lastPingStartTimeBucket);
394-
query.must(rangeQuery);
392+
query.must(Query.range(ProcessTraffic.LAST_PING_TIME_BUCKET).gte(lastPingStartTimeBucket));
393+
}
394+
if (lastPingEndTimeBucket > 0) {
395+
query.must(Query.range(ProcessTraffic.TIME_BUCKET).lte(lastPingEndTimeBucket));
395396
}
396397
if (!includeVirtual) {
397398
query.mustNot(Query.term(ProcessTraffic.DETECT_TYPE, ProcessDetectType.VIRTUAL.value()));

oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/common/dao/JDBCMetadataQueryDAO.java

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -139,7 +139,7 @@ protected SQLAndParameters buildSQLForListInstances(String serviceId, Duration d
139139
final var endMinuteTimeBucket = TimeBucket.getMinuteTimeBucket(duration.getEndTimestamp());
140140
sql.append(" and ").append(InstanceTraffic.LAST_PING_TIME_BUCKET).append(" >= ?");
141141
parameters.add(startMinuteTimeBucket);
142-
sql.append(" and ").append(InstanceTraffic.TIME_BUCKET).append(" < ?");
142+
sql.append(" and ").append(InstanceTraffic.TIME_BUCKET).append(" <= ?");
143143
parameters.add(endMinuteTimeBucket);
144144
}
145145
sql.append(" and ").append(InstanceTraffic.SERVICE_ID).append("=?");
@@ -228,7 +228,7 @@ public List<Endpoint> findEndpoint(String keyword, String serviceId, int limit,
228228
final var endMinuteTimeBucket = TimeBucket.getMinuteTimeBucket(duration.getEndTimestamp());
229229
sql.append(" and ").append(EndpointTraffic.LAST_PING_TIME_BUCKET).append(" >= ?");
230230
condition.add(startMinuteTimeBucket);
231-
sql.append(" and ").append(EndpointTraffic.LAST_PING_TIME_BUCKET).append(" <= ?");
231+
sql.append(" and ").append(EndpointTraffic.TIME_BUCKET).append(" <= ?");
232232
condition.add(endMinuteTimeBucket);
233233
}
234234
sql.append(" order by ").append(EndpointTraffic.TIME_BUCKET).append(" desc");
@@ -493,6 +493,13 @@ private void appendProcessWhereQuery(StringBuilder sql, List<Object> condition,
493493
sql.append(ProcessTraffic.LAST_PING_TIME_BUCKET).append(">=?");
494494
condition.add(lastPingStartTimeBucket);
495495
}
496+
if (lastPingEndTimeBucket > 0) {
497+
if (!condition.isEmpty()) {
498+
sql.append(" and ");
499+
}
500+
sql.append(ProcessTraffic.TIME_BUCKET).append("<=?");
501+
condition.add(lastPingEndTimeBucket);
502+
}
496503
if (!includeVirtual) {
497504
if (!condition.isEmpty()) {
498505
sql.append(" and ");

0 commit comments

Comments
 (0)