Skip to content

Commit a950a30

Browse files
committed
add the rpc layer
1 parent 4f3c457 commit a950a30

File tree

6 files changed

+145
-1
lines changed

6 files changed

+145
-1
lines changed

fluss-rpc/src/main/java/org/apache/fluss/rpc/gateway/TabletServerGateway.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,10 @@
4242
import org.apache.fluss.rpc.messages.ProduceLogResponse;
4343
import org.apache.fluss.rpc.messages.PutKvRequest;
4444
import org.apache.fluss.rpc.messages.PutKvResponse;
45+
import org.apache.fluss.rpc.messages.ScanKvRequest;
46+
import org.apache.fluss.rpc.messages.ScanKvResponse;
47+
import org.apache.fluss.rpc.messages.ScannerKeepAliveRequest;
48+
import org.apache.fluss.rpc.messages.ScannerKeepAliveResponse;
4549
import org.apache.fluss.rpc.messages.StopReplicaRequest;
4650
import org.apache.fluss.rpc.messages.StopReplicaResponse;
4751
import org.apache.fluss.rpc.messages.UpdateMetadataRequest;
@@ -172,4 +176,10 @@ CompletableFuture<NotifyKvSnapshotOffsetResponse> notifyKvSnapshotOffset(
172176
@RPC(api = ApiKeys.NOTIFY_LAKE_TABLE_OFFSET)
173177
CompletableFuture<NotifyLakeTableOffsetResponse> notifyLakeTableOffset(
174178
NotifyLakeTableOffsetRequest request);
179+
180+
@RPC(api = ApiKeys.SCAN_KV)
181+
CompletableFuture<ScanKvResponse> scanKv(ScanKvRequest request);
182+
183+
@RPC(api = ApiKeys.SCANNER_KEEP_ALIVE)
184+
CompletableFuture<ScannerKeepAliveResponse> scannerKeepAlive(ScannerKeepAliveRequest request);
175185
}

