Skip to content

Commit 808277b

Browse files
authored
Make internal fibers reloadable v2 (#30)
1 parent bce6ffe commit 808277b

File tree

1 file changed

+31
-32
lines changed

1 file changed

+31
-32
lines changed

xqueue.lua

Lines changed: 31 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -821,8 +821,26 @@ function M.upgrade(space,opts,depth)
821821

822822
self.ready = fiber.channel(0)
823823

824+
local function was_reload_func()
825+
local start_gen = 0
826+
if package.reload then
827+
start_gen = package.reload.count
828+
end
829+
830+
return start_gen, function()
831+
local curr_gen = 0
832+
if package.reload then
833+
curr_gen = package.reload.count
834+
end
835+
return curr_gen ~= start_gen
836+
end
837+
end
838+
824839
local function rw_fiber_f(func, ...)
825840
local xq = self
841+
842+
local _, was_reload = was_reload_func()
843+
826844
repeat
827845
if box.info.ro then
828846
log.verbose("awaiting rw")
@@ -832,7 +850,12 @@ function M.upgrade(space,opts,depth)
832850
else
833851
fiber.sleep(0.001)
834852
end
835-
until not box.info.ro
853+
until not box.info.ro or was_reload()
854+
end
855+
856+
if was_reload() then
857+
log.info('shutting down rw fiber')
858+
return
836859
end
837860

838861
local ok, err = pcall(func, ...)
@@ -849,21 +872,9 @@ function M.upgrade(space,opts,depth)
849872
local worker = opts.worker
850873
for i = 1,workers do
851874
fiber.create(rw_fiber_f, function(space,xq)
852-
local fname = space.name .. '.xq.wrk' .. tostring(i)
875+
local start_gen, was_reload = was_reload_func()
853876
---@diagnostic disable-next-line: undefined-field
854-
local start_gen = 0
855-
if package.reload then
856-
start_gen = package.reload.count
857-
end
858-
fname = fname .. '.' .. start_gen
859-
860-
local was_reload = function()
861-
local curr_gen = 0
862-
if package.reload then
863-
curr_gen = package.reload.count
864-
end
865-
return curr_gen ~= start_gen
866-
end
877+
local fname = space.name .. '.xq.wrk' .. tostring(i) .. '.' .. start_gen
867878

868879
fiber.name(string.sub(fname,1,32))
869880
repeat fiber.sleep(0.001) until space.xq
@@ -907,21 +918,8 @@ function M.upgrade(space,opts,depth)
907918
if have_runat then
908919
self.runat_chan = fiber.channel(0)
909920
self.runat = fiber.create(rw_fiber_f, function(space,xq,runat_index)
910-
local fname = space.name .. '.xq'
911-
local start_gen = 0
912-
913-
if package.reload then
914-
start_gen = package.reload.count
915-
end
916-
fname = fname .. '.' .. start_gen
917-
918-
local was_reload = function()
919-
local curr_gen = 0
920-
if package.reload then
921-
curr_gen = package.reload.count
922-
end
923-
return curr_gen ~= start_gen
924-
end
921+
local start_gen, was_reload = was_reload_func()
922+
local fname = space.name .. '.xq.' .. start_gen
925923

926924
fiber.name(string.sub(fname,1,32))
927925
repeat fiber.sleep(0.001) until space.xq
@@ -1755,7 +1753,7 @@ function methods:touch(key, attr)
17551753

17561754
local status = t[ xq.fields.status ]
17571755
if status == 'T' then
1758-
xq:check_owner(key)
1756+
xq:check_owner(key)
17591757
end
17601758

17611759
-- delayed or ttl or default ttl
@@ -1768,7 +1766,8 @@ function methods:touch(key, attr)
17681766
if xq.have_runat then
17691767
xq.runat_chan:put(true,0)
17701768
end
1771-
log.info("Touch: {%s} run_at +%s seconds from %s/sid=%s/fid=%s", key, attr.increment, box.session.storage.peer, box.session.id(), fiber.id())
1769+
log.info("Touch: {%s} run_at +%s seconds from %s/sid=%s/fid=%s",
1770+
key, attr.increment, box.session.storage.peer, box.session.id(), fiber.id())
17721771
end)
17731772
end
17741773

0 commit comments

Comments
 (0)