Skip to content

Commit e842c96

Browse files
authored
feat: add 'drain' method (#46)
1 parent 7296908 commit e842c96

File tree

5 files changed

+304
-60
lines changed

5 files changed

+304
-60
lines changed

lib/resty/apisix/stream/xrpc/socket.lua

Lines changed: 43 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -65,28 +65,24 @@ local function get_tcp_socket(cosocket)
6565
end
6666

6767

68-
-- read the given length of data to a buffer in C land and return the buffer address
69-
-- return error if the read data is less than given length
70-
local function read_buf(cosocket, len)
71-
if len <= 0 then
72-
error("bad length: length of data should be positive, got " .. len, 2)
73-
end
74-
75-
if len > 4 * 1024 * 1024 then
76-
error("bad length: length of data too big, got " .. len, 2)
77-
end
78-
68+
local function _read(cosocket, len, single_buf)
7969
local r = get_request()
8070
if not r then
8171
error("no request found", 2)
8272
end
8373

8474
local u = get_tcp_socket(cosocket)
75+
76+
local buf
77+
if single_buf then
78+
buf = resbuf
79+
end
80+
8581
local errbuf = get_string_buf(ERR_BUF_SIZE)
8682
local errbuf_size = get_size_ptr()
8783
errbuf_size[0] = ERR_BUF_SIZE
8884

89-
local rc = socket_tcp_read(r, u, resbuf, len, errbuf, errbuf_size)
85+
local rc = socket_tcp_read(r, u, buf, len, errbuf, errbuf_size)
9086
if rc == FFI_DONE then
9187
error(ffi_str(errbuf, errbuf_size[0]), 2)
9288
end
@@ -97,7 +93,11 @@ local function read_buf(cosocket, len)
9793
end
9894

9995
if rc >= 0 then
100-
return resbuf[0]
96+
if single_buf then
97+
return resbuf[0]
98+
end
99+
100+
return true
101101
end
102102

103103
assert(rc == FFI_AGAIN)
@@ -107,8 +107,35 @@ local function read_buf(cosocket, len)
107107
errbuf = get_string_buf(ERR_BUF_SIZE)
108108
errbuf_size = get_size_ptr()
109109
errbuf_size[0] = ERR_BUF_SIZE
110-
rc = socket_tcp_get_read_result(r, u, resbuf, len, errbuf, errbuf_size)
110+
rc = socket_tcp_get_read_result(r, u, buf, len, errbuf, errbuf_size)
111+
end
112+
end
113+
114+
115+
-- read the given length of data to a buffer in C land and return the buffer address
116+
-- return error if the read data is less than given length
117+
local function read(cosocket, len)
118+
if len <= 0 then
119+
error("bad length: length of data should be positive, got " .. len, 2)
120+
end
121+
122+
if len > 4 * 1024 * 1024 then
123+
error("bad length: length of data too big, got " .. len, 2)
111124
end
125+
126+
return _read(cosocket, len, true)
127+
end
128+
129+
130+
-- work like `read` but don't return the buffer address and don't guarantee all the data is
131+
-- in the same buffer.
132+
-- return error if the read data is less than given length
133+
local function drain(cosocket, len)
134+
if len <= 0 then
135+
error("bad length: length of data should be positive, got " .. len, 2)
136+
end
137+
138+
return _read(cosocket, len, false)
112139
end
113140

114141

@@ -172,7 +199,8 @@ local function patch_methods(sk)
172199
copy.receiveany = nil
173200
copy.receiveuntil = nil
174201

175-
copy.read = read_buf
202+
copy.read = read
203+
copy.drain = drain
176204
copy.move = move
177205

178206
return {__index = copy}

patch/1.19.9/ngx_stream_lua-xrpc.patch

Lines changed: 68 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
diff --git src/ngx_stream_lua_socket_tcp.c src/ngx_stream_lua_socket_tcp.c
2-
index 7fcfb45..3178588 100644
2+
index 7fcfb45..13e942d 100644
33
--- src/ngx_stream_lua_socket_tcp.c
44
+++ src/ngx_stream_lua_socket_tcp.c
55
@@ -234,6 +234,41 @@ enum {
@@ -44,7 +44,7 @@ index 7fcfb45..3178588 100644
4444

4545
static char ngx_stream_lua_raw_req_socket_metatable_key;
4646
static char ngx_stream_lua_tcp_socket_metatable_key;
47-
@@ -6005,6 +6040,582 @@ static ngx_int_t ngx_stream_lua_socket_insert_buffer(
47+
@@ -6005,6 +6040,605 @@ static ngx_int_t ngx_stream_lua_socket_insert_buffer(
4848
}
4949

5050

@@ -213,7 +213,10 @@ index 7fcfb45..3178588 100644
213213
+ return NGX_ERROR;
214214
+ }
215215
+
216-
+ ngx_stream_lua_ffi_socket_push_res(r, ctx, u, buf, len);
216+
+ if (buf != NULL) {
217+
+ ngx_stream_lua_ffi_socket_push_res(r, ctx, u, buf, len);
218+
+ }
219+
+
217220
+ return NGX_OK;
218221
+}
219222
+
@@ -263,63 +266,83 @@ index 7fcfb45..3178588 100644
263266
+
264267
+ lctx = ngx_stream_lua_get_module_ctx(r, ngx_stream_lua_module);
265268
+
266-
+ if (u->bufs_in == NULL) {
267-
+ size_t buf_len = len > u->conf->buffer_size ? len : u->conf->buffer_size;
269+
+ if (buf != NULL) {
270+
+ if (u->bufs_in == NULL) {
271+
+ size_t buf_len = len > u->conf->buffer_size ? len : u->conf->buffer_size;
272+
+
273+
+ ngx_log_debug1(NGX_LOG_DEBUG_STREAM, r->connection->log, 0,
274+
+ "stream lua tcp socket allocate new new buf of size %uz",
275+
+ buf_len);
276+
+
277+
+ u->bufs_in =
278+
+ ngx_stream_lua_chain_get_free_buf(r->connection->log,
279+
+ r->pool,
280+
+ &lctx->free_recv_bufs,
281+
+ buf_len);
282+
+
283+
+ if (u->bufs_in == NULL) {
284+
+ *errbuf_size = ngx_snprintf(errbuf, *errbuf_size, "no memory") - errbuf;
285+
+ return NGX_DONE;
286+
+ }
287+
+
288+
+ u->buf_in = u->bufs_in;
289+
+ u->buffer = *u->buf_in->buf;
290+
+
291+
+ } else {
292+
+ size_t remain_space = u->buffer.end - u->buffer.pos;
293+
+ size_t remain_data = u->buffer.last - u->buffer.pos;
294+
+ size_t buf_len;
295+
+ u_char *pos;
296+
+ ngx_chain_t *cl, *tmp_cl;
297+
+
298+
+ if (remain_space < len) {
299+
+ buf_len = len > u->conf->buffer_size ? len : u->conf->buffer_size;
300+
+
301+
+ ngx_log_debug1(NGX_LOG_DEBUG_STREAM, r->connection->log, 0,
302+
+ "stream lua tcp socket allocate new new buf of size %uz",
303+
+ buf_len);
304+
+
305+
+ cl = ngx_stream_lua_chain_get_free_buf(r->connection->log,
306+
+ r->pool,
307+
+ &lctx->free_recv_bufs,
308+
+ buf_len);
309+
+ if (cl == NULL) {
310+
+ *errbuf_size = ngx_snprintf(errbuf, *errbuf_size, "no memory") - errbuf;
311+
+ return NGX_DONE;
312+
+ }
313+
+
314+
+ for (tmp_cl = u->bufs_in; tmp_cl->next; tmp_cl = tmp_cl->next) {}
315+
+ tmp_cl->next = cl;
316+
+ u->buf_in = cl;
317+
+
318+
+ u->buffer.last = u->buffer.pos;
319+
+ pos = u->buffer.pos;
320+
+ u->buffer = *cl->buf;
268321
+
322+
+ if (remain_data > 0) {
323+
+ u->buffer.last = ngx_copy(u->buffer.last, pos, remain_data);
324+
+ }
325+
+ }
326+
+ }
327+
+
328+
+ } else if (u->bufs_in == NULL) {
269329
+ ngx_log_debug1(NGX_LOG_DEBUG_STREAM, r->connection->log, 0,
270330
+ "stream lua tcp socket allocate new new buf of size %uz",
271-
+ buf_len);
331+
+ u->conf->buffer_size);
272332
+
273333
+ u->bufs_in =
274334
+ ngx_stream_lua_chain_get_free_buf(r->connection->log,
275335
+ r->pool,
276336
+ &lctx->free_recv_bufs,
277-
+ buf_len);
337+
+ u->conf->buffer_size);
278338
+
279339
+ if (u->bufs_in == NULL) {
280-
+ *errbuf_size = ngx_snprintf(errbuf, *errbuf_size, "no memory")
281-
+ - errbuf;
340+
+ *errbuf_size = ngx_snprintf(errbuf, *errbuf_size, "no memory") - errbuf;
282341
+ return NGX_DONE;
283342
+ }
284343
+
285344
+ u->buf_in = u->bufs_in;
286345
+ u->buffer = *u->buf_in->buf;
287-
+
288-
+ } else {
289-
+ size_t remain_space = u->buffer.end - u->buffer.pos;
290-
+ size_t remain_data = u->buffer.last - u->buffer.pos;
291-
+ size_t buf_len;
292-
+ u_char *pos;
293-
+ ngx_chain_t *cl, *tmp_cl;
294-
+
295-
+ if (remain_space < len) {
296-
+ buf_len = len > u->conf->buffer_size ? len : u->conf->buffer_size;
297-
+
298-
+ ngx_log_debug1(NGX_LOG_DEBUG_STREAM, r->connection->log, 0,
299-
+ "stream lua tcp socket allocate new new buf of size %uz",
300-
+ buf_len);
301-
+
302-
+ cl = ngx_stream_lua_chain_get_free_buf(r->connection->log,
303-
+ r->pool,
304-
+ &lctx->free_recv_bufs,
305-
+ buf_len);
306-
+ if (cl == NULL) {
307-
+ *errbuf_size = ngx_snprintf(errbuf, *errbuf_size, "no memory") - errbuf;
308-
+ return NGX_DONE;
309-
+ }
310-
+
311-
+ for (tmp_cl = u->bufs_in; tmp_cl->next; tmp_cl = tmp_cl->next) {}
312-
+ tmp_cl->next = cl;
313-
+ u->buf_in = cl;
314-
+
315-
+ u->buffer.last = u->buffer.pos;
316-
+ pos = u->buffer.pos;
317-
+ u->buffer = *cl->buf;
318-
+
319-
+ if (remain_data > 0) {
320-
+ u->buffer.last = ngx_copy(u->buffer.last, pos, remain_data);
321-
+ }
322-
+ }
323346
+ }
324347
+
325348
+ dd("tcp receive: buf_in: %p, bufs_in: %p", u->buf_in, u->bufs_in);

