Skip to content

Commit 6e3101a

Browse files
ddustinrustyrussell
authored andcommitted
connectd: Implement sending of start_batch
Implement the sending of `start_batch` and `protocol_batch_element` from `channeld` to `connectd`. Each real peer wire message is prefixed with `protocol_batch_element` so connectd can know the size of the message that were batched together. `connectd` intercepts `protocol_batch_element` messages and eats them (doesn’t forward them to peer) to get individual messages out of the batch. It needs this to be able to encrypt them individiaully. Afterwards it recombines the now encrypted messages into a single message to send over the wire to the peer. `channeld` remains responsible for making `start_batch` the first message of the message bundle.
1 parent 28ecf25 commit 6e3101a

File tree

2 files changed

+147
-5
lines changed

2 files changed

+147
-5
lines changed

channeld/channeld.c

Lines changed: 74 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1232,6 +1232,78 @@ static s64 sats_diff(struct amount_sat a, struct amount_sat b)
12321232
return (s64)a.satoshis - (s64)b.satoshis; /* Raw: splicing numbers can wrap! */
12331233
}
12341234

1235+
static void send_message_batch(struct peer *peer, u8 **msgs)
1236+
{
1237+
size_t size;
1238+
size_t hdr_size = tal_bytelen(towire_protocol_batch_element(tmpctx,
1239+
&peer->channel_id,
1240+
0));
1241+
u8 *batch_msg, *final_msg, *final_msg_ptr;
1242+
struct tlv_start_batch_tlvs *tlvs;
1243+
1244+
assert(tal_count(msgs) > 0);
1245+
1246+
/* When sending one message, no batching is required */
1247+
if (tal_count(msgs) == 1) {
1248+
peer_write(peer->pps, msgs[0]);
1249+
return;
1250+
}
1251+
1252+
/* We prefix each message with an interal wire type,
1253+
* protocol_batch_element. connectd will eat each message so they don't
1254+
* actually go out to the peer. It's just so connectd can chop up the
1255+
* message batch back out into individual messages. */
1256+
1257+
/* We start by calculating the total size */
1258+
size = 0;
1259+
1260+
/* Build the `start_batch` msg now so know it's size */
1261+
tlvs = tlv_start_batch_tlvs_new(tmpctx);
1262+
tlvs->batch_info = tal(tlvs, u16);
1263+
*tlvs->batch_info = WIRE_COMMITMENT_SIGNED;
1264+
batch_msg = towire_start_batch(tmpctx, &peer->channel_id,
1265+
tal_count(msgs), tlvs);
1266+
size += tal_bytelen(batch_msg) + hdr_size;
1267+
1268+
/* Count the size of all the messages in the batch */
1269+
for(u32 i = 0; i < tal_count(msgs); i++)
1270+
size += tal_bytelen(msgs[i]) + hdr_size;
1271+
1272+
/* Now we know the size of our `final_msg` so we allocate */
1273+
final_msg = tal_arr(tmpctx, u8, size);
1274+
final_msg_ptr = final_msg;
1275+
1276+
status_debug("proto_batch Building batch with %zu bytes, msgs: %zu",
1277+
size, tal_count(msgs));
1278+
1279+
/* Copy the bytes for `start_batch` prefix */
1280+
memcpy(final_msg_ptr,
1281+
towire_protocol_batch_element(tmpctx,
1282+
&peer->channel_id,
1283+
tal_bytelen(batch_msg)),
1284+
hdr_size);
1285+
final_msg_ptr += hdr_size;
1286+
1287+
memcpy(final_msg_ptr, batch_msg, tal_bytelen(batch_msg));
1288+
final_msg_ptr += tal_bytelen(batch_msg);
1289+
1290+
/* Now copy the bytes from all messages in `msgs` */
1291+
for(u32 i = 0; i < tal_count(msgs); i++) {
1292+
memcpy(final_msg_ptr,
1293+
towire_protocol_batch_element(tmpctx,
1294+
&peer->channel_id,
1295+
tal_bytelen(msgs[i])),
1296+
hdr_size);
1297+
final_msg_ptr += hdr_size;
1298+
1299+
memcpy(final_msg_ptr, msgs[i], tal_bytelen(msgs[i]));
1300+
final_msg_ptr += tal_bytelen(msgs[i]);
1301+
}
1302+
1303+
assert(final_msg + size == final_msg_ptr);
1304+
peer_write(peer->pps, take(final_msg));
1305+
}
1306+
12351307
static void send_commit(struct peer *peer)
12361308
{
12371309
const struct htlc **changed_htlcs;
@@ -1398,8 +1470,7 @@ static void send_commit(struct peer *peer)
13981470

13991471
peer->next_index[REMOTE]++;
14001472

1401-
for(u32 i = 0; i < tal_count(msgs); i++)
1402-
peer_write(peer->pps, take(msgs[i]));
1473+
send_message_batch(peer, msgs);
14031474

14041475
maybe_send_shutdown(peer);
14051476

@@ -5195,8 +5266,7 @@ static void resend_commitment(struct peer *peer, struct changed_htlc *last)
51955266
peer->splice_state->inflights[i]->remote_funding));
51965267
}
51975268

