Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 15 additions & 3 deletions apisix/utils/log-util.lua
Original file line number Diff line number Diff line change
Expand Up @@ -83,12 +83,23 @@ local function gen_log_format(format)
end


local function get_custom_format_log(ctx, format)
local function get_custom_format_log(ctx, format, max_req_body_bytes)
local log_format = lru_log_format(format or "", nil, gen_log_format, format)
local entry = core.table.new(0, core.table.nkeys(log_format))
for k, var_attr in pairs(log_format) do
if var_attr[1] then
entry[k] = ctx.var[var_attr[2]]
local key = var_attr[2]
if key == "request_body" then
local max_req_body_bytes = max_req_body_bytes or MAX_REQ_BODY
local req_body, err = get_request_body(max_req_body_bytes)
if err then
core.log.error("fail to get request body: ", err)
else
entry[k] = req_body
end
else
entry[k] = ctx.var[var_attr[2]]
end
else
entry[k] = var_attr[2]
end
Expand Down Expand Up @@ -268,7 +279,8 @@ function _M.get_log_entry(plugin_name, conf, ctx)

if conf.log_format or has_meta_log_format then
customized = true
entry = get_custom_format_log(ctx, conf.log_format or metadata.value.log_format)
entry = get_custom_format_log(ctx, conf.log_format or metadata.value.log_format,
conf.max_req_body_bytes)
else
if is_http then
entry = get_full_log(ngx, conf)
Expand Down
77 changes: 77 additions & 0 deletions t/plugin/kafka-logger-large-body.t
Original file line number Diff line number Diff line change
Expand Up @@ -867,3 +867,80 @@ hello world
--- error_log eval
qr/send data to kafka/
--- wait: 2



=== TEST 26: add plugin metadata
--- config
location /t {
content_by_lua_block {
local t = require("lib.test_admin").test
local code, body = t('/apisix/admin/plugin_metadata/kafka-logger',
ngx.HTTP_PUT,
[[{
"log_format": {
"request_body": "$request_body"
}
}]]
)
if code >=300 then
ngx.status = code
end
ngx.say(body)
}
}
--- request
GET /t
--- response_body
passed



=== TEST 27: set route(meta_format = default, include_req_body = true)
--- config
location /t {
content_by_lua_block {
local t = require("lib.test_admin").test
local code, body = t('/apisix/admin/routes/1',
ngx.HTTP_PUT,
[[{
"plugins": {
"kafka-logger": {
"broker_list" : {
"127.0.0.1":9092
},
"kafka_topic" : "test2",
"key" : "key1",
"timeout" : 1,
"batch_max_size": 1,
"max_req_body_bytes": 5
}
},
"upstream": {
"nodes": {
"127.0.0.1:1980": 1
},
"type": "roundrobin"
},
"uri": "/hello"
}]]
)
if code >= 300 then
ngx.status = code
end
ngx.say(body)
}
}
--- response_body
passed



=== TEST 28: hit route(meta_format = default, include_req_body = true)
--- request
GET /hello?ab=cd
abcdef
--- response_body
hello world
--- error_log_like eval
qr/"request_body": "abcde"/