1
1
// Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0.
2
2
3
+ use core:: ops:: Range ;
4
+ use std:: { str:: FromStr , sync:: Arc , u32} ;
5
+
6
+ use slog:: { Drain , Logger } ;
3
7
use tikv_client_common:: Error ;
8
+ use tikv_client_proto:: metapb;
4
9
5
10
use crate :: {
6
11
backoff:: DEFAULT_REGION_BACKOFF ,
7
12
config:: Config ,
8
- pd:: PdRpcClient ,
13
+ pd:: { PdClient , PdRpcClient } ,
9
14
raw:: lowering:: * ,
10
15
request:: { Collect , CollectSingle , Plan } ,
11
16
BoundRange , ColumnFamily , Key , KvPair , Result , Value ,
12
17
} ;
13
- use slog:: { Drain , Logger } ;
14
- use std:: { sync:: Arc , u32} ;
15
18
16
19
const MAX_RAW_KV_SCAN_LIMIT : u32 = 10240 ;
17
20
@@ -23,15 +26,15 @@ const MAX_RAW_KV_SCAN_LIMIT: u32 = 10240;
23
26
/// The returned results of raw request methods are [`Future`](std::future::Future)s that must be
24
27
/// awaited to execute.
25
28
#[ derive( Clone ) ]
26
- pub struct Client {
27
- rpc : Arc < PdRpcClient > ,
29
+ pub struct Client < PdC : PdClient = PdRpcClient > {
30
+ rpc : Arc < PdC > ,
28
31
cf : Option < ColumnFamily > ,
29
32
/// Whether to use the [`atomic mode`](Client::with_atomic_for_cas).
30
33
atomic : bool ,
31
34
logger : Logger ,
32
35
}
33
36
34
- impl Client {
37
+ impl Client < PdRpcClient > {
35
38
/// Create a raw [`Client`] and connect to the TiKV cluster.
36
39
///
37
40
/// Because TiKV is managed by a [PD](https://github.com/pingcap/pd/) cluster, the endpoints for
@@ -50,7 +53,7 @@ impl Client {
50
53
pub async fn new < S : Into < String > > (
51
54
pd_endpoints : Vec < S > ,
52
55
logger : Option < Logger > ,
53
- ) -> Result < Client > {
56
+ ) -> Result < Self > {
54
57
Self :: new_with_config ( pd_endpoints, Config :: default ( ) , logger) . await
55
58
}
56
59
@@ -78,7 +81,7 @@ impl Client {
78
81
pd_endpoints : Vec < S > ,
79
82
config : Config ,
80
83
optional_logger : Option < Logger > ,
81
- ) -> Result < Client > {
84
+ ) -> Result < Self > {
82
85
let logger = optional_logger. unwrap_or_else ( || {
83
86
let plain = slog_term:: PlainSyncDecorator :: new ( std:: io:: stdout ( ) ) ;
84
87
Logger :: root (
@@ -125,7 +128,7 @@ impl Client {
125
128
/// let get_request = client.get("foo".to_owned());
126
129
/// # });
127
130
/// ```
128
- pub fn with_cf ( & self , cf : ColumnFamily ) -> Client {
131
+ pub fn with_cf ( & self , cf : ColumnFamily ) -> Self {
129
132
Client {
130
133
rpc : self . rpc . clone ( ) ,
131
134
cf : Some ( cf) ,
@@ -141,15 +144,17 @@ impl Client {
141
144
/// the atomicity of CAS, write operations like [`put`](Client::put) or
142
145
/// [`delete`](Client::delete) in atomic mode are more expensive. Some
143
146
/// operations are not supported in the mode.
144
- pub fn with_atomic_for_cas ( & self ) -> Client {
147
+ pub fn with_atomic_for_cas ( & self ) -> Self {
145
148
Client {
146
149
rpc : self . rpc . clone ( ) ,
147
150
cf : self . cf . clone ( ) ,
148
151
atomic : true ,
149
152
logger : self . logger . clone ( ) ,
150
153
}
151
154
}
155
+ }
152
156
157
+ impl < PdC : PdClient > Client < PdC > {
153
158
/// Create a new 'get' request.
154
159
///
155
160
/// Once resolved this request will result in the fetching of the value associated with the
@@ -517,6 +522,29 @@ impl Client {
517
522
plan. execute ( ) . await
518
523
}
519
524
525
+ pub async fn coprocessor (
526
+ & self ,
527
+ copr_name : impl Into < String > ,
528
+ copr_version_req : impl Into < String > ,
529
+ ranges : impl IntoIterator < Item = impl Into < BoundRange > > ,
530
+ request_builder : impl Fn ( metapb:: Region , Vec < Range < Key > > ) -> Vec < u8 > + Send + Sync + ' static ,
531
+ ) -> Result < Vec < ( Vec < u8 > , Vec < Range < Key > > ) > > {
532
+ let copr_version_req = copr_version_req. into ( ) ;
533
+ semver:: VersionReq :: from_str ( & copr_version_req) ?;
534
+ let req = new_raw_coprocessor_request (
535
+ copr_name. into ( ) ,
536
+ copr_version_req,
537
+ ranges. into_iter ( ) . map ( Into :: into) ,
538
+ request_builder,
539
+ ) ;
540
+ let plan = crate :: request:: PlanBuilder :: new ( self . rpc . clone ( ) , req)
541
+ . preserve_shard ( )
542
+ . retry_multi_region ( DEFAULT_REGION_BACKOFF )
543
+ . post_process_default ( )
544
+ . plan ( ) ;
545
+ plan. execute ( ) . await
546
+ }
547
+
520
548
async fn scan_inner (
521
549
& self ,
522
550
range : impl Into < BoundRange > ,
@@ -576,3 +604,80 @@ impl Client {
576
604
self . atomic . then ( || ( ) ) . ok_or ( Error :: UnsupportedMode )
577
605
}
578
606
}
607
+
608
+ #[ cfg( test) ]
609
+ mod tests {
610
+ use super :: * ;
611
+ use crate :: {
612
+ mock:: { MockKvClient , MockPdClient } ,
613
+ Result ,
614
+ } ;
615
+ use std:: { any:: Any , sync:: Arc } ;
616
+ use tikv_client_proto:: kvrpcpb;
617
+
618
+ #[ tokio:: test]
619
+ async fn test_raw_coprocessor ( ) -> Result < ( ) > {
620
+ let plain = slog_term:: PlainSyncDecorator :: new ( std:: io:: stdout ( ) ) ;
621
+ let logger = Logger :: root (
622
+ slog_term:: FullFormat :: new ( plain)
623
+ . build ( )
624
+ . filter_level ( slog:: Level :: Info )
625
+ . fuse ( ) ,
626
+ o ! ( ) ,
627
+ ) ;
628
+ let pd_client = Arc :: new ( MockPdClient :: new ( MockKvClient :: with_dispatch_hook (
629
+ move |req : & dyn Any | {
630
+ if let Some ( req) = req. downcast_ref :: < kvrpcpb:: RawCoprocessorRequest > ( ) {
631
+ assert_eq ! ( req. copr_name, "example" ) ;
632
+ assert_eq ! ( req. copr_version_req, "0.1.0" ) ;
633
+ let resp = kvrpcpb:: RawCoprocessorResponse {
634
+ data : req. data . clone ( ) ,
635
+ ..Default :: default ( )
636
+ } ;
637
+ Ok ( Box :: new ( resp) as Box < dyn Any > )
638
+ } else {
639
+ unreachable ! ( )
640
+ }
641
+ } ,
642
+ ) ) ) ;
643
+ let client = Client {
644
+ rpc : pd_client,
645
+ cf : Some ( ColumnFamily :: Default ) ,
646
+ atomic : false ,
647
+ logger,
648
+ } ;
649
+ let resps = client
650
+ . coprocessor (
651
+ "example" ,
652
+ "0.1.0" ,
653
+ vec ! [ vec![ 5 ] ..vec![ 15 ] , vec![ 20 ] ..vec![ ] ] ,
654
+ |region, ranges| format ! ( "{:?}:{:?}" , region. id, ranges) . into_bytes ( ) ,
655
+ )
656
+ . await ?;
657
+ let resps: Vec < _ > = resps
658
+ . into_iter ( )
659
+ . map ( |( data, ranges) | ( String :: from_utf8 ( data) . unwrap ( ) , ranges) )
660
+ . collect ( ) ;
661
+ assert_eq ! (
662
+ resps,
663
+ vec![
664
+ (
665
+ "1:[Key(05)..Key(0A)]" . to_string( ) ,
666
+ vec![ Key :: from( vec![ 5 ] ) ..Key :: from( vec![ 10 ] ) ]
667
+ ) ,
668
+ (
669
+ "2:[Key(0A)..Key(0F), Key(14)..Key(FAFA)]" . to_string( ) ,
670
+ vec![
671
+ Key :: from( vec![ 10 ] ) ..Key :: from( vec![ 15 ] ) ,
672
+ Key :: from( vec![ 20 ] ) ..Key :: from( vec![ 250 , 250 ] )
673
+ ]
674
+ ) ,
675
+ (
676
+ "3:[Key(FAFA)..Key()]" . to_string( ) ,
677
+ vec![ Key :: from( vec![ 250 , 250 ] ) ..Key :: from( vec![ ] ) ]
678
+ )
679
+ ]
680
+ ) ;
681
+ Ok ( ( ) )
682
+ }
683
+ }
0 commit comments