Skip to content

Commit 93793c1

Browse files
authored
feat: add createdTime(publishTime) into ReceivedRecord (#159)
1 parent 421b610 commit 93793c1

File tree

2 files changed

+13
-2
lines changed

2 files changed

+13
-2
lines changed

client/src/main/java/io/hstream/ReceivedRecord.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,16 @@
11
package io.hstream;
22

3+
import java.time.Instant;
4+
35
public class ReceivedRecord {
46
String recordId;
57
Record record;
8+
Instant createdTime;
69

7-
public ReceivedRecord(String recordId, Record record) {
10+
public ReceivedRecord(String recordId, Record record, Instant createdTime) {
811
this.recordId = recordId;
912
this.record = record;
13+
this.createdTime = createdTime;
1014
}
1115

1216
public String getRecordId() {
@@ -24,4 +28,8 @@ public Record getRecord() {
2428
public void setRecord(Record record) {
2529
this.record = record;
2630
}
31+
32+
public Instant getCreatedTime() {
33+
return createdTime;
34+
}
2735
}

client/src/main/kotlin/io/hstream/impl/ReaderKtImpl.kt

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ import kotlinx.coroutines.Dispatchers
1616
import kotlinx.coroutines.launch
1717
import kotlinx.coroutines.runBlocking
1818
import org.slf4j.LoggerFactory
19+
import java.time.Instant
1920
import java.util.concurrent.CompletableFuture
2021

2122
class ReaderKtImpl(
@@ -55,14 +56,16 @@ class ReaderKtImpl(
5556
RecordUtils.decompress(it).map { receivedHStreamRecord ->
5657
val hStreamRecord = receivedHStreamRecord.record
5758
val header = RecordUtils.parseRecordHeaderFromHStreamRecord(hStreamRecord)
59+
val publishTime = it.record.publishTime
60+
val createdTime = Instant.ofEpochSecond(publishTime.seconds, publishTime.nanos.toLong())
5861
val record = if (RecordUtils.isRawRecord(hStreamRecord)) {
5962
val rawRecord = RecordUtils.parseRawRecordFromHStreamRecord(hStreamRecord)
6063
Record.newBuilder().rawRecord(rawRecord).partitionKey(header.partitionKey).build()
6164
} else {
6265
val hRecord = RecordUtils.parseHRecordFromHStreamRecord(hStreamRecord)
6366
Record.newBuilder().hRecord(hRecord).partitionKey(header.partitionKey).build()
6467
}
65-
ReceivedRecord(GrpcUtils.recordIdFromGrpc(receivedHStreamRecord.recordId), record)
68+
ReceivedRecord(GrpcUtils.recordIdFromGrpc(receivedHStreamRecord.recordId), record, createdTime)
6669
}
6770
}
6871
readFuture.complete(res as MutableList<ReceivedRecord>?)

0 commit comments

Comments
 (0)