Skip to content

Commit 664e6af

Browse files
authored
feat: add query and view (#153)
1 parent 84803c2 commit 664e6af

File tree

7 files changed

+277
-1
lines changed

7 files changed

+277
-1
lines changed

client/src/main/java/io/hstream/HStreamClient.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -126,4 +126,20 @@ static HStreamClientBuilder builder() {
126126
void deleteSubscription(String subscriptionId, boolean force);
127127

128128
Cluster describeCluster();
129+
130+
Query createQuery(String sql);
131+
132+
List<Query> listQueries();
133+
134+
Query getQuery(String id);
135+
136+
void deleteQuery(String id);
137+
138+
void createView(String sql);
139+
140+
List<View> listViews();
141+
142+
View getView(String name);
143+
144+
void deleteView(String name);
129145
}
Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
package io.hstream;
2+
3+
public class Query {
4+
String id;
5+
TaskStatus status;
6+
long createdTime;
7+
String queryText;
8+
9+
public String getId() {
10+
return id;
11+
}
12+
13+
public TaskStatus getStatus() {
14+
return status;
15+
}
16+
17+
public long getCreatedTime() {
18+
return createdTime;
19+
}
20+
21+
public String getQueryText() {
22+
return queryText;
23+
}
24+
25+
public static final class Builder {
26+
private String id;
27+
private TaskStatus status;
28+
private long createdTime;
29+
private String queryText;
30+
31+
public Builder id(String id) {
32+
this.id = id;
33+
return this;
34+
}
35+
36+
public Builder status(TaskStatus status) {
37+
this.status = status;
38+
return this;
39+
}
40+
41+
public Builder createdTime(long createdTime) {
42+
this.createdTime = createdTime;
43+
return this;
44+
}
45+
46+
public Builder queryText(String queryText) {
47+
this.queryText = queryText;
48+
return this;
49+
}
50+
51+
public Query build() {
52+
Query query = new Query();
53+
query.queryText = this.queryText;
54+
query.id = this.id;
55+
query.createdTime = this.createdTime;
56+
query.status = this.status;
57+
return query;
58+
}
59+
}
60+
61+
public static Builder newBuilder() {
62+
return new Builder();
63+
}
64+
}
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
package io.hstream;
2+
3+
public enum TaskStatus {
4+
TASK_CREATING,
5+
TASK_CREATED,
6+
TASK_RUNNING,
7+
TASK_CREATION_ABORT,
8+
TASK_CONNECTION_ABORT,
9+
TASK_TERMINATED,
10+
}
Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
package io.hstream;
2+
3+
import java.util.List;
4+
5+
public class View {
6+
String name;
7+
TaskStatus status;
8+
long createdTime;
9+
String sql;
10+
List<String> schema;
11+
12+
public static Builder newBuilder() {
13+
return new Builder();
14+
}
15+
16+
public String getName() {
17+
return name;
18+
}
19+
20+
public TaskStatus getStatus() {
21+
return status;
22+
}
23+
24+
public long getCreatedTime() {
25+
return createdTime;
26+
}
27+
28+
public String getSql() {
29+
return sql;
30+
}
31+
32+
public List<String> getSchema() {
33+
return schema;
34+
}
35+
36+
public static final class Builder {
37+
private String name;
38+
private TaskStatus status;
39+
private long createdTime;
40+
private String sql;
41+
private List<String> schema;
42+
43+
public Builder name(String name) {
44+
this.name = name;
45+
return this;
46+
}
47+
48+
public Builder status(TaskStatus status) {
49+
this.status = status;
50+
return this;
51+
}
52+
53+
public Builder createdTime(long createdTime) {
54+
this.createdTime = createdTime;
55+
return this;
56+
}
57+
58+
public Builder sql(String sql) {
59+
this.sql = sql;
60+
return this;
61+
}
62+
63+
public Builder schema(List<String> schema) {
64+
this.schema = schema;
65+
return this;
66+
}
67+
68+
public View build() {
69+
View view = new View();
70+
view.createdTime = this.createdTime;
71+
view.schema = this.schema;
72+
view.status = this.status;
73+
view.name = this.name;
74+
view.sql = this.sql;
75+
return view;
76+
}
77+
}
78+
}

client/src/main/java/io/hstream/util/GrpcUtils.java

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import io.hstream.*;
66
import io.hstream.internal.RecordId;
77
import io.hstream.internal.SpecialOffset;
8+
import io.hstream.internal.TaskStatusPB;
89

910
/**
1011
* A class of utility functions to convert between the GRPC generated classes and the custom classes
@@ -154,4 +155,42 @@ public static io.hstream.CompressionType compressionTypeFromInternal(
154155
throw new IllegalArgumentException("Unknown compressionType: " + compressionType);
155156
}
156157
}
158+
159+
public static Query queryFromInternal(io.hstream.internal.Query query) {
160+
return Query.newBuilder()
161+
.id(query.getId())
162+
.status(taskStatusFromInternal(query.getStatus()))
163+
.createdTime(query.getCreatedTime())
164+
.queryText(query.getQueryText())
165+
.build();
166+
}
167+
168+
public static View viewFromInternal(io.hstream.internal.View view) {
169+
return View.newBuilder()
170+
.name(view.getViewId())
171+
.status(taskStatusFromInternal(view.getStatus()))
172+
.sql(view.getSql())
173+
.createdTime(view.getCreatedTime())
174+
.schema(view.getSchemaList())
175+
.build();
176+
}
177+
178+
public static TaskStatus taskStatusFromInternal(TaskStatusPB statusPB) {
179+
switch (statusPB) {
180+
case TASK_CREATING:
181+
return TaskStatus.TASK_CREATING;
182+
case TASK_CREATED:
183+
return TaskStatus.TASK_CREATED;
184+
case TASK_RUNNING:
185+
return TaskStatus.TASK_RUNNING;
186+
case TASK_CONNECTION_ABORT:
187+
return TaskStatus.TASK_CONNECTION_ABORT;
188+
case TASK_CREATION_ABORT:
189+
return TaskStatus.TASK_CREATION_ABORT;
190+
case TASK_TERMINATED:
191+
return TaskStatus.TASK_TERMINATED;
192+
default:
193+
throw new IllegalArgumentException("Unknown task status: " + statusPB);
194+
}
195+
}
157196
}

client/src/main/kotlin/io/hstream/impl/HStreamClientKtImpl.kt

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,23 +7,34 @@ import io.hstream.Cluster
77
import io.hstream.ConsumerBuilder
88
import io.hstream.HStreamClient
99
import io.hstream.ProducerBuilder
10+
import io.hstream.Query
1011
import io.hstream.QueryerBuilder
1112
import io.hstream.ReaderBuilder
1213
import io.hstream.Shard
1314
import io.hstream.Stream
1415
import io.hstream.Subscription
16+
import io.hstream.View
17+
import io.hstream.internal.CommandQuery
18+
import io.hstream.internal.CreateQueryRequest
19+
import io.hstream.internal.DeleteQueryRequest
1520
import io.hstream.internal.DeleteStreamRequest
1621
import io.hstream.internal.DeleteSubscriptionRequest
22+
import io.hstream.internal.DeleteViewRequest
23+
import io.hstream.internal.GetQueryRequest
24+
import io.hstream.internal.GetViewRequest
1725
import io.hstream.internal.HStreamApiGrpcKt
26+
import io.hstream.internal.ListQueriesRequest
1827
import io.hstream.internal.ListShardsRequest
1928
import io.hstream.internal.ListStreamsRequest
2029
import io.hstream.internal.ListSubscriptionsRequest
30+
import io.hstream.internal.ListViewsRequest
2131
import io.hstream.internal.LookupSubscriptionRequest
2232
import io.hstream.util.GrpcUtils
2333
import kotlinx.coroutines.runBlocking
2434
import org.slf4j.LoggerFactory
2535
import java.util.concurrent.CompletableFuture
2636
import java.util.concurrent.atomic.AtomicReference
37+
import kotlin.streams.toList
2738

2839
class HStreamClientKtImpl(bootstrapServerUrls: List<String>, credentials: ChannelCredentials? = null) : HStreamClient {
2940

@@ -193,6 +204,64 @@ class HStreamClientKtImpl(bootstrapServerUrls: List<String>, credentials: Channe
193204
}
194205
}
195206

207+
override fun createQuery(sql: String?): Query? {
208+
checkNotNull(sql)
209+
return unaryCallBlocked {
210+
val query = it.createQuery(CreateQueryRequest.newBuilder().setSql(sql).build())
211+
GrpcUtils.queryFromInternal(query)
212+
}
213+
}
214+
215+
override fun listQueries(): List<Query> {
216+
return unaryCallBlocked {
217+
val result = it.listQueries(ListQueriesRequest.getDefaultInstance())
218+
result.queriesList.stream()
219+
.map(GrpcUtils::queryFromInternal)
220+
.toList()
221+
}
222+
}
223+
224+
override fun getQuery(id: String?): Query {
225+
return unaryCallBlocked {
226+
val result = it.getQuery(GetQueryRequest.newBuilder().setId(id).build())
227+
GrpcUtils.queryFromInternal(result)
228+
}
229+
}
230+
231+
override fun deleteQuery(id: String?) {
232+
unaryCallBlocked {
233+
it.deleteQuery(DeleteQueryRequest.newBuilder().setId(id).build())
234+
}
235+
}
236+
237+
override fun createView(sql: String?) {
238+
unaryCallBlocked {
239+
it.executeQuery(CommandQuery.newBuilder().setStmtText(sql).build())
240+
}
241+
}
242+
243+
override fun listViews(): List<View> {
244+
return unaryCallBlocked {
245+
it.listViews(ListViewsRequest.getDefaultInstance())
246+
.viewsList.stream()
247+
.map(GrpcUtils::viewFromInternal)
248+
.toList()
249+
}
250+
}
251+
252+
override fun getView(name: String?): View {
253+
return unaryCallBlocked {
254+
val result = it.getView(GetViewRequest.newBuilder().setViewId(name).build())
255+
GrpcUtils.viewFromInternal(result)
256+
}
257+
}
258+
259+
override fun deleteView(name: String?) {
260+
unaryCallBlocked {
261+
it.deleteView(DeleteViewRequest.newBuilder().setViewId(name).build())
262+
}
263+
}
264+
196265
private final suspend fun lookupSubscriptionServerUrl(subscriptionId: String?): String {
197266
return unaryCallCoroutine {
198267
val req: LookupSubscriptionRequest =

client/src/main/proto

Submodule proto updated 1 file

0 commit comments

Comments
 (0)