t/stream/xrpc/downstream.t

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -256,3 +256,46 @@ qr/stream lua tcp socket allocate new new buf of size \d+/
256256
--- grep_error_log_out
257257
stream lua tcp socket allocate new new buf of size 128
258258
stream lua tcp socket allocate new new buf of size 144
259+
260+
261+
262+
=== TEST 10: drain & move
263+
--- stream_config
264+
server {
265+
listen 1995;
266+
content_by_lua_block {
267+
local sk = ngx.req.socket(true)
268+
local exp = {
269+
"hell",
270+
"o wo",
271+
"r",
272+
}
273+
for i = 1, 3 do
274+
local data = sk:receiveany(128)
275+
if data ~= exp[i] then
276+
ngx.log(ngx.ERR, "actual: ", data, ", expected: ", exp[i])
277+
end
278+
end
279+
}
280+
}
281+
--- stream_server_config
282+
content_by_lua_block {
283+
local ffi = require("ffi")
284+
local ds = require("resty.apisix.stream.xrpc.socket").downstream.socket()
285+
ds:settimeout(5)
286+
287+
local us = require("resty.apisix.stream.xrpc.socket").upstream.socket()
288+
us:settimeout(50)
289+
assert(us:connect("127.0.0.1", 1995))
290+
291+
assert(ds:drain(4))
292+
assert(us:move(ds))
293+
ngx.sleep(0.01)
294+
assert(ds:drain(4))
295+
assert(us:move(ds))
296+
ngx.sleep(0.01)
297+
assert(ds:drain(1))
298+
assert(us:move(ds))
299+
}
300+
--- stream_request
301+
hello world