fluss-rpc/src/main/java/org/apache/fluss/rpc/protocol/ApiKeys.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,9 @@ public enum ApiKeys {
8080
REBALANCE(1049, 0, 0, PUBLIC),
8181
LIST_REBALANCE_PROGRESS(1050, 0, 0, PUBLIC),
8282
CANCEL_REBALANCE(1051, 0, 0, PUBLIC),
83-
PREPARE_LAKE_TABLE_SNAPSHOT(1052, 0, 0, PRIVATE);
83+
PREPARE_LAKE_TABLE_SNAPSHOT(1052, 0, 0, PRIVATE),
84+
SCAN_KV(1053, 0, 0, PUBLIC),
85+
SCANNER_KEEP_ALIVE(1054, 0, 0, PUBLIC);
8486

8587
private static final Map<Integer, ApiKeys> ID_TO_TYPE =
8688
Arrays.stream(ApiKeys.values())

fluss-rpc/src/main/proto/FlussApi.proto

Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -272,6 +272,93 @@ message LimitScanResponse{
272272
optional bytes records = 4;
273273
}
274274

275+
// A scan request. Initially, it should specify a scan. Later on, you
276+
// can use the scanner id returned to fetch result batches with a different
277+
// scan request.
278+
//
279+
// The scanner will remain open if there are more results, and it's not
280+
// asked to be closed explicitly.
281+
//
282+
// Clients may choose to retry scan requests that fail to complete (due to, for
283+
// example, a timeout or network error).
284+
//
285+
// You can fetch the results and ask the scanner to be closed to save
286+
// a trip if you are not interested in remaining results.
287+
message ScanKvRequest {
288+
// If continuing an existing scan, then you must set scanner_id.
289+
// Otherwise, you must set 'new_scan_request'.
290+
optional bytes scanner_id = 1;
291+
optional PbScanReqForBucket bucket_scan_req = 2;
292+
293+
// The sequence ID of this call. The sequence ID should start at 0
294+
// with the request for a new scanner, and after each successful request,
295+
// the client should increment it by 1. When retrying a request, the client
296+
// should _not_ increment this value. If the server detects that the client
297+
// missed a chunk of rows from the middle of a scan, it will respond with an
298+
// error.
299+
optional uint32 call_seq_id = 3;
300+
301+
// The maximum number of bytes to send in the response.
302+
optional uint32 batch_size_bytes = 4;
303+
304+
// If set, the server will close the scanner after responding to
305+
// this request, regardless of whether all rows have been delivered.
306+
optional bool close_scanner = 5;
307+
}
308+
309+
message PbScanReqForBucket {
310+
// The tablet to scan.
311+
required int64 table_id = 1;
312+
optional int64 partition_id = 2;
313+
required int32 bucket_id = 3;
314+
315+
// The maximum number of rows to scan with the new scanner.
316+
//
317+
// The scanner will automatically stop yielding results and close itself
318+
// after reaching this number of result rows.
319+
optional uint64 limit = 4;
320+
}
321+
322+
message ScanKvResponse {
323+
// The error, if an error occurred with this request.
324+
optional int32 error_code = 1;
325+
optional string error_message = 2;
326+
327+
// When a scanner is created, returns the scanner ID which may be used
328+
// to pull new rows from the scanner.
329+
optional bytes scanner_id = 3;
330+
331+
// Set to true to indicate that there may be further results to be fetched
332+
// from this scanner. If the scanner has no more results, then the scanner
333+
// ID will become invalid and cannot continue to be used.
334+
//
335+
// Note that if a scan returns no results, then the initial response from
336+
// the first RPC may return false in this flag, in which case there will
337+
// be no scanner ID assigned.
338+
optional bool has_more_results = 4;
339+
340+
// The block of returned rows.
341+
//
342+
// NOTE: the schema-related fields will not be present in this row block.
343+
// The schema will match the schema requested by the client when it created
344+
// the scanner.
345+
optional bytes records = 5;
346+
347+
// Returns the corresponding log offset at the time the scanner is created
348+
optional int64 log_offset = 6;
349+
}
350+
351+
// A scanner keep-alive request.
352+
// Updates the scanner access time, increasing its time-to-live.
353+
message ScannerKeepAliveRequest {
354+
required bytes scanner_id = 1;
355+
}
356+
357+
message ScannerKeepAliveResponse {
358+
// The error, if an error occurred with this request.
359+
optional int32 error_code = 1;
360+
optional string error_message = 2;
361+
}
275362

276363
// notify bucket leader and isr request
277364
message NotifyLeaderAndIsrRequest {

fluss-rpc/src/test/java/org/apache/fluss/rpc/TestingTabletGatewayService.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,10 @@
7171
import org.apache.fluss.rpc.messages.ProduceLogResponse;
7272
import org.apache.fluss.rpc.messages.PutKvRequest;
7373
import org.apache.fluss.rpc.messages.PutKvResponse;
74+
import org.apache.fluss.rpc.messages.ScanKvRequest;
75+
import org.apache.fluss.rpc.messages.ScanKvResponse;
76+
import org.apache.fluss.rpc.messages.ScannerKeepAliveRequest;
77+
import org.apache.fluss.rpc.messages.ScannerKeepAliveResponse;
7478
import org.apache.fluss.rpc.messages.StopReplicaRequest;
7579
import org.apache.fluss.rpc.messages.StopReplicaResponse;
7680
import org.apache.fluss.rpc.messages.TableExistsRequest;
@@ -163,6 +167,17 @@ public CompletableFuture<NotifyLakeTableOffsetResponse> notifyLakeTableOffset(
163167
return null;
164168
}
165169

170+
@Override
171+
public CompletableFuture<ScanKvResponse> scanKv(ScanKvRequest request) {
172+
return null;
173+
}
174+
175+
@Override
176+
public CompletableFuture<ScannerKeepAliveResponse> scannerKeepAlive(
177+
ScannerKeepAliveRequest request) {
178+
return null;
179+
}
180+
166181
@Override
167182
public CompletableFuture<ListDatabasesResponse> listDatabases(ListDatabasesRequest request) {
168183
return null;

fluss-server/src/main/java/org/apache/fluss/server/tablet/TabletService.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,10 @@
5656
import org.apache.fluss.rpc.messages.ProduceLogResponse;
5757
import org.apache.fluss.rpc.messages.PutKvRequest;
5858
import org.apache.fluss.rpc.messages.PutKvResponse;
59+
import org.apache.fluss.rpc.messages.ScanKvRequest;
60+
import org.apache.fluss.rpc.messages.ScanKvResponse;
61+
import org.apache.fluss.rpc.messages.ScannerKeepAliveRequest;
62+
import org.apache.fluss.rpc.messages.ScannerKeepAliveResponse;
5963
import org.apache.fluss.rpc.messages.StopReplicaRequest;
6064
import org.apache.fluss.rpc.messages.StopReplicaResponse;
6165
import org.apache.fluss.rpc.messages.UpdateMetadataRequest;
@@ -386,6 +390,17 @@ public CompletableFuture<NotifyLakeTableOffsetResponse> notifyLakeTableOffset(
386390
return response;
387391
}
388392

393+
@Override
394+
public CompletableFuture<ScanKvResponse> scanKv(ScanKvRequest request) {
395+
return null;
396+
}
397+
398+
@Override
399+
public CompletableFuture<ScannerKeepAliveResponse> scannerKeepAlive(
400+
ScannerKeepAliveRequest request) {
401+
return null;
402+
}
403+
389404
@Override
390405
public void authorizeTable(OperationType operationType, long tableId) {
391406
if (authorizer != null) {

fluss-server/src/test/java/org/apache/fluss/server/tablet/TestTabletServerGateway.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,10 @@
8181
import org.apache.fluss.rpc.messages.ProduceLogResponse;
8282
import org.apache.fluss.rpc.messages.PutKvRequest;
8383
import org.apache.fluss.rpc.messages.PutKvResponse;
84+
import org.apache.fluss.rpc.messages.ScanKvRequest;
85+
import org.apache.fluss.rpc.messages.ScanKvResponse;
86+
import org.apache.fluss.rpc.messages.ScannerKeepAliveRequest;
87+
import org.apache.fluss.rpc.messages.ScannerKeepAliveResponse;
8488
import org.apache.fluss.rpc.messages.StopReplicaRequest;
8589
import org.apache.fluss.rpc.messages.StopReplicaResponse;
8690
import org.apache.fluss.rpc.messages.TableExistsRequest;
@@ -324,6 +328,17 @@ public CompletableFuture<NotifyLakeTableOffsetResponse> notifyLakeTableOffset(
324328
throw new UnsupportedOperationException();
325329
}
326330

331+
@Override
332+
public CompletableFuture<ScanKvResponse> scanKv(ScanKvRequest request) {
333+
return null;
334+
}
335+
336+
@Override
337+
public CompletableFuture<ScannerKeepAliveResponse> scannerKeepAlive(
338+
ScannerKeepAliveRequest request) {
339+
return null;
340+
}
341+
327342
@Override
328343
public CompletableFuture<ListAclsResponse> listAcls(ListAclsRequest request) {
329344
throw new UnsupportedOperationException();

0 commit comments

Comments
 (0)