Skip to content

Commit 5b0ac96

Browse files
committed
Rewrite to websockets
1 parent 9d22129 commit 5b0ac96

File tree

2 files changed

+139
-200
lines changed

2 files changed

+139
-200
lines changed

computercraft/back.lua

Lines changed: 53 additions & 118 deletions
Original file line numberDiff line numberDiff line change
@@ -1,134 +1,69 @@
11
local genv = getfenv()
22
local temp = {}
3+
local event_sub = {}
34
genv.temp = temp
45
local url = 'http://127.0.0.1:4343/'
5-
local args = {...}
66
local tasks = {}
77

8-
-- https://www.lua.org/pil/11.4.html
9-
local Queue = {}
10-
function Queue.new()
11-
return {head = 0, tail = 0}
12-
end
13-
function Queue.push(q, value)
14-
q[q.head] = value
15-
q.head = q.head + 1
16-
end
17-
function Queue.pop(q)
18-
if q.tail >= q.head then return nil end
19-
local value = q[q.tail]
20-
q[q.tail] = nil
21-
q.tail = q.tail + 1
22-
return value
23-
end
24-
function Queue.length(q)
25-
return q.head - q.tail
26-
end
27-
local output = Queue.new()
28-
29-
local function is_special_task(task_id)
30-
return (task_id == '_fetch' or task_id == '_send')
31-
end
32-
33-
local function string_split(text)
34-
local pos = string.find(text, ';')
35-
return string.sub(text, 1, pos - 1), string.sub(text, pos + 1)
36-
end
37-
38-
local function inner_resume(task_id, ...)
39-
local r = {coroutine.resume(tasks[task_id].co, ...)}
40-
if coroutine.status(tasks[task_id].co) == 'dead' then
41-
if not is_special_task(task_id) then
42-
Queue.push(output, {task_id=task_id, result=r})
43-
end
44-
tasks[task_id] = nil
45-
return true
46-
end
47-
if r[1] then tasks[task_id].exp = r[2] end
48-
return false
49-
end
50-
51-
local function make_task(task_id, f, ...)
52-
tasks[task_id] = {co=coroutine.create(f)}
53-
return inner_resume(task_id, ...)
54-
end
55-
56-
local function resume_task(task_id, event, ...)
57-
local exp = tasks[task_id].exp
58-
if exp ~= nil and event ~= exp then return false end
59-
return inner_resume(task_id, event, ...)
60-
end
61-
62-
local function event_queue(task_id, event)
63-
while true do
64-
local estruct = {os.pullEvent(event)}
65-
Queue.push(output, {task_id=task_id, result=estruct})
66-
end
67-
end
68-
69-
local function fetch_fn()
70-
local r = http.post(url..'start/'..os.getComputerID()..'/'..args[1]..'/', '')
71-
if (r == nil or r.getResponseCode() ~= 200) then
72-
print('Failed to start program '..args[1])
73-
return
74-
end
75-
while true do
76-
r = http.post(url..'gettask/'..os.getComputerID()..'/', '')
77-
if (r == nil or r.getResponseCode() ~= 200) then
78-
print('Connection broken')
79-
return
80-
end
81-
local cmd = r.readAll()
82-
if cmd == 'END' then
83-
break
84-
elseif cmd ~= 'NOOP' then
85-
local cmd, text = string_split(cmd)
86-
if cmd == 'TASK' then
87-
local task_id, text = string_split(text)
88-
local f = loadstring(text)
89-
setfenv(f, genv)
90-
make_task(task_id, f)
91-
elseif cmd == 'STARTQUEUE' then
92-
local task_id, text = string_split(text)
93-
make_task(task_id, event_queue, task_id, text)
94-
elseif cmd == 'STOPQUEUE' then
95-
tasks[text] = nil
96-
end
97-
end
98-
end
99-
end
100-
101-
local function send_fn()
102-
while true do
103-
local r = Queue.pop(output)
104-
if r == nil then break end
105-
local answer = textutils.serializeJSON(r.result)
106-
http.post(url..'taskresult/'..os.getComputerID()..'/'..r.task_id..'/', answer)
107-
end
108-
end
109-
110-
if make_task('_fetch', fetch_fn) then return end
8+
ws = http.websocket(url..'ws/')
9+
ws.send(textutils.serializeJSON{
10+
action='run',
11+
computer=os.getComputerID(),
12+
args={...},
13+
})
11114

11215
while true do
11316
local event, p1, p2, p3, p4, p5 = os.pullEvent()
114-
if resume_task('_fetch', event, p1, p2, p3, p4, p5) then break end
11517

116-
-- Use of http API in user code is explicitly disallowed
117-
if event ~= 'http_success' and event ~= 'http_failure' then
118-
local local_task_ids = {}
119-
for task_id in pairs(tasks) do
120-
local_task_ids[task_id] = true
121-
end
122-
for task_id in pairs(local_task_ids) do
123-
if not is_special_task(task_id) then
124-
resume_task(task_id, event, p1, p2, p3, p4, p5)
18+
if event == 'websocket_message' then
19+
msg = textutils.unserializeJSON(p2)
20+
if msg.action == 'task' then
21+
local fn, err = loadstring(msg.code)
22+
if fn ~= nil then
23+
setfenv(fn, genv)
24+
tasks[msg.task_id] = coroutine.create(fn)
25+
else
26+
ws.send(textutils.serializeJSON{
27+
action='task_result',
28+
task_id=msg.task_id,
29+
result={false, err},
30+
})
12531
end
32+
elseif msg.action == 'sub' then
33+
event_sub[msg.event] = true
34+
elseif msg.action == 'unsub' then
35+
event_sub[msg.event] = nil
36+
elseif msg.action == 'close' then
37+
if msg.error ~= nil then
38+
print(msg.error)
39+
end
40+
break
41+
else
42+
print(msg)
12643
end
44+
elseif event_sub[event] == true then
45+
ws.send(textutils.serializeJSON{
46+
action='event',
47+
event=event,
48+
params={p1, p2, p3, p4, p5},
49+
})
12750
end
12851

129-
if tasks['_send'] ~= nil then
130-
resume_task('_send', event, p1, p2, p3, p4, p5)
131-
else
132-
if Queue.length(output) > 0 then make_task('_send', send_fn) end
52+
local del_tasks = {}
53+
for task_id in pairs(tasks) do
54+
local r = {coroutine.resume(tasks[task_id], event, p1, p2, p3, p4, p5)}
55+
if coroutine.status(tasks[task_id]) == 'dead' then
56+
ws.send(textutils.serializeJSON{
57+
action='task_result',
58+
task_id=task_id,
59+
result=r,
60+
})
61+
del_tasks[task_id] = true
62+
end
63+
end
64+
for task_id in pairs(del_tasks) do
65+
tasks[task_id] = nil
13366
end
13467
end
68+
69+
ws.close()

0 commit comments

Comments
 (0)