Skip to content
This repository was archived by the owner on Jun 17, 2022. It is now read-only.
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions modules/async/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
/target
.gdb_history
vgcore.*
Binary file added modules/async/a.exe
Binary file not shown.
Binary file added modules/async/crashyboy.zip
Binary file not shown.
97 changes: 97 additions & 0 deletions modules/async/src/lib.zz
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
using time;
using buffer;
using err;
using slice;
using <stdio.h>::{printf};
using <string.h>::{memset};
using log;


inline using "os.h" as os;
inline using "os.h"::{
(struct io_uring_cqe) as io_uring_cqe,
};
//using unix;
//using poll;
//using win32;

export enum State {
Invalid,
Later,
Ready,
Error,
Canceled,
}

export struct Future {
State state;
os::async_os_Future os;
Driver mut * driver;
}

export fn future(Future mut new*self, Driver mut * driver)
{
self->driver = driver;
}

export struct Driver+ {
os::async_os_Driver os;
u8 todo[];
}

/// construct a default eventloop driver
export fn system(Driver+maxq mut new * self, err::Err+et mut *e)
where err::checked(*e)
if #(os::ZZ_ASYNC_URING)
{
memset(self, 0, sizeof(Driver));
int r = unsafe<int>(os::io_uring_queue_init(maxq, &self->os.ring, 0));
if r < 0 {
e->fail_with_system_error((0-r), "io_uring_queue_init(%d)", maxq);
return;
}
}
else if #(os::ZZ_ASYNC_WIN32)
{
memset(self, 0, sizeof(Driver));
}

/// wait for futures to complete
///
/// blocks the current thread until either a future completed or a deadline expired
export fn wait(Driver mut * self, err::Err+et mut* e)
where err::checked(*e)
if #(os::ZZ_ASYNC_URING)
{

//TODO call here or in wait?
unsafe { os::io_uring_submit(&self->os.ring); }

io_uring_cqe mut * mut cqe = 0;
int r = unsafe<int>(os::io_uring_wait_cqe(&self->os.ring, &cqe));
if r < 0 {
e->fail_with_system_error((0-r), "io_uring_queue_init");
return;
}

if unsafe<bool>(cqe->res < 0) {
return;
}

Future mut *fi = (Future mut*)os::io_uring_cqe_get_data(cqe);
err::assert_safe(fi);
fi->state = State::Ready;
unsafe { fi->os.cqe = cqe; }

}


export fn take(Future mut *self, err::Err+et mut *e) -> int
if #(os::ZZ_ASYNC_URING)
{
self->state = State::Invalid;
int r = unsafe<int>(self->os.cqe->res);
unsafe { os::io_uring_cqe_seen(&self->driver->os.ring, self->os.cqe); }
return r;
}

103 changes: 103 additions & 0 deletions modules/async/src/main.zz
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
using async;
using time;
using buffer;
using err;
using log;

export fn main() -> int {
new+1000 e = err::make();
new+64 driver = async::system(&e);
e.abort();

new input = async::stdio::stdin(&e);
e.abort();
new output = async::stdio::stdout(&e);
e.abort();

new read = async::future(&driver);
new write = async::future(&driver);

new+1000 mem = buffer::make();
input.read(&e, mem.as_mut_slice(), &read);
e.abort();

for (;;) {
log::info(".");

if read.state == async::State::Ready {
mem.at = (usize)read.take(&e);
e.abort();
static_attest(buffer::integrity(&mem, 1000));

output.write(&e, mem.as_slice(), &write);
e.abort();
}

if write.state == async::State::Ready {
write.take(&e);
e.abort();
input.read(&e, mem.as_mut_slice(), &read);
e.abort();
}

driver.wait(&e);
e.abort();
}


/*

new i = async::stdin(&e, &driver);
e.abort();

fprintf(stderr, "timeout will happen after 1 seconds, then every 3 second.\ninterval must remain uneffected from stdin input\n\n");


new t = async::tick(&driver, time::from_seconds(1));
e.abort();


bool mut is_3_second = false;
let mut last_wakeup = time::tick();

for (;;) {
log::info(".");

string::String+2 mut buf;
buf.clear();

switch i.read_string(&e, &buf) {
async::Result::Ready => {
printf(">%s<\n", buf.mem);
}
async::Result::Later => {}
async::Result::Eof => {
e.abort();
i.close(&e);
e.abort();
return 0;
}
async::Result::Error => {
e.abort();
}
}
e.abort();

if t.expired() {
let now = time::tick();
fprintf(stderr, "tick after %u ms\n", now.to_millis() - last_wakeup.to_millis());
last_wakeup = now;

if !is_3_second {
async::tick(&t, &driver, time::from_seconds(3));
}
}

driver.wait(&e);
e.abort();
}
return 0;
*/

}

