Skip to content

Commit ed9dbfc

Browse files
authored
encapsulate classes generated by grpc (#21)
1 parent 0fdaf3d commit ed9dbfc

16 files changed

+284
-31
lines changed

src/main/java/io/hstream/ConsumerBuilder.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package io.hstream;
22

33
import io.hstream.impl.ConsumerImpl;
4+
import io.hstream.internal.HStreamApiGrpc;
45

56
/** used to construct a consumer */
67
public class ConsumerBuilder {

src/main/java/io/hstream/HStreamClient.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package io.hstream;
22

3+
import io.hstream.internal.Stream;
34
import java.util.List;
45

56
/** HstreamClient implement the hstream client, user can use it to interact with server */

src/main/java/io/hstream/ProducerBuilder.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package io.hstream;
22

33
import io.hstream.impl.ProducerImpl;
4+
import io.hstream.internal.HStreamApiGrpc;
45

56
/** used to construct a producer */
67
public class ProducerBuilder {

src/main/java/io/hstream/QueryerBuilder.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package io.hstream;
22

33
import io.hstream.impl.QueryerImpl;
4+
import io.hstream.internal.HStreamApiGrpc;
45

56
/** Builder used to configure and construct a {@link Queryer} instance. */
67
public class QueryerBuilder {
Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
package io.hstream;
2+
3+
import java.util.Objects;
4+
5+
public class RecordId {
6+
7+
private long batchId;
8+
private int batchIndex;
9+
10+
public RecordId(long batchId, int batchIndex) {
11+
this.batchId = batchId;
12+
this.batchIndex = batchIndex;
13+
}
14+
15+
public long getBatchId() {
16+
return batchId;
17+
}
18+
19+
public int getBatchIndex() {
20+
return batchIndex;
21+
}
22+
23+
public void setBatchId(long batchId) {
24+
this.batchId = batchId;
25+
}
26+
27+
public void setBatchIndex(int batchIndex) {
28+
this.batchIndex = batchIndex;
29+
}
30+
31+
@Override
32+
public boolean equals(Object o) {
33+
if (this == o) {
34+
return true;
35+
}
36+
if (o == null || getClass() != o.getClass()) {
37+
return false;
38+
}
39+
RecordId recordId = (RecordId) o;
40+
return batchId == recordId.batchId && batchIndex == recordId.batchIndex;
41+
}
42+
43+
@Override
44+
public int hashCode() {
45+
return Objects.hash(batchId, batchIndex);
46+
}
47+
}
Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
package io.hstream;
2+
3+
import java.util.Objects;
4+
5+
public class Subscription {
6+
7+
private String subscriptionId;
8+
private String streamName;
9+
private SubscriptionOffset subscriptionOffset;
10+
11+
public Subscription(
12+
String subscriptionId, String streamName, SubscriptionOffset subscriptionOffset) {
13+
this.subscriptionId = subscriptionId;
14+
this.streamName = streamName;
15+
this.subscriptionOffset = subscriptionOffset;
16+
}
17+
18+
public String getSubscriptionId() {
19+
return subscriptionId;
20+
}
21+
22+
public String getStreamName() {
23+
return streamName;
24+
}
25+
26+
public SubscriptionOffset getSubscriptionOffset() {
27+
return subscriptionOffset;
28+
}
29+
30+
public void setSubscriptionId(String subscriptionId) {
31+
this.subscriptionId = subscriptionId;
32+
}
33+
34+
public void setStreamName(String streamName) {
35+
this.streamName = streamName;
36+
}
37+
38+
public void setSubscriptionOffset(SubscriptionOffset subscriptionOffset) {
39+
this.subscriptionOffset = subscriptionOffset;
40+
}
41+
42+
@Override
43+
public boolean equals(Object o) {
44+
if (this == o) {
45+
return true;
46+
}
47+
if (o == null || getClass() != o.getClass()) {
48+
return false;
49+
}
50+
Subscription that = (Subscription) o;
51+
return Objects.equals(subscriptionId, that.subscriptionId)
52+
&& Objects.equals(streamName, that.streamName)
53+
&& Objects.equals(subscriptionOffset, that.subscriptionOffset);
54+
}
55+
56+
@Override
57+
public int hashCode() {
58+
return Objects.hash(subscriptionId, streamName, subscriptionOffset);
59+
}
60+
}
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
package io.hstream;
2+
3+
import java.util.Objects;
4+
5+
public class SubscriptionOffset {
6+
public enum SpecialOffset {
7+
EARLIEST,
8+
LATEST,
9+
UNRECOGNIZED;
10+
}
11+
12+
private SpecialOffset specialOffset;
13+
14+
public SubscriptionOffset(SpecialOffset specialOffset) {
15+
this.specialOffset = specialOffset;
16+
}
17+
18+
public SpecialOffset getSpecialOffset() {
19+
return specialOffset;
20+
}
21+
22+
public void setSpecialOffset(SpecialOffset specialOffset) {
23+
this.specialOffset = specialOffset;
24+
}
25+
26+
@Override
27+
public boolean equals(Object o) {
28+
if (this == o) {
29+
return true;
30+
}
31+
if (o == null || getClass() != o.getClass()) {
32+
return false;
33+
}
34+
SubscriptionOffset that = (SubscriptionOffset) o;
35+
return specialOffset == that.specialOffset;
36+
}
37+
38+
@Override
39+
public int hashCode() {
40+
return Objects.hash(specialOffset);
41+
}
42+
}

src/main/java/io/hstream/impl/ClientImpl.java

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,15 @@
44
import io.grpc.ManagedChannel;
55
import io.grpc.ManagedChannelBuilder;
66
import io.hstream.*;
7-
import io.hstream.HStreamApiGrpc;
7+
import io.hstream.internal.DeleteStreamRequest;
8+
import io.hstream.internal.DeleteSubscriptionRequest;
9+
import io.hstream.internal.HStreamApiGrpc;
10+
import io.hstream.internal.ListStreamsResponse;
11+
import io.hstream.internal.Stream;
12+
import io.hstream.util.GrpcUtils;
813
import java.util.List;
914
import java.util.concurrent.TimeUnit;
15+
import java.util.stream.Collectors;
1016
import org.slf4j.Logger;
1117
import org.slf4j.LoggerFactory;
1218

@@ -64,12 +70,14 @@ public List<Stream> listStreams() {
6470

6571
@Override
6672
public void createSubscription(Subscription subscription) {
67-
blockingStub.createSubscription(subscription);
73+
blockingStub.createSubscription(GrpcUtils.subscriptionToGrpc(subscription));
6874
}
6975

7076
@Override
7177
public List<Subscription> listSubscriptions() {
72-
return blockingStub.listSubscriptions(Empty.newBuilder().build()).getSubscriptionList();
78+
return blockingStub.listSubscriptions(Empty.newBuilder().build()).getSubscriptionList().stream()
79+
.map(GrpcUtils::subscriptionFromGrpc)
80+
.collect(Collectors.toList());
7381
}
7482

7583
@Override

src/main/java/io/hstream/impl/ConsumerImpl.java

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@
66
import io.grpc.StatusRuntimeException;
77
import io.grpc.stub.StreamObserver;
88
import io.hstream.*;
9+
import io.hstream.internal.*;
10+
import io.hstream.util.GrpcUtils;
911
import io.hstream.util.RecordUtils;
1012
import java.util.concurrent.*;
1113
import org.slf4j.Logger;
@@ -113,7 +115,9 @@ public void onNext(SubscribeResponse response) {
113115
rawRecordReceiver.processRawRecord(
114116
toReceivedRawRecord(receivedRecord),
115117
new ResponderImpl(
116-
grpcBlockingStub, subscriptionId, receivedRecord.getRecordId()));
118+
grpcBlockingStub,
119+
subscriptionId,
120+
GrpcUtils.recordIdFromGrpc(receivedRecord.getRecordId())));
117121
} catch (Exception e) {
118122
logger.error("process rawRecord error", e);
119123
}
@@ -123,7 +127,9 @@ public void onNext(SubscribeResponse response) {
123127
hRecordReceiver.processHRecord(
124128
toReceivedHRecord(receivedRecord),
125129
new ResponderImpl(
126-
grpcBlockingStub, subscriptionId, receivedRecord.getRecordId()));
130+
grpcBlockingStub,
131+
subscriptionId,
132+
GrpcUtils.recordIdFromGrpc(receivedRecord.getRecordId())));
127133

128134
} catch (Exception e) {
129135
logger.error("process hrecord error", e);
@@ -191,7 +197,8 @@ private static ReceivedRawRecord toReceivedRawRecord(ReceivedRecord receivedReco
191197
HStreamRecord hStreamRecord = HStreamRecord.parseFrom(receivedRecord.getRecord());
192198
byte[] rawRecord = RecordUtils.parseRawRecordFromHStreamRecord(hStreamRecord);
193199
ReceivedRawRecord receivedRawRecord =
194-
new ReceivedRawRecord(receivedRecord.getRecordId(), rawRecord);
200+
new ReceivedRawRecord(
201+
GrpcUtils.recordIdFromGrpc(receivedRecord.getRecordId()), rawRecord);
195202
return receivedRawRecord;
196203
} catch (InvalidProtocolBufferException e) {
197204
throw new HStreamDBClientException.InvalidRecordException("parse HStreamRecord error", e);
@@ -202,7 +209,8 @@ private static ReceivedHRecord toReceivedHRecord(ReceivedRecord receivedRecord)
202209
try {
203210
HStreamRecord hStreamRecord = HStreamRecord.parseFrom(receivedRecord.getRecord());
204211
HRecord hRecord = RecordUtils.parseHRecordFromHStreamRecord(hStreamRecord);
205-
ReceivedHRecord receivedHRecord = new ReceivedHRecord(receivedRecord.getRecordId(), hRecord);
212+
ReceivedHRecord receivedHRecord =
213+
new ReceivedHRecord(GrpcUtils.recordIdFromGrpc(receivedRecord.getRecordId()), hRecord);
206214
return receivedHRecord;
207215
} catch (InvalidProtocolBufferException e) {
208216
throw new HStreamDBClientException.InvalidRecordException("parse HStreamRecord error", e);

src/main/java/io/hstream/impl/ProducerImpl.java

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,10 @@
22

33
import io.grpc.stub.StreamObserver;
44
import io.hstream.*;
5+
import io.hstream.internal.AppendRequest;
6+
import io.hstream.internal.AppendResponse;
7+
import io.hstream.internal.HStreamApiGrpc;
8+
import io.hstream.util.GrpcUtils;
59
import io.hstream.util.RecordUtils;
610
import java.util.ArrayList;
711
import java.util.List;
@@ -140,7 +144,10 @@ private CompletableFuture<List<RecordId>> writeRawRecordsAsync(List<byte[]> rawR
140144
new StreamObserver<>() {
141145
@Override
142146
public void onNext(AppendResponse appendResponse) {
143-
completableFuture.complete(appendResponse.getRecordIdsList());
147+
completableFuture.complete(
148+
appendResponse.getRecordIdsList().stream()
149+
.map(GrpcUtils::recordIdFromGrpc)
150+
.collect(Collectors.toList()));
144151
}
145152

146153
@Override
@@ -173,7 +180,10 @@ private CompletableFuture<List<RecordId>> writeHRecordsAsync(List<HRecord> hReco
173180
new StreamObserver<>() {
174181
@Override
175182
public void onNext(AppendResponse appendResponse) {
176-
completableFuture.complete(appendResponse.getRecordIdsList());
183+
completableFuture.complete(
184+
appendResponse.getRecordIdsList().stream()
185+
.map(GrpcUtils::recordIdFromGrpc)
186+
.collect(Collectors.toList()));
177187
}
178188

179189
@Override

0 commit comments

Comments
 (0)