Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion lib/resty/kafka/producer.lua
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ local function _send(self, broker_conf, topic_partitions)
err = Errors[errcode] or Errors[-1]

-- set retries according to the error list
local retryable0 = retryable or err.retriable
local retryable0 = err.retriable

local index = sendbuffer:err(topic, partition_id, err.msg, retryable0)

Expand Down
44 changes: 44 additions & 0 deletions t/producer.t
Original file line number Diff line number Diff line change
Expand Up @@ -402,3 +402,47 @@ GET /t
.*offset.*
--- no_error_log
[error]



=== TEST 11: send over size message
--- http_config eval: $::HttpConfig
--- config
location /t {
content_by_lua '
local producer = require "resty.kafka.producer"
local broker_list = {
{ host = "$TEST_NGINX_KAFKA_HOST", port = $TEST_NGINX_KAFKA_PORT },
}

local error_handle = function(topic, partition_id, message_queue, index, err, retryable)
if retryable and err == "MESSAGE_TOO_LARGE" then
ngx.log(ngx.ERR, "retryable is expected to be false when MESSAGE_TOO_LARGE error occurs")
else
ngx.log(ngx.WARN, "kafka send err: topic=", topic, ", partition_id=", partition_id, ", index=", index, ", err=", err)
end
end

local producer_config =
{ producer_type = "async", max_retry = 1, batch_num = 1, error_handle = error_handle }

local p = producer:new(broker_list, producer_config)

-- Assuming the Kafka max.request.size is set to 1MB
local message = string.rep("a", 1024 * 1024)

local offset, err = p:send("test", nil, message)
if not offset then
ngx.say("send err:", err)
return
end

ngx.say("offset: ", tostring(offset))
';
}
--- request
GET /t
--- response_body_like
.*offset.*
--- no_error_log
[error]