Skip to content

Commit c479906

Browse files
authored
feature: watch cancel (#73)
* [WIP] feature: implement operation 'compact' * [WIP] feat: watch cancel * [WIP] feat: watch cancel * cancel watch by close http connection * fix lint * retest * retest * return consistency * show error clearly
1 parent ab9acc9 commit c479906

File tree

3 files changed

+92
-9
lines changed

3 files changed

+92
-9
lines changed

lib/resty/etcd/v3.lua

Lines changed: 28 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -513,8 +513,9 @@ local function request_chunk(self, method, host, port, path, opts, timeout)
513513
return nil, "failed to watch data, response code: " .. res.status
514514
end
515515

516-
return function()
517-
body, err = res.body_reader()
516+
local function read_watch()
517+
local body, err = res.body_reader()
518+
518519
if not body then
519520
return nil, err
520521
end
@@ -541,6 +542,12 @@ local function request_chunk(self, method, host, port, path, opts, timeout)
541542

542543
return body
543544
end
545+
546+
if opts.need_cancel == true then
547+
return read_watch, nil, http_cli
548+
else
549+
return read_watch
550+
end
544551
end
545552

546553

@@ -602,6 +609,11 @@ local function watch(self, key, attr)
602609
filters = attr.filters and attr.filters or 0
603610
end
604611

612+
local need_cancel
613+
if attr.need_cancel then
614+
need_cancel = attr.need_cancel and true or false
615+
end
616+
605617
local opts = {
606618
body = {
607619
create_request = {
@@ -614,19 +626,23 @@ local function watch(self, key, attr)
614626
fragment = fragment,
615627
filters = filters,
616628
}
617-
}
629+
},
630+
need_cancel = need_cancel,
618631
}
619632

620633
local endpoint = choose_endpoint(self)
621-
local callback_fun, err = request_chunk(self, 'POST',
634+
635+
local callback_fun, err, http_cli = request_chunk(self, 'POST',
622636
endpoint.host,
623637
endpoint.port,
624638
endpoint.api_prefix .. '/watch', opts,
625639
attr.timeout or self.timeout)
626640
if not callback_fun then
627641
return nil, err
628642
end
629-
643+
if opts.need_cancel == true then
644+
return callback_fun, nil, http_cli
645+
end
630646
return callback_fun
631647
end
632648

@@ -658,10 +674,16 @@ function _M.watch(self, key, opts)
658674
attr.prev_kv = opts and opts.prev_kv
659675
attr.watch_id = opts and opts.watch_id
660676
attr.fragment = opts and opts.fragment
677+
attr.need_cancel = opts and opts.need_cancel
661678

662679
return watch(self, key, attr)
663680
end
664681

682+
function _M.watchcancel(self, http_cli)
683+
local res, err = http_cli:close()
684+
return res, err
685+
end
686+
665687
function _M.readdir(self, key, opts)
666688

667689
clear_tab(attr)
@@ -693,6 +715,7 @@ function _M.watchdir(self, key, opts)
693715
attr.prev_kv = opts and opts.prev_kv
694716
attr.watch_id = opts and opts.watch_id
695717
attr.fragment = opts and opts.fragment
718+
attr.need_cancel = opts and opts.need_cancel
696719

697720
return watch(self, key, attr)
698721
end

t/v3/key.t

Lines changed: 63 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -163,7 +163,66 @@ timeout/
163163
164164
165165
166-
=== TEST 4: watchdir(key)
166+
=== TEST 4: watch and watchcancel(key)
167+
--- http_config eval: $::HttpConfig
168+
--- config
169+
location /t {
170+
content_by_lua_block {
171+
local etcd, err = require("resty.etcd").new({protocol = "v3"})
172+
check_res(etcd, err)
173+
174+
local res, err = etcd:set("/test", "abc")
175+
check_res(res, err)
176+
177+
ngx.timer.at(0.1, function ()
178+
etcd:set("/test", "bcd3")
179+
end)
180+
181+
ngx.timer.at(0.2, function ()
182+
etcd:set("/test", "bcd4")
183+
end)
184+
185+
local cur_time = ngx.now()
186+
local body_chunk_fun, err, http_cli = etcd:watch("/test", {timeout = 0.5, need_cancel = true})
187+
188+
if type(http_cli) ~= "table" then
189+
ngx.say("need_cancel failed")
190+
end
191+
192+
if not body_chunk_fun then
193+
ngx.say("failed to watch: ", err)
194+
end
195+
196+
local chunk, err = body_chunk_fun()
197+
ngx.say("created: ", chunk.result.created)
198+
local chunk, err = body_chunk_fun()
199+
ngx.say("value: ", chunk.result.events[1].kv.value)
200+
201+
local res, err = etcd:watchcancel(http_cli)
202+
if not res then
203+
ngx.say("failed to cancel: ", err)
204+
end
205+
206+
local chunk, err = body_chunk_fun()
207+
ngx.say(err)
208+
209+
ngx.say("ok")
210+
}
211+
}
212+
--- request
213+
GET /t
214+
--- no_error_log
215+
[error]
216+
--- response_body
217+
created: true
218+
value: bcd3
219+
closed
220+
ok
221+
--- timeout: 5
222+
223+
224+
225+
=== TEST 5: watchdir(key)
167226
--- http_config eval: $::HttpConfig
168227
--- config
169228
location /t {
@@ -289,7 +348,7 @@ timeout/
289348
290349
291350
292-
=== TEST 5: setx(key, val) failed
351+
=== TEST 6: setx(key, val) failed
293352
--- http_config eval: $::HttpConfig
294353
--- config
295354
location /t {
@@ -310,7 +369,7 @@ GET /t
310369
311370
312371
313-
=== TEST 6: setx(key, val) success
372+
=== TEST 7: setx(key, val) success
314373
--- http_config eval: $::HttpConfig
315374
--- config
316375
location /t {
@@ -336,7 +395,7 @@ checked val as expect: abd
336395
337396
338397
339-
=== TEST 7: setnx(key, val)
398+
=== TEST 8: setnx(key, val)
340399
--- http_config eval: $::HttpConfig
341400
--- config
342401
location /t {

t/v3/lease3.4.t

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@ __DATA__
6565
local data, err = etcd:leases()
6666
if data.body.leases[1].ID ~= res.body.ID then
6767
ngx.say("leases not working")
68+
ngx.say("result: ", require("cjson").encode(data.body))
6869
end
6970
7071
local data, err = etcd:set("/test", "abc", {prev_kv = true, lease = res.body.ID})

0 commit comments

Comments
 (0)