Skip to content

Commit ba6cc2e

Browse files
authored
fix: update Record for ordering key (#76)
1 parent 85a8c2d commit ba6cc2e

File tree

7 files changed

+38
-22
lines changed

7 files changed

+38
-22
lines changed

client/src/main/java/io/hstream/BufferedProducer.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33
import java.io.Closeable;
44

5-
/** The interface for the HStream BuffetedProducer. */
5+
/** The interface for the HStream BufferedProducer. */
66
public interface BufferedProducer extends Producer, Closeable {
77

88
/** explicitly flush buffered records. */

client/src/main/java/io/hstream/BufferedProducerBuilder.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,12 +5,24 @@ public interface BufferedProducerBuilder {
55

66
BufferedProducerBuilder stream(String streamName);
77

8+
/** @param recordCountLimit optional, default: 100, it MUST be greater than 0 */
89
BufferedProducerBuilder recordCountLimit(int recordCountLimit);
910

11+
/**
12+
* @param flushIntervalMs optional, default: 100(ms), disabled if flushIntervalMs {@literal <=} 0
13+
*/
1014
BufferedProducerBuilder flushIntervalMs(long flushIntervalMs);
1115

16+
/**
17+
* @param maxBytesSize optional, default: 4096(Bytes), disabled if maxBytesSize {@literal <=} 0
18+
*/
1219
BufferedProducerBuilder maxBytesSize(int maxBytesSize);
1320

21+
/**
22+
* @param throwExceptionIfFull optional, default: false, if throwExceptionIfFull is true, throw
23+
* HStreamDBClientException when buffer is full, otherwise, block thread and wait the buffer
24+
* to be flushed.
25+
*/
1426
BufferedProducerBuilder throwExceptionIfFull(boolean throwExceptionIfFull);
1527

1628
BufferedProducer build();

client/src/main/java/io/hstream/Producer.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ public interface Producer {
1111
* @param rawRecord raw format record
1212
* @return the {@link RecordId} wrapped in a {@link CompletableFuture}
1313
*/
14+
@Deprecated
1415
CompletableFuture<RecordId> write(byte[] rawRecord);
1516

1617
/**
@@ -19,6 +20,7 @@ public interface Producer {
1920
* @param hRecord {@link HRecord}
2021
* @return the {@link RecordId} wrapped in a {@link CompletableFuture}
2122
*/
23+
@Deprecated
2224
CompletableFuture<RecordId> write(HRecord hRecord);
2325

2426
/**

client/src/main/java/io/hstream/Record.java

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44

55
public class Record {
66

7-
private String key;
7+
private String orderingKey;
88
private byte[] rawRecord;
99
private HRecord hRecord;
1010
private boolean isRawRecord;
@@ -13,20 +13,20 @@ public static Builder newBuilder() {
1313
return new Builder();
1414
}
1515

16-
private Record(String key, byte[] rawRecord) {
16+
private Record(String orderingKey, byte[] rawRecord) {
1717
isRawRecord = true;
18-
this.key = key;
18+
this.orderingKey = orderingKey;
1919
this.rawRecord = rawRecord;
2020
}
2121

22-
private Record(String key, HRecord hRecord) {
22+
private Record(String orderingKey, HRecord hRecord) {
2323
isRawRecord = false;
24-
this.key = key;
24+
this.orderingKey = orderingKey;
2525
this.hRecord = hRecord;
2626
}
2727

28-
public String getKey() {
29-
return key;
28+
public String getOrderingKey() {
29+
return orderingKey;
3030
}
3131

3232
public byte[] getRawRecord() {
@@ -44,12 +44,12 @@ public boolean isRawRecord() {
4444
}
4545

4646
public static class Builder {
47-
private String key;
47+
private String orderingKey;
4848
private byte[] rawRecord;
4949
private HRecord hRecord;
5050

51-
public Builder key(String key) {
52-
this.key = key;
51+
public Builder orderingKey(String key) {
52+
this.orderingKey = key;
5353
return this;
5454
}
5555

@@ -67,9 +67,9 @@ public Record build() {
6767
checkArgument(
6868
(rawRecord != null && hRecord == null) || (rawRecord == null && hRecord != null));
6969
if (rawRecord != null) {
70-
return new Record(key, rawRecord);
70+
return new Record(orderingKey, rawRecord);
7171
} else {
72-
return new Record(key, hRecord);
72+
return new Record(orderingKey, hRecord);
7373
}
7474
}
7575
}

client/src/main/java/io/hstream/util/RecordUtils.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -53,11 +53,13 @@ public static HStreamRecord buildHStreamRecordFromRecord(Record record) {
5353
record.isRawRecord()
5454
? buildHStreamRecordFromRawRecord(record.getRawRecord())
5555
: buildHStreamRecordFromHRecord(record.getHRecord());
56-
if (record.getKey() == null) {
56+
if (record.getOrderingKey() == null) {
5757
return hStreamRecord;
5858
}
5959
HStreamRecordHeader newHeader =
60-
HStreamRecordHeader.newBuilder(hStreamRecord.getHeader()).setKey(record.getKey()).build();
60+
HStreamRecordHeader.newBuilder(hStreamRecord.getHeader())
61+
.setKey(record.getOrderingKey())
62+
.build();
6163
return HStreamRecord.newBuilder(hStreamRecord).setHeader(newHeader).build();
6264
}
6365

client/src/test/java/io/hstream/HStreamClientTest07.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ void writeTest() throws Exception {
4545
random.nextBytes(payload);
4646
Producer producer = client.newProducer().stream(streamName).build();
4747
CompletableFuture<RecordId> future =
48-
producer.write(Record.newBuilder().key("k1").rawRecord(payload).build());
48+
producer.write(Record.newBuilder().orderingKey("k1").rawRecord(payload).build());
4949
RecordId recordId = future.join();
5050
logger.info("write successfully, got recordId: " + recordId.toString());
5151
client.close();
@@ -72,7 +72,7 @@ void readTest() throws Exception {
7272
producer
7373
.write(
7474
Record.newBuilder()
75-
.key("k1")
75+
.orderingKey("k1")
7676
.rawRecord(("record-" + i).getBytes(StandardCharsets.UTF_8))
7777
.build())
7878
.join();
@@ -117,7 +117,7 @@ void readMultipleShardsTest() throws Exception {
117117
producer
118118
.write(
119119
Record.newBuilder()
120-
.key(orderingKey)
120+
.orderingKey(orderingKey)
121121
.rawRecord(("record-" + i).getBytes(StandardCharsets.UTF_8))
122122
.build())
123123
.join();
@@ -162,7 +162,7 @@ void consumerRebalenceTest() throws Exception {
162162
producer
163163
.write(
164164
Record.newBuilder()
165-
.key(orderingKey)
165+
.orderingKey(orderingKey)
166166
.rawRecord(("record-" + i).getBytes(StandardCharsets.UTF_8))
167167
.build())
168168
.join();
@@ -231,7 +231,7 @@ void consumerRebalenceTest() throws Exception {
231231
producer
232232
.write(
233233
Record.newBuilder()
234-
.key(orderingKey)
234+
.orderingKey(orderingKey)
235235
.rawRecord(("record-" + i).getBytes(StandardCharsets.UTF_8))
236236
.build())
237237
.join();

client/src/test/java/io/hstream/TestUtils.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ public static ArrayList<String> doProduce(
5757
var xs = new CompletableFuture[recordsNums];
5858
for (int i = 0; i < recordsNums; i++) {
5959
rand.nextBytes(rRec);
60-
Record recordToWrite = Record.newBuilder().key(key).rawRecord(rRec).build();
60+
Record recordToWrite = Record.newBuilder().orderingKey(key).rawRecord(rRec).build();
6161
records.add(Arrays.toString(rRec));
6262
xs[i] = producer.write(recordToWrite);
6363
}
@@ -68,7 +68,7 @@ public static ArrayList<String> doProduce(
6868
public static RecordId produceIntegerAndGatherRid(Producer producer, int data, String key) {
6969
Record recordToWrite =
7070
Record.newBuilder()
71-
.key(key)
71+
.orderingKey(key)
7272
.rawRecord(Integer.toString(data).getBytes(StandardCharsets.UTF_8))
7373
.build();
7474
return producer.write(recordToWrite).join();

0 commit comments

Comments
 (0)