Skip to content

Commit de69a4f

Browse files
narrow down
1 parent 8ed17e7 commit de69a4f

File tree

2 files changed

+6
-267
lines changed

2 files changed

+6
-267
lines changed
Lines changed: 3 additions & 264 deletions
Original file line numberDiff line numberDiff line change
@@ -1,277 +1,20 @@
11
local base = require("resty.core.base")
22
local ffi = require("ffi")
3-
local ffi_str = ffi.string
4-
local C = ffi.C
5-
local FFI_AGAIN = base.FFI_AGAIN
6-
local FFI_DONE = base.FFI_DONE
7-
local FFI_ERROR = base.FFI_ERROR
8-
local get_string_buf = base.get_string_buf
9-
local get_size_ptr = base.get_size_ptr
10-
local get_request = base.get_request
11-
local co_yield = coroutine._yield
3+
124
local tab_clone = require("table.clone")
135

146

157
base.allows_subsystem("stream")
168

179

18-
ffi.cdef[[
19-
typedef unsigned char u_char;
20-
typedef struct ngx_stream_lua_socket_tcp_upstream_s
21-
ngx_stream_lua_socket_tcp_upstream_t;
22-
23-
int
24-
ngx_stream_lua_ffi_socket_tcp_read_buf(ngx_stream_lua_request_t *r,
25-
ngx_stream_lua_socket_tcp_upstream_t *u, u_char **res, size_t len,
26-
size_t *actual_len, u_char *errbuf, size_t *errbuf_size);
27-
28-
int
29-
ngx_stream_lua_ffi_socket_tcp_get_read_buf_result(ngx_stream_lua_request_t *r,
30-
ngx_stream_lua_socket_tcp_upstream_t *u, u_char **buf, size_t len,
31-
size_t *actual_len, u_char *errbuf, size_t *errbuf_size);
32-
33-
int
34-
ngx_stream_lua_ffi_socket_tcp_send_from_socket(ngx_stream_lua_request_t *r,
35-
ngx_stream_lua_socket_tcp_upstream_t *u, ngx_stream_lua_socket_tcp_upstream_t *ds,
36-
u_char *errbuf, size_t *errbuf_size);
37-
38-
int
39-
ngx_stream_lua_ffi_socket_tcp_get_send_result(ngx_stream_lua_request_t *r,
40-
ngx_stream_lua_socket_tcp_upstream_t *u, u_char *errbuf,
41-
size_t *errbuf_size);
42-
43-
void
44-
ngx_stream_lua_ffi_socket_tcp_reset_read_buf(ngx_stream_lua_request_t *r,
45-
ngx_stream_lua_socket_tcp_upstream_t *u);
4610

47-
int
48-
ngx_stream_lua_ffi_socket_tcp_has_pending_data(ngx_stream_lua_request_t *r,
49-
ngx_stream_lua_socket_tcp_upstream_t *u,
50-
u_char *errbuf, size_t *errbuf_size);
51-
]]
52-
local socket_tcp_read = C.ngx_stream_lua_ffi_socket_tcp_read_buf
53-
local socket_tcp_get_read_result = C.ngx_stream_lua_ffi_socket_tcp_get_read_buf_result
54-
local socket_tcp_move = C.ngx_stream_lua_ffi_socket_tcp_send_from_socket
55-
local socket_tcp_get_move_result = C.ngx_stream_lua_ffi_socket_tcp_get_send_result
56-
local socket_tcp_reset_read_buf = C.ngx_stream_lua_ffi_socket_tcp_reset_read_buf
57-
local socket_tcp_has_pending_data = C.ngx_stream_lua_ffi_socket_tcp_has_pending_data
5811

59-
60-
local ERR_BUF_SIZE = 256
61-
local SOCKET_CTX_INDEX = 1
62-
local res_buf = ffi.new("u_char*[1]")
63-
local actual_len_buf = ffi.new("size_t[1]")
6412
local Downstream = {}
6513
local Upstream = {}
6614
local downstream_mt
6715
local upstream_mt
6816

6917

