@@ -934,17 +934,15 @@ local function get_range_end(key)
934934end
935935
936936
937- local function watch ( self , key , attr )
937+ local function create_watch_request ( key , attr )
938938 -- verify key
939939 if # key == 0 then
940940 key = str_char (0 )
941941 end
942942
943- key = encode_base64 (key )
944-
945943 local range_end
946944 if attr .range_end then
947- range_end = encode_base64 ( attr .range_end )
945+ range_end = attr .range_end
948946 end
949947
950948 local prev_kv
@@ -977,23 +975,91 @@ local function watch(self, key, attr)
977975 filters = attr .filters and attr .filters or 0
978976 end
979977
978+ local create_request = {
979+ key = key ,
980+ range_end = range_end ,
981+ prev_kv = prev_kv ,
982+ start_revision = start_revision ,
983+ watch_id = watch_id ,
984+ progress_notify = progress_notify ,
985+ fragment = fragment ,
986+ filters = filters ,
987+ }
988+
989+ return create_request
990+ end
991+
992+
993+ function _grpc_M .create_grpc_watch_stream (self , key , attr , opts )
994+ key = utils .get_real_key (self .key_prefix , key )
995+ attr .range_end = get_range_end (key )
996+
997+ local req = {
998+ create_request = create_watch_request (key , attr ),
999+ }
1000+
1001+ local conn = self .conn
1002+ if opts then
1003+ self .call_opts .timeout = opts .timeout and opts .timeout * 1000
1004+ end
1005+ if not self .call_opts .timeout then
1006+ self .call_opts .timeout = self .timeout * 1000
1007+ end
1008+
1009+ local st , err = conn :new_server_stream (" etcdserverpb.Watch" , " Watch" , req , self .call_opts )
1010+ if not st then
1011+ return nil , err
1012+ end
1013+
1014+ local res , err = st :recv ()
1015+ if not res then
1016+ return nil , err
1017+ end
1018+
1019+ return st
1020+ end
1021+
1022+
1023+ function _grpc_M .read_grpc_watch_stream (self , watching_stream )
1024+ local res , err = watching_stream :recv ()
1025+ if not res then
1026+ return nil , err
1027+ end
1028+
1029+ if res .events then
1030+ for _ , event in ipairs (res .events ) do
1031+ if event .kv .value then -- DELETE not have value
1032+ event .kv .value = self .serializer .deserialize (event .kv .value )
1033+ end
1034+ if event .prev_kv then
1035+ event .prev_kv .value = self .serializer .deserialize (event .prev_kv .value )
1036+ end
1037+ end
1038+ end
1039+
1040+ local wrapped_res = {
1041+ result = res ,
1042+ }
1043+ return wrapped_res
1044+ end
1045+
1046+
1047+ local function watch (self , key , attr )
9801048 local need_cancel
9811049 if attr .need_cancel then
9821050 need_cancel = attr .need_cancel and true or false
9831051 end
9841052
1053+ local create_request = create_watch_request (key , attr )
1054+ create_request .key = encode_base64 (key )
1055+
1056+ if attr .range_end then
1057+ create_request .range_end = encode_base64 (attr .range_end )
1058+ end
1059+
9851060 local opts = {
9861061 body = {
987- create_request = {
988- key = key ,
989- range_end = range_end ,
990- prev_kv = prev_kv ,
991- start_revision = start_revision ,
992- watch_id = watch_id ,
993- progress_notify = progress_notify ,
994- fragment = fragment ,
995- filters = filters ,
996- }
1062+ create_request = create_request ,
9971063 },
9981064 need_cancel = need_cancel ,
9991065 }
@@ -1010,6 +1076,7 @@ local function watch(self, key, attr)
10101076 return callback_fun
10111077end
10121078
1079+
10131080function _grpc_M .convert_grpc_to_http_res (self , res )
10141081 if res == nil then
10151082 return nil
0 commit comments