Cooperative Concurrency (and Parallelism!) #3031
jjtolton
started this conversation in
Show and tell
Replies: 2 comments
-
The |
Beta Was this translation helpful? Give feedback.
0 replies
-
Full file::- use_module(library(lists)).
:- use_module(library(debug)).
:- use_module(library(time)).
:- use_module(library(format)).
:- use_module(library(charsio)).
:- use_module(library(process)).
:- use_module(library(clpz)).
:- use_module(library(reif)).
:- use_module(library(lambda)).
run_task_q(Q) :-
taskqcall_deque(Q, Nq),
run_task_q(Nq).
run_task_q([]).
taskqcall_deque(Q, Nq) :-
q_deque_last(Q,Q0,T),
call(T,Q0,Nq).
item_q_enque(X, [], [X]).
item_q_enque(X, [Y|Ys], [X,Y|Ys]).
q_deque_last(Xs, Ys, Item) :-
reverse(Xs,[Item|Sy]),
reverse(Sy,Ys).
que_task_enque(Q,T,Nq) :-
item_q_enque(T,Q,Nq).
write_task(Message,Q,Q) :-
write(Message),
nl.
format_task(Format,Message,Q,Q) :-
format(Format,Message).
writem_task([Message|Messages],Q,Nq) :-
que_task_enque(Q,write_task(Message),Nq0),
que_task_enque(Nq0,writem_task(Messages),Nq).
writem_task([],Q,Q).
countable_task(N,Max,Q,Nq) :-
if_(clpz_t(N #>= Max),
Q=Nq,
( que_task_enque(Q,write_task(N),Nq0),
N1#=N+1,
que_task_enque(Nq0,countable_task(N1,Max),Nq)
)
).
wait(S) :-
phrase(format_("sleep ~q", [S]), Sleep),
process_create("bash", ["-c", Sleep], [process(P)]),
process_wait(P, exit(0)).
process_task(proc(Name, Args, Opts), Q, Nq) :-
process_create(Name,Args,Opts),
memberchk(process(Proc), Opts),
que_task_enque(Q,process_task(wait(Proc,Opts)),Nq).
process_task(wait(Proc,Opts),Q,Nq) :-
process_wait(Proc, Status, [timeout(0)]),
if_(Status=exit(0),
( memberchk(stdout(pipe(Stream)), Opts),
get_n_chars(Stream,_,Chars),
que_task_enque(Q,format_task("~s", [Chars]),Nq)
),
( wait(0.01),
que_task_enque(Q,process_task(wait(Proc,Opts)),Nq)
)
).
random_sleep_write_task(Proc) :-
Program="N=3; rand=$(( (RANDOM % N) + 1 )); sleep $rand; echo $rand",
Proc=process_task(proc("bash", ["-c", Program], [stdout(pipe(_Stream)), process(_P)])).
/* - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
?- Nq+\(
que_task_enque([], write_task(hello), Nq0),
que_task_enque(Nq0, write_task(cruel), Nq1),
que_task_enque(Nq1, write_task(world), Nq),
run_task_q(Nq)
).
?- run_task_q([countable_task(0,5), writem_task([a,b,c]), writem_task([x,y,z])]).
?- time(_+\(length(Tasks,16),
maplist(random_sleep_write_task, Tasks),
run_task_q(Tasks)
)
).
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - */
|
Beta Was this translation helpful? Give feedback.
0 replies
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Uh oh!
There was an error while loading. Please reload this page.
Uh oh!
There was an error while loading. Please reload this page.
-
(Tested on
v0.9.4-548-g97a7e37b
)I have been dabbling with ways to work towards making Torbjorn's Prolog Actors a reality in Scryer, and I stumbled across an interesting technique for cooperative concurrency in Prolog.
(Note: this work is somewhat superseded by Part 2)
I am certainly not the first person to figure this out (big shout out to @bakaq for many long conversations on this), but I was actually surprised how elegant it was to implement in Prolog when I finally figured it out!
I could use some help with a few parts, particularly the queueing, if anyone has any ideas.
The bulk of it can be expressed in a few lines:
I could not figure out how to use
library(queue)
(the behavior ofqueue_last/3
was very strange), so I made my own constructs:Here are some example concurrent tasks:
They can be used like so:
Most excitingly, thanks to @Skgland's contribution with
library(process)
, we can combine cooperative concurrency with processes to achieve ad-hoc parallelism in Scryer!I'm sure there are plenty of mistakes in here, but I was excited with the progress and I think there is room to push forward.
For instance, the fact that I have to use
once
is an indication there is a serious problem. If I do not useonce
, the query does not terminate after the first leaf answer.Edit: Oh, interesting. Putting
length/2
aftermaplist/2
to constrainTasks
does not seem to work.That caught me by surprise!
This then works as expected:
Beta Was this translation helpful? Give feedback.
All reactions