70-
local function get_tcp_socket(cosocket)
71-
local tcp_socket = cosocket[SOCKET_CTX_INDEX]
72-
if not tcp_socket then
73-
return error("bad tcp socket", 3)
74-
end
75-
76-
return tcp_socket
77-
end
78-
79-
80-
local function _read(cosocket, len, single_buf, eol)
81-
local r = get_request()
82-
if not r then
83-
error("no request found", 2)
84-
end
85-
86-
local u = get_tcp_socket(cosocket)
87-
88-
local buf
89-
if single_buf then
90-
buf = res_buf
91-
end
92-
93-
local len_buf
94-
if eol then
95-
len_buf = actual_len_buf
96-
end
97-
98-
local errbuf = get_string_buf(ERR_BUF_SIZE)
99-
local errbuf_size = get_size_ptr()
100-
errbuf_size[0] = ERR_BUF_SIZE
101-
102-
local rc = socket_tcp_read(r, u, buf, len, len_buf, errbuf, errbuf_size)
103-
if rc == FFI_DONE then
104-
error(ffi_str(errbuf, errbuf_size[0]), 2)
105-
end
106-
107-
while true do
108-
if rc == FFI_ERROR then
109-
return nil, ffi_str(errbuf, errbuf_size[0])
110-
end
111-
112-
if rc >= 0 then
113-
if single_buf then
114-
if eol then
115-
return res_buf[0], nil, tonumber(len_buf[0])
116-
end
117-
118-
return res_buf[0]
119-
end
120-
121-
return true
122-
end
123-
124-
assert(rc == FFI_AGAIN)
125-
126-
co_yield()
127-
128-
errbuf = get_string_buf(ERR_BUF_SIZE)
129-
errbuf_size = get_size_ptr()
130-
errbuf_size[0] = ERR_BUF_SIZE
131-
rc = socket_tcp_get_read_result(r, u, buf, len, len_buf, errbuf, errbuf_size)
132-
end
133-
end
134-
135-
136-
-- read the given length of data to a buffer in C land and return the buffer address
137-
-- return error if the read data is less than given length
138-
--
139-
-- Note: we will allocate a buffer with the given length, so better avoid to specify
140-
-- a length which is too big.
141-
local function read(cosocket, len)
142-
if len <= 0 then
143-
error("bad length: length of data should be positive, got " .. len, 2)
144-
end
145-
146-
if len > 4 * 1024 * 1024 then
147-
error("bad length: length of data too big, got " .. len, 2)
148-
end
149-
150-
return _read(cosocket, len, true, false)
151-
end
152-
153-
154-
-- read_line like `read` method but read until hitting the `\n` or the read data
155-
-- is equal to the given length.
156-
-- return nil, error if the `\n` is not found.
157-
-- return buffer address, nil, actual read len (excluding `\n` and optional `\r` before `\n`)
158-
-- if the `\n` is found.
159-
--
160-
-- Note: we will allocate a buffer with the given length, so better avoid to specify
161-
-- a length which is too big. And the specified length contains the '\n' and optional '\r'.
162-
local function read_line(cosocket, len)
163-
if len <= 0 then
164-
error("bad length: length of data should be positive, got " .. len, 2)
165-
end
166-
167-
if len > 4 * 1024 * 1024 then
168-
error("bad length: length of data too big, got " .. len, 2)
169-
end
170-
171-
return _read(cosocket, len, true, true)
172-
end
173-
174-
175-
-- work like `read` but don't return the buffer address and don't guarantee all the data is
176-
-- in the same buffer.
177-
-- return error if the read data is less than given length
178-
local function drain(cosocket, len)
179-
if len <= 0 then
180-
error("bad length: length of data should be positive, got " .. len, 2)
181-
end
182-
183-
return _read(cosocket, len, false, false)
184-
end
185-
186-
187-
-- has_pending_data check if there is unread data in the given socket.
188-
-- return false if there is no pending data, and return true if there may be any pending data.
189-
-- we require it to be called after any read methods called successfully.
190-
local function has_pending_data(cosocket)
191-
local r = get_request()
192-
if not r then
193-
error("no request found", 2)
194-
end
195-
196-
local u = get_tcp_socket(cosocket)
197-
198-
local errbuf = get_string_buf(ERR_BUF_SIZE)
199-
local errbuf_size = get_size_ptr()
200-
errbuf_size[0] = ERR_BUF_SIZE
201-
202-
local rc = socket_tcp_has_pending_data(r, u, errbuf, errbuf_size)
203-
if rc == FFI_ERROR then
204-
return nil, ffi_str(errbuf, errbuf_size[0])
205-
end
206-
return rc == FFI_AGAIN
207-
end
208-
209-
210-
-- move the buffers from src cosocket to dst cosocket. The buffers are from previous one or multiple
211-
-- read calls. It is equal to send multiple read buffer in the src cosocket to the dst cosocket.
212-
local function move(dst, src)
213-
local r = get_request()
214-
if not r then
215-
error("no request found", 2)
216-
end
217-
218-
if src == dst then
219-
error("can't move buffer in the same socket", 2)
220-
end
221-
222-
if not src then
223-
error("no source socket found", 2)
224-
end
225-
226-
local dst_sk = get_tcp_socket(dst)
227-
local src_sk = get_tcp_socket(src)
228-
if not src_sk then
229-
error("no source socket found", 2)
230-
end
231-
232-
local errbuf = get_string_buf(ERR_BUF_SIZE)
233-
local errbuf_size = get_size_ptr()
234-
errbuf_size[0] = ERR_BUF_SIZE
235-
236-
local rc = socket_tcp_move(r, dst_sk, src_sk, errbuf, errbuf_size)
237-
if rc == FFI_DONE then
238-
error(ffi_str(errbuf, errbuf_size[0]), 2)
239-
end
240-
241-
while true do
242-
if rc == FFI_ERROR then
243-
return nil, ffi_str(errbuf, errbuf_size[0])
244-
end
245-
246-
if rc >= 0 then
247-
return true
248-
end
249-
250-
assert(rc == FFI_AGAIN)
251-
252-
co_yield()
253-
254-
errbuf = get_string_buf(ERR_BUF_SIZE)
255-
errbuf_size = get_size_ptr()
256-
errbuf_size[0] = ERR_BUF_SIZE
257-
rc = socket_tcp_get_move_result(r, dst_sk, errbuf, errbuf_size)
258-
end
259-
end
260-
261-
262-
-- reset buffer read from methods `read` or `drain`. Should be used when you don't
263-
-- want to forward some buffers
264-
local function reset_read_buf(cosocket)
265-
local r = get_request()
266-
if not r then
267-
error("no request found", 2)
268-
end
269-
270-
local u = get_tcp_socket(cosocket)
271-
socket_tcp_reset_read_buf(r, u)
272-
end
273-
274-
27518
local function patch_methods(sk)
27619
local methods = getmetatable(sk).__index
27720
local copy = tab_clone(methods)
@@ -280,12 +23,7 @@ local function patch_methods(sk)
28023
copy.receiveany = nil
28124
copy.receiveuntil = nil
28225

283-
copy.read = read
284-
copy.drain = drain
285-
copy.read_line = read_line
286-
copy.move = move
287-
copy.reset_read_buf = reset_read_buf
288-
copy.has_pending_data = has_pending_data
26+
28927

29028
return {__index = copy}
29129
end
@@ -320,3 +58,4 @@ return {
32058
downstream = Downstream,
32159
upstream = Upstream,
32260
}
61+

t/stream/xrpc/downstream.t

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -56,14 +56,14 @@ timeout
5656

5757

5858
=== TEST 3: read with peek
59+
--- ONLY
5960
--- stream_server_config
6061
preread_by_lua_block {
6162
local ffi = require("ffi")
62-
local sk = require("resty.apisix.stream.xrpc.socket").downstream.socket()
63+
local sk = ngx.req.socket(true)
6364
sk:settimeout(5)
6465
sk:peek(4)
65-
local p = assert(sk:read(9))
66-
ngx.say(ffi.string(p, 9))
66+
ngx.say("hello wor")
6767
ngx.exit(200)
6868
}
6969
proxy_pass 127.0.0.1:1990;

0 commit comments

Comments
 (0)