5198-
for(i = 0; i < tal_count(msgs); i++)
5199-
peer_write(peer->pps, take(msgs[i]));
5269+
send_message_batch(peer, msgs);
52005270

52015271
/* If we have already received the revocation for the previous, the
52025272
* other side shouldn't be asking for a retransmit! */

connectd/multiplex.c

Lines changed: 73 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -399,6 +399,69 @@ static struct io_plan *io_sock_shutdown_cb(struct io_conn *conn, struct peer *un
399399
return io_sock_shutdown(conn);
400400
}
401401

402+
/* Process and eat protocol_batch_element messages, encrypt each element message
403+
* and return the encrypted messages as one long byte array. */
404+
static u8 *process_batch_elements(struct peer *peer, const u8 *msg TAKES)
405+
{
406+
u8 *ret = tal_arr(peer, u8, 0);
407+
size_t ret_size = 0;
408+
const u8 *cursor = msg;
409+
size_t plen = tal_count(msg);
410+
411+
status_debug("Processing batch elements of %zu bytes. %s", plen,
412+
tal_hex(tmpctx, msg));
413+
414+
do {
415+
u8 *element_bytes;
416+
u16 element_size;
417+
struct channel_id channel_id;
418+
u8 *enc_msg;
419+
420+
if (fromwire_u16(&cursor, &plen) != WIRE_PROTOCOL_BATCH_ELEMENT) {
421+
status_broken("process_batch_elements on msg that is"
422+
" not WIRE_PROTOCOL_BATCH_ELEMENT. %s",
423+
tal_hexstr(tmpctx, cursor, plen));
424+
return tal_free(ret);
425+
}
426+
427+
fromwire_channel_id(&cursor, &plen, &channel_id);
428+
429+
element_size = fromwire_u16(&cursor, &plen);
430+
if (!element_size) {
431+
status_broken("process_batch_elements cannot have zero"
432+
" length elements. %s",
433+
tal_hexstr(tmpctx, cursor, plen));
434+
return tal_free(ret);
435+
}
436+
437+
element_bytes = fromwire_tal_arrn(NULL, &cursor, &plen,
438+
element_size);
439+
if (!element_bytes) {
440+
status_broken("process_batch_elements fromwire_tal_arrn"
441+
" %s",
442+
tal_hexstr(tmpctx, cursor, plen));
443+
return tal_free(ret);
444+
}
445+
446+
status_debug("Processing batch extracted item %s. %s",
447+
peer_wire_name(fromwire_peektype(element_bytes)),
448+
tal_hex(tmpctx, element_bytes));
449+
450+
enc_msg = cryptomsg_encrypt_msg(tmpctx, &peer->cs,
451+
take(element_bytes));
452+
453+
tal_resize(&ret, ret_size + tal_bytelen(enc_msg));
454+
memcpy(&ret[ret_size], enc_msg, tal_bytelen(enc_msg));
455+
ret_size += tal_bytelen(enc_msg);
456+
457+
} while(plen);
458+
459+
if (taken(msg))
460+
tal_free(msg);
461+
462+
return ret;
463+
}
464+
402465
static struct io_plan *encrypt_and_send(struct peer *peer,
403466
const u8 *msg TAKES,
404467
struct io_plan *(*next)
@@ -442,8 +505,17 @@ static struct io_plan *encrypt_and_send(struct peer *peer,
442505

443506
set_urgent_flag(peer, is_urgent(type));
444507

508+
/* Special message type directing us to process batch items. */
509+
if (type == WIRE_PROTOCOL_BATCH_ELEMENT) {
510+
peer->sent_to_peer = process_batch_elements(peer, msg);
511+
if (!peer->sent_to_peer)
512+
return io_close(peer->to_peer);
513+
}
514+
else {
515+
peer->sent_to_peer = cryptomsg_encrypt_msg(peer, &peer->cs, msg);
516+
}
445517
/* We free this and the encrypted version in next write_to_peer */
446-
peer->sent_to_peer = cryptomsg_encrypt_msg(peer, &peer->cs, msg);
518+
447519
return io_write(peer->to_peer,
448520
peer->sent_to_peer,
449521
tal_bytelen(peer->sent_to_peer),

0 commit comments

Comments
 (0)