Skip to content

Commit 0775c0f

Browse files
authored
feat: add 'move' method (#43)
1 parent a3d937d commit 0775c0f

File tree

5 files changed

+660
-9
lines changed

5 files changed

+660
-9
lines changed

lib/resty/apisix/stream/xrpc/socket.lua

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,9 +29,21 @@ int
2929
ngx_stream_lua_ffi_socket_tcp_get_read_buf_result(ngx_stream_lua_request_t *r,
3030
ngx_stream_lua_socket_tcp_upstream_t *u, u_char **res, size_t len,
3131
u_char *errbuf, size_t *errbuf_size);
32+
33+
int
34+
ngx_stream_lua_ffi_socket_tcp_send_from_socket(ngx_stream_lua_request_t *r,
35+
ngx_stream_lua_socket_tcp_upstream_t *u, ngx_stream_lua_socket_tcp_upstream_t *ds,
36+
u_char *errbuf, size_t *errbuf_size);
37+
38+
int
39+
ngx_stream_lua_ffi_socket_tcp_get_send_result(ngx_stream_lua_request_t *r,
40+
ngx_stream_lua_socket_tcp_upstream_t *u, u_char *errbuf,
41+
size_t *errbuf_size);
3242
]]
3343
local socket_tcp_read = C.ngx_stream_lua_ffi_socket_tcp_read_buf
3444
local socket_tcp_get_read_result = C.ngx_stream_lua_ffi_socket_tcp_get_read_buf_result
45+
local socket_tcp_move = C.ngx_stream_lua_ffi_socket_tcp_send_from_socket
46+
local socket_tcp_get_move_result = C.ngx_stream_lua_ffi_socket_tcp_get_send_result
3547

3648

3749
local ERR_BUF_SIZE = 256
@@ -100,6 +112,58 @@ local function read_buf(cosocket, len)
100112
end
101113

102114

115+
-- move the buffers from src cosocket to dst cosocket. The buffers are from previous one or multiple
116+
-- read calls. It is equal to send multiple read buffer in the src cosocket to the dst cosocket.
117+
local function move(dst, src)
118+
local r = get_request()
119+
if not r then
120+
error("no request found", 2)
121+
end
122+
123+
if src == dst then
124+
error("can't move buffer in the same socket", 2)
125+
end
126+
127+
if not src then
128+
error("no source socket found", 2)
129+
end
130+
131+
local dst_sk = get_tcp_socket(dst)
132+
local src_sk = get_tcp_socket(src)
133+
if not src_sk then
134+
error("no source socket found", 2)
135+
end
136+
137+
local errbuf = get_string_buf(ERR_BUF_SIZE)
138+
local errbuf_size = get_size_ptr()
139+
errbuf_size[0] = ERR_BUF_SIZE
140+
141+
local rc = socket_tcp_move(r, dst_sk, src_sk, errbuf, errbuf_size)
142+
if rc == FFI_DONE then
143+
error(ffi_str(errbuf, errbuf_size[0]), 2)
144+
end
145+
146+
while true do
147+
if rc == FFI_ERROR then
148+
return nil, ffi_str(errbuf, errbuf_size[0])
149+
end
150+
151+
if rc >= 0 then
152+
return true
153+
end
154+
155+
assert(rc == FFI_AGAIN)
156+
157+
co_yield()
158+
159+
errbuf = get_string_buf(ERR_BUF_SIZE)
160+
errbuf_size = get_size_ptr()
161+
errbuf_size[0] = ERR_BUF_SIZE
162+
rc = socket_tcp_get_move_result(r, dst_sk, errbuf, errbuf_size)
163+
end
164+
end
165+
166+
103167
local function patch_methods(sk)
104168
local methods = getmetatable(sk).__index
105169
local copy = tab_clone(methods)
@@ -109,6 +173,7 @@ local function patch_methods(sk)
109173
copy.receiveuntil = nil
110174

111175
copy.read = read_buf
176+
copy.move = move
112177

113178
return {__index = copy}
114179
end

patch/1.19.9/ngx_stream_lua-xrpc.patch