60 changes: 60 additions & 0 deletions modules/async/src/os.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
#if defined(__linux__)
#define ZZ_ASYNC_UNIX 1
#define ZZ_ASYNC_URING 1
#elif defined(__APPLE__)
#define ZZ_ASYNC_UNIX 1
#define ZZ_ASYNC_POLL 1
#elif _WIN32
#define ZZ_ASYNC_WIN32 1
#endif


#if ZZ_ASYNC_UNIX
#include <errno.h>
#include <fcntl.h>
#include <stdint.h>
#include <stdio.h>
#include <string.h>
#include <unistd.h>

typedef struct {
int fd;
} async_os_Io;
#endif

#if ZZ_ASYNC_URING
#include <liburing.h>
typedef struct {
struct io_uring ring;
} async_os_Driver;

typedef struct {
struct io_uring_sqe * sqe;
struct io_uring_cqe * cqe;
} async_os_Future;

#endif


#if ZZ_ASYNC_WIN32
#define WIN32_LEAN_AND_MEAN 1
#include <windows.h>
#include <stdint.h>
#include <stdbool.h>
#include <winsock2.h>

typedef struct {
int fd;
} async_os_Io;

typedef struct {
HANDLE completion_port;

bool has_deadline;
uint64_t deadline;
} async_os_Driver;

typedef struct {
} async_os_Future;

#endif
94 changes: 94 additions & 0 deletions modules/async/src/stdio.zz
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
using err;
using async;
using slice;

inline using "os.h" as os;

export struct Stdio
{
os::async_os_Io os;
}

export fn stdin(Stdio mut new * self, err::Err+et mut* e)
where err::checked(*e)
if #(os::ZZ_ASYNC_UNIX)
{
unsafe {
self->os.fd = os::fileno(os::stdin);
}
make_async(unsafe<int>(self->os.fd), e);
}

export fn stdout(Stdio mut new * self, err::Err+et mut* e)
where err::checked(*e)
if #(os::ZZ_ASYNC_UNIX)
{
unsafe {
self->os.fd = os::fileno(os::stdout);
}
make_async(unsafe<int>(self->os.fd), e);
}

pub fn make_async(int fd, err::Err+et mut* e)
where err::checked(*e)
if #(os::defined(os::ZZ_ASYNC_UNIX))
{
int mut flags = unsafe<int>(os::fcntl(fd, os::F_GETFL, 0));
if (flags == -1) {
err::fail_with_errno(e, "F_GETFL");
return;
}
flags = flags | unsafe<int>(os::O_NONBLOCK);

if unsafe<int>(os::fcntl(fd, os::F_SETFL, flags)) != 0 {
err::fail_with_errno(e, "F_SETFL");
}
}

pub fn read(
Stdio mut * self,
err::Err+et mut* e,
slice::MutSlice to,
async::Future new mut *future,
)
where err::checked(*e)
if #(os::defined(os::ZZ_ASYNC_URING))
{
unsafe {
future->os.sqe = os::io_uring_get_sqe(&future->driver->os.ring);
}
if !unsafe<bool>(future->os.sqe) {
e->fail(err::OutOfTail, "maxq");
return;
}

unsafe {
os::io_uring_prep_read(future->os.sqe, self->os.fd, (u8 mut *)to.mem, to.size, *to.at);
os::io_uring_sqe_set_data(future->os.sqe, future);
}

//FIXME how to store and advance to->at ?
}

pub fn write(
Stdio mut * self,
err::Err+et mut* e,
slice::Slice to,
async::Future new mut *future,
)
where err::checked(*e)
if #(os::defined(os::ZZ_ASYNC_URING))
{
unsafe {
future->os.sqe = os::io_uring_get_sqe(&future->driver->os.ring);
}
if !unsafe<bool>(future->os.sqe) {
e->fail(err::OutOfTail, "maxq");
return;
}

unsafe {
os::io_uring_prep_write(future->os.sqe, self->os.fd, (u8 *)to.mem, to.size, 0);
os::io_uring_sqe_set_data(future->os.sqe, future);
}
}
Loading