diff --git a/common/Makefile b/common/Makefile index dec98e099b26..cd68eefd8e09 100644 --- a/common/Makefile +++ b/common/Makefile @@ -62,6 +62,7 @@ COMMON_SRC_NOGEN := \ common/json_parse.c \ common/json_parse_simple.c \ common/json_stream.c \ + common/jsonrpc_io.c \ common/key_derive.c \ common/keyset.c \ common/lease_rates.c \ diff --git a/common/json_parse.c b/common/json_parse.c index 260e9e1b5c89..7bf9bc496c83 100644 --- a/common/json_parse.c +++ b/common/json_parse.c @@ -676,3 +676,13 @@ json_tok_channel_id(const char *buffer, const jsmntok_t *tok, return hex_decode(buffer + tok->start, tok->end - tok->start, cid, sizeof(*cid)); } + +void json_dup_contents(const tal_t *ctx, + const char *buffer, + const jsmntok_t *tok, + const char **new_buffer, + const jsmntok_t **new_toks) +{ + *new_buffer = tal_dup_arr(ctx, char, buffer, tok->end, 0); + *new_toks = tal_dup_arr(ctx, jsmntok_t, tok, json_next(tok) - tok, 0); +} diff --git a/common/json_parse.h b/common/json_parse.h index 4bd23cbddcd7..7e19fdfe4883 100644 --- a/common/json_parse.h +++ b/common/json_parse.h @@ -137,6 +137,13 @@ const char *json_scan(const tal_t *ctx, const char *guide, ...); +/* Duplicate the tok(s) and buffer required (don't assume they're tal objects!) */ +void json_dup_contents(const tal_t *ctx, + const char *buffer, + const jsmntok_t *tok, + const char **new_buffer, + const jsmntok_t **new_toks); + /* eg. JSON_SCAN(json_to_bool, &boolvar) */ #define JSON_SCAN(fmt, var) \ json_scan, \ diff --git a/common/jsonrpc_io.c b/common/jsonrpc_io.c new file mode 100644 index 000000000000..2bb5ac55e7b8 --- /dev/null +++ b/common/jsonrpc_io.c @@ -0,0 +1,163 @@ +#include "config.h" + +#include +#include +#include +#include +#include +#include +#include + +#define READ_CHUNKSIZE 64 + +struct jsonrpc_io { + MEMBUF(char) membuf; + jsmn_parser parser; + jsmntok_t *toks; + + /* Amount of unparsed JSON from previous reads */ + size_t bytes_unparsed; + /* Amount just read by io_read_partial */ + size_t bytes_read; +}; + +struct jsonrpc_io *jsonrpc_io_new(const tal_t *ctx) +{ + struct jsonrpc_io *json_in; + + json_in = tal(ctx, struct jsonrpc_io); + json_in->bytes_unparsed = 0; + json_in->bytes_read = 0; + + membuf_init(&json_in->membuf, + tal_arr(json_in, char, READ_CHUNKSIZE), + READ_CHUNKSIZE, membuf_tal_resize); + json_in->toks = toks_alloc(json_in); + jsmn_init(&json_in->parser); + + return json_in; +} + +/* Empty new bytes read into our unparsed buffer */ +static void add_newly_read(struct jsonrpc_io *json_in) +{ + /* Now added it to our ubparsed buffer */ + membuf_added(&json_in->membuf, json_in->bytes_read); + json_in->bytes_unparsed += json_in->bytes_read; + json_in->bytes_read = 0; +} + +const char *jsonrpc_newly_read(struct jsonrpc_io *json_in, + size_t *len) +{ + const char *ret; + + ret = membuf_elems(&json_in->membuf) + json_in->bytes_unparsed; + *len = json_in->bytes_read; + + add_newly_read(json_in); + return ret; +} + +const char *jsonrpc_io_parse(const tal_t *ctx, + struct jsonrpc_io *json_in, + const jsmntok_t **toks, + const char **buf) +{ + bool complete; + + /* If we're read any more, add that */ + add_newly_read(json_in); + *toks = NULL; + *buf = NULL; + + /* Our JSON parser is pretty good at incremental parsing, but + * `getrawblock` gives a giant 2MB token, which forces it to re-parse + * every time until we have all of it. However, we can't complete a + * JSON object without a '}', so we do a cheaper check here. + */ + if (!memchr(membuf_elems(&json_in->membuf), '}', + membuf_num_elems(&json_in->membuf))) + return NULL; + + if (!json_parse_input(&json_in->parser, &json_in->toks, + membuf_elems(&json_in->membuf), + membuf_num_elems(&json_in->membuf), + &complete)) { + return tal_fmt(ctx, "Failed to parse RPC JSON response '%.*s'", + (int)membuf_num_elems(&json_in->membuf), + membuf_elems(&json_in->membuf)); + } + + if (!complete) + return NULL; + + /* Must have jsonrpc to be valid! */ + if (!json_get_member(membuf_elems(&json_in->membuf), + json_in->toks, + "jsonrpc")) { + return tal_fmt(ctx, + "JSON-RPC message does not contain \"jsonrpc\" field: '%.*s'", + (int)membuf_num_elems(&json_in->membuf), + membuf_elems(&json_in->membuf)); + } + + *toks = json_in->toks; + *buf = membuf_elems(&json_in->membuf); + return NULL; +} + +void jsonrpc_io_parse_done(struct jsonrpc_io *json_in) +{ + size_t bytes_parsed = json_in->toks[0].end; + json_in->bytes_unparsed -= bytes_parsed; + membuf_consume(&json_in->membuf, bytes_parsed); + + jsmn_init(&json_in->parser); + toks_reset(json_in->toks); +} + +struct io_plan *jsonrpc_io_read_(struct io_conn *conn, + struct jsonrpc_io *json_in, + struct io_plan *(*next)(struct io_conn *, + void *), + void *arg) +{ + /* Make sure there's more room */ + membuf_prepare_space(&json_in->membuf, READ_CHUNKSIZE); + + /* Try to read more. */ + json_in->bytes_read = 0; + return io_read_partial(conn, + membuf_elems(&json_in->membuf) + + json_in->bytes_unparsed, + membuf_num_elems(&json_in->membuf) + - json_in->bytes_unparsed + + membuf_num_space(&json_in->membuf), + &json_in->bytes_read, + next, arg); +} + +bool jsonrpc_sync_read(struct jsonrpc_io *json_in, int infd) +{ + int r; + + /* Make sure there's more room */ + membuf_prepare_space(&json_in->membuf, READ_CHUNKSIZE); + + /* Try to read more. */ + r = read(infd, + membuf_elems(&json_in->membuf) + + json_in->bytes_unparsed, + membuf_num_elems(&json_in->membuf) + - json_in->bytes_unparsed + + membuf_num_space(&json_in->membuf)); + if (r < 0) + return false; + if (r == 0) { + errno = 0; + return false; + } + json_in->bytes_read = r; + return true; +} diff --git a/common/jsonrpc_io.h b/common/jsonrpc_io.h new file mode 100644 index 000000000000..fd0babb9d375 --- /dev/null +++ b/common/jsonrpc_io.h @@ -0,0 +1,80 @@ +/* Low-level helper library for C plugins using ccan/io and jsonrpc socket. */ +#ifndef LIGHTNING_COMMON_JSONRPC_IO_H +#define LIGHTNING_COMMON_JSONRPC_IO_H +#include "config.h" +#include +#include +#include + +struct io_conn; +struct plugin; + +/** + * jsonrpc_io_new: allocate a fresh jsonrpc_io + */ +struct jsonrpc_io *jsonrpc_io_new(const tal_t *ctx); + + +/** + * jsonrpc_io_read: set io_plan for reading more into buffer. + * @conn: the io_conn to read. + * @json_in: the jsonrpc_io. + * @next: the callback once a read is done. + * @arg: the argument for @next (typesafe). + */ +struct io_plan *jsonrpc_io_read_(struct io_conn *conn, + struct jsonrpc_io *json_in, + struct io_plan *(*next)(struct io_conn *, + void *), + void *arg); +#define jsonrpc_io_read(ctx, json_in, next, arg) \ + jsonrpc_io_read_((ctx), (json_in), \ + typesafe_cb_preargs(struct io_plan *, void *, \ + (next), (arg), \ + struct io_conn *), \ + (arg)) + +/** + * jsonrpc_newly_read: how much did we read into the buffer? + * + * Returns the buffer and sets *len to the bytes just read. After + * that it will return *len == 0. + */ +const char *jsonrpc_newly_read(struct jsonrpc_io *json_in, + size_t *len); + +/** + * jsonrpc_sync_read: read from fd into buffer. + * @json_in: buffer to read into. + * @infd: file descriptort to read. + * + * Returns false on error or EOF; for EOF errno will be 0. + */ +bool jsonrpc_sync_read(struct jsonrpc_io *json_in, int infd); + +/** + * jsonrpc_io_parse: try to parse more of the buffer. + * @ctx: context to allocate error message off. + * @json_in: json_in after jsonrpc_io_read. + * @toks: returned non-NULL if there's a whole valid json object. + * @buf: returned non-NULL as above. + * + * On error, a message is returned. On incomplete, *@toks and *@buf + * are NULL. Usually you call this, the use the result and call + * jsonrpc_io_parse_done(), then call it again. + */ +const char *jsonrpc_io_parse(const tal_t *ctx, + struct jsonrpc_io *json_in, + const jsmntok_t **toks, + const char **buf); + +/** + * jsonrpc_io_parse_done: call aftr using toks from jsonrpc_io_parse. + * @json_in: json_in after jsonrpc_io_parse. + * + * You must call this if jsonrpc_io_parse() sets *toks non-NULL + * (i.e. complete, and no error). + */ +void jsonrpc_io_parse_done(struct jsonrpc_io *json_in); + +#endif /* LIGHTNING_COMMON_JSONRPC_IO_H */ diff --git a/common/msg_queue.c b/common/msg_queue.c index ba702f373053..b63a60be0231 100644 --- a/common/msg_queue.c +++ b/common/msg_queue.c @@ -36,21 +36,11 @@ static void destroy_msg_queue(struct msg_queue *q) } } -/* Realloc helper for tal membufs */ -static void *membuf_tal_realloc(struct membuf *mb, void *rawelems, - size_t newsize) -{ - char *p = rawelems; - - tal_resize(&p, newsize); - return p; -} - struct msg_queue *msg_queue_new(const tal_t *ctx, bool fd_passing) { struct msg_queue *q = tal(ctx, struct msg_queue); q->fd_passing = fd_passing; - membuf_init(&q->mb, tal_arr(q, const u8 *, 0), 0, membuf_tal_realloc); + membuf_init(&q->mb, tal_arr(q, const u8 *, 0), 0, membuf_tal_resize); if (q->fd_passing) tal_add_destructor(q, destroy_msg_queue); diff --git a/common/test/Makefile b/common/test/Makefile index db0fbcaab70e..6352beaa39c6 100644 --- a/common/test/Makefile +++ b/common/test/Makefile @@ -130,4 +130,6 @@ common/test/run-shutdown_scriptpubkey: wire/towire.o wire/fromwire.o common/test/run-wireaddr: wire/towire.o wire/fromwire.o +common/test/run-jsonrpc_io: common/json_parse_simple.o + check-units: $(COMMON_TEST_PROGRAMS:%=unittest/%) diff --git a/common/test/run-jsonrpc_io.c b/common/test/run-jsonrpc_io.c new file mode 100644 index 000000000000..4d5e604bb1f7 --- /dev/null +++ b/common/test/run-jsonrpc_io.c @@ -0,0 +1,317 @@ +/* Body of tests written by ChatGPT 5 */ +#include "config.h" +#include +#include +#include +#include +#include +#include +#include + +#undef io_read_partial +#define io_read_partial io_read_partial_test + +struct jsonrpc_io; + +static struct io_plan *io_read_partial_test(struct io_conn *conn, + void *data, + size_t maxlen, + size_t *lenp, + struct io_plan *(*next)(struct io_conn *, + void *), + void *arg); + + +#include "../jsonrpc_io.c" + +/* AUTOGENERATED MOCKS START */ +/* Generated stub for amount_asset_is_main */ +bool amount_asset_is_main(struct amount_asset *asset UNNEEDED) +{ fprintf(stderr, "amount_asset_is_main called!\n"); abort(); } +/* Generated stub for amount_asset_to_sat */ +struct amount_sat amount_asset_to_sat(struct amount_asset *asset UNNEEDED) +{ fprintf(stderr, "amount_asset_to_sat called!\n"); abort(); } +/* Generated stub for amount_feerate */ + bool amount_feerate(u32 *feerate UNNEEDED, struct amount_sat fee UNNEEDED, size_t weight UNNEEDED) +{ fprintf(stderr, "amount_feerate called!\n"); abort(); } +/* Generated stub for amount_sat */ +struct amount_sat amount_sat(u64 satoshis UNNEEDED) +{ fprintf(stderr, "amount_sat called!\n"); abort(); } +/* Generated stub for amount_sat_add */ + bool amount_sat_add(struct amount_sat *val UNNEEDED, + struct amount_sat a UNNEEDED, + struct amount_sat b UNNEEDED) +{ fprintf(stderr, "amount_sat_add called!\n"); abort(); } +/* Generated stub for amount_sat_eq */ +bool amount_sat_eq(struct amount_sat a UNNEEDED, struct amount_sat b UNNEEDED) +{ fprintf(stderr, "amount_sat_eq called!\n"); abort(); } +/* Generated stub for amount_sat_greater_eq */ +bool amount_sat_greater_eq(struct amount_sat a UNNEEDED, struct amount_sat b UNNEEDED) +{ fprintf(stderr, "amount_sat_greater_eq called!\n"); abort(); } +/* Generated stub for amount_sat_sub */ + bool amount_sat_sub(struct amount_sat *val UNNEEDED, + struct amount_sat a UNNEEDED, + struct amount_sat b UNNEEDED) +{ fprintf(stderr, "amount_sat_sub called!\n"); abort(); } +/* Generated stub for amount_sat_to_asset */ +struct amount_asset amount_sat_to_asset(struct amount_sat *sat UNNEEDED, const u8 *asset UNNEEDED) +{ fprintf(stderr, "amount_sat_to_asset called!\n"); abort(); } +/* Generated stub for amount_tx_fee */ +struct amount_sat amount_tx_fee(u32 fee_per_kw UNNEEDED, size_t weight UNNEEDED) +{ fprintf(stderr, "amount_tx_fee called!\n"); abort(); } +/* Generated stub for fromwire */ +const u8 *fromwire(const u8 **cursor UNNEEDED, size_t *max UNNEEDED, void *copy UNNEEDED, size_t n UNNEEDED) +{ fprintf(stderr, "fromwire called!\n"); abort(); } +/* Generated stub for fromwire_bool */ +bool fromwire_bool(const u8 **cursor UNNEEDED, size_t *max UNNEEDED) +{ fprintf(stderr, "fromwire_bool called!\n"); abort(); } +/* Generated stub for fromwire_fail */ +void *fromwire_fail(const u8 **cursor UNNEEDED, size_t *max UNNEEDED) +{ fprintf(stderr, "fromwire_fail called!\n"); abort(); } +/* Generated stub for fromwire_secp256k1_ecdsa_signature */ +void fromwire_secp256k1_ecdsa_signature(const u8 **cursor UNNEEDED, size_t *max UNNEEDED, + secp256k1_ecdsa_signature *signature UNNEEDED) +{ fprintf(stderr, "fromwire_secp256k1_ecdsa_signature called!\n"); abort(); } +/* Generated stub for fromwire_sha256 */ +void fromwire_sha256(const u8 **cursor UNNEEDED, size_t *max UNNEEDED, struct sha256 *sha256 UNNEEDED) +{ fprintf(stderr, "fromwire_sha256 called!\n"); abort(); } +/* Generated stub for fromwire_tal_arrn */ +u8 *fromwire_tal_arrn(const tal_t *ctx UNNEEDED, + const u8 **cursor UNNEEDED, size_t *max UNNEEDED, size_t num UNNEEDED) +{ fprintf(stderr, "fromwire_tal_arrn called!\n"); abort(); } +/* Generated stub for fromwire_u32 */ +u32 fromwire_u32(const u8 **cursor UNNEEDED, size_t *max UNNEEDED) +{ fprintf(stderr, "fromwire_u32 called!\n"); abort(); } +/* Generated stub for fromwire_u64 */ +u64 fromwire_u64(const u8 **cursor UNNEEDED, size_t *max UNNEEDED) +{ fprintf(stderr, "fromwire_u64 called!\n"); abort(); } +/* Generated stub for fromwire_u8 */ +u8 fromwire_u8(const u8 **cursor UNNEEDED, size_t *max UNNEEDED) +{ fprintf(stderr, "fromwire_u8 called!\n"); abort(); } +/* Generated stub for fromwire_u8_array */ +void fromwire_u8_array(const u8 **cursor UNNEEDED, size_t *max UNNEEDED, u8 *arr UNNEEDED, size_t num UNNEEDED) +{ fprintf(stderr, "fromwire_u8_array called!\n"); abort(); } +/* Generated stub for towire */ +void towire(u8 **pptr UNNEEDED, const void *data UNNEEDED, size_t len UNNEEDED) +{ fprintf(stderr, "towire called!\n"); abort(); } +/* Generated stub for towire_bool */ +void towire_bool(u8 **pptr UNNEEDED, bool v UNNEEDED) +{ fprintf(stderr, "towire_bool called!\n"); abort(); } +/* Generated stub for towire_secp256k1_ecdsa_signature */ +void towire_secp256k1_ecdsa_signature(u8 **pptr UNNEEDED, + const secp256k1_ecdsa_signature *signature UNNEEDED) +{ fprintf(stderr, "towire_secp256k1_ecdsa_signature called!\n"); abort(); } +/* Generated stub for towire_sha256 */ +void towire_sha256(u8 **pptr UNNEEDED, const struct sha256 *sha256 UNNEEDED) +{ fprintf(stderr, "towire_sha256 called!\n"); abort(); } +/* Generated stub for towire_u32 */ +void towire_u32(u8 **pptr UNNEEDED, u32 v UNNEEDED) +{ fprintf(stderr, "towire_u32 called!\n"); abort(); } +/* Generated stub for towire_u64 */ +void towire_u64(u8 **pptr UNNEEDED, u64 v UNNEEDED) +{ fprintf(stderr, "towire_u64 called!\n"); abort(); } +/* Generated stub for towire_u8 */ +void towire_u8(u8 **pptr UNNEEDED, u8 v UNNEEDED) +{ fprintf(stderr, "towire_u8 called!\n"); abort(); } +/* Generated stub for towire_u8_array */ +void towire_u8_array(u8 **pptr UNNEEDED, const u8 *arr UNNEEDED, size_t num UNNEEDED) +{ fprintf(stderr, "towire_u8_array called!\n"); abort(); } +/* AUTOGENERATED MOCKS END */ + +struct test_feed { + const char *data; + size_t len, off; + size_t max_chunk; /* 0 => no artificial limit */ + unsigned calls_to_io_read; +}; +static struct test_feed FEED; + +static void feed_set(const char *s, size_t max_chunk) +{ + FEED.data = s; + FEED.len = strlen(s); + FEED.off = 0; + FEED.max_chunk = max_chunk; + FEED.calls_to_io_read = 0; +} + +static size_t feed_next_chunk(size_t want) +{ + size_t remain = FEED.len - FEED.off; + size_t cap = (FEED.max_chunk && FEED.max_chunk < want) ? FEED.max_chunk : want; + return (remain < cap) ? remain : cap; +} + +static struct io_plan *io_read_partial_test(struct io_conn *conn, + void *data, + size_t maxlen, + size_t *lenp, + struct io_plan *(*next)(struct io_conn *, void *), + void *arg) +{ + char *out = (char *)data; + size_t n = feed_next_chunk(maxlen); + + FEED.calls_to_io_read++; + if (n) { + memcpy(out, FEED.data + FEED.off, n); + FEED.off += n; + } + *lenp = n; + + /* No more input -> end the chain */ + if (n == 0) + return NULL; + + return next(conn, arg); +} + +/* ---------- minimal “handler” to count parsed messages ---------- */ + +struct handler_ctx { + unsigned called; + char last_buf[512]; + size_t last_len; +}; + +static void record_message(const char *buf, const jsmntok_t *toks, struct handler_ctx *hc) +{ + size_t obj_len = (size_t)(toks[0].end - toks[0].start); + if (obj_len > sizeof(hc->last_buf)) obj_len = sizeof(hc->last_buf); + memcpy(hc->last_buf, buf + toks[0].start, obj_len); + hc->last_len = obj_len; + hc->called++; +} + +/* ---------- pump that drives read -> parse -> (maybe) read again ---------- */ + +struct pump_ctx { + struct jsonrpc_io *jin; + struct handler_ctx *hc; +}; + +static struct io_plan *pump_next(struct io_conn *conn, struct pump_ctx *pc) +{ + for (;;) { + const jsmntok_t *toks; + const char *buf; + const char *err = jsonrpc_io_parse(tmpctx, pc->jin, &toks, &buf); + + assert(!err); + if (!toks) { + /* Need more bytes */ + return jsonrpc_io_read(conn, pc->jin, pump_next, pc); + } + + /* Got a full JSON-RPC message */ + record_message(buf, toks, pc->hc); + jsonrpc_io_parse_done(pc->jin); + + /* Loop to consume any additional buffered messages + * without asking for more input yet. */ + } +} + +/* ---------- helpers ---------- */ + +static struct jsonrpc_io *mk_reader(const tal_t *ctx) +{ + return jsonrpc_io_new(ctx); +} + +static void run_once(struct jsonrpc_io *jin, struct handler_ctx *hc) +{ + struct pump_ctx pc = { .jin = jin, .hc = hc }; + jsonrpc_io_read(NULL, jin, pump_next, &pc); +} + +/* ---------- tests ---------- */ + +static size_t test_single_message_chunked(size_t chunksize) +{ + struct handler_ctx hc = {0}; + const char *msg = "{\"jsonrpc\":\"2.0\",\"id\":1,\"result\":true}\n"; + struct jsonrpc_io *jin = mk_reader(NULL); + + feed_set(msg, chunksize); + run_once(jin, &hc); + + assert(hc.called == 1); + assert(FEED.off == FEED.len); + assert(hc.last_len > 0 && hc.last_buf[0] == '{' && hc.last_buf[hc.last_len-1] == '}'); + + tal_free(jin); + return strlen(msg); +} + +static size_t test_two_messages_back_to_back(size_t chunksize) +{ + struct handler_ctx hc = {0}; + const char *msgs = + "{\"jsonrpc\":\"2.0\",\"id\":1,\"method\":\"ping\"}\n" + "{\"jsonrpc\":\"2.0\",\"id\":2,\"result\":42}\n"; + struct jsonrpc_io *jin = mk_reader(NULL); + + feed_set(msgs, chunksize); + run_once(jin, &hc); + + assert(hc.called == 2); + assert(FEED.off == FEED.len); + + tal_free(jin); + return strlen(msgs); +} + +static size_t test_whitespace_only(size_t chunksize) +{ + struct handler_ctx hc = {0}; + const char *ws = " \t \n \r\n "; + struct jsonrpc_io *jin = mk_reader(NULL); + + feed_set(ws, chunksize); + run_once(jin, &hc); + + assert(hc.called == 0); + assert(FEED.off == FEED.len); + + tal_free(jin); + return strlen(ws); +} + +static size_t test_message_then_whitespace_then_message(size_t chunksize) +{ + struct handler_ctx hc = {0}; + const char *msgs = + "{\"jsonrpc\":\"2.0\",\"id\":7,\"result\":true}\n" + " \n \t" + "{\"jsonrpc\":\"2.0\",\"id\":8,\"result\":false}\n"; + struct jsonrpc_io *jin = mk_reader(NULL); + + feed_set(msgs, chunksize); + run_once(jin, &hc); + + assert(hc.called == 2); + assert(FEED.off == FEED.len); + + tal_free(jin); + return strlen(msgs); +} + +/* ---------- main ---------- */ + +int main(int argc, char *argv[]) +{ + size_t max = 1; + + common_setup(argv[0]); + + for (size_t i = 0; i < max + 10; i++) { + max = max_u64(max, test_single_message_chunked(i)); + max = max_u64(max, test_two_messages_back_to_back(i)); + max = max_u64(max, test_whitespace_only(i)); + max = max_u64(max, test_message_then_whitespace_then_message(i)); + } + + common_shutdown(); + return 0; +} diff --git a/common/utils.c b/common/utils.c index f51094f41eeb..b383f63c3df4 100644 --- a/common/utils.c +++ b/common/utils.c @@ -191,3 +191,13 @@ char *str_lowering(const void *ctx, const char *string TAKES) for (char *p = ret; *p; p++) *p = tolower(*p); return ret; } + +/* Realloc helper for tal membufs */ +void *membuf_tal_resize(struct membuf *mb, void *rawelems, size_t newsize) +{ + char *p = rawelems; + + tal_resize(&p, newsize); + return p; +} + diff --git a/common/utils.h b/common/utils.h index 26992fdb06b0..a4041320cadb 100644 --- a/common/utils.h +++ b/common/utils.h @@ -9,8 +9,8 @@ #include #include +struct membuf; extern secp256k1_context *secp256k1_ctx; - extern const struct chainparams *chainparams; /* Unsigned min/max macros: BUILD_ASSERT make sure types are unsigned */ @@ -164,6 +164,9 @@ extern const tal_t *wally_tal_ctx; * Returns created temporary path name at *created if successful. */ int tmpdir_mkstemp(const tal_t *ctx, const char *template TAKES, char **created); +/* For use with membuf_init */ +void *membuf_tal_resize(struct membuf *mb, void *rawelems, size_t newsize); + /** * tal_strlowering - return the same string by in lower case. * @ctx: the context to tal from (often NULL) diff --git a/lightningd/Makefile b/lightningd/Makefile index fbf1d10f47a1..4ca2ddcc4d33 100644 --- a/lightningd/Makefile +++ b/lightningd/Makefile @@ -125,6 +125,7 @@ LIGHTNINGD_COMMON_OBJS := \ common/json_parse.o \ common/json_parse_simple.o \ common/json_stream.o \ + common/jsonrpc_io.o \ common/lease_rates.o \ common/memleak.o \ common/msg_queue.o \ diff --git a/lightningd/jsonrpc.c b/lightningd/jsonrpc.c index 7370ad73ac5b..88ac751ea7e0 100644 --- a/lightningd/jsonrpc.c +++ b/lightningd/jsonrpc.c @@ -28,6 +28,7 @@ #include #include #include +#include #include #include #include @@ -85,18 +86,8 @@ struct json_connection { /* Logging for this json connection. */ struct logger *log; - /* The buffer (required to interpret tokens). */ - char *buffer; - - /* Internal state: */ - /* How much is already filled. */ - size_t used; - /* How much has just been filled. */ - size_t len_read; - - /* JSON parsing state. */ - jsmn_parser input_parser; - jsmntok_t *input_toks; + /* The buffer and state reading in the JSON commands */ + struct jsonrpc_io *json_in; /* Local deprecated support? */ bool deprecated_ok; @@ -998,8 +989,8 @@ rpc_command_hook_callback(struct rpc_command_hook_payload *p, if (tok) { /* We need to make copies here, as buffer and tokens * can be reused. */ - p->custom_replace = json_tok_copy(p, tok); - p->custom_buffer = tal_dup_talarr(p, char, buffer); + json_dup_contents(p, buffer, tok, + &p->custom_buffer, &p->custom_replace); return true; } @@ -1048,7 +1039,9 @@ REGISTER_PLUGIN_HOOK(rpc_command, /* We return struct command_result so command_fail return value has a natural * sink; we don't actually use the result. */ static struct command_result * -parse_request(struct json_connection *jcon, const jsmntok_t tok[]) +parse_request(struct json_connection *jcon, + const char *buffer, + const jsmntok_t tok[]) { const jsmntok_t *method, *id, *params, *filter, *jsonrpc; struct command *c; @@ -1061,10 +1054,10 @@ parse_request(struct json_connection *jcon, const jsmntok_t tok[]) return NULL; } - method = json_get_member(jcon->buffer, tok, "method"); - params = json_get_member(jcon->buffer, tok, "params"); - filter = json_get_member(jcon->buffer, tok, "filter"); - id = json_get_member(jcon->buffer, tok, "id"); + method = json_get_member(buffer, tok, "method"); + params = json_get_member(buffer, tok, "params"); + filter = json_get_member(buffer, tok, "filter"); + id = json_get_member(buffer, tok, "id"); if (!id) { json_command_malformed(jcon, "null", "No id"); @@ -1077,8 +1070,8 @@ parse_request(struct json_connection *jcon, const jsmntok_t tok[]) return NULL; } - jsonrpc = json_get_member(jcon->buffer, tok, "jsonrpc"); - if (!jsonrpc || jsonrpc->type != JSMN_STRING || !json_tok_streq(jcon->buffer, jsonrpc, "2.0")) { + jsonrpc = json_get_member(buffer, tok, "jsonrpc"); + if (!jsonrpc || jsonrpc->type != JSMN_STRING || !json_tok_streq(buffer, jsonrpc, "2.0")) { json_command_malformed(jcon, "null", "jsonrpc: \"2.0\" must be specified in the request"); return NULL; } @@ -1094,7 +1087,7 @@ parse_request(struct json_connection *jcon, const jsmntok_t tok[]) c->id_is_string = (id->type == JSMN_STRING); /* Include "" around string */ c->id = tal_strndup(c, - json_tok_full(jcon->buffer, id), + json_tok_full(buffer, id), json_tok_full_len(id)); c->mode = CMD_NORMAL; c->filter = NULL; @@ -1113,7 +1106,7 @@ parse_request(struct json_connection *jcon, const jsmntok_t tok[]) if (filter) { struct command_result *ret; - ret = parse_filter(c, "filter", jcon->buffer, filter); + ret = parse_filter(c, "filter", buffer, filter); if (ret) return ret; } @@ -1122,11 +1115,11 @@ parse_request(struct json_connection *jcon, const jsmntok_t tok[]) * actually just logging the id */ log_io(jcon->log, LOG_IO_IN, NULL, c->id, NULL, 0); - c->json_cmd = find_cmd(jcon->ld->jsonrpc, jcon->buffer, method); + c->json_cmd = find_cmd(jcon->ld->jsonrpc, buffer, method); if (!c->json_cmd) { return command_fail( c, JSONRPC2_METHOD_NOT_FOUND, "Unknown command '%.*s'", - method->end - method->start, jcon->buffer + method->start); + method->end - method->start, buffer + method->start); } if (!command_deprecated_in_ok(c, NULL, c->json_cmd->depr_start, @@ -1134,20 +1127,21 @@ parse_request(struct json_connection *jcon, const jsmntok_t tok[]) return command_fail(c, JSONRPC2_METHOD_NOT_FOUND, "Command %.*s is deprecated", json_tok_full_len(method), - json_tok_full(jcon->buffer, method)); + json_tok_full(buffer, method)); } if (c->json_cmd->dev_only && !jcon->ld->developer) { return command_fail(c, JSONRPC2_METHOD_NOT_FOUND, "Command %.*s is developer-only", json_tok_full_len(method), - json_tok_full(jcon->buffer, method)); + json_tok_full(buffer, method)); } rpc_hook = tal(c, struct rpc_command_hook_payload); rpc_hook->cmd = c; /* Duplicate since we might outlive the connection */ - rpc_hook->buffer = tal_dup_talarr(rpc_hook, char, jcon->buffer); - rpc_hook->request = tal_dup_talarr(rpc_hook, jsmntok_t, tok); + json_dup_contents(rpc_hook, buffer, tok, + &rpc_hook->buffer, + &rpc_hook->request); /* NULL the custom_ values for the hooks */ rpc_hook->custom_result = NULL; @@ -1214,93 +1208,61 @@ static struct io_plan *stream_out_complete(struct io_conn *conn, static struct io_plan *read_json(struct io_conn *conn, struct json_connection *jcon) { - bool complete; bool in_transaction = false; struct timemono start_time = time_mono(); + size_t len_read; + const jsmntok_t *toks; + const char *buffer, *error; - if (jcon->len_read) - log_io(jcon->log, LOG_IO_IN, NULL, "", - jcon->buffer + jcon->used, jcon->len_read); - - /* Resize larger if we're full. */ - jcon->used += jcon->len_read; - if (jcon->used == tal_count(jcon->buffer)) - tal_resize(&jcon->buffer, jcon->used * 2); + buffer = jsonrpc_newly_read(jcon->json_in, &len_read); + if (len_read) + log_io(jcon->log, LOG_IO_IN, NULL, "", buffer, len_read); /* We wait for pending output to be consumed, to avoid DoS */ if (tal_count(jcon->js_arr) != 0) { - jcon->len_read = 0; return io_wait(conn, conn, read_json, jcon); } again: - if (!json_parse_input(&jcon->input_parser, &jcon->input_toks, - jcon->buffer, jcon->used, - &complete)) { - json_command_malformed( - jcon, "null", - tal_fmt(tmpctx, "Invalid token in json input: '%s'", - tal_hexstr(tmpctx, jcon->buffer, jcon->used))); + error = jsonrpc_io_parse(tmpctx, jcon->json_in, &toks, &buffer); + if (error) { + json_command_malformed(jcon, "null", error); if (in_transaction) db_commit_transaction(jcon->ld->wallet->db); return io_halfclose(conn); } - if (!complete) + if (!toks) goto read_more; - /* Empty buffer? (eg. just whitespace). */ - if (tal_count(jcon->input_toks) == 1) { - jcon->used = 0; - - /* Reset parser. */ - jsmn_init(&jcon->input_parser); - toks_reset(jcon->input_toks); - goto read_more; - } - if (!in_transaction) { db_begin_transaction(jcon->ld->wallet->db); in_transaction = true; } - parse_request(jcon, jcon->input_toks); - - /* Remove first {}. */ - memmove(jcon->buffer, jcon->buffer + jcon->input_toks[0].end, - tal_count(jcon->buffer) - jcon->input_toks[0].end); - jcon->used -= jcon->input_toks[0].end; - - /* Reset parser. */ - jsmn_init(&jcon->input_parser); - toks_reset(jcon->input_toks); + parse_request(jcon, buffer, toks); + jsonrpc_io_parse_done(jcon->json_in); - /* Do we have more already read? */ - if (jcon->used) { - if (!jcon->db_batching) { + if (!jcon->db_batching) { + db_commit_transaction(jcon->ld->wallet->db); + in_transaction = false; + } else { + /* FIXME: io_always() should interleave with + * real IO, and then we should rotate order we + * service fds in, to avoid starvation. */ + if (time_greater(timemono_between(time_mono(), + start_time), + time_from_msec(250))) { db_commit_transaction(jcon->ld->wallet->db); - in_transaction = false; - } else { - /* FIXME: io_always() should interleave with - * real IO, and then we should rotate order we - * service fds in, to avoid starvation. */ - if (time_greater(timemono_between(time_mono(), - start_time), - time_from_msec(250))) { - db_commit_transaction(jcon->ld->wallet->db); - /* Call us back, as if we read nothing new */ - jcon->len_read = 0; - return io_always(conn, read_json, jcon); - } + /* Call us back, as if we read nothing new */ + return io_always(conn, read_json, jcon); } - goto again; } + goto again; read_more: if (in_transaction) db_commit_transaction(jcon->ld->wallet->db); - return io_read_partial(conn, jcon->buffer + jcon->used, - tal_count(jcon->buffer) - jcon->used, - &jcon->len_read, read_json, jcon); + return jsonrpc_io_read(conn, jcon->json_in, read_json, jcon); } static struct io_plan *jcon_connected(struct io_conn *conn, @@ -1312,12 +1274,8 @@ static struct io_plan *jcon_connected(struct io_conn *conn, jcon = notleak(tal(conn, struct json_connection)); jcon->conn = conn; jcon->ld = ld; - jcon->used = 0; - jcon->buffer = tal_arr(jcon, char, 64); jcon->js_arr = tal_arr(jcon, struct json_stream *, 0); - jcon->len_read = 0; - jsmn_init(&jcon->input_parser); - jcon->input_toks = toks_alloc(jcon); + jcon->json_in = jsonrpc_io_new(jcon); jcon->notifications_enabled = false; jcon->db_batching = false; jcon->deprecated_ok = ld->deprecated_ok; diff --git a/lightningd/plugin.c b/lightningd/plugin.c index 5d031e73af9a..289ff25530c2 100644 --- a/lightningd/plugin.c +++ b/lightningd/plugin.c @@ -17,6 +17,7 @@ #include #include #include +#include #include #include #include @@ -226,36 +227,36 @@ static bool request_add(const char *reqid, struct jsonrpc_request *req, } /* FIXME: reorder */ -static const char *plugin_read_json_one(struct plugin *plugin, - bool want_transaction, - bool *complete, - bool *destroyed); +static void plugin_response_handle(struct plugin *plugin, + const char *buffer, + const jsmntok_t *toks, + const jsmntok_t *idtok); /* We act as if the plugin itself said "I'm dead!" */ static void plugin_terminated_fail_req(struct plugin *plugin, struct jsonrpc_request *req) { - bool complete, destroyed; - const char *err; - - jsmn_init(&plugin->parser); - toks_reset(plugin->toks); - tal_free(plugin->buffer); - plugin->buffer = tal_fmt(plugin, - "{\"jsonrpc\": \"2.0\"," - "\"id\": %s," - "\"error\":" - " {\"code\":%i, \"message\":\"%s\"}" - "}\n\n", - req->id, - PLUGIN_TERMINATED, - "Plugin terminated before replying to RPC call."); - plugin->used = strlen(plugin->buffer); - - /* We're already in a transaction, don't do it again! */ - err = plugin_read_json_one(plugin, false, &complete, &destroyed); - assert(!err); + jsmntok_t *toks = toks_alloc(plugin); + const jsmntok_t *idtok; + const char *buf; + jsmn_parser parser; + bool complete; + + buf = tal_fmt(plugin, + "{\"jsonrpc\": \"2.0\"," + "\"id\": %s," + "\"error\":" + " {\"code\":%i, \"message\":\"%s\"}" + "}\n\n", + req->id, + PLUGIN_TERMINATED, + "Plugin terminated before replying to RPC call."); + jsmn_init(&parser); + if (!json_parse_input(&parser, &toks, buf, strlen(buf), &complete)) + abort(); assert(complete); + idtok = json_get_member(buf, toks, "id"); + plugin_response_handle(plugin, buf, toks, idtok); } static void destroy_plugin(struct plugin *p) @@ -376,7 +377,6 @@ struct plugin *plugin_register(struct plugins *plugins, const char* path TAKES, p->plugin_state = UNCONFIGURED; p->js_arr = tal_arr(p, struct json_stream *, 0); - p->used = 0; p->notification_topics = tal_arr(p, const char *, 0); p->subscriptions = NULL; p->dynamic = false; @@ -485,16 +485,18 @@ static void plugin_send(struct plugin *plugin, struct json_stream *stream) /* Returns the error string, or NULL */ static const char *plugin_log_handle(struct plugin *plugin, + const char *buffer, const jsmntok_t *paramstok) WARN_UNUSED_RESULT; static const char *plugin_log_handle(struct plugin *plugin, + const char *buffer, const jsmntok_t *paramstok) { const jsmntok_t *msgtok, *leveltok; enum log_level level; bool call_notifier; - msgtok = json_get_member(plugin->buffer, paramstok, "message"); - leveltok = json_get_member(plugin->buffer, paramstok, "level"); + msgtok = json_get_member(buffer, paramstok, "message"); + leveltok = json_get_member(buffer, paramstok, "level"); if (!msgtok || msgtok->type != JSMN_STRING) { return tal_fmt(plugin, "Log notification from plugin doesn't have " @@ -503,7 +505,7 @@ static const char *plugin_log_handle(struct plugin *plugin, if (!leveltok) level = LOG_INFORM; - else if (!log_level_parse(plugin->buffer + leveltok->start, + else if (!log_level_parse(buffer + leveltok->start, leveltok->end - leveltok->start, &level) /* FIXME: Allow io logging? */ @@ -513,15 +515,15 @@ static const char *plugin_log_handle(struct plugin *plugin, "Unknown log-level %.*s, valid values are " "\"trace\", \"debug\", \"info\", \"warn\", or \"error\".", json_tok_full_len(leveltok), - json_tok_full(plugin->buffer, leveltok)); + json_tok_full(buffer, leveltok)); } call_notifier = (level == LOG_BROKEN || level == LOG_UNUSUAL)? true : false; /* Only bother unescaping and splitting if it has \ */ - if (memchr(plugin->buffer + msgtok->start, '\\', msgtok->end - msgtok->start)) { + if (memchr(buffer + msgtok->start, '\\', msgtok->end - msgtok->start)) { const char *log_msg = json_escape_unescape_len(tmpctx, - plugin->buffer + msgtok->start, + buffer + msgtok->start, msgtok->end - msgtok->start); char **lines; @@ -539,13 +541,14 @@ static const char *plugin_log_handle(struct plugin *plugin, print_raw: log_(plugin->log, level, NULL, call_notifier, "%.*s", msgtok->end - msgtok->start, - plugin->buffer + msgtok->start); + buffer + msgtok->start); } return NULL; } static const char *plugin_notify_handle(struct plugin *plugin, + const char *buffer, const jsmntok_t *methodtok, const jsmntok_t *paramstok) { @@ -553,7 +556,7 @@ static const char *plugin_notify_handle(struct plugin *plugin, struct jsonrpc_request *request; /* id inside params tells us which id to redirect to. */ - idtok = json_get_member(plugin->buffer, paramstok, "id"); + idtok = json_get_member(buffer, paramstok, "id"); if (!idtok) { return tal_fmt(plugin, "JSON-RPC notify \"id\"-field is not present"); @@ -561,7 +564,7 @@ static const char *plugin_notify_handle(struct plugin *plugin, /* Include any "" in id */ request = strmap_getn(&plugin->pending_requests, - json_tok_full(plugin->buffer, idtok), + json_tok_full(buffer, idtok), json_tok_full_len(idtok)); if (!request) { return NULL; @@ -569,7 +572,7 @@ static const char *plugin_notify_handle(struct plugin *plugin, /* Ignore if they don't have a callback */ if (request->notify_cb) - request->notify_cb(plugin->buffer, methodtok, paramstok, idtok, + request->notify_cb(buffer, methodtok, paramstok, idtok, request->response_cb_arg); return NULL; } @@ -588,38 +591,40 @@ static bool plugin_notification_allowed(const struct plugin *plugin, const char /* Returns the error string, or NULL */ static const char *plugin_notification_handle(struct plugin *plugin, + const char *buffer, const jsmntok_t *toks) WARN_UNUSED_RESULT; static const char *plugin_notification_handle(struct plugin *plugin, + const char *buffer, const jsmntok_t *toks) { const jsmntok_t *methtok, *paramstok; const char *methname; struct jsonrpc_notification *n; - methtok = json_get_member(plugin->buffer, toks, "method"); - paramstok = json_get_member(plugin->buffer, toks, "params"); + methtok = json_get_member(buffer, toks, "method"); + paramstok = json_get_member(buffer, toks, "params"); if (!methtok || !paramstok) { return tal_fmt(plugin, "Malformed JSON-RPC notification missing " "\"method\" or \"params\": %.*s", toks->end - toks->start, - plugin->buffer + toks->start); + buffer + toks->start); } /* Dispatch incoming notifications. This is currently limited * to just a few method types, should this ever become * unwieldy we can switch to the AUTODATA construction to * register notification handlers in a variety of places. */ - if (json_tok_streq(plugin->buffer, methtok, "log")) { - return plugin_log_handle(plugin, paramstok); - } else if (json_tok_streq(plugin->buffer, methtok, "message") - || json_tok_streq(plugin->buffer, methtok, "progress")) { - return plugin_notify_handle(plugin, methtok, paramstok); + if (json_tok_streq(buffer, methtok, "log")) { + return plugin_log_handle(plugin, buffer, paramstok); + } else if (json_tok_streq(buffer, methtok, "message") + || json_tok_streq(buffer, methtok, "progress")) { + return plugin_notify_handle(plugin, buffer, methtok, paramstok); } - methname = json_strdup(tmpctx, plugin->buffer, methtok); + methname = json_strdup(tmpctx, buffer, methtok); if (!plugin_notification_allowed(plugin, methname)) { log_unusual(plugin->log, @@ -630,7 +635,7 @@ static const char *plugin_notification_handle(struct plugin *plugin, } else if (notifications_have_topic(plugin->plugins, methname)) { n = jsonrpc_notification_start_noparams(NULL, methname); json_add_string(n->stream, "origin", plugin->shortname); - json_add_tok(n->stream, "params", paramstok, plugin->buffer); + json_add_tok(n->stream, "params", paramstok, buffer); jsonrpc_notification_end_noparams(n); plugins_notify(plugin->plugins, take(n)); @@ -675,6 +680,7 @@ static void destroy_request(struct jsonrpc_request *req, } static void plugin_response_handle(struct plugin *plugin, + const char *buffer, const jsmntok_t *toks, const jsmntok_t *idtok) { @@ -682,7 +688,7 @@ static void plugin_response_handle(struct plugin *plugin, const tal_t *ctx; request = strmap_getn(&plugin->pending_requests, - json_tok_full(plugin->buffer, idtok), + json_tok_full(buffer, idtok), json_tok_full_len(idtok)); /* Can happen if request was freed before plugin responded */ if (!request) { @@ -696,190 +702,116 @@ static void plugin_response_handle(struct plugin *plugin, /* Don't keep track of this request; we will terminate it */ tal_del_destructor2(request, destroy_request, plugin); destroy_request(request, plugin); - request->response_cb(plugin->buffer, toks, idtok, request->response_cb_arg); + request->response_cb(buffer, toks, idtok, request->response_cb_arg); tal_free(ctx); } -/** - * Try to parse a complete message from the plugin's buffer. - * - * Returns NULL if there was no error. - * If it can parse a JSON message, sets *@complete, and returns any error - * from the callback. - * - * If @destroyed was set, it means the plugin called plugin stop on itself. - */ -static const char *plugin_read_json_one(struct plugin *plugin, - bool want_transaction, - bool *complete, - bool *destroyed) -{ - const jsmntok_t *jrtok, *idtok; - struct plugin_destroyed *pd; - const char *err; - struct wallet *wallet = plugin->plugins->ld->wallet; - - *destroyed = false; - /* Note that in the case of 'plugin stop' this can free request (since - * plugin is parent), so detect that case */ - - if (!json_parse_input(&plugin->parser, &plugin->toks, - plugin->buffer, plugin->used, - complete)) { - return tal_fmt(plugin, - "Failed to parse JSON response '%.*s'", - (int)plugin->used, plugin->buffer); - } - - if (!*complete) { - /* We need more. */ - return NULL; - } - - /* Empty buffer? (eg. just whitespace). */ - if (tal_count(plugin->toks) == 1) { - plugin->used = 0; - jsmn_init(&plugin->parser); - toks_reset(plugin->toks); - /* We need more. */ - *complete = false; - return NULL; - } - - if (plugin->toks->type != JSMN_OBJECT) - return tal_fmt( - plugin, - "JSON-RPC message is not a valid JSON object type"); - - jrtok = json_get_member(plugin->buffer, plugin->toks, "jsonrpc"); - idtok = json_get_member(plugin->buffer, plugin->toks, "id"); - - if (!jrtok) { - return tal_fmt( - plugin, - "JSON-RPC message does not contain \"jsonrpc\" field"); - } - - /* We can be called extremely early, or as db hook, or for - * fake "terminated" request. */ - if (want_transaction) - db_begin_transaction(wallet->db); - - pd = plugin_detect_destruction(plugin); - if (!idtok) { - /* A Notification is a Request object without an "id" - * member. A Request object that is a Notification - * signifies the Client's lack of interest in the - * corresponding Response object, and as such no - * Response object needs to be returned to the - * client. The Server MUST NOT reply to a - * Notification, including those that are within a - * batch request. - * - * https://www.jsonrpc.org/specification#notification - */ - err = plugin_notification_handle(plugin, plugin->toks); - - } else { - /* When a rpc call is made, the Server MUST reply with - * a Response, except for in the case of - * Notifications. The Response is expressed as a - * single JSON Object, with the following members: - * - * - jsonrpc: A String specifying the version of the - * JSON-RPC protocol. MUST be exactly "2.0". - * - * - result: This member is REQUIRED on success. This - * member MUST NOT exist if there was an error - * invoking the method. The value of this member is - * determined by the method invoked on the Server. - * - * - error: This member is REQUIRED on error. This - * member MUST NOT exist if there was no error - * triggered during invocation. - * - * - id: This member is REQUIRED. It MUST be the same - * as the value of the id member in the Request - * Object. If there was an error in detecting the id - * in the Request object (e.g. Parse error/Invalid - * Request), it MUST be Null. Either the result - * member or error member MUST be included, but both - * members MUST NOT be included. - * - * https://www.jsonrpc.org/specification#response_object - */ - plugin_response_handle(plugin, plugin->toks, idtok); - err = NULL; - } - if (want_transaction) - db_commit_transaction(wallet->db); - - /* Corner case: rpc_command hook can destroy plugin for 'plugin - * stop'! */ - if (was_plugin_destroyed(pd)) { - *destroyed = true; - } else { - /* Move this object out of the buffer */ - memmove(plugin->buffer, plugin->buffer + plugin->toks[0].end, - tal_count(plugin->buffer) - plugin->toks[0].end); - plugin->used -= plugin->toks[0].end; - jsmn_init(&plugin->parser); - toks_reset(plugin->toks); - } - return err; -} - +/* Try to parse complete messages from the plugin's buffer. */ static struct io_plan *plugin_read_json(struct io_conn *conn, struct plugin *plugin) { - bool success; - bool have_full; + struct wallet *wallet = plugin->plugins->ld->wallet; + const char *new_bytes, *buffer; + const jsmntok_t *toks; + size_t new_bytes_len; /* wallet is NULL in really early code */ bool want_transaction = (plugin->plugins->want_db_transaction - && plugin->plugins->ld->wallet != NULL); - - log_io(plugin->log, LOG_IO_IN, NULL, "", - plugin->buffer + plugin->used, plugin->len_read); - - /* Our JSON parser is pretty good at incremental parsing, but - * `getrawblock` gives a giant 2MB token, which forces it to re-parse - * every time until we have all of it. However, we can't complete a - * JSON object without a '}', so we do a cheaper check here. - */ - have_full = memchr(plugin->buffer + plugin->used, '}', - plugin->len_read); - - plugin->used += plugin->len_read; - if (plugin->used == tal_count(plugin->buffer)) - tal_resize(&plugin->buffer, plugin->used * 2); - - /* Read and process all messages from the connection */ - if (have_full) { - do { - bool destroyed; - const char *err; - - err = plugin_read_json_one(plugin, want_transaction, - &success, &destroyed); - - /* If it's destroyed, conn is already freed! */ - if (destroyed) - return io_close(NULL); - - if (err) { - plugin_kill(plugin, LOG_UNUSUAL, - "%s", err); - /* plugin_kill frees plugin */ - return io_close(NULL); - } - } while (success); + && wallet != NULL); + + new_bytes = jsonrpc_newly_read(plugin->json_in, &new_bytes_len); + if (new_bytes_len) { + log_io(plugin->log, LOG_IO_IN, NULL, "", + new_bytes, new_bytes_len); + } + + /* Parse until we get incomplete JSON */ + for (;;) { + const char *error; + const jsmntok_t *idtok; + struct plugin_destroyed *pd; + + error = jsonrpc_io_parse(tmpctx, plugin->json_in, + &toks, &buffer); + if (error) { + plugin_kill(plugin, LOG_UNUSUAL, "%s", error); + /* plugin_kill frees plugin */ + return io_close(NULL); + } + /* Incomplete? */ + if (!toks) + break; + + idtok = json_get_member(buffer, toks, "id"); + + /* We can be called extremely early, or as db hook, or for + * fake "terminated" request. */ + if (want_transaction) + db_begin_transaction(wallet->db); + + pd = plugin_detect_destruction(plugin); + if (!idtok) { + /* A Notification is a Request object without an "id" + * member. A Request object that is a Notification + * signifies the Client's lack of interest in the + * corresponding Response object, and as such no + * Response object needs to be returned to the + * client. The Server MUST NOT reply to a + * Notification, including those that are within a + * batch request. + * + * https://www.jsonrpc.org/specification#notification + */ + error = plugin_notification_handle(plugin, buffer, toks); + } else { + /* When a rpc call is made, the Server MUST reply with + * a Response, except for in the case of + * Notifications. The Response is expressed as a + * single JSON Object, with the following members: + * + * - jsonrpc: A String specifying the version of the + * JSON-RPC protocol. MUST be exactly "2.0". + * + * - result: This member is REQUIRED on success. This + * member MUST NOT exist if there was an error + * invoking the method. The value of this member is + * determined by the method invoked on the Server. + * + * - error: This member is REQUIRED on error. This + * member MUST NOT exist if there was no error + * triggered during invocation. + * + * - id: This member is REQUIRED. It MUST be the same + * as the value of the id member in the Request + * Object. If there was an error in detecting the id + * in the Request object (e.g. Parse error/Invalid + * Request), it MUST be Null. Either the result + * member or error member MUST be included, but both + * members MUST NOT be included. + * + * https://www.jsonrpc.org/specification#response_object + */ + plugin_response_handle(plugin, buffer, toks, idtok); + error = NULL; + } + if (want_transaction) + db_commit_transaction(wallet->db); + + /* If it's destroyed, conn is already freed! */ + if (was_plugin_destroyed(pd)) + return io_close(NULL); + + if (error) { + plugin_kill(plugin, LOG_UNUSUAL, "%s", error); + /* plugin_kill frees plugin */ + return io_close(NULL); + } + + jsonrpc_io_parse_done(plugin->json_in); } /* Now read more from the connection */ - return io_read_partial(plugin->stdout_conn, - plugin->buffer + plugin->used, - tal_count(plugin->buffer) - plugin->used, - &plugin->len_read, plugin_read_json, plugin); + return jsonrpc_io_read(plugin->stdout_conn, plugin->json_in, + plugin_read_json, plugin); } /* Mutual recursion */ @@ -934,6 +866,7 @@ static void plugin_conn_finish(struct io_conn *conn, struct plugin *plugin) struct io_plan *plugin_stdin_conn_init(struct io_conn *conn, struct plugin *plugin) { + plugin->stdin_conn = conn; /* We write to their stdin */ /* We don't have anything queued yet, wait for notification */ return io_wait(conn, plugin, plugin_write_json, plugin); @@ -943,10 +876,9 @@ struct io_plan *plugin_stdout_conn_init(struct io_conn *conn, struct plugin *plugin) { /* We read from their stdout */ + plugin->stdout_conn = conn; io_set_finish(conn, plugin_conn_finish, plugin); - return io_read_partial(conn, plugin->buffer, - tal_bytelen(plugin->buffer), &plugin->len_read, - plugin_read_json, plugin); + return plugin_read_json(conn, plugin); } static char *plugin_opt_check(struct plugin_opt *popt) @@ -1527,7 +1459,7 @@ static const char *plugin_subscriptions_add(struct plugin *plugin, * manifest, without checking that they exist, since * later plugins may also emit notifications of custom * types that we don't know about yet. */ - sub.topic = json_strdup(plugin, plugin->buffer, s); + sub.topic = json_strdup(plugin, buffer, s); sub.owner = plugin; tal_arr_expand(&plugin->subscriptions, sub); } @@ -1568,7 +1500,7 @@ static const char *plugin_hooks_add(struct plugin *plugin, const char *buffer, aftertok = json_get_member(buffer, t, "after"); } else { /* FIXME: deprecate in 3 releases after v0.9.2! */ - name = json_strdup(tmpctx, plugin->buffer, t); + name = json_strdup(tmpctx, buffer, t); beforetok = aftertok = NULL; } @@ -2042,8 +1974,8 @@ static void plugin_set_timeout(struct plugin *p) time_from_sec(PLUGIN_STARTUP_TIMEOUT), plugin_manifest_timeout, p); } -} +} const char *plugin_send_getmanifest(struct plugin *p, const char *cmd_id) { char **cmd; @@ -2062,14 +1994,12 @@ const char *plugin_send_getmanifest(struct plugin *p, const char *cmd_id) return tal_fmt(p, "opening pipe: %s", strerror(errno)); log_debug(p->plugins->log, "started(%u) %s", p->pid, p->cmd); - p->buffer = tal_arr(p, char, 64); - jsmn_init(&p->parser); - p->toks = toks_alloc(p); + p->json_in = jsonrpc_io_new(p); /* Create two connections, one read-only on top of p->stdout, and one * write-only on p->stdin */ - p->stdout_conn = io_new_conn(p, stdoutfd, plugin_stdout_conn_init, p); - p->stdin_conn = io_new_conn(p, stdinfd, plugin_stdin_conn_init, p); + io_new_conn(p, stdoutfd, plugin_stdout_conn_init, p); + io_new_conn(p, stdinfd, plugin_stdin_conn_init, p); req = jsonrpc_request_start(p, "getmanifest", cmd_id, p->log, NULL, plugin_manifest_cb, p); json_add_bool(req->stream, "allow-deprecated-apis", diff --git a/lightningd/plugin.h b/lightningd/plugin.h index 3481d35655b1..fb16d4b252e4 100644 --- a/lightningd/plugin.h +++ b/lightningd/plugin.h @@ -76,10 +76,7 @@ struct plugin { bool dynamic; /* Stuff we read */ - char *buffer; - size_t used, len_read; - jsmn_parser parser; - jsmntok_t *toks; + struct jsonrpc_io *json_in; /* Our json_streams. Since multiple streams could start * returning data at once, we always service these in order, diff --git a/lightningd/test/run-jsonrpc.c b/lightningd/test/run-jsonrpc.c index cf064ca891a4..35c4efea7b34 100644 --- a/lightningd/test/run-jsonrpc.c +++ b/lightningd/test/run-jsonrpc.c @@ -51,6 +51,13 @@ char *hsm_secret_arg(const tal_t *ctx UNNEEDED, const char *arg UNNEEDED, const u8 **hsm_secret UNNEEDED) { fprintf(stderr, "hsm_secret_arg called!\n"); abort(); } +/* Generated stub for json_dup_contents */ +void json_dup_contents(const tal_t *ctx UNNEEDED, + const char *buffer UNNEEDED, + const jsmntok_t *tok UNNEEDED, + const char **new_buffer UNNEEDED, + const jsmntok_t **new_toks UNNEEDED) +{ fprintf(stderr, "json_dup_contents called!\n"); abort(); } /* Generated stub for json_to_jsonrpc_errcode */ bool json_to_jsonrpc_errcode(const char *buffer UNNEEDED, const jsmntok_t *tok UNNEEDED, enum jsonrpc_errcode *errcode UNNEEDED) @@ -59,6 +66,29 @@ bool json_to_jsonrpc_errcode(const char *buffer UNNEEDED, const jsmntok_t *tok U bool json_to_number(const char *buffer UNNEEDED, const jsmntok_t *tok UNNEEDED, unsigned int *num UNNEEDED) { fprintf(stderr, "json_to_number called!\n"); abort(); } +/* Generated stub for jsonrpc_io_new */ +struct jsonrpc_io *jsonrpc_io_new(const tal_t *ctx UNNEEDED) +{ fprintf(stderr, "jsonrpc_io_new called!\n"); abort(); } +/* Generated stub for jsonrpc_io_parse */ +const char *jsonrpc_io_parse(const tal_t *ctx UNNEEDED, + struct jsonrpc_io *json_in UNNEEDED, + const jsmntok_t **toks UNNEEDED, + const char **buf UNNEEDED) +{ fprintf(stderr, "jsonrpc_io_parse called!\n"); abort(); } +/* Generated stub for jsonrpc_io_parse_done */ +void jsonrpc_io_parse_done(struct jsonrpc_io *json_in UNNEEDED) +{ fprintf(stderr, "jsonrpc_io_parse_done called!\n"); abort(); } +/* Generated stub for jsonrpc_io_read_ */ +struct io_plan *jsonrpc_io_read_(struct io_conn *conn UNNEEDED, + struct jsonrpc_io *json_in UNNEEDED, + struct io_plan *(*next)(struct io_conn * UNNEEDED, + void *) UNNEEDED, + void *arg UNNEEDED) +{ fprintf(stderr, "jsonrpc_io_read_ called!\n"); abort(); } +/* Generated stub for jsonrpc_newly_read */ +const char *jsonrpc_newly_read(struct jsonrpc_io *json_in UNNEEDED, + size_t *len UNNEEDED) +{ fprintf(stderr, "jsonrpc_newly_read called!\n"); abort(); } /* Generated stub for lightningd_deprecated_in_ok */ bool lightningd_deprecated_in_ok(struct lightningd *ld UNNEEDED, struct logger *log UNNEEDED, diff --git a/plugins/Makefile b/plugins/Makefile index 96147d8b9041..6347b2ffcf6c 100644 --- a/plugins/Makefile +++ b/plugins/Makefile @@ -191,6 +191,7 @@ PLUGIN_COMMON_OBJS := \ common/json_parse_simple.o \ common/json_filter.o \ common/json_stream.o \ + common/jsonrpc_io.o \ common/lease_rates.o \ common/memleak.o \ common/node_id.o \ diff --git a/plugins/bkpr/test/run-sql.c b/plugins/bkpr/test/run-sql.c index 8be5859adad7..2fc758adad17 100644 --- a/plugins/bkpr/test/run-sql.c +++ b/plugins/bkpr/test/run-sql.c @@ -69,6 +69,28 @@ bool json_filter_ok(const struct json_filter *filter UNNEEDED, const char *membe /* Generated stub for json_filter_up */ bool json_filter_up(struct json_filter **filter UNNEEDED) { fprintf(stderr, "json_filter_up called!\n"); abort(); } +/* Generated stub for jsonrpc_io_new */ +struct jsonrpc_io *jsonrpc_io_new(const tal_t *ctx UNNEEDED) +{ fprintf(stderr, "jsonrpc_io_new called!\n"); abort(); } +/* Generated stub for jsonrpc_io_parse */ +const char *jsonrpc_io_parse(const tal_t *ctx UNNEEDED, + struct jsonrpc_io *json_in UNNEEDED, + const jsmntok_t **toks UNNEEDED, + const char **buf UNNEEDED) +{ fprintf(stderr, "jsonrpc_io_parse called!\n"); abort(); } +/* Generated stub for jsonrpc_io_parse_done */ +void jsonrpc_io_parse_done(struct jsonrpc_io *json_in UNNEEDED) +{ fprintf(stderr, "jsonrpc_io_parse_done called!\n"); abort(); } +/* Generated stub for jsonrpc_io_read_ */ +struct io_plan *jsonrpc_io_read_(struct io_conn *conn UNNEEDED, + struct jsonrpc_io *json_in UNNEEDED, + struct io_plan *(*next)(struct io_conn * UNNEEDED, + void *) UNNEEDED, + void *arg UNNEEDED) +{ fprintf(stderr, "jsonrpc_io_read_ called!\n"); abort(); } +/* Generated stub for jsonrpc_sync_read */ +bool jsonrpc_sync_read(struct jsonrpc_io *json_in UNNEEDED, int infd UNNEEDED) +{ fprintf(stderr, "jsonrpc_sync_read called!\n"); abort(); } /* Generated stub for last_fee_state */ enum htlc_state last_fee_state(enum side opener UNNEEDED) { fprintf(stderr, "last_fee_state called!\n"); abort(); } diff --git a/plugins/keysend.c b/plugins/keysend.c index 6104bf607ccd..030d62c37e16 100644 --- a/plugins/keysend.c +++ b/plugins/keysend.c @@ -221,8 +221,6 @@ static struct command_result *json_keysend(struct command *cmd, const char *buf, p = payment_new(cmd, cmd, NULL /* No parent */, global_hints, pay_mods); p->local_id = &my_id; - p->json_buffer = tal_dup_talarr(p, const char, buf); - p->json_toks = params; p->route_destination = tal_steal(p, destination); p->pay_destination = p->route_destination; p->payment_secret = NULL; diff --git a/plugins/libplugin-pay.h b/plugins/libplugin-pay.h index bc1ec51b5dc9..88ccd7d7e72b 100644 --- a/plugins/libplugin-pay.h +++ b/plugins/libplugin-pay.h @@ -151,9 +151,6 @@ struct payment { struct plugin *plugin; struct node_id *local_id; - const char *json_buffer; - const jsmntok_t *json_toks; - /* The current phase we are in. */ enum payment_step step; diff --git a/plugins/libplugin.c b/plugins/libplugin.c index 167d13621099..44c64bf6c2d2 100644 --- a/plugins/libplugin.c +++ b/plugins/libplugin.c @@ -12,6 +12,7 @@ #include #include #include +#include #include #include #include @@ -31,11 +32,6 @@ struct plugin_timer { void *cb_arg; }; -struct rpc_conn { - int fd; - MEMBUF(char) mb; -}; - /* We can have more than one of these pending at once. */ struct jstream { struct list_node list; @@ -88,27 +84,23 @@ struct plugin { const char **beglist; /* To read from lightningd */ - char *buffer; - size_t used, len_read; - jsmn_parser parser; - jsmntok_t *toks; + struct jsonrpc_io *lightningd_in; /* To write to lightningd */ struct list_head js_list; - /* Asynchronous RPC interaction */ + /* Asynchronous RPC interaction. */ struct io_conn *io_rpc_conn; struct list_head rpc_js_list; - char *rpc_buffer; - size_t rpc_used, rpc_len_read, rpc_read_offset; - jsmn_parser rpc_parser; - jsmntok_t *rpc_toks; + struct jsonrpc_io *jsonrpc_in; + /* Tracking async RPC requests */ STRMAP(struct out_req *) out_reqs; u64 next_outreq_id; - /* Synchronous RPC interaction */ - struct rpc_conn *rpc_conn; + /* Synchronous RPC interaction: sync_io is NULL if they didn't want it. */ + int sync_fd; + struct jsonrpc_io *sync_io; /* Plugin information details */ enum plugin_restartability restartability; @@ -539,42 +531,6 @@ struct json_out *json_out_obj(const tal_t *ctx, return jout; } -/* Realloc helper for tal membufs */ -static void *membuf_tal_realloc(struct membuf *mb, void *rawelems, - size_t newsize) -{ - char *p = rawelems; - - tal_resize(&p, newsize); - return p; -} - -static int read_json_from_rpc(struct plugin *p) -{ - char *end; - - /* We rely on the double-\n marker which only terminates JSON top - * levels. Thanks lightningd! */ - while ((end = memmem(membuf_elems(&p->rpc_conn->mb), - membuf_num_elems(&p->rpc_conn->mb), "\n\n", 2)) - == NULL) { - ssize_t r; - - /* Make sure we've room for at least READ_CHUNKSIZE. */ - membuf_prepare_space(&p->rpc_conn->mb, READ_CHUNKSIZE); - r = read(p->rpc_conn->fd, membuf_space(&p->rpc_conn->mb), - membuf_num_space(&p->rpc_conn->mb)); - /* lightningd goes away, we go away. */ - if (r == 0) - exit(0); - if (r < 0) - plugin_err(p, "Reading JSON input: %s", strerror(errno)); - membuf_added(&p->rpc_conn->mb, r); - } - - return end + 2 - membuf_elems(&p->rpc_conn->mb); -} - /* This closes a JSON response and writes it out. */ static void finish_and_send_json(int fd, struct json_out *jout) { @@ -734,40 +690,63 @@ void command_set_usage(struct command *cmd, const char *usage TAKES) cmd->methodname); } -/* Reads rpc reply and returns tokens, setting contents to 'error' or -- * 'result' (depending on *error). */ -static jsmntok_t *read_rpc_reply(const tal_t *ctx, - struct plugin *plugin, - const jsmntok_t **contents, - bool *error, - int *reqlen) +static const char *read_one_json_sync(struct plugin *p, const jsmntok_t **toks) { - jsmntok_t *toks; + for (;;) { + const char *buf, *error; - do { - *reqlen = read_json_from_rpc(plugin); + error = jsonrpc_io_parse(tmpctx, p->sync_io, toks, &buf); + if (error) + plugin_err(p, "Parsing sync lightningd: %s", error); + if (*toks) + return buf; - toks = json_parse_simple(ctx, - membuf_elems(&plugin->rpc_conn->mb), - *reqlen); - if (!toks) - plugin_err(plugin, "Malformed JSON reply '%.*s'", - *reqlen, membuf_elems(&plugin->rpc_conn->mb)); + /* lightningd goes away, we go away. */ + if (!jsonrpc_sync_read(p->sync_io, p->sync_fd)) { + if (errno == 0) + exit(0); + else + plugin_err(p, "Reading sync lightningd: %s", + strerror(errno)); + } + } +} + +/* Reads rpc reply and returns result tokens */ +static const jsmntok_t *read_sync_rpc_reply(const tal_t *ctx, + struct plugin *plugin, + const char *method, + const char **final_buffer) +{ + const jsmntok_t *errtok, *resulttok, *toks; + const char *buffer; + + for (;;) { + buffer = read_one_json_sync(plugin, &toks); /* FIXME: Don't simply ignore notifications here! */ - } while (!json_get_member(membuf_elems(&plugin->rpc_conn->mb), toks, - "id")); + if (json_get_member(buffer, toks, "id")) + break; + jsonrpc_io_parse_done(plugin->sync_io); + } - *contents = json_get_member(membuf_elems(&plugin->rpc_conn->mb), toks, "error"); - if (*contents) - *error = true; - else { - *contents = json_get_member(membuf_elems(&plugin->rpc_conn->mb), toks, - "result"); - if (!*contents) - plugin_err(plugin, "JSON reply with no 'result' nor 'error'? '%.*s'", - *reqlen, membuf_elems(&plugin->rpc_conn->mb)); - *error = false; + errtok = json_get_member(buffer, toks, "error"); + if (errtok) { + plugin_err(plugin, "Got error result to %s: '%.*s'", + method, + json_tok_full_len(toks), + json_tok_full(buffer, toks)); + } + resulttok = json_get_member(buffer, toks, "result"); + if (!resulttok) { + plugin_err(plugin, "JSON reply with no 'result' nor 'error'? '%.*s'", + json_tok_full_len(toks), + json_tok_full(buffer, toks)); } + + /* Make the returned pointers valid tal object */ + json_dup_contents(ctx, buffer, resulttok, final_buffer, &toks); + jsonrpc_io_parse_done(plugin->sync_io); + return toks; } @@ -778,13 +757,8 @@ static const jsmntok_t *sync_req(const tal_t *ctx, const struct json_out *params TAKES, const char **resp) { - bool error; - jsmntok_t *toks; - const jsmntok_t *contents; - int reqlen; struct json_out *jout = json_out_new(tmpctx); const char *id = json_id(tmpctx, plugin, "init/", method); - size_t num_toks; json_out_start(jout, NULL, '{'); json_out_addstr(jout, "jsonrpc", "2.0"); @@ -802,23 +776,15 @@ static const jsmntok_t *sync_req(const tal_t *ctx, /* If we're past init, we may need a new fd (the old one * is being used for async comms). */ - if (plugin->rpc_conn->fd == -1) - plugin->rpc_conn->fd = rpc_open(plugin); - - finish_and_send_json(plugin->rpc_conn->fd, jout); - - toks = read_rpc_reply(ctx, plugin, &contents, &error, &reqlen); - if (error) - plugin_err(plugin, "Got error reply to %s: '%.*s'", - method, reqlen, membuf_elems(&plugin->rpc_conn->mb)); + if (plugin->sync_fd == -1) { + plugin->sync_fd = rpc_open(plugin); + if (!plugin->sync_io) + plugin->sync_io = jsonrpc_io_new(plugin); + } - *resp = membuf_consume(&plugin->rpc_conn->mb, reqlen); + finish_and_send_json(plugin->sync_fd, jout); - /* Make the returned pointer the valid tal object of minimal length */ - num_toks = json_next(contents) - contents; - memmove(toks, contents, num_toks * sizeof(*toks)); - tal_resize(&toks, num_toks); - return toks; + return read_sync_rpc_reply(ctx, plugin, method, resp); } const jsmntok_t *jsonrpc_request_sync(const tal_t *ctx, @@ -827,7 +793,6 @@ const jsmntok_t *jsonrpc_request_sync(const tal_t *ctx, const struct json_out *params TAKES, const char **resp) { - return sync_req(ctx, cmd->plugin, method, params, resp); } @@ -1376,73 +1341,33 @@ static void rpc_conn_finished(struct io_conn *conn, plugin_err(plugin, "Lost connection to the RPC socket."); } -static bool rpc_read_response_one(struct plugin *plugin) -{ - const jsmntok_t *jrtok; - bool complete; - - if (!json_parse_input(&plugin->rpc_parser, &plugin->rpc_toks, - plugin->rpc_buffer + plugin->rpc_read_offset, - plugin->rpc_used - plugin->rpc_read_offset, - &complete)) { - plugin_err(plugin, "Failed to parse RPC JSON response '%.*s'", - (int)(plugin->rpc_used - plugin->rpc_read_offset), - plugin->rpc_buffer + plugin->rpc_read_offset); - } - - if (!complete) { - /* We need more. */ - goto compact; - } - - /* Empty buffer? (eg. just whitespace). */ - if (tal_count(plugin->rpc_toks) == 1) { - jsmn_init(&plugin->rpc_parser); - toks_reset(plugin->rpc_toks); - goto compact; - } - - jrtok = json_get_member(plugin->rpc_buffer + plugin->rpc_read_offset, - plugin->rpc_toks, "jsonrpc"); - if (!jrtok) { - plugin_err(plugin, "JSON-RPC message does not contain \"jsonrpc\" field: '%.*s'", - (int)(plugin->rpc_used - plugin->rpc_read_offset), - plugin->rpc_buffer + plugin->rpc_read_offset); - } - - handle_rpc_reply(plugin, plugin->rpc_buffer + plugin->rpc_read_offset, plugin->rpc_toks); - - /* Move this object out of the buffer */ - plugin->rpc_read_offset += plugin->rpc_toks[0].end; - jsmn_init(&plugin->rpc_parser); - toks_reset(plugin->rpc_toks); - return true; - -compact: - memmove(plugin->rpc_buffer, plugin->rpc_buffer + plugin->rpc_read_offset, - plugin->rpc_used - plugin->rpc_read_offset); - plugin->rpc_used -= plugin->rpc_read_offset; - plugin->rpc_read_offset = 0; - return false; -} static struct io_plan *rpc_conn_read_response(struct io_conn *conn, struct plugin *plugin) { - plugin->rpc_used += plugin->rpc_len_read; - if (plugin->rpc_used == tal_count(plugin->rpc_buffer)) - tal_resize(&plugin->rpc_buffer, plugin->rpc_used * 2); + /* Gather an parse any new bytes */ + for (;;) { + const jsmntok_t *toks; + const char *buf; + const char *err; + + err = jsonrpc_io_parse(tmpctx, + plugin->jsonrpc_in, + &toks, &buf); + if (err) + plugin_err(plugin, "%s", err); - /* Read and process all messages from the connection */ - while (rpc_read_response_one(plugin)) - ; + if (!toks) + break; + + handle_rpc_reply(plugin, buf, toks); + jsonrpc_io_parse_done(plugin->jsonrpc_in); + } - /* Read more, if there is. */ - return io_read_partial(plugin->io_rpc_conn, - plugin->rpc_buffer + plugin->rpc_used, - tal_bytelen(plugin->rpc_buffer) - plugin->rpc_used, - &plugin->rpc_len_read, - rpc_conn_read_response, plugin); + /* Read more */ + return jsonrpc_io_read(conn, plugin->jsonrpc_in, + rpc_conn_read_response, + plugin); } static struct io_plan *rpc_conn_write_request(struct io_conn *conn, @@ -1563,7 +1488,6 @@ static struct command_result *handle_init(struct command *cmd, size_t i; char *dir, *network; struct plugin *p = cmd->plugin; - bool with_rpc; const char *err; configtok = json_get_member(buf, params, "configuration"); @@ -1590,17 +1514,10 @@ static struct command_result *handle_init(struct command *cmd, /* Only attempt to connect if the plugin has configured the rpc_conn * already, if that's not the case we were told to run without an RPC * connection, so don't even log an error. */ - if (p->rpc_conn != NULL) { - p->rpc_conn->fd = rpc_open(p); - if (p->rpc_conn->fd == -1) - with_rpc = false; - else - with_rpc = true; - - membuf_init(&p->rpc_conn->mb, tal_arr(p, char, READ_CHUNKSIZE), - READ_CHUNKSIZE, membuf_tal_realloc); - } else - with_rpc = false; + if (p->sync_io) + p->sync_fd = rpc_open(p); + else + p->sync_fd = -1; opttok = json_get_member(buf, params, "options"); json_for_each_obj(i, t, opttok) { @@ -1624,11 +1541,12 @@ static struct command_result *handle_init(struct command *cmd, disable)); } - if (with_rpc) { + /* Now set up async. */ + if (p->sync_fd != -1) { struct out_req *req; struct command *aux_cmd = aux_command(cmd); - io_new_conn(p, p->rpc_conn->fd, rpc_conn_init, p); + io_new_conn(p, p->sync_fd, rpc_conn_init, p); /* In case they intercept rpc_command, we can't do this sync. */ req = jsonrpc_request_start(aux_cmd, "listconfigs", get_beglist, plugin_broken_cb, NULL); @@ -1636,7 +1554,7 @@ static struct command_result *handle_init(struct command *cmd, send_outreq(req); /* We will open a new one if we want to be sync. */ - p->rpc_conn->fd = -1; + p->sync_fd = -1; } return command_success(cmd, json_out_obj(cmd, NULL, NULL)); @@ -1996,6 +1914,7 @@ void NORETURN plugin_errv(struct plugin *p, const char *fmt, va_list ap) plugin_logv(p, LOG_BROKEN, fmt, ap); vfprintf(stderr, fmt, ap2); + fprintf(stderr, "\n"); plugin_exit(p, 1); va_end(ap2); } @@ -2076,6 +1995,7 @@ static struct command_result *param_tok(struct command *cmd, const char *name, } static void ld_command_handle(struct plugin *plugin, + const char *buffer, const jsmntok_t *toks) { const jsmntok_t *methtok, *paramstok, *filtertok; @@ -2084,18 +2004,18 @@ static void ld_command_handle(struct plugin *plugin, const char *id; enum command_type type; - methtok = json_get_member(plugin->buffer, toks, "method"); - paramstok = json_get_member(plugin->buffer, toks, "params"); - filtertok = json_get_member(plugin->buffer, toks, "filter"); + methtok = json_get_member(buffer, toks, "method"); + paramstok = json_get_member(buffer, toks, "params"); + filtertok = json_get_member(buffer, toks, "filter"); if (!methtok || !paramstok) plugin_err(plugin, "Malformed JSON-RPC notification missing " "\"method\" or \"params\": %.*s", json_tok_full_len(toks), - json_tok_full(plugin->buffer, toks)); + json_tok_full(buffer, toks)); - methodname = json_strdup(NULL, plugin->buffer, methtok); - id = json_get_id(tmpctx, plugin->buffer, toks); + methodname = json_strdup(NULL, buffer, methtok); + id = json_get_id(tmpctx, buffer, toks); if (!id) type = COMMAND_TYPE_NOTIFICATION; @@ -2111,7 +2031,7 @@ static void ld_command_handle(struct plugin *plugin, if (!plugin->manifested) { if (streq(cmd->methodname, "getmanifest")) { - handle_getmanifest(cmd, plugin->buffer, paramstok); + handle_getmanifest(cmd, buffer, paramstok); plugin->manifested = true; return; } @@ -2121,7 +2041,7 @@ static void ld_command_handle(struct plugin *plugin, if (!plugin->initialized) { if (streq(cmd->methodname, "init")) { - handle_init(cmd, plugin->buffer, paramstok); + handle_init(cmd, buffer, paramstok); plugin->initialized = true; return; } @@ -2139,7 +2059,7 @@ static void ld_command_handle(struct plugin *plugin, const char *err; plugin->deprecated_ok_override = tal(plugin, bool); - err = json_scan(tmpctx, plugin->buffer, paramstok, + err = json_scan(tmpctx, buffer, paramstok, "{deprecated_oneshot:{deprecated_ok:%}}", JSON_SCAN(json_to_bool, plugin->deprecated_ok_override)); @@ -2153,7 +2073,7 @@ static void ld_command_handle(struct plugin *plugin, || is_asterix_notification(cmd->methodname, plugin->notif_subs[i].name)) { plugin->notif_subs[i].handle(cmd, - plugin->buffer, + buffer, paramstok); return; } @@ -2165,14 +2085,14 @@ static void ld_command_handle(struct plugin *plugin, plugin_err(plugin, "Unregistered notification %.*s", json_tok_full_len(methtok), - json_tok_full(plugin->buffer, methtok)); + json_tok_full(buffer, methtok)); } for (size_t i = 0; i < plugin->num_hook_subs; i++) { if (streq(cmd->methodname, plugin->hook_subs[i].name)) { cmd->type = COMMAND_TYPE_HOOK; plugin->hook_subs[i].handle(cmd, - plugin->buffer, + buffer, paramstok); return; } @@ -2180,7 +2100,7 @@ static void ld_command_handle(struct plugin *plugin, if (filtertok) { /* On error, this fails cmd */ - if (parse_filter(cmd, "filter", plugin->buffer, filtertok) + if (parse_filter(cmd, "filter", buffer, filtertok) != NULL) return; } @@ -2192,17 +2112,17 @@ static void ld_command_handle(struct plugin *plugin, /* We're going to mangle it, so make a copy */ mod_params = json_tok_copy(cmd, paramstok); - if (!param_check(cmd, plugin->buffer, mod_params, + if (!param_check(cmd, buffer, mod_params, p_req("command_to_check", param_tok, &method), p_opt_any(), NULL)) { plugin_err(plugin, "lightningd check without command_to_check: %.*s", json_tok_full_len(toks), - json_tok_full(plugin->buffer, toks)); + json_tok_full(buffer, toks)); } tal_free(cmd->methodname); - cmd->methodname = json_strdup(cmd, plugin->buffer, method); + cmd->methodname = json_strdup(cmd, buffer, method); /* Point method to the name, not the value */ if (mod_params->type == JSMN_OBJECT) @@ -2215,7 +2135,7 @@ static void ld_command_handle(struct plugin *plugin, for (size_t i = 0; i < plugin->num_commands; i++) { if (streq(cmd->methodname, plugin->commands[i].name)) { plugin->commands[i].handle(cmd, - plugin->buffer, + buffer, paramstok); /* Reset this */ plugin->deprecated_ok_override @@ -2232,8 +2152,8 @@ static void ld_command_handle(struct plugin *plugin, struct command_result *ret; bool check_only; - config = json_strdup(tmpctx, plugin->buffer, - json_get_member(plugin->buffer, paramstok, "config")); + config = json_strdup(tmpctx, buffer, + json_get_member(buffer, paramstok, "config")); popt = find_opt(plugin, config); if (!popt) { plugin_err(plugin, @@ -2249,9 +2169,9 @@ static void ld_command_handle(struct plugin *plugin, check_only = command_check_only(cmd); plugin_log(plugin, LOG_DBG, "setconfig %s check_only=%i", config, check_only); - valtok = json_get_member(plugin->buffer, paramstok, "val"); + valtok = json_get_member(buffer, paramstok, "val"); if (valtok) - val = json_strdup(tmpctx, plugin->buffer, valtok); + val = json_strdup(tmpctx, buffer, valtok); else val = "true"; @@ -2272,65 +2192,31 @@ static void ld_command_handle(struct plugin *plugin, plugin_err(plugin, "Unknown command '%s'", cmd->methodname); } -/** - * Try to parse a complete message from lightningd's buffer, and return true - * if we could handle it. - */ -static bool ld_read_json_one(struct plugin *plugin) +static struct io_plan *ld_read_json(struct io_conn *conn, + struct plugin *plugin) { - bool complete; + /* Gather an parse any new bytes */ + for (;;) { + const jsmntok_t *toks; + const char *buf; + const char *err; - if (!json_parse_input(&plugin->parser, &plugin->toks, - plugin->buffer, plugin->used, - &complete)) { - plugin_err(plugin, "Failed to parse JSON response '%.*s'", - (int)plugin->used, plugin->buffer); - return false; - } + err = jsonrpc_io_parse(tmpctx, + plugin->lightningd_in, + &toks, &buf); + if (err) + plugin_err(plugin, "%s", err); - if (!complete) { - /* We need more. */ - return false; - } + if (!toks) + break; - /* Empty buffer? (eg. just whitespace). */ - if (tal_count(plugin->toks) == 1) { - toks_reset(plugin->toks); - jsmn_init(&plugin->parser); - plugin->used = 0; - return false; + ld_command_handle(plugin, buf, toks); + jsonrpc_io_parse_done(plugin->lightningd_in); } - /* FIXME: Spark doesn't create proper jsonrpc 2.0! So we don't - * check for "jsonrpc" here. */ - ld_command_handle(plugin, plugin->toks); - - /* Move this object out of the buffer */ - memmove(plugin->buffer, plugin->buffer + plugin->toks[0].end, - tal_count(plugin->buffer) - plugin->toks[0].end); - plugin->used -= plugin->toks[0].end; - toks_reset(plugin->toks); - jsmn_init(&plugin->parser); - - return true; -} - -static struct io_plan *ld_read_json(struct io_conn *conn, - struct plugin *plugin) -{ - plugin->used += plugin->len_read; - if (plugin->used && plugin->used == tal_count(plugin->buffer)) - tal_resize(&plugin->buffer, plugin->used * 2); - - /* Read and process all messages from the connection */ - while (ld_read_json_one(plugin)) - ; - - /* Now read more from the connection */ - return io_read_partial(plugin->stdin_conn, - plugin->buffer + plugin->used, - tal_count(plugin->buffer) - plugin->used, - &plugin->len_read, ld_read_json, plugin); + /* Read more */ + return jsonrpc_io_read(conn, plugin->lightningd_in, + ld_read_json, plugin); } static struct io_plan *ld_write_json(struct io_conn *conn, @@ -2376,9 +2262,7 @@ static struct io_plan *stdin_conn_init(struct io_conn *conn, { plugin->stdin_conn = conn; io_set_finish(conn, ld_conn_finish, plugin); - return io_read_partial(plugin->stdin_conn, plugin->buffer, - tal_bytelen(plugin->buffer), &plugin->len_read, - ld_read_json, plugin); + return ld_read_json(conn, plugin); } /* lightningd reads from our stdout */ @@ -2419,33 +2303,21 @@ static struct plugin *new_plugin(const tal_t *ctx, p->id = name; p->developer = developer; p->deprecated_ok_override = NULL; - p->buffer = tal_arr(p, char, 64); + p->lightningd_in = jsonrpc_io_new(p); list_head_init(&p->js_list); - p->used = 0; - p->len_read = 0; - jsmn_init(&p->parser); - p->toks = toks_alloc(p); /* Async RPC */ - p->rpc_buffer = tal_arr(p, char, 64); + p->jsonrpc_in = jsonrpc_io_new(p); list_head_init(&p->rpc_js_list); p->io_rpc_conn = NULL; - p->rpc_used = 0; - p->rpc_read_offset = 0; - p->rpc_len_read = 0; - jsmn_init(&p->rpc_parser); - p->rpc_toks = toks_alloc(p); p->next_outreq_id = 0; strmap_init(&p->out_reqs); p->beglist = NULL; p->desired_features = tal_steal(p, features); - if (init_rpc) { - /* Sync RPC FIXME: maybe go full async ? */ - p->rpc_conn = tal(p, struct rpc_conn); - } else { - p->rpc_conn = NULL; - } - + if (init_rpc) + p->sync_io = jsonrpc_io_new(p); + else + p->sync_io = NULL; p->init = init; p->manifested = p->initialized = p->exiting = false; p->restartability = restartability; diff --git a/plugins/libplugin.h b/plugins/libplugin.h index 550429950995..20254b46983d 100644 --- a/plugins/libplugin.h +++ b/plugins/libplugin.h @@ -5,7 +5,6 @@ #include #include -#include #include #include #include diff --git a/plugins/pay.c b/plugins/pay.c index 7deaf85bc518..e16a6bf001b4 100644 --- a/plugins/pay.c +++ b/plugins/pay.c @@ -1475,8 +1475,6 @@ static struct command_result *json_pay(struct command *cmd, } p->local_id = &my_id; - p->json_buffer = buf; - p->json_toks = params; p->why = "Initial attempt"; p->constraints.cltv_budget = *maxdelay; tal_free(maxdelay); diff --git a/plugins/test/Makefile b/plugins/test/Makefile index 3b5712d5ac09..db5b40c37cbe 100644 --- a/plugins/test/Makefile +++ b/plugins/test/Makefile @@ -33,6 +33,6 @@ plugins/test/run-route-calc: \ $(PLUGIN_TEST_PROGRAMS): $(BITCOIN_OBJS) $(WIRE_OBJS) $(PLUGIN_TEST_COMMON_OBJS) -$(PLUGIN_TEST_OBJS): $(PLUGIN_FUNDER_HEADER) $(PLUGIN_FUNDER_SRC) +$(PLUGIN_TEST_OBJS): $(PLUGIN_ALL_HEADER) $(PLUGIN_ALL_SRC) check-units: $(PLUGIN_TEST_PROGRAMS:%=unittest/%) diff --git a/tests/test_misc.py b/tests/test_misc.py index a4d355b12b72..dc5e0a9fcc1a 100644 --- a/tests/test_misc.py +++ b/tests/test_misc.py @@ -951,11 +951,14 @@ def test_malformed_rpc(node_factory): obj, _ = l1.rpc._readobj(sock, b'') assert obj['error']['code'] == -32600 - # Complete crap - sock.sendall(b'[]') + # Complete crap: needs } to even try parsing, and also this makes it hang up! + sock.sendall(b'[]}') obj, _ = l1.rpc._readobj(sock, b'') assert obj['error']['code'] == -32600 + sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) + sock.connect(l1.rpc.socket_path) + # Bad ID sock.sendall(b'{"id":{}, "jsonrpc":"2.0","method":"getinfo","params":[]}') obj, _ = l1.rpc._readobj(sock, b'')