Skip to content

Commit bce6ffe

Browse files
authored
Make internal fibers reloadable (#28)
1 parent 4c97bb4 commit bce6ffe

File tree

1 file changed

+44
-5
lines changed

1 file changed

+44
-5
lines changed

xqueue.lua

Lines changed: 44 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -851,12 +851,25 @@ function M.upgrade(space,opts,depth)
851851
fiber.create(rw_fiber_f, function(space,xq)
852852
local fname = space.name .. '.xq.wrk' .. tostring(i)
853853
---@diagnostic disable-next-line: undefined-field
854-
if package.reload then fname = fname .. '.' .. package.reload.count end
854+
local start_gen = 0
855+
if package.reload then
856+
start_gen = package.reload.count
857+
end
858+
fname = fname .. '.' .. start_gen
859+
860+
local was_reload = function()
861+
local curr_gen = 0
862+
if package.reload then
863+
curr_gen = package.reload.count
864+
end
865+
return curr_gen ~= start_gen
866+
end
867+
855868
fiber.name(string.sub(fname,1,32))
856869
repeat fiber.sleep(0.001) until space.xq
857870
if xq.ready then xq.ready:get() end
858-
log.info("I am worker %s",i)
859-
while box.space[space.name] and space.xq == xq and not box.info.ro do
871+
log.info("I am worker %s in gen %s",i, start_gen)
872+
while box.space[space.name] and space.xq == xq and not box.info.ro and not was_reload() do
860873
if xq.ready then xq.ready:get() end
861874
local task = space:take(1)
862875
if task then
@@ -880,6 +893,12 @@ function M.upgrade(space,opts,depth)
880893
log.info("Shutting down on ro instance")
881894
return
882895
end
896+
897+
if was_reload() then
898+
log.info("Shutting down after package.reload")
899+
return
900+
end
901+
883902
log.info("worker %s ended", i)
884903
end,space,self)
885904
end
@@ -889,7 +908,21 @@ function M.upgrade(space,opts,depth)
889908
self.runat_chan = fiber.channel(0)
890909
self.runat = fiber.create(rw_fiber_f, function(space,xq,runat_index)
891910
local fname = space.name .. '.xq'
892-
if package.reload then fname = fname .. '.' .. package.reload.count end
911+
local start_gen = 0
912+
913+
if package.reload then
914+
start_gen = package.reload.count
915+
end
916+
fname = fname .. '.' .. start_gen
917+
918+
local was_reload = function()
919+
local curr_gen = 0
920+
if package.reload then
921+
curr_gen = package.reload.count
922+
end
923+
return curr_gen ~= start_gen
924+
end
925+
893926
fiber.name(string.sub(fname,1,32))
894927
repeat fiber.sleep(0.001) until space.xq
895928
if xq.ready then xq.ready:get() end
@@ -898,7 +931,7 @@ function M.upgrade(space,opts,depth)
898931
local maxrun = 1000
899932
local curwait
900933
local collect = {}
901-
while box.space[space.name] and space.xq == xq and not box.info.ro do
934+
while box.space[space.name] and space.xq == xq and not box.info.ro and not was_reload() do
902935
local r,e = pcall(function()
903936
-- print("runat loop 2 ",box.time64())
904937
local remaining
@@ -979,6 +1012,12 @@ function M.upgrade(space,opts,depth)
9791012
log.info("Shutting down on ro instance")
9801013
return
9811014
end
1015+
1016+
if was_reload() then
1017+
log.info("Shutting down after package.reload")
1018+
return
1019+
end
1020+
9821021
log.info("Runat ended")
9831022
end,space,self,runat_index)
9841023
end

0 commit comments

Comments
 (0)