Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions common/json_stream.c
Original file line number Diff line number Diff line change
Expand Up @@ -372,6 +372,14 @@ void json_add_timeabs(struct json_stream *result, const char *fieldname,
(u64)t.ts.tv_sec, (u64)t.ts.tv_nsec);
}

void json_add_timerel(struct json_stream *result, const char *fieldname,
struct timerel t)
{
json_add_primitive_fmt(result, fieldname,
"%" PRIu64 ".%09" PRIu64,
(u64)t.ts.tv_sec, (u64)t.ts.tv_nsec);
}

void json_add_timestr(struct json_stream *result, const char *fieldname,
struct timespec ts)
{
Expand Down
5 changes: 5 additions & 0 deletions common/json_stream.h
Original file line number Diff line number Diff line change
Expand Up @@ -253,9 +253,14 @@ void json_add_hex_talarr(struct json_stream *result,
const char *fieldname,
const tal_t *data);

/* '"fieldname" : 1749785122.000000001 */
void json_add_timeabs(struct json_stream *result, const char *fieldname,
struct timeabs t);

/* '"fieldname" : 1.000000001 */
void json_add_timerel(struct json_stream *result, const char *fieldname,
struct timerel t);

/* used in log.c and notification.c*/
void json_add_timestr(struct json_stream *result, const char *fieldname,
struct timespec ts);
Expand Down
86 changes: 86 additions & 0 deletions doc/developers-guide/plugin-development/event-notifications.md
Original file line number Diff line number Diff line change
Expand Up @@ -567,3 +567,89 @@ Where:
- `plugin_name`: The short name of the plugin.
- `plugin_path`: The full file path to the plugin executable.
- `methods`: An array of RPC method names that the plugin registered.


### `pay_part_start` (v25.09 onward)

Emitted by `xpay` when part of a payment begins. `payment_hash` and
`groupid` uniquely identify this xpay invocation, and `partid` then identifies
this particular attempt to pay part of that.

`total_payment_msat` is the total amount (usually the invoice amount),
which will be the same across all parts, adn `attempt_msat` is the
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
which will be the same across all parts, adn `attempt_msat` is the
which will be the same across all parts, and `attempt_msat` is the

amount being delivered to the destination by this part.

Each element in `hops` shows the amount going into the node (i.e. with
fees, `channel_in_msat`) and the amount we're telling it to send
to the other end (`channel_out_msat`). The `channel_out_msat` will
be equal to the next `channel_in_msat. The final
`channel_out_msat` will be equal to the `attempt_msat`.

The example shows a payment from this node via 103x1x0 (direction 1) to 022d223620a359a47ff7f7ac447c85c46c923da53389221a0054c11c1e3ca31d59, then via 103x2x0 (direction 0) to 035d2b1192dfba134e10e540875d366ebc8bc353d5aa766b80c090b39c3a5d885d.

```json
{
"jsonrpc": "2.0",
"method": "pay_part_start",
"params": {
"origin": "cln-xpay",
"payload": {
"payment_hash": "651b28004d41cf0dc8e39a0b3d905651a7b012d03d81199fde09314700cb5a62",
"groupid": 5793910575598463611,
"partid": 1,
"total_payment_msat": 5000000,
"attempt_msat": 5000000,
"hops": [
{
"next_node": "022d223620a359a47ff7f7ac447c85c46c923da53389221a0054c11c1e3ca31d59",
"short_channel_id": "103x1x0",
"direction": 1,
"channel_in_msat": 5000051,
"channel_out_msat": 5000051
},
{
"next_node": "035d2b1192dfba134e10e540875d366ebc8bc353d5aa766b80c090b39c3a5d885d",
"short_channel_id": "103x2x0",
"direction": 0,
"channel_in_msat": 5000051,
"channel_out_msat": 5000000
}
]
}
}
}
```

### `pay_part_end` (v25.09 onward)

Emitted by `xpay` when part of a payment ends. `payment_hash`, `groupid` and `partid`
will match a previous `pay_part_start`.

`status` will be "success" or "failure". `duration` will be a number of seconds, with 9 decimal places. This is the time between `xpay` telling lightningd to send the onion, to when `xpay` processes the response.

If `status` is "failure", there will always be an `error_message`: the other fields below
will be missing in the unusual case where the error onion is corrupted.

`failed_node_id`: If it's a non-local error, the source of the error.
`failed_short_channel_id`: if it's not the final node, the channel it's complaining about.
`failed_direction`: if it's not the final node, the channel direction.
`failed_msg`: the decrypted onion message, in hex, if it was valid.
`error_code`: the error code returned (present unless onion was corrupted).
`error_message`: always present: if `failed_node_id` is present it's just the name of the `error_code`, but otherwise it can be a more informative error from our own node.

```json
{
"jsonrpc": "2.0",
"method": "pay_part_end",
"params": {
"origin": "cln-xpay",
"payload": {
"status": "success",
"duration": 0.220209189,
"payment_hash": "651b28004d41cf0dc8e39a0b3d905651a7b012d03d81199fde09314700cb5a62",
"groupid": 5793910575598463611,
"partid": 1
}
}
}
```
6 changes: 3 additions & 3 deletions plugins/libplugin.c
Original file line number Diff line number Diff line change
Expand Up @@ -1859,10 +1859,10 @@ void plugin_gossmap_logcb(struct plugin *plugin,
va_end(ap);
}

struct json_stream *plugin_notification_start(struct plugin *plugin,
struct json_stream *plugin_notification_start(const tal_t *ctx,
const char *method)
{
struct json_stream *js = new_json_stream(plugin, NULL, NULL);
struct json_stream *js = new_json_stream(ctx, NULL, NULL);

json_object_start(js, NULL);
json_add_string(js, "jsonrpc", "2.0");
Expand All @@ -1873,7 +1873,7 @@ struct json_stream *plugin_notification_start(struct plugin *plugin,
}

void plugin_notification_end(struct plugin *plugin,
struct json_stream *stream)
struct json_stream *stream STEALS)
{
json_object_end(stream);
jsonrpc_finish_and_send(plugin, stream);
Expand Down
4 changes: 2 additions & 2 deletions plugins/libplugin.h
Original file line number Diff line number Diff line change
Expand Up @@ -534,10 +534,10 @@ void plugin_notify_end(struct command *cmd, struct json_stream *js);

/* Send a notification for a custom notification topic. These are sent
* to lightningd and distributed to subscribing plugins. */
struct json_stream *plugin_notification_start(struct plugin *plugins,
struct json_stream *plugin_notification_start(const tal_t *ctx,
const char *method);
void plugin_notification_end(struct plugin *plugin,
struct json_stream *stream TAKES);
struct json_stream *stream STEALS);

/* Convenience wrapper for notify "message" */
void plugin_notify_message(struct command *cmd,
Expand Down
4 changes: 2 additions & 2 deletions plugins/test/run-route-calc.c
Original file line number Diff line number Diff line change
Expand Up @@ -299,10 +299,10 @@ void plugin_log(struct plugin *p UNNEEDED, enum log_level l UNNEEDED, const char
{ fprintf(stderr, "plugin_log called!\n"); abort(); }
/* Generated stub for plugin_notification_end */
void plugin_notification_end(struct plugin *plugin UNNEEDED,
struct json_stream *stream TAKES UNNEEDED)
struct json_stream *stream STEALS UNNEEDED)
{ fprintf(stderr, "plugin_notification_end called!\n"); abort(); }
/* Generated stub for plugin_notification_start */
struct json_stream *plugin_notification_start(struct plugin *plugins UNNEEDED,
struct json_stream *plugin_notification_start(const tal_t *ctx UNNEEDED,
const char *method UNNEEDED)
{ fprintf(stderr, "plugin_notification_start called!\n"); abort(); }
/* Generated stub for plugin_notify_message */
Expand Down
4 changes: 2 additions & 2 deletions plugins/test/run-route-overlong.c
Original file line number Diff line number Diff line change
Expand Up @@ -296,10 +296,10 @@ void plugin_log(struct plugin *p UNNEEDED, enum log_level l UNNEEDED, const char
{ fprintf(stderr, "plugin_log called!\n"); abort(); }
/* Generated stub for plugin_notification_end */
void plugin_notification_end(struct plugin *plugin UNNEEDED,
struct json_stream *stream TAKES UNNEEDED)
struct json_stream *stream STEALS UNNEEDED)
{ fprintf(stderr, "plugin_notification_end called!\n"); abort(); }
/* Generated stub for plugin_notification_start */
struct json_stream *plugin_notification_start(struct plugin *plugins UNNEEDED,
struct json_stream *plugin_notification_start(const tal_t *ctx UNNEEDED,
const char *method UNNEEDED)
{ fprintf(stderr, "plugin_notification_start called!\n"); abort(); }
/* Generated stub for plugin_notify_message */
Expand Down
83 changes: 82 additions & 1 deletion plugins/xpay/xpay.c
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@ struct attempt {

struct payment *payment;
struct amount_msat delivers;
struct timemono start_time;

/* Path we tried, so we can unreserve, and tell askrene the results */
const struct hop *hops;
Expand Down Expand Up @@ -536,6 +537,73 @@ static struct amount_msat total_delivered(const struct payment *payment)
return sum;
}

/* We can notify others of what the details are, so they can do their own
* layer heuristics. */
static void json_add_attempt_fields(struct json_stream *js,
const struct attempt *attempt)
{
/* These three uniquely identify this attempt */
json_add_sha256(js, "payment_hash", &attempt->payment->payment_hash);
json_add_u64(js, "groupid", attempt->payment->group_id);
json_add_u64(js, "partid", attempt->partid);
}

static void outgoing_notify_start(const struct attempt *attempt)
{
struct json_stream *js = plugin_notification_start(NULL, "pay_part_start");
json_add_attempt_fields(js, attempt);
json_add_amount_msat(js, "total_payment_msat", attempt->payment->amount);
json_add_amount_msat(js, "attempt_msat", attempt->delivers);
json_array_start(js, "hops");
for (size_t i = 0; i < tal_count(attempt->hops); i++) {
const struct hop *hop = &attempt->hops[i];
json_object_start(js, NULL);
json_add_pubkey(js, "next_node", &hop->next_node);
json_add_short_channel_id(js, "short_channel_id", hop->scidd.scid);
json_add_u32(js, "direction", hop->scidd.dir);
json_add_amount_msat(js, "channel_in_msat", hop->amount_in);
json_add_amount_msat(js, "channel_out_msat", hop->amount_out);
json_object_end(js);
}
json_array_end(js);
plugin_notification_end(attempt->payment->plugin, js);
}

static void outgoing_notify_success(const struct attempt *attempt)
{
struct json_stream *js = plugin_notification_start(NULL, "pay_part_end");
json_add_string(js, "status", "success");
json_add_timerel(js, "duration", timemono_between(time_mono(), attempt->start_time));
json_add_attempt_fields(js, attempt);
plugin_notification_end(attempt->payment->plugin, js);
}

static void outgoing_notify_failure(const struct attempt *attempt,
int failindex, int errcode,
const u8 *replymsg,
const char *errstr)
{
struct json_stream *js = plugin_notification_start(NULL, "pay_part_end");
json_add_string(js, "status", "failure");
json_add_attempt_fields(js, attempt);
if (replymsg)
json_add_hex_talarr(js, "failed_msg", replymsg);
json_add_timerel(js, "duration", timemono_between(time_mono(), attempt->start_time));
if (failindex != -1) {
if (failindex != 0)
json_add_pubkey(js, "failed_node_id", &attempt->hops[failindex-1].next_node);
if (failindex != tal_count(attempt->hops)) {
const struct hop *hop = &attempt->hops[failindex];
json_add_short_channel_id(js, "failed_short_channel_id", hop->scidd.scid);
json_add_u32(js, "failed_direction", hop->scidd.dir);
}
}
if (errcode != -1)
json_add_u32(js, "error_code", errcode);
json_add_string(js, "error_message", errstr);
plugin_notification_end(attempt->payment->plugin, js);
}

static void update_knowledge_from_error(struct command *aux_cmd,
const char *buf,
const jsmntok_t *error,
Expand Down Expand Up @@ -590,6 +658,7 @@ static void update_knowledge_from_error(struct command *aux_cmd,

/* Garbled? Blame random hop. */
if (!replymsg) {
outgoing_notify_failure(attempt, -1, -1, replymsg, "Garbled error message");
index = pseudorand(tal_count(attempt->hops));
description = "Garbled error message";
add_result_summary(attempt, LOG_UNUSUAL,
Expand Down Expand Up @@ -627,6 +696,7 @@ static void update_knowledge_from_error(struct command *aux_cmd,
} else
errmsg = failcode_name;

outgoing_notify_failure(attempt, index, failcode, replymsg, errmsg);
description = tal_fmt(tmpctx,
"Error %s for path %s, from %s",
errmsg,
Expand Down Expand Up @@ -881,6 +951,8 @@ static struct command_result *injectpaymentonion_succeeded(struct command *aux_c
plugin_err(aux_cmd->plugin, "Invalid injectpaymentonion result '%.*s'",
json_tok_full_len(result), json_tok_full(buf, result));

outgoing_notify_success(attempt);

/* Move from current_attempts to past_attempts */
list_del_from(&payment->current_attempts, &attempt->list);
list_add(&payment->past_attempts, &attempt->list);
Expand Down Expand Up @@ -1008,6 +1080,9 @@ static struct command_result *do_inject(struct command *aux_cmd,
return command_still_pending(aux_cmd);
}

outgoing_notify_start(attempt);
attempt->start_time = time_mono();

req = jsonrpc_request_start(aux_cmd,
"injectpaymentonion",
injectpaymentonion_succeeded,
Expand Down Expand Up @@ -2118,6 +2193,12 @@ static const struct plugin_hook hooks[] = {
},
};

/* Notifications for each payment part we attempt */
static const char *outgoing_notifications[] = {
"pay_part_start",
"pay_part_end",
};

int main(int argc, char *argv[])
{
struct xpay *xpay;
Expand All @@ -2131,7 +2212,7 @@ int main(int argc, char *argv[])
commands, ARRAY_SIZE(commands),
notifications, ARRAY_SIZE(notifications),
hooks, ARRAY_SIZE(hooks),
NULL, 0,
outgoing_notifications, ARRAY_SIZE(outgoing_notifications),
plugin_option_dynamic("xpay-handle-pay", "bool",
"Make xpay take over pay commands it can handle.",
bool_option, bool_jsonfmt, &xpay->take_over_pay),
Expand Down
10 changes: 10 additions & 0 deletions tests/plugins/custom_notifications.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,16 @@ def on_pay_success(origin, payload, **kwargs):
)


@plugin.subscribe("pay_part_start")
def on_pay_part_start(origin, payload, **kwargs):
plugin.log("Got pay_part_start: {}".format(payload))


@plugin.subscribe("pay_part_end")
def on_pay_part_end(origin, payload, **kwargs):
plugin.log("Got pay_part_end: {}".format(payload))


@plugin.subscribe("ididntannouncethis")
def on_faulty_emit(origin, payload, **kwargs):
"""We should never receive this as it gets dropped.
Expand Down
50 changes: 50 additions & 0 deletions tests/test_xpay.py
Original file line number Diff line number Diff line change
Expand Up @@ -831,3 +831,53 @@ def test_xpay_twohop_bug(node_factory, bitcoind):
# This doesn't!
l1.rpc.xpay(inv)
l1.daemon.wait_for_log(f'Adding HTLC 1 amount=15002msat cltv={110 + 1 + 100 + 200 + 400}')


def test_attempt_notifications(node_factory):
plugin_path = os.path.join(os.getcwd(), 'tests/plugins/custom_notifications.py')
l1, l2, l3 = node_factory.line_graph(3, wait_for_announce=True,
opts=[{"plugin": plugin_path}, {}, {}])

scid12 = only_one(l1.rpc.listpeerchannels(l2.info['id'])['channels'])['short_channel_id']
scid12_dir = only_one(l1.rpc.listpeerchannels(l2.info['id'])['channels'])['direction']
scid23 = only_one(l2.rpc.listpeerchannels(l3.info['id'])['channels'])['short_channel_id']
scid23_dir = only_one(l2.rpc.listpeerchannels(l3.info['id'])['channels'])['direction']
inv1 = l3.rpc.invoice(5000000, 'test_attempt_notifications1', 'test_attempt_notifications1')
l1.rpc.xpay(inv1['bolt11'])

line = l1.daemon.wait_for_log("plugin-custom_notifications.py: Got pay_part_start: ")
regex = r".*Got pay_part_start: \{'payment_hash': '" + inv1['payment_hash'] + r"', 'groupid': [0-9]*, 'partid': 1, 'total_payment_msat': 5000000, 'attempt_msat': 5000000, 'hops': \[\{'next_node': '" + l2.info['id'] + r"', 'short_channel_id': '" + scid12 + r"', 'direction': " + str(scid12_dir) + r", 'channel_in_msat': 5000051, 'channel_out_msat': 5000051\}, \{'next_node': '" + l3.info['id'] + r"', 'short_channel_id': '" + scid23 + r"', 'direction': " + str(scid23_dir) + r", 'channel_in_msat': 5000051, 'channel_out_msat': 5000000\}\]\}"
assert re.match(regex, line)

# Note, duration always has 9 decimals, EXCEPT that the python code interprets it, so if the last digit is a 0 it will only print 8.
line = l1.daemon.wait_for_log("plugin-custom_notifications.py: Got pay_part_end: ")
regex = r".*Got pay_part_end: \{'status': 'success', 'duration': [0-9]*\.[0-9]*, 'payment_hash': '" + inv1['payment_hash'] + r"', 'groupid': [0-9]*, 'partid': 1\}"
assert re.match(regex, line)

inv2 = l3.rpc.invoice(10000000, 'test_attempt_notifications2', 'test_attempt_notifications2')
l3.rpc.delinvoice('test_attempt_notifications2', "unpaid")

# Final node failure
with pytest.raises(RpcError, match=r"Destination said it doesn't know invoice: incorrect_or_unknown_payment_details"):
l1.rpc.xpay(inv2['bolt11'])

line = l1.daemon.wait_for_log("plugin-custom_notifications.py: Got pay_part_start: ")
regex = r".*Got pay_part_start: \{'payment_hash': '" + inv2['payment_hash'] + r"', 'groupid': [0-9]*, 'partid': 1, 'total_payment_msat': 10000000, 'attempt_msat': 10000000, 'hops': \[\{'next_node': '" + l2.info['id'] + r"', 'short_channel_id': '" + scid12 + r"', 'direction': " + str(scid12_dir) + r", 'channel_in_msat': 10000101, 'channel_out_msat': 10000101\}, \{'next_node': '" + l3.info['id'] + r"', 'short_channel_id': '" + scid23 + r"', 'direction': " + str(scid23_dir) + r", 'channel_in_msat': 10000101, 'channel_out_msat': 10000000\}\]\}"
assert re.match(regex, line)

line = l1.daemon.wait_for_log("plugin-custom_notifications.py: Got pay_part_end: ")
regex = r".*Got pay_part_end: \{'status': 'failure', 'payment_hash': '" + inv2['payment_hash'] + r"', 'groupid': [0-9]*, 'partid': 1, 'failed_msg': '400f00000000009896800000006c', 'duration': [0-9]*\.[0-9]*, 'failed_node_id': '" + l3.info['id'] + r"', 'error_code': 16399, 'error_message': 'incorrect_or_unknown_payment_details'\}"
assert re.match(regex, line)

# Intermediary node failure
l3.stop()
with pytest.raises(RpcError, match=r"Failed after 1 attempts"):
l1.rpc.xpay(inv2['bolt11'])

line = l1.daemon.wait_for_log("plugin-custom_notifications.py: Got pay_part_start: ")
regex = r".*Got pay_part_start: \{'payment_hash': '" + inv2['payment_hash'] + r"', 'groupid': [0-9]*, 'partid': 1, 'total_payment_msat': 10000000, 'attempt_msat': 10000000, 'hops': \[\{'next_node': '" + l2.info['id'] + r"', 'short_channel_id': '" + scid12 + r"', 'direction': " + str(scid12_dir) + r", 'channel_in_msat': 10000101, 'channel_out_msat': 10000101\}, \{'next_node': '" + l3.info['id'] + r"', 'short_channel_id': '" + scid23 + r"', 'direction': " + str(scid23_dir) + r", 'channel_in_msat': 10000101, 'channel_out_msat': 10000000\}\]\}"
assert re.match(regex, line)

line = l1.daemon.wait_for_log("plugin-custom_notifications.py: Got pay_part_end: ")
regex = r".*Got pay_part_end: \{'status': 'failure', 'payment_hash': '" + inv2['payment_hash'] + r"', 'groupid': [0-9]*, 'partid': 1, 'failed_msg': '1007[a-f0-9]*', 'duration': [0-9]*\.[0-9]*, 'failed_node_id': '" + l2.info['id'] + r"', 'failed_short_channel_id': '" + scid23 + r"', 'failed_direction': " + str(scid23_dir) + r", 'error_code': 4103, 'error_message': 'temporary_channel_failure'\}"
assert re.match(regex, line)
Loading