Lines changed: 255 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
diff --git src/ngx_stream_lua_socket_tcp.c src/ngx_stream_lua_socket_tcp.c
2-
index 7fcfb45..601673d 100644
2+
index 7fcfb45..8fc96cf 100644
33
--- src/ngx_stream_lua_socket_tcp.c
44
+++ src/ngx_stream_lua_socket_tcp.c
55
@@ -234,6 +234,41 @@ enum {
@@ -44,10 +44,39 @@ index 7fcfb45..601673d 100644
4444

4545
static char ngx_stream_lua_raw_req_socket_metatable_key;
4646
static char ngx_stream_lua_tcp_socket_metatable_key;
47-
@@ -6005,6 +6040,329 @@ static ngx_int_t ngx_stream_lua_socket_insert_buffer(
47+
@@ -6005,6 +6040,576 @@ static ngx_int_t ngx_stream_lua_socket_insert_buffer(
4848
}
4949

5050

51+
+static void
52+
+ngx_stream_lua_ffi_socket_reset_buf(ngx_stream_lua_ctx_t *ctx,
53+
+ ngx_stream_lua_socket_tcp_upstream_t *u)
54+
+{
55+
+ ngx_chain_t *cl = u->bufs_in;
56+
+ ngx_chain_t **ll = NULL;
57+
+
58+
+ if (cl->next) {
59+
+ ll = &cl->next;
60+
+ }
61+
+
62+
+ if (ll) {
63+
+ *ll = ctx->free_recv_bufs;
64+
+ ctx->free_recv_bufs = u->bufs_in;
65+
+ u->bufs_in = u->buf_in;
66+
+ }
67+
+
68+
+ if (u->buffer.pos == u->buffer.last) {
69+
+ u->buffer.pos = u->buffer.start;
70+
+ u->buffer.last = u->buffer.pos;
71+
+ }
72+
+
73+
+ if (u->bufs_in) {
74+
+ u->buf_in->buf->last = u->buffer.pos;
75+
+ u->buf_in->buf->pos = u->buffer.pos;
76+
+ }
77+
+}
78+
+
79+
+
5180
+static int
5281
+ngx_stream_lua_socket_tcp_dummy_retval_handler(ngx_stream_lua_request_t *r,
5382
+ ngx_stream_lua_socket_tcp_upstream_t *u, lua_State *L)
@@ -257,11 +286,13 @@ index 7fcfb45..601673d 100644
257286
+ u->buffer = *u->buf_in->buf;
258287
+
259288
+ } else {
260-
+ size_t remain = u->buffer.end - u->buffer.pos;
289+
+ size_t remain_space = u->buffer.end - u->buffer.pos;
290+
+ size_t remain_data = u->buffer.last - u->buffer.pos;
261291
+ size_t buf_len;
262-
+ ngx_chain_t *cl;
292+
+ u_char *pos;
293+
+ ngx_chain_t *cl, *tmp_cl;
263294
+
264-
+ if (remain < len) {
295+
+ if (remain_space < len) {
265296
+ buf_len = len > u->conf->buffer_size ? len : u->conf->buffer_size;
266297
+
267298
+ ngx_log_debug1(NGX_LOG_DEBUG_STREAM, r->connection->log, 0,
@@ -277,13 +308,17 @@ index 7fcfb45..601673d 100644
277308
+ return NGX_DONE;
278309
+ }
279310
+
280-
+ u->buf_in->next = cl;
311+
+ for (tmp_cl = u->bufs_in; tmp_cl->next; tmp_cl = tmp_cl->next) {}
312+
+ tmp_cl->next = cl;
281313
+ u->buf_in = cl;
282314
+
283-
+ cl->buf->last = ngx_copy(cl->buf->last, u->buffer.pos, remain);
284315
+ u->buffer.last = u->buffer.pos;
285-
+
316+
+ pos = u->buffer.pos;
286317
+ u->buffer = *cl->buf;
318+
+
319+
+ if (remain_data > 0) {
320+
+ u->buffer.last = ngx_copy(u->buffer.last, pos, remain_data);
321+
+ }
287322
+ }
288323
+ }
289324
+
@@ -370,6 +405,218 @@ index 7fcfb45..601673d 100644
370405
+ errbuf, errbuf_size);
371406
+}
372407
+
408+
+
409+
+static void
410+
+ngx_stream_lua_ffi_socket_write_error_retval_handler(
411+
+ ngx_stream_lua_request_t *r, ngx_stream_lua_socket_tcp_upstream_t *u,
412+
+ u_char *errbuf, size_t *errbuf_size)
413+
+{
414+
+ ngx_uint_t ft_type;
415+
+
416+
+ if (u->write_co_ctx) {
417+
+ u->write_co_ctx->cleanup = NULL;
418+
+ }
419+
+
420+
+ ngx_stream_lua_socket_tcp_finalize_write_part(r, u, 0);
421+
+
422+
+ ft_type = u->ft_type;
423+
+ u->ft_type = 0;
424+
+
425+
+ ngx_stream_lua_ffi_socket_prepare_error_retvals(r, u, ft_type,
426+
+ errbuf, errbuf_size);
427+
+}
428+
+
429+
+
430+
+int
431+
+ngx_stream_lua_ffi_socket_tcp_send_from_socket(ngx_stream_lua_request_t *r,
432+
+ ngx_stream_lua_socket_tcp_upstream_t *u, ngx_stream_lua_socket_tcp_upstream_t *src,
433+
+ u_char *errbuf, size_t *errbuf_size)
434+
+{
435+
+ size_t len = 0;
436+
+ ngx_int_t rc;
437+
+ ngx_chain_t *cl, *in_cl;
438+
+ ngx_stream_lua_ctx_t *ctx;
439+
+ int tcp_nodelay;
440+
+ ngx_buf_t *b;
441+
+ ngx_connection_t *c;
442+
+ ngx_stream_lua_loc_conf_t *llcf;
443+
+ ngx_stream_core_srv_conf_t *clcf;
444+
+ ngx_stream_lua_co_ctx_t *coctx;
445+
+
446+
+ if (u == NULL || u->peer.connection == NULL || u->write_closed) {
447+
+ llcf = ngx_stream_lua_get_module_loc_conf(r, ngx_stream_lua_module);
448+
+
449+
+ if (llcf->log_socket_errors) {
450+
+ ngx_log_error(NGX_LOG_ERR, r->connection->log, 0,
451+
+ "attempt to send data on a closed socket: u:%p, "
452+
+ "c:%p, ft:%d eof:%d",
453+
+ u, u ? u->peer.connection : NULL,
454+
+ u ? (int) u->ft_type : 0, u ? (int) u->eof : 0);
455+
+ }
456+
+
457+
+ *errbuf_size = ngx_snprintf(errbuf, *errbuf_size, "closed") - errbuf;
458+
+ return NGX_ERROR;
459+
+ }
460+
+
461+
+ if (u->request != r) {
462+
+ *errbuf_size = ngx_snprintf(errbuf, *errbuf_size, "bad request")
463+
+ - errbuf;
464+
+ return NGX_DONE;
465+
+ }
466+
+
467+
+ ngx_stream_lua_ffi_socket_check_busy_connecting(r, u, errbuf, errbuf_size);
468+
+ ngx_stream_lua_ffi_socket_check_busy_writing(r, u, errbuf, errbuf_size);
469+
+
470+
+ if (u->body_downstream) {
471+
+ *errbuf_size = ngx_snprintf(errbuf, *errbuf_size,
472+
+ "attempt to write to request sockets")
473+
+ - errbuf;
474+
+ return NGX_DONE;
475+
+ }
476+
+
477+
+ ngx_log_debug1(NGX_LOG_DEBUG_STREAM, r->connection->log, 0,
478+
+ "lua tcp socket send timeout: %M", u->send_timeout);
479+
+
480+
+ ctx = ngx_stream_lua_get_module_ctx(r, ngx_stream_lua_module);
481+
+
482+
+ for (in_cl = src->bufs_in; in_cl; in_cl = in_cl->next) {
483+
+ b = in_cl->buf;
484+
+
485+
+ len += b->last - b->pos;
486+
+
487+
+ ngx_log_debug3(NGX_LOG_DEBUG_STREAM, r->connection->log, 0,
488+
+ "lua tcp socket move cl:%p buf %p, len: %d",
489+
+ in_cl, b, b->last - b->pos);
490+
+
491+
+ }
492+
+
493+
+ ngx_log_debug2(NGX_LOG_DEBUG_STREAM, r->connection->log, 0,
494+
+ "lua tcp socket move total buf %p, len: %d",
495+
+ src->bufs_in, len);
496+
+
497+
+ if (len == 0) {
498+
+ return NGX_OK;
499+
+ }
500+
+
501+
+ cl = ngx_stream_lua_chain_get_free_buf(r->connection->log, r->pool,
502+
+ &ctx->free_bufs, len);
503+
+
504+
+ if (cl == NULL) {
505+
+ *errbuf_size = ngx_snprintf(errbuf, *errbuf_size, "no memory") - errbuf;
506+
+ return NGX_DONE;
507+
+ }
508+
+
509+
+ /* TODO: avoid copying (it requires to modify the way cosocket sends data) */
510+
+ for (in_cl = src->bufs_in; in_cl; in_cl = in_cl->next) {
511+
+ b = in_cl->buf;
512+
+ cl->buf->last = ngx_copy(cl->buf->last, b->pos, b->last - b->pos);
513+
+ }
514+
+
515+
+ ngx_stream_lua_ffi_socket_reset_buf(ctx, src);
516+
+
517+
+ u->request_bufs = cl;
518+
+
519+
+ u->request_len = len;
520+
+
521+
+ /* mimic ngx_stream_upstream_init_request here */
522+
+
523+
+ clcf = ngx_stream_lua_get_module_loc_conf(r, ngx_stream_core_module);
524+
+ c = u->peer.connection;
525+
+
526+
+ if (clcf->tcp_nodelay && c->tcp_nodelay == NGX_TCP_NODELAY_UNSET) {
527+
+ ngx_log_debug0(NGX_LOG_DEBUG_STREAM, c->log, 0,
528+
+ "lua socket tcp_nodelay");
529+
+
530+
+ tcp_nodelay = 1;
531+
+
532+
+ if (setsockopt(c->fd, IPPROTO_TCP, TCP_NODELAY,
533+
+ (const void *) &tcp_nodelay, sizeof(int))
534+
+ == -1)
535+
+ {
536+
+ llcf = ngx_stream_lua_get_module_loc_conf(r, ngx_stream_lua_module);
537+
+ if (llcf->log_socket_errors) {
538+
+ ngx_connection_error(c, ngx_socket_errno,
539+
+ "setsockopt(TCP_NODELAY) "
540+
+ "failed");
541+
+ }
542+
+
543+
+ *errbuf_size = ngx_snprintf(errbuf, *errbuf_size,
544+
+ "setsocketopt tcp_nodelay failed")
545+
+ - errbuf;
546+
+ return NGX_ERROR;
547+
+ }
548+
+
549+
+ c->tcp_nodelay = NGX_TCP_NODELAY_SET;
550+
+ }
551+
+
552+
+ u->write_waiting = 0;
553+
+ u->write_co_ctx = NULL;
554+
+
555+
+ ngx_stream_lua_probe_socket_tcp_send_start(r, u, b->pos, len);
556+
+
557+
+ rc = ngx_stream_lua_socket_send(r, u);
558+
+
559+
+ dd("socket send returned %d", (int) rc);
560+
+
561+
+ if (rc == NGX_ERROR) {
562+
+ ngx_stream_lua_ffi_socket_write_error_retval_handler(r, u, errbuf,
563+
+ errbuf_size);
564+
+ return NGX_ERROR;
565+
+ }
566+
+
567+
+ if (rc == NGX_OK) {
568+
+ return rc;
569+
+ }
570+
+
571+
+ /* rc == NGX_AGAIN */
572+
+
573+
+ coctx = ctx->cur_co_ctx;
574+
+
575+
+ ngx_stream_lua_cleanup_pending_operation(coctx);
576+
+ coctx->cleanup = ngx_stream_lua_coctx_cleanup;
577+
+ coctx->data = u;
578+
+
579+
+ if (u->raw_downstream) {
580+
+ ctx->writing_raw_req_socket = 1;
581+
+ }
582+
+
583+
+ if (ctx->entered_content_phase) {
584+
+ r->write_event_handler = ngx_stream_lua_content_wev_handler;
585+
+
586+
+ } else {
587+
+ r->write_event_handler = ngx_stream_lua_core_run_phases;
588+
+ }
589+
+
590+
+ u->write_co_ctx = coctx;
591+
+ u->write_waiting = 1;
592+
+ u->write_prepare_retvals = ngx_stream_lua_socket_tcp_dummy_retval_handler;
593+
+
594+
+ dd("setting data to %p", u);
595+
+
596+
+ ngx_log_debug1(NGX_LOG_DEBUG_STREAM, c->log, 0,
597+
+ "lua socket send yield, u: %p", u);
598+
+
599+
+ return NGX_AGAIN;
600+
+}
601+
+
602+
+
603+
+int
604+
+ngx_stream_lua_ffi_socket_tcp_get_send_result(ngx_stream_lua_request_t *r,
605+
+ ngx_stream_lua_socket_tcp_upstream_t *u, u_char *errbuf,
606+
+ size_t *errbuf_size)
607+
+{
608+
+ ngx_log_debug0(NGX_LOG_DEBUG_STREAM, r->connection->log, 0,
609+
+ "lua tcp socket get send result");
610+
+
611+
+ if (u->ft_type) {
612+
+ ngx_stream_lua_ffi_socket_write_error_retval_handler(r, u, errbuf,
613+
+ errbuf_size);
614+
+ return NGX_ERROR;
615+
+ }
616+
+
617+
+ return NGX_OK;
618+
+}
619+
+
373620
+
374621
static ngx_int_t
375622
ngx_stream_lua_socket_tcp_conn_op_resume(ngx_stream_lua_request_t *r)

0 commit comments

Comments
 (0)