Skip to content

Commit 8382f50

Browse files
committed
Add mainthread api
1 parent 48a11e4 commit 8382f50

File tree

2 files changed

+135
-1
lines changed

2 files changed

+135
-1
lines changed

lualib/service.lua

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -743,6 +743,22 @@ end
743743

744744
ltask.running = coroutine_running
745745

746+
do
747+
local enter = assert(ltask.mainthread_enter)
748+
local leave = assert(ltask.mainthread_leave)
749+
750+
function ltask.mainthread_run(f, ...)
751+
enter()
752+
continue_session()
753+
local ok, err = pcall(f, ...)
754+
leave()
755+
continue_session()
756+
if not ok then
757+
error(err)
758+
end
759+
end
760+
end
761+
746762
local yieldable_require; do
747763
local require = _G.require
748764
local loaded = package.loaded

src/ltask.c

Lines changed: 119 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,43 @@ LUAMOD_API int luaopen_ltask_root(lua_State *L);
4444

4545
#endif
4646

47+
#define MAINTHREAD_STATUS_NONE 0
48+
#define MAINTHREAD_STATUS_READY 1
49+
#define MAINTHREAD_STATUS_YIELD 2
50+
#define MAINTHREAD_STATUS_FINISH 3
51+
#define MAINTHREAD_STATUS_INVALID 4
52+
53+
struct mainthread_session {
54+
service_id srv;
55+
struct cond ev;
56+
int status;
57+
};
58+
59+
static void
60+
mainthread_init(struct mainthread_session *mt) {
61+
mt->srv.id = 0;
62+
mt->status = MAINTHREAD_STATUS_NONE;
63+
cond_create(&mt->ev);
64+
}
65+
66+
static void
67+
mainthread_deinit(struct mainthread_session *mt) {
68+
assert(mt->status == MAINTHREAD_STATUS_NONE || mt->status == MAINTHREAD_STATUS_YIELD);
69+
mt->status = MAINTHREAD_STATUS_INVALID;
70+
cond_release(&mt->ev);
71+
}
72+
73+
static inline int
74+
mainthread_current(struct mainthread_session *mt, service_id id) {
75+
return mt->srv.id == id.id && mt->status == MAINTHREAD_STATUS_READY;
76+
}
77+
78+
static inline void
79+
mainthread_trigger(struct mainthread_session *mt) {
80+
cond_trigger_begin(&mt->ev);
81+
cond_trigger_end(&mt->ev, 1);
82+
}
83+
4784
struct ltask {
4885
const struct ltask_config *config;
4986
struct worker_thread *workers;
@@ -58,6 +95,7 @@ struct ltask {
5895
struct logqueue *lqueue;
5996
struct queue *external_message;
6097
struct message *external_last_message;
98+
struct mainthread_session mt;
6199
atomic_int schedule_owner;
62100
atomic_int active_worker;
63101
atomic_int thread_count;
@@ -170,7 +208,7 @@ collect_done_job(struct ltask *task, service_id done_job[]) {
170208
for (i=0;i<worker_n;i++) {
171209
struct worker_thread * w = &task->workers[i];
172210
service_id job = worker_done_job(w);
173-
if (job.id) {
211+
if (job.id && !mainthread_current(&task->mt, job)) {
174212
debug_printf(task->logger, "Service %x is done", job.id);
175213
done_job[done_job_n++] = job;
176214
}
@@ -654,6 +692,8 @@ thread_worker(void *ud) {
654692
} else {
655693
service_send_signal(P, id);
656694
}
695+
} else if (mainthread_current(&w->task->mt, id)) {
696+
mainthread_trigger(&w->task->mt);
657697
} else {
658698
service_status_set(P, id, SERVICE_STATUS_DONE);
659699
}
@@ -777,6 +817,8 @@ ltask_init(lua_State *L) {
777817
}
778818

779819
task->blocked_service = 0;
820+
821+
mainthread_init(&task->mt);
780822

781823
return 1;
782824
}
@@ -901,6 +943,7 @@ ltask_wait(lua_State *L) {
901943
}
902944
message_delete(ctx->task->external_last_message);
903945
queue_delete(ctx->task->external_message);
946+
mainthread_deinit(&ctx->task->mt);
904947
return 0;
905948
}
906949

@@ -1096,6 +1139,53 @@ ltask_log_sender(lua_State *L) {
10961139
return 2;
10971140
}
10981141

1142+
// run service in mainthread
1143+
1144+
static int
1145+
lmainthread_wait(lua_State *L) {
1146+
struct ltask *task = (struct ltask *)get_ptr(L, "LTASK_GLOBAL");
1147+
struct mainthread_session * mt = &task->mt;
1148+
if (mt->status != MAINTHREAD_STATUS_NONE && mt->status != MAINTHREAD_STATUS_READY)
1149+
return luaL_error(L, "mainthread in use");
1150+
struct service_pool * P = task->services;
1151+
int finish = 0;
1152+
do {
1153+
cond_wait_begin(&mt->ev);
1154+
cond_wait(&mt->ev);
1155+
cond_wait_end(&mt->ev);
1156+
if (mt->status != MAINTHREAD_STATUS_READY)
1157+
luaL_error(L, "mainthread not ready");
1158+
1159+
service_id id = mt->srv;
1160+
1161+
debug_printf(task->logger, "service %x run in mainthread", id.id);
1162+
if (service_resume(P, id)) {
1163+
// dead
1164+
debug_printf(task->logger, "service %x is dead in mainthread", id.id);
1165+
service_status_set(P, id, SERVICE_STATUS_DEAD);
1166+
} else {
1167+
service_status_set(P, id, SERVICE_STATUS_SCHEDULE);
1168+
switch (mt->status) {
1169+
case MAINTHREAD_STATUS_YIELD:
1170+
debug_printf(task->logger, "service %x yield from mainthread", id.id);
1171+
break;
1172+
case MAINTHREAD_STATUS_FINISH:
1173+
debug_printf(task->logger, "service %x finish in mainthread", id.id);
1174+
finish = 1;
1175+
break;
1176+
default:
1177+
debug_printf(task->logger, "service %x is dead in mainthread", id.id);
1178+
service_status_set(P, id, SERVICE_STATUS_DEAD);
1179+
schedule_back(task, id);
1180+
return luaL_error(L, "Can't yield in mainthread (status = %d), kill service (%d)", mt->status, id.id);
1181+
}
1182+
schedule_back(task, id);
1183+
}
1184+
mt->status = MAINTHREAD_STATUS_NONE;
1185+
} while (!finish);
1186+
return 0;
1187+
}
1188+
10991189
LUAMOD_API int
11001190
luaopen_ltask_bootstrap(lua_State *L) {
11011191
static atomic_int init = 0;
@@ -1120,6 +1210,7 @@ luaopen_ltask_bootstrap(lua_State *L) {
11201210
{ "unpack_remove", luaseri_unpack_remove },
11211211
{ "external_sender", ltask_external_sender },
11221212
{ "log_sender", ltask_log_sender },
1213+
{ "mainthread_wait", lmainthread_wait },
11231214
{ NULL, NULL },
11241215
};
11251216

@@ -1563,6 +1654,30 @@ ltask_debuglog(lua_State *L) {
15631654

15641655
#endif
15651656

1657+
static int
1658+
mainthread_change_status(lua_State *L, int status) {
1659+
const struct service_ud *S = getS(L);
1660+
struct ltask *task = S->task;
1661+
task->mt.srv = S->id;
1662+
task->mt.status = status;
1663+
return 0;
1664+
}
1665+
1666+
static int
1667+
lmainthread_enter(lua_State *L) {
1668+
return mainthread_change_status(L, MAINTHREAD_STATUS_READY);
1669+
}
1670+
1671+
static int
1672+
lmainthread_leave(lua_State *L) {
1673+
return mainthread_change_status(L, MAINTHREAD_STATUS_FINISH);
1674+
}
1675+
1676+
static int
1677+
lmainthread_yield(lua_State *L) {
1678+
return mainthread_change_status(L, MAINTHREAD_STATUS_YIELD);
1679+
}
1680+
15661681
LUAMOD_API int
15671682
luaopen_ltask(lua_State *L) {
15681683
luaL_checkversion(L);
@@ -1600,6 +1715,9 @@ luaopen_ltask(lua_State *L) {
16001715
{ "debuglog", ltask_debuglog },
16011716
{ "eventinit", ltask_eventinit },
16021717
{ "eventreset", ltask_eventreset },
1718+
{ "mainthread_enter", lmainthread_enter },
1719+
{ "mainthread_leave", lmainthread_leave },
1720+
{ "mainthread_yield", lmainthread_yield },
16031721
{ NULL, NULL },
16041722
};
16051723

0 commit comments

Comments
 (0)