Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
88 changes: 68 additions & 20 deletions std/async/process.kk
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ module std/async/process
import std/async/async
import uv/process
import uv/signal
import std/stream
import std/num/int32
import std/num/int64
import std/time/duration
Expand All @@ -12,7 +13,7 @@ import std/time/duration
// TODO: support passing in pipes, and using strings for input/output

// A command specification, which can be used to spawn a process
pub struct command
abstract struct command
exe: string
args: list<string>
input: child-input
Expand All @@ -28,18 +29,20 @@ pub fun command(
pub fun command/show(command: command)
"command("++command.exe.show++", "++command.args.show++")"

pub type child-input
type child-input
Ignore-input
Inherit-input

pub val input/default = Inherit-input
pub val input/inherit: child-input = Inherit-input
pub val input/ignore: child-input = Ignore-input

pub fun command/with-output(cmd: command, output: child-output) cmd(output = output)

type child-output-stream
Ignore-output
Inerhit-output
// Capture-output
Capture-output
// Output-to(pipe)

type child-output
Expand All @@ -50,7 +53,8 @@ type child-output
pub val output/default: child-output = Split(Inerhit-output, Inerhit-output)
pub val output/inherit: child-output = Split(Inerhit-output, Inerhit-output)
pub val output/ignore: child-output = Split(Ignore-output, Ignore-output)
// pub val const/capture-stdout: child-output = Split(Capture-output, Inerhit-output)
pub val const/capture-stdout: child-output = Split(Capture-output, Inerhit-output)
pub val const/capture-outputs: child-output = Split(Capture-output, Capture-output)
// pub val const/capture-merged: child-output = Capture-merged

// fun update/capture-stdout(o: child-output): child-output
Expand All @@ -63,9 +67,12 @@ fun update/ignore-stderr(o: child-output): child-output
// Capture-merged -> Split(Capture-output, Ignore-output)
Split(stdout, _) -> Split(stdout, Ignore-output)

abstract struct output-streams(stdout: maybe<io-stream>, stderr: maybe<io-stream>)

abstract struct process
internal: uv-process
exit-p: promise<exit>
output-streams: output-streams

pub type exit
Exit-code(code: int)
Expand Down Expand Up @@ -93,11 +100,13 @@ pub fun exit/(==)(a: exit, b: exit): div bool
(Exit-signal(a), Exit-signal(b)) -> a == b
_ -> False

fun stdio-stream-of-output(output: child-output-stream, inherit-fd)
fun stdio-stream-of-output(output: child-output-stream, inherit-fd): asyncx (stdio-stream, maybe<io-stream>)
match output
Ignore-output -> Stream-ignore
Inerhit-output -> Stream-fd(inherit-fd)
// Capture-output -> impossible("TODO")
Ignore-output -> (Stream-ignore, Nothing)
Inerhit-output -> (Stream-fd(inherit-fd), Nothing)
Capture-output ->
val (readable, writable) = mkpipe().untry
(Stream-uv(writable), Just(readable))

pub type kill-strategy
Signal-once(signal: int32)
Expand Down Expand Up @@ -145,16 +154,23 @@ fun finalize-process(p, strategy: kill-strategy)
p.kill(strategy)
()

pub fun only/run(
command: command,
// optional args
kill-strategy: kill-strategy = default
): <io,async|e> ()
run(
command,
kill-strategy=kill-strategy,
action=fn(_) ()
)
p.output-streams.stdout.foreach(stream/close)
p.output-streams.stderr.foreach(stream/close)

pub fun run-unit(command: command): <io,async|e> ()
run(command) fn(_) ()

pub fun run-output(cmd: command): <async,io> string
cmd.with-output(capture-stdout).run fn(p) p.output

pub fun run-outputs(cmd: command): <async,io> (string, string)
cmd.with-output(capture-outputs).run fn(p) p.outputs

// TODO: add to core/maybe?
fun foreach(m: maybe<a>, action: a -> e ()): e ()
match m
Just(x) -> action(x)
Nothing -> ()

pub fun run(
command: command,
Expand All @@ -167,7 +183,7 @@ pub fun run(
Ignore-input -> Stream-ignore
Inherit-input -> Stream-fd(0.int32)

val (uv-stdout, uv-stderr) = match command.output
val ((uv-stdout, stdout-stream), (uv-stderr, stderr-stream)) = match command.output
Split(out, err) ->
(
stdio-stream-of-output(out, 1.int32),
Expand All @@ -193,13 +209,18 @@ pub fun run(
stderr = uv-stderr
)
val uv-process = spawn(uv-cmd, on-exit)
val process = Process(uv-process, exit-p)
val process = Process(uv-process, exit-p, Output-streams(stdout-stream, stderr-stream))
with finally
// this finalizer will kill the process,
// but only if it hasn't already ended
// (which the success path will wait for)
finalize-process(process, kill-strategy)

println("TODO: close half of pipe belonging to child...")
// close the half of any pipes that are only used by the child
// close-owned(uv-stdout)
// close-owned(uv-stderr)

val result = action(process)

// if block succeeded, wait for process and check for failure
Expand All @@ -208,8 +229,35 @@ pub fun run(
exit/check-exit(exit)
result

fun close-owned(stream: stdio-stream): asyncx ()
match stream
Stream-uv(s) -> s.close()
_ -> ()

pub fun await(process: process): <io,async> exit
process.exit-p.await

pub fun pid(process: process): <io-noexn> int
process.internal.uv/process/pid.int

fun require(v: maybe<io-stream>, desc: string): exn io-stream
match v
Just(v) -> v
Nothing -> throw("Process has no " ++ desc ++ " stream")

pub fun output-stream(process: process): exn io-stream
process.output-streams.stdout.require("stdout")

pub fun stderr-stream(process: process): exn io-stream
process.output-streams.stderr.require("stderr")

pub fun output(process: process): <asyncx> string
process.output-stream.read-all()

pub fun stderr-output(process: process): <asyncx> string
process.stderr-stream.read-all()

pub fun outputs(process: process): <asyncx> (string, string)
val out = process.output-stream()
val err = process.stderr-stream()
interleaved({ out.read-all() }, { err.read-all() })
13 changes: 13 additions & 0 deletions std/stream.kk
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
module std/stream

import uv/stream
import uv/event-loop
import std/async/async

pub alias io-stream = uv-stream

pub fun read-all(stream): <asyncx> string
"TODO: read-all implementation"

pub fun close(stream: io-stream): asyncx ()
await0(fn(cb) stream.uv-handle.event-loop/close(cb))
10 changes: 8 additions & 2 deletions test/std/async/process-test.kk
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,12 @@ fun expect-process-finished(process)
fun suite()
effectful-test("success")
expect(())
command("true", []).run()
command("true", []).run-unit()
wait(0.2)

effectful-test("failure")
expect-error-message("Command failed (status:1)")
command("false", []).run()
command("false", []).run-unit()
wait(0.2)

effectful-test("allowable failure")
Expand All @@ -51,3 +51,9 @@ fun suite()
throw(errmsg)
expect-process-finished(process.unjust)
wait(0.2)

// effectful-test("capture output")
// expect("echo 1 2 words with many spaces\n") {
// val c = command("echo", ["1", "2", "words with many spaces"], output=capture-stdout)
// c.run(fn(p) p.output)
// }
40 changes: 39 additions & 1 deletion uv/inline/process.c
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ char* kk_string_cstr_alloc(kk_string_t str, kk_context_t* _ctx) {
// TODO: move into utils?
// Helper function to convert Koka list<string> to null-terminated C char** array.
static char** kk_list_string_to_nt_carray_borrow(kk_std_core_types__list list, kk_context_t* _ctx) {
kk_std_core_types__list_dup(list, _ctx);
list = kk_std_core_types__list_dup(list, _ctx);
kk_integer_t klist_len = kk_std_core_list_length(list, _ctx);
int list_len = kk_integer_clamp32(klist_len, _ctx);

Expand Down Expand Up @@ -85,6 +85,10 @@ static uv_stdio_container_t kk_convert_to_stdio_container(
result.flags = UV_INHERIT_FD;
struct kk_uv_process_Stream_fd* stream_fd = kk_uv_process__as_Stream_fd(stream, _ctx);
result.data.fd = stream_fd->fd;
} else if (kk_uv_process_is_stream_uv(stream, _ctx)) {
result.flags = UV_INHERIT_STREAM;
struct kk_uv_process_Stream_uv* stream_uv = kk_uv_process__as_Stream_uv(stream, _ctx);
result.data.stream = kk_owned_handle_to_uv_handle(uv_stream_t, stream_uv->value);
} else {
kk_fatal_error(EINVAL, "unknown stream type\n");
}
Expand Down Expand Up @@ -154,3 +158,37 @@ static kk_std_core_exn__error kk_uv_proc_signal(kk_uv_process__uv_process proces
int status = uv_process_kill(&wrapper->process, kk_signal);
kk_uv_check_return(status, kk_unit_box(kk_Unit));
}

static kk_std_core_exn__error kk_uv_proc_pipe(kk_context_t* _ctx) {
uv_file files[2];
int status = uv_pipe(files, 0, 0);
kk_uv_check_bail(status);

uv_pipe_t readable;
status = uv_pipe_init(uvloop(), &readable, 0);
kk_uv_check_bail(status);

status = uv_pipe_open(&readable, files[0]);
kk_uv_check_bail(status);

uv_pipe_t writable;
status = uv_pipe_init(uvloop(), &writable, 0);
kk_uv_check_bail(status);

status = uv_pipe_open(&writable, files[1]);
kk_uv_check_bail(status);

// everything has succeeded, do the mallocs and return
uv_pipe_t* readable_p = kk_malloc(sizeof(uv_pipe_t), _ctx);
memcpy(readable_p, &readable, sizeof(uv_pipe_t));

uv_pipe_t* writable_p = kk_malloc(sizeof(uv_pipe_t), _ctx);
memcpy(writable_p, &writable, sizeof(uv_pipe_t));

kk_std_core_types__tuple2 result = kk_std_core_types__new_Tuple2(
uv_handle_to_owned_kk_handle_box(readable_p, kk_free_fun, stream, stream),
uv_handle_to_owned_kk_handle_box(writable_p, kk_free_fun, stream, stream),
_ctx);

return kk_std_core_exn__new_Ok(kk_std_core_types__tuple2_box(result, _ctx), _ctx);
}
11 changes: 7 additions & 4 deletions uv/inline/utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -99,14 +99,17 @@ static inline kk_uv_buff_callback_t* kk_new_uv_buff_callback(kk_function_t cb, k
} \
return kk_uv_utils_int_fs_status_code(status, kk_context()); \

// Sometimes the return value is a file descriptor which is why this is a < UV_OK check instead of == UV_OK
#define kk_uv_check_return(err, result) \
// Check the uv status code and return a kk_std_core_exn__error Error if unsuccessful. Continue on success
#define kk_uv_check_bail(err) \
if (err < UV_OK) { \
return kk_async_error_from_errno(err, kk_context()); \
} else { \
return kk_std_core_exn__new_Ok(result, kk_context()); \
}

// Sometimes the return value is a file descriptor which is why this is a < UV_OK check instead of == UV_OK
#define kk_uv_check_return(err, result) \
kk_uv_check_bail(err); \
return kk_std_core_exn__new_Ok(result, kk_context());

// Typically used to clean up when an error occurs
#define kk_uv_check_return_err_drops(err, result, drops) \
if (err < UV_OK) { \
Expand Down
9 changes: 7 additions & 2 deletions uv/process.kk
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
module uv/process

import uv/utils
import uv/stream
import uv/event-loop
import std/num/int32
import std/num/int64

Expand All @@ -26,8 +28,8 @@ pub struct uv-command

pub type stdio-stream
Stream-ignore
// stream-mkpipe // TODO
Stream-fd(fd: int32)
Stream-uv(value: uv-stream)

// pub fun pipe-stdout(pipe): child-output
// Split(Output-to(pipe), Inerhit-output-stream)
Expand All @@ -42,7 +44,7 @@ pub fun spawn(command: uv-command, on-complete: uv-exit-cb): io uv-process
// https://docs.libuv.org/en/v1.x/process.html
// Command line arguments. args[0] should be the path to the program.
val raw-cmd = command(args = Cons(command.file, command.args))
untry(spawn-c(raw-cmd, on-complete))
spawn-c(raw-cmd, on-complete).untry

extern spawn-c(
command: uv-command,
Expand All @@ -58,3 +60,6 @@ pub fun signal(^process: uv-process, signal: int32): exn ()

extern try-signal(^process: uv-process, signal: int32): error<()>
c "kk_uv_proc_signal"

pub extern mkpipe(): error<(uv-stream, uv-stream)>
c "kk_uv_proc_pipe"