Skip to content

Commit c75d56f

Browse files
authored
Optimize APIs, remove unnecessary ts parameter of trace model (#95)
1 parent 1ec27be commit c75d56f

File tree

7 files changed

+46
-76
lines changed

7 files changed

+46
-76
lines changed

src/main/java/org/apache/skywalking/banyandb/v1/client/AbstractWrite.java

Lines changed: 18 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,6 @@
1818

1919
package org.apache.skywalking.banyandb.v1.client;
2020

21-
import com.google.protobuf.Timestamp;
22-
2321
import java.util.Map;
2422
import java.util.Optional;
2523

@@ -33,30 +31,35 @@
3331

3432
public abstract class AbstractWrite<P extends com.google.protobuf.GeneratedMessageV3> {
3533
/**
36-
* Timestamp represents the time of current stream
37-
* in the timeunit of milliseconds.
34+
* Timestamp represents the time of the current data point, in milliseconds.
35+
* <p>
36+
* <b>When to set:</b>
37+
* <ul>
38+
* <li><b>Stream and Measure writes:</b> This field <i>must</i> be set to indicate the event time.</li>
39+
* <li><b>Trace writes:</b> This field is <i>not needed</i> and should be left unset; trace data does not require an explicit timestamp here.</li>
40+
* </ul>
3841
*/
3942
@Getter
40-
protected long timestamp;
43+
protected Optional<Long> timestamp;
4144

4245
protected final Object[] tags;
4346

4447
protected final MetadataCache.EntityMetadata entityMetadata;
4548

4649
public AbstractWrite(MetadataCache.EntityMetadata entityMetadata, long timestamp) {
47-
if (entityMetadata == null) {
48-
throw new IllegalArgumentException("metadata not found");
49-
}
50-
this.entityMetadata = entityMetadata;
51-
this.timestamp = timestamp;
52-
this.tags = new Object[this.entityMetadata.getTotalTags()];
50+
this(entityMetadata);
51+
this.timestamp = Optional.of(timestamp);
5352
}
5453

5554
/**
5655
* Build a write request without initial timestamp.
5756
*/
5857
AbstractWrite(MetadataCache.EntityMetadata entityMetadata) {
59-
this(entityMetadata, 0);
58+
if (entityMetadata == null) {
59+
throw new IllegalArgumentException("metadata not found");
60+
}
61+
this.entityMetadata = entityMetadata;
62+
this.tags = new Object[this.entityMetadata.getTotalTags()];
6063
}
6164

6265
public AbstractWrite<P> tag(String tagName, Serializable<BanyandbModel.TagValue> tagValue) throws BanyanDBException {
@@ -69,19 +72,13 @@ public AbstractWrite<P> tag(String tagName, Serializable<BanyandbModel.TagValue>
6972
}
7073

7174
P build() {
72-
if (timestamp <= 0) {
73-
throw new IllegalArgumentException("timestamp is invalid.");
74-
}
75-
7675
BanyandbCommon.Metadata metadata = BanyandbCommon.Metadata.newBuilder()
7776
.setGroup(entityMetadata.getGroup()).setName(entityMetadata.getName()).setModRevision(entityMetadata.getModRevision()).build();
78-
Timestamp ts = Timestamp.newBuilder()
79-
.setSeconds(timestamp / 1000)
80-
.setNanos((int) (timestamp % 1000 * 1_000_000)).build();
81-
return build(metadata, ts);
77+
78+
return build(metadata);
8279
}
8380

84-
protected abstract P build(BanyandbCommon.Metadata metadata, Timestamp ts);
81+
protected abstract P build(BanyandbCommon.Metadata metadata);
8582

8683
@Override
8784
public String toString() {

src/main/java/org/apache/skywalking/banyandb/v1/client/BanyanDBClient.java

Lines changed: 0 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -399,21 +399,6 @@ public StreamWrite createStreamWrite(String group, String name, final String ele
399399
return new StreamWrite(this.metadataCache.findStreamMetadata(group, name), elementId);
400400
}
401401

402-
/**
403-
* Build a StreamWrite request.
404-
*
405-
* @param group the group of the stream
406-
* @param name the name of the stream
407-
* @param elementId the primary key of the stream
408-
* @param timestamp the timestamp of the stream
409-
* @return the request to be built
410-
*/
411-
public StreamWrite createStreamWrite(String group, String name, final String elementId, long timestamp) throws BanyanDBException {
412-
Preconditions.checkArgument(!Strings.isNullOrEmpty(group));
413-
Preconditions.checkArgument(!Strings.isNullOrEmpty(name));
414-
return new StreamWrite(this.metadataCache.findStreamMetadata(group, name), elementId, timestamp);
415-
}
416-
417402
/**
418403
* Build a trace bulk write processor.
419404
*
@@ -430,20 +415,6 @@ public TraceBulkWriteProcessor buildTraceWriteProcessor(int maxBulkSize, int flu
430415
return new TraceBulkWriteProcessor(this, maxBulkSize, flushInterval, concurrency, timeout, WRITE_HISTOGRAM, options);
431416
}
432417

433-
/**
434-
* Build a TraceWrite request.
435-
*
436-
* @param group the group of the trace
437-
* @param name the name of the trace
438-
* @param timestamp the timestamp of the trace
439-
* @return the request to be built
440-
*/
441-
public TraceWrite createTraceWrite(String group, String name, long timestamp) throws BanyanDBException {
442-
Preconditions.checkArgument(!Strings.isNullOrEmpty(group));
443-
Preconditions.checkArgument(!Strings.isNullOrEmpty(name));
444-
return new TraceWrite(this.metadataCache.findTraceMetadata(group, name), timestamp);
445-
}
446-
447418
/**
448419
* Build a TraceWrite request without initial timestamp.
449420
*

src/main/java/org/apache/skywalking/banyandb/v1/client/MeasureWrite.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,14 @@ public MeasureWrite tag(String tagName, Serializable<BanyandbModel.TagValue> tag
5858
* @return {@link BanyandbMeasure.WriteRequest} for the bulk process.
5959
*/
6060
@Override
61-
protected BanyandbMeasure.WriteRequest build(BanyandbCommon.Metadata metadata, Timestamp ts) {
61+
protected BanyandbMeasure.WriteRequest build(BanyandbCommon.Metadata metadata) {
62+
if (!timestamp.isPresent() || timestamp.get() <= 0) {
63+
throw new IllegalArgumentException("Timestamp is required and must be greater than 0 for stream writes.");
64+
}
65+
Timestamp ts = Timestamp.newBuilder()
66+
.setSeconds(timestamp.get() / 1000)
67+
.setNanos((int) (timestamp.get() % 1000 * 1_000_000)).build();
68+
6269
final BanyandbMeasure.WriteRequest.Builder builder = BanyandbMeasure.WriteRequest.newBuilder();
6370
builder.setMetadata(metadata);
6471
final BanyandbMeasure.DataPointValue.Builder datapointValueBuilder = BanyandbMeasure.DataPointValue.newBuilder();

src/main/java/org/apache/skywalking/banyandb/v1/client/StreamWrite.java

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@
2121
import com.google.protobuf.Timestamp;
2222
import java.util.Deque;
2323
import java.util.LinkedList;
24+
import java.util.Optional;
25+
2426
import lombok.Getter;
2527
import org.apache.skywalking.banyandb.common.v1.BanyandbCommon;
2628
import org.apache.skywalking.banyandb.model.v1.BanyandbModel;
@@ -40,11 +42,6 @@ public class StreamWrite extends AbstractWrite<BanyandbStream.WriteRequest> {
4042
@Getter
4143
private final String elementId;
4244

43-
StreamWrite(MetadataCache.EntityMetadata entityMetadata, final String elementId, long timestamp) {
44-
super(entityMetadata, timestamp);
45-
this.elementId = elementId;
46-
}
47-
4845
/**
4946
* Create a StreamWrite without initial timestamp.
5047
*/
@@ -59,7 +56,7 @@ public StreamWrite tag(String tagName, Serializable<BanyandbModel.TagValue> tagV
5956
}
6057

6158
public void setTimestamp(long timestamp) {
62-
super.timestamp = timestamp;
59+
super.timestamp = Optional.of(timestamp);
6360
}
6461

6562
/**
@@ -68,7 +65,15 @@ public void setTimestamp(long timestamp) {
6865
* @return {@link BanyandbStream.WriteRequest} for the bulk process.
6966
*/
7067
@Override
71-
protected BanyandbStream.WriteRequest build(BanyandbCommon.Metadata metadata, Timestamp ts) {
68+
protected BanyandbStream.WriteRequest build(BanyandbCommon.Metadata metadata) {
69+
if (!timestamp.isPresent() || timestamp.get() <= 0) {
70+
throw new IllegalArgumentException("Timestamp is required and must be greater than 0 for stream writes.");
71+
}
72+
73+
Timestamp ts = Timestamp.newBuilder()
74+
.setSeconds(timestamp.get() / 1000)
75+
.setNanos((int) (timestamp.get() % 1000 * 1_000_000)).build();
76+
7277
final BanyandbStream.WriteRequest.Builder builder = BanyandbStream.WriteRequest.newBuilder();
7378
builder.setMetadata(metadata);
7479
final BanyandbStream.ElementValue.Builder elemValBuilder = BanyandbStream.ElementValue.newBuilder();

src/main/java/org/apache/skywalking/banyandb/v1/client/TraceWrite.java

Lines changed: 2 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919
package org.apache.skywalking.banyandb.v1.client;
2020

2121
import com.google.protobuf.ByteString;
22-
import com.google.protobuf.Timestamp;
2322

2423
import java.util.ArrayList;
2524
import java.util.List;
@@ -34,7 +33,7 @@
3433

3534
/**
3635
* TraceWrite represents a write operation, including necessary fields, for {@link
37-
* BanyanDBClient#buildTraceBulkWriteProcessor}.
36+
* BanyanDBClient#buildTraceWriteProcessor(int, int, int, int)}.
3837
*/
3938
public class TraceWrite extends AbstractWrite<BanyandbTrace.WriteRequest> {
4039
/**
@@ -49,12 +48,6 @@ public class TraceWrite extends AbstractWrite<BanyandbTrace.WriteRequest> {
4948
@Getter
5049
private long version;
5150

52-
TraceWrite(MetadataCache.EntityMetadata entityMetadata, long timestamp) {
53-
super(entityMetadata, timestamp);
54-
this.span = ByteString.EMPTY;
55-
this.version = 1L;
56-
}
57-
5851
/**
5952
* Create a TraceWrite without initial timestamp.
6053
*/
@@ -99,17 +92,13 @@ public TraceWrite version(long version) {
9992
return this;
10093
}
10194

102-
public void setTimestamp(long timestamp) {
103-
super.timestamp = timestamp;
104-
}
105-
10695
/**
10796
* Build a write request
10897
*
10998
* @return {@link BanyandbTrace.WriteRequest} for the bulk process.
11099
*/
111100
@Override
112-
protected BanyandbTrace.WriteRequest build(BanyandbCommon.Metadata metadata, Timestamp ts) {
101+
protected BanyandbTrace.WriteRequest build(BanyandbCommon.Metadata metadata) {
113102
final BanyandbTrace.WriteRequest.Builder builder = BanyandbTrace.WriteRequest.newBuilder();
114103
builder.setMetadata(metadata);
115104

src/test/java/org/apache/skywalking/banyandb/v1/client/ITBanyanDBStreamQueryTests.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,7 @@ public void testStreamQuery_TraceID() throws BanyanDBException, ExecutionExcepti
9494
String dbType = "SQL";
9595
String dbInstance = "127.0.0.1:3306";
9696

97-
StreamWrite streamWrite = client.createStreamWrite("sw_record", "trace", segmentId, now.toEpochMilli())
97+
StreamWrite streamWrite = client.createStreamWrite("sw_record", "trace", segmentId)
9898
.tag("data_binary", Value.binaryTagValue(byteData))
9999
.tag("trace_id", Value.stringTagValue(traceId)) // 0
100100
.tag("state", Value.longTagValue(state)) // 1
@@ -109,6 +109,7 @@ public void testStreamQuery_TraceID() throws BanyanDBException, ExecutionExcepti
109109
.tag("mq.broker", Value.stringTagValue(broker)) // 10
110110
.tag("mq.topic", Value.stringTagValue(topic)) // 11
111111
.tag("mq.queue", Value.stringTagValue(queue)); // 12
112+
streamWrite.setTimestamp(now.toEpochMilli());
112113

113114
CompletableFuture<Void> f = processor.add(streamWrite);
114115
f.exceptionally(exp -> {

src/test/java/org/apache/skywalking/banyandb/v1/client/ITTraceTest.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -129,7 +129,7 @@ public void testTraceQueryByTraceId() throws BanyanDBException, ExecutionExcepti
129129
byte[] spanData = "query-test-span-data".getBytes();
130130

131131
// Create and write trace data
132-
TraceWrite traceWrite = client.createTraceWrite(groupName, traceName, now.toEpochMilli())
132+
TraceWrite traceWrite = client.createTraceWrite(groupName, traceName)
133133
.tag("trace_id", Value.stringTagValue(traceId))
134134
.tag("span_id", Value.stringTagValue(spanId))
135135
.tag("service_name", Value.stringTagValue(serviceName))
@@ -183,23 +183,23 @@ public void testTraceQueryOrderByStartTime() throws BanyanDBException, Execution
183183
Instant baseTime = Instant.now().minusSeconds(60); // Start 1 minute ago
184184

185185
// Create 3 traces with different timestamps (1 minute apart)
186-
TraceWrite trace1 = client.createTraceWrite(groupName, traceName, baseTime.toEpochMilli())
186+
TraceWrite trace1 = client.createTraceWrite(groupName, traceName)
187187
.tag("trace_id", Value.stringTagValue(traceId + "1"))
188188
.tag("span_id", Value.stringTagValue("span-1"))
189189
.tag("service_name", Value.stringTagValue(serviceName))
190190
.tag("start_time", Value.timestampTagValue(baseTime.toEpochMilli()))
191191
.span("span-data-1".getBytes())
192192
.version(1L);
193193

194-
TraceWrite trace2 = client.createTraceWrite(groupName, traceName, baseTime.plusSeconds(60).toEpochMilli())
194+
TraceWrite trace2 = client.createTraceWrite(groupName, traceName)
195195
.tag("trace_id", Value.stringTagValue(traceId + "2"))
196196
.tag("span_id", Value.stringTagValue("span-2"))
197197
.tag("service_name", Value.stringTagValue(serviceName))
198198
.tag("start_time", Value.timestampTagValue(baseTime.plusSeconds(60).toEpochMilli()))
199199
.span("span-data-2".getBytes())
200200
.version(1L);
201201

202-
TraceWrite trace3 = client.createTraceWrite(groupName, traceName, baseTime.plusSeconds(120).toEpochMilli())
202+
TraceWrite trace3 = client.createTraceWrite(groupName, traceName)
203203
.tag("trace_id", Value.stringTagValue(traceId + "3"))
204204
.tag("span_id", Value.stringTagValue("span-3"))
205205
.tag("service_name", Value.stringTagValue(serviceName))

0 commit comments

Comments
 (0)