t/stream/xrpc/fuzzing.t

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -173,3 +173,47 @@ __DATA__
173173
--- stream_request
174174
--- stream_response eval
175175
"123456789" x 128
176+
177+
178+
179+
=== TEST 5: move (read & drain)
180+
--- stream_config
181+
lua_socket_buffer_size 128;
182+
server {
183+
listen 1995;
184+
content_by_lua_block {
185+
local s = ("123456789"):rep(1280)
186+
ngx.say(s)
187+
}
188+
}
189+
--- stream_server_config
190+
lua_socket_buffer_size 128;
191+
content_by_lua_block {
192+
math.randomseed(ngx.time())
193+
194+
local ffi = require("ffi")
195+
local sk = require("resty.apisix.stream.xrpc.socket").upstream.socket()
196+
local dsk = require("resty.apisix.stream.xrpc.socket").downstream.socket()
197+
assert(sk:connect("127.0.0.1", 1995))
198+
sk:settimeout(500)
199+
local total = 9 * 1280
200+
while total > 0 do
201+
local len = math.random(1, 512)
202+
if len > total then
203+
len = total
204+
end
205+
total = total - len
206+
if math.random(1, 2) == 1 then
207+
assert(sk:read(len))
208+
else
209+
assert(sk:drain(len))
210+
end
211+
if total % 2 == 0 then
212+
dsk:move(sk)
213+
end
214+
end
215+
}
216+
--- stream_request
217+
--- stream_response eval
218+
"123456789" x 1280
219+
--- timeout: 5

0 commit comments

Comments
 (0)