Skip to content

Commit 6a39d92

Browse files
committed
mainthread_wait should acquire schedule
1 parent a8ab4b6 commit 6a39d92

File tree

1 file changed

+48
-10
lines changed

1 file changed

+48
-10
lines changed

src/ltask.c

Lines changed: 48 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ LUAMOD_API int luaopen_ltask_bootstrap(lua_State *L);
3333
LUAMOD_API int luaopen_ltask_root(lua_State *L);
3434

3535
#define THREAD_NONE -1
36+
#define THREAD_MAINTHREAD -2
3637
#define THREAD_WORKER(n) (n)
3738

3839
#ifndef DEBUGLOG
@@ -53,13 +54,15 @@ LUAMOD_API int luaopen_ltask_root(lua_State *L);
5354

5455
struct mainthread_session {
5556
atomic_int srv;
57+
atomic_int ready;
5658
struct sem ev;
5759
int status;
5860
};
5961

6062
static void
6163
mainthread_init(struct mainthread_session *mt) {
6264
atomic_int_init(&mt->srv, 0);
65+
atomic_int_init(&mt->ready, 0);
6366
mt->status = MAINTHREAD_STATUS_NONE;
6467
sem_init(&mt->ev);
6568
}
@@ -68,6 +71,7 @@ static void
6871
mainthread_deinit(struct mainthread_session *mt) {
6972
assert(mt->status == MAINTHREAD_STATUS_NONE || mt->status == MAINTHREAD_STATUS_YIELD);
7073
atomic_int_store(&mt->srv, 0);
74+
atomic_int_store(&mt->ready, 0);
7175
mt->status = MAINTHREAD_STATUS_INVALID;
7276
sem_deinit(&mt->ev);
7377
}
@@ -471,35 +475,43 @@ dispatch_external_messages(struct ltask *task) {
471475

472476
static void
473477
schedule_dispatch(struct ltask *task) {
474-
// Step 0 : dispatch external messsages
478+
// Step 0 : check mainthread service id
479+
struct mainthread_session * mt = &task->mt;
480+
service_id id = { atomic_int_load(&mt->ready) };
481+
if (id.id != 0) {
482+
atomic_int_store(&mt->ready, 0);
483+
schedule_back(task, id);
484+
}
485+
486+
// Step 1 : dispatch external messsages
475487

476488
if (task->external_message) {
477489
dispatch_external_messages(task);
478490
}
479491

480-
// Step 1 : Collect service_done
492+
// Step 2 : Collect service_done
481493
service_id jobs[MAX_WORKER];
482494

483495
int done_job_n = collect_done_job(task, jobs);
484496

485-
// Step 2: Dispatch out message by service_done
497+
// Step 3: Dispatch out message by service_done
486498
dispath_out_messages(task, jobs, done_job_n);
487499

488-
// Step 3: get pending jobs
500+
// Step 4: get pending jobs
489501
int job_n = get_pending_jobs(task, jobs);
490502

491-
// Step 4: Assign queue task
503+
// Step 5: Assign queue task
492504
int free_slot = count_freeslot(task);
493505

494506
assert(free_slot >= job_n);
495507

496-
// Step 5: Assign task to workers
508+
// Step 6: Assign task to workers
497509
int prepare_n = prepare_task(task, jobs, free_slot - job_n, job_n);
498510

499-
// Step 6
511+
// Step 7
500512
assign_prepare_task(task, jobs, prepare_n);
501513

502-
// Step 7
514+
// Step 8
503515
trigger_blocked_workers(task);
504516
}
505517

@@ -529,6 +541,20 @@ release_scheduler(struct worker_thread * worker) {
529541
#endif
530542
}
531543

544+
static int
545+
acquire_scheduler_mainthread(struct ltask *task) {
546+
if (atomic_int_cas(&task->schedule_owner, THREAD_NONE, THREAD_MAINTHREAD)) {
547+
return 0;
548+
}
549+
return 1;
550+
}
551+
552+
static void
553+
release_scheduler_mainthread(struct ltask *task) {
554+
assert(atomic_int_load(&task->schedule_owner) == THREAD_MAINTHREAD);
555+
atomic_int_store(&task->schedule_owner, THREAD_NONE);
556+
}
557+
532558
static service_id
533559
steal_job(struct worker_thread * worker) {
534560
int i;
@@ -1146,6 +1172,18 @@ ltask_log_sender(lua_State *L) {
11461172

11471173
// run service in mainthread
11481174

1175+
static void
1176+
mainthread_service_back(struct ltask *task, service_id id) {
1177+
struct mainthread_session * mt = &task->mt;
1178+
atomic_int_store(&mt->ready, id.id);
1179+
while (atomic_int_load(&mt->ready) == id.id) {
1180+
if (!acquire_scheduler_mainthread(task)) {
1181+
schedule_dispatch(task);
1182+
release_scheduler_mainthread(task);
1183+
}
1184+
}
1185+
}
1186+
11491187
static int
11501188
lmainthread_wait(lua_State *L) {
11511189
struct ltask *task = (struct ltask *)get_ptr(L, "LTASK_GLOBAL");
@@ -1191,11 +1229,11 @@ lmainthread_wait(lua_State *L) {
11911229
if (id.id != SERVICE_ID_ROOT) {
11921230
service_send_signal(P, id);
11931231
}
1194-
schedule_back(task, id);
1232+
mainthread_service_back(task, id);
11951233
return luaL_error(L, "Can't yield in mainthread (status = %d), kill service (%d)", mt->status, id.id);
11961234
}
11971235
}
1198-
schedule_back(task, id);
1236+
mainthread_service_back(task, id);
11991237
mt->status = MAINTHREAD_STATUS_NONE;
12001238
} while (!finish);
12011239
atomic_int_store(&mt->srv, 0);

0 commit comments

Comments
 (0)