diff --git a/Dockerfile b/Dockerfile index 21361b3..ad74671 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,8 +1,5 @@ FROM openresty/openresty:xenial AS base -LABEL author="Curtis Johnson " -LABEL maintainer="Curtis Johnson " - USER root WORKDIR /usr/src @@ -39,4 +36,4 @@ CMD ["bash", "-c", "./run_lua_tests.sh"] FROM test AS lint -CMD ["bash", "-c", "luacheck --no-self -- ./src"] \ No newline at end of file +CMD ["bash", "-c", "luacheck --no-self -- ./src"] diff --git a/docker-compose.yml b/docker-compose.yml index 6e3f0d9..9d51f0c 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -13,6 +13,7 @@ services: volumes: - "./src/conf/nginx.conf:/usr/local/openresty/nginx/conf/nginx.conf" - "./src/lua_resty_netacea.lua:/usr/local/openresty/site/lualib/lua_resty_netacea.lua" + - "./src/kinesis_resty.lua:/usr/local/openresty/site/lualib/kinesis_resty.lua" test: build: diff --git a/src/kinesis_resty.lua b/src/kinesis_resty.lua new file mode 100644 index 0000000..74857da --- /dev/null +++ b/src/kinesis_resty.lua @@ -0,0 +1,167 @@ +-- kinesis_resty.lua +-- OpenResty-compatible AWS Kinesis client +-- No external dependencies, fully thread-safe + +local ffi = require "ffi" +local http = require "resty.http" +local cjson = require "cjson.safe" +local sha256 = require "resty.sha256" +local str = require "resty.string" +local ngx = ngx + +local Kinesis = {} +Kinesis.__index = Kinesis + +ngx.log(ngx.ERR, "*** kinesis_resty module loaded ***") + +-- FFI-based HMAC-SHA256 +ffi.cdef[[ +unsigned char *HMAC(const void *evp_md, + const void *key, int key_len, + const unsigned char *d, size_t n, + unsigned char *md, unsigned int *md_len); +const void* EVP_sha256(void); +]] + +local function hmac_sha256(key, data) + local md = ffi.new("unsigned char[32]") + local md_len = ffi.new("unsigned int[1]") + ffi.C.HMAC(ffi.C.EVP_sha256(), + key, #key, + data, #data, + md, md_len) + return ffi.string(md, md_len[0]) +end + +-- SHA256 helper +local function sha256_bin(data) + local sha = sha256:new() + sha:update(data) + return sha:final() +end + +local function hex(bin) + return str.to_hex(bin) +end + +-- Derive AWS signing key +local function get_signing_key(secret_key, date, region, service) + local kDate = hmac_sha256("AWS4"..secret_key, date) + local kRegion = hmac_sha256(kDate, region) + local kService= hmac_sha256(kRegion, service) + local kSign = hmac_sha256(kService, "aws4_request") + return kSign +end + +-- Constructor +function Kinesis.new(stream_name, region, access_key, secret_key) + local self = setmetatable({}, Kinesis) + self.stream_name = stream_name + self.region = region + self.access_key = access_key + self.secret_key = secret_key + self.host = "kinesis."..region..".amazonaws.com" + self.endpoint = "https://"..self.host.."/" + return self +end + +-- Generate SigV4 headers +function Kinesis:_sign_request(payload, target) + local now = os.date("!%Y%m%dT%H%M%SZ") -- UTC time in ISO8601 basic + local date = os.date("!%Y%m%d") -- YYYYMMDD for scope + + local headers = { + ["Host"] = self.host, + ["Content-Type"] = "application/x-amz-json-1.1", + ["X-Amz-Date"] = now, + ["X-Amz-Target"] = target + } + + -- canonical headers + local canonical_headers = "" + local signed_headers = {} + local keys = {} + for k,_ in pairs(headers) do table.insert(keys,k) end + table.sort(keys, function(a,b) return a:lower() < b:lower() end) + for _,k in ipairs(keys) do + canonical_headers = canonical_headers .. k:lower()..":"..headers[k].."\n" + table.insert(signed_headers, k:lower()) + end + local signed_headers_str = table.concat(signed_headers,";") + + local payload_hash = hex(sha256_bin(payload)) + + local canonical_request = table.concat{ + "POST\n", + "/\n", + "\n", + canonical_headers .. "\n", + signed_headers_str .. "\n", + payload_hash + } + + local canonical_request_hash = hex(sha256_bin(canonical_request)) + + local scope = date.."/"..self.region.."/kinesis/aws4_request" + local string_to_sign = table.concat{ + "AWS4-HMAC-SHA256\n", + now.."\n", + scope.."\n", + canonical_request_hash + } + + local signing_key = get_signing_key(self.secret_key, date, self.region, "kinesis") + local signature = hex(hmac_sha256(signing_key, string_to_sign)) + + headers["Authorization"] = string.format( + "AWS4-HMAC-SHA256 Credential=%s/%s, SignedHeaders=%s, Signature=%s", + self.access_key, scope, signed_headers_str, signature + ) + + headers["Content-Length"] = #payload + + return headers +end + +-- Internal send +function Kinesis:_send(target, payload) + local httpc = http.new() + httpc:set_timeout(5000) + local headers = self:_sign_request(payload, target) + ngx.log(ngx.DEBUG, "Kinesis Request Headers: ", cjson.encode(headers)) + local res, err = httpc:request_uri(self.endpoint, { + method = "POST", + body = payload, + headers = headers, + ssl_verify = true + }) + return res, err +end + +-- PutRecord +function Kinesis:put_record(partition_key, data) + local payload = cjson.encode{ + StreamName = self.stream_name, + PartitionKey = partition_key, + Data = ngx.encode_base64(data) + } + return self:_send("Kinesis_20131202.PutRecord", payload) +end + +-- PutRecords +function Kinesis:put_records(records) + local recs = {} + for _,r in ipairs(records) do + table.insert(recs, { + PartitionKey = r.partition_key, + Data = ngx.encode_base64(r.data) + }) + end + local payload = cjson.encode{ + StreamName = self.stream_name, + Records = recs + } + return self:_send("Kinesis_20131202.PutRecords", payload) +end + +return Kinesis diff --git a/src/lua_resty_netacea.lua b/src/lua_resty_netacea.lua index adc9af2..c1fe719 100644 --- a/src/lua_resty_netacea.lua +++ b/src/lua_resty_netacea.lua @@ -1,3 +1,5 @@ +local Kinesis = require("kinesis_resty") + local _N = {} _N._VERSION = '0.2.2' _N._TYPE = 'nginx' @@ -75,6 +77,7 @@ function _N:new(options) if not self.ingestEndpoint or self.ingestEndpoint == '' then self.ingestEnabled = false end + self.kinesisProperties = options.kinesisProperties or nil -- mitigate:optional:mitigationEnabled self.mitigationEnabled = options.mitigationEnabled or false -- mitigate:required:mitigationEndpoint @@ -463,8 +466,8 @@ function _N:setBcType(match, mitigate, captcha) return mitigationApplied end ----------------------------------------------------------------------- --- start STASH code to enable async HTTP requests from logging context +--------------------------------------------------------- +-- Async ingest from logging context local function new_queue(size, allow_wrapping) -- Head is next insert, tail is next read @@ -520,106 +523,136 @@ local function new_queue(size, allow_wrapping) }; end -local semaphore = require "ngx.semaphore"; - -local async_queue_low_priority = new_queue(5000, true); -local queue_sema_low_priority = semaphore.new(); -local requests_sema = semaphore.new(); - -requests_sema:post(1024); -- allow up to 1024 sending timer contexts +-- Data queue for batch processing +local data_queue = new_queue(5000, true); +local dead_letter_queue = new_queue(1000, true); +local BATCH_SIZE = 25; -- Kinesis PutRecords supports up to 500 records, using 25 for more frequent sends +local BATCH_TIMEOUT = 1.0; -- Send batch after 1 second even if not full -------------------------------------------------------- --- start timers to execute requests tasks +-- start batch processor for Kinesis data function _N:start_timers() - -- start requests executor - local executor; - executor = function( premature ) + -- start batch processor + local batch_processor; + batch_processor = function( premature ) if premature then return end + local execution_thread = ngx.thread.spawn( function() + local batch = {} + local last_send_time = ngx.now() while true do - while async_queue_low_priority:count() == 0 do - if ngx.worker.exiting() == true then return end + -- Check if worker is exiting + if ngx.worker.exiting() == true then + -- Send any remaining data before exiting + if #batch > 0 then + self:send_batch_to_kinesis(batch) + end + return + end + + local current_time = ngx.now() + local should_send_batch = false + local dead_letter_items = 0 + -- Check dead_letter_queue first + while dead_letter_queue:count() > 0 and #batch < BATCH_SIZE do + local dlq_item = dead_letter_queue:pop() + if dlq_item then + table.insert(batch, dlq_item) + dead_letter_items = dead_letter_items + 1 + end + end - queue_sema_low_priority:wait(0.3); -- sleeping for 300 milliseconds + if (dead_letter_items > 0) then + ngx.log(ngx.DEBUG, "NETACEA BATCH - added ", dead_letter_items, " items from dead letter queue to batch") end - repeat - if ngx.worker.exiting() == true then return end - - -- to make sure that there are only up to 1024 executor's timers at any time - local ok, _ = requests_sema:wait(0.1); - until ok and ok == true; - - local task = async_queue_low_priority:pop(); - if task then - -- run tasks in separate timer contexts to avoid accumulating large numbers of dead corutines - ngx.timer.at( 0, function() - local ok, err = pcall( task ); - if not ok and err then - ngx.log( ngx.ERR, "NETACEA API - sending task has failed with error: ", err ); - end - - local cnt = 1; - - while async_queue_low_priority:count() > 0 and cnt < 100 do - - local next_task = async_queue_low_priority:pop(); - - if not next_task then - queue_sema_low_priority:wait(0.3); -- sleeping for 300 milliseconds - next_task = async_queue_low_priority:pop(); - end - - if next_task then - ok, err = pcall( next_task ); - if not ok and err then - ngx.log( ngx.ERR, "NETACEA - sending task has failed with error: ", err ); - else - ngx.sleep(0.01); - end - else - if queue_sema_low_priority:count() > async_queue_low_priority:count() then - queue_sema_low_priority:wait(0) - end - break; - end - - cnt = cnt + 1; - end - - requests_sema:post(1); - end ); - else -- semaphore is out of sync with queue - need to drain it - if queue_sema_low_priority:count() > async_queue_low_priority:count() then - queue_sema_low_priority:wait(0) + -- Collect data items for batch + while data_queue:count() > 0 and #batch < BATCH_SIZE do + local data_item = data_queue:pop() + if data_item then + table.insert(batch, data_item) end - requests_sema:post(1); end + -- Determine if we should send the batch + if #batch >= BATCH_SIZE then + should_send_batch = true + ngx.log(ngx.DEBUG, "NETACEA BATCH - sending full batch of ", #batch, " items") + elseif #batch > 0 and (current_time - last_send_time) >= BATCH_TIMEOUT then + should_send_batch = true + ngx.log(ngx.DEBUG, "NETACEA BATCH - sending timeout batch of ", #batch, " items") + end + + -- Send batch if conditions are met + if should_send_batch then + self:send_batch_to_kinesis(batch) + batch = {} -- Reset batch + last_send_time = current_time + end + + -- Sleep briefly if no data to process + if data_queue:count() == 0 and dead_letter_queue:count() == 0 then + ngx.sleep(0.1) + end end - end ); + end ) local ok, err = ngx.thread.wait( execution_thread ); if not ok and err then - ngx.log( ngx.ERR, "NETACEA - executor thread has failed with error: ", err ); + ngx.log( ngx.ERR, "NETACEA - batch processor thread has failed with error: ", err ); end - -- If the worker is exiting, don't queue another executor + -- If the worker is exiting, don't queue another processor if ngx.worker.exiting() then return end - ngx.timer.at( 0, executor ); + ngx.timer.at( 0, batch_processor ); + end + + ngx.timer.at( 0, batch_processor ); + +end + +function _N:send_batch_to_kinesis(batch) + if not batch or #batch == 0 then return end + + local client = Kinesis.new( + self.kinesisProperties.stream_name, + self.kinesisProperties.region, + self.kinesisProperties.aws_access_key, + self.kinesisProperties.aws_secret_key + ) + + -- Convert batch data to Kinesis records format + local records = {} + for _, data_item in ipairs(batch) do + table.insert(records, { + partition_key = buildRandomString(10), + data = "[" .. cjson.encode(data_item) .. "]" + }) end - ngx.timer.at( 0, executor ); + ngx.log( ngx.DEBUG, "NETACEA BATCH - sending batch of ", #records, " records to Kinesis stream ", self.kinesisProperties.stream_name ); + + local res, err = client:put_records(records) + if err then + ngx.log( ngx.ERR, "NETACEA BATCH - error sending batch to Kinesis: ", err ); + for _, data_item in ipairs(batch) do + local ok, dlq_err = dead_letter_queue:push(data_item) + if not ok and dlq_err then + ngx.log( ngx.ERR, "NETACEA BATCH - failed to push record to dead letter queue: ", dlq_err ); + end + end + else + ngx.log( ngx.DEBUG, "NETACEA BATCH - successfully sent batch to Kinesis, response status: ", res.status .. ", body: " .. (res.body or '') ); + end end --- end STASH code function _N:ingest() if not self.ingestEnabled then return nil end @@ -628,7 +661,8 @@ function _N:ingest() local data = { Request = vars.request_method .. " " .. vars.request_uri .. " " .. vars.server_protocol, - TimeLocal = vars.msec * 1000, + TimeLocal = vars.time_local, + TimeUnixMsUTC = vars.msec * 1000, RealIp = self:getIpAddress(vars), UserAgent = vars.http_user_agent or "-", Status = vars.status, @@ -638,41 +672,25 @@ function _N:ingest() NetaceaUserIdCookie = mitata, NetaceaMitigationApplied = ngx.ctx.bc_type, IntegrationType = self._MODULE_TYPE, - IntegrationVersion = self._MODULE_VERSION + IntegrationVersion = self._MODULE_VERSION, + Query = vars.query_string or "", + RequestHost = vars.host or "-", + RequestId = vars.request_id or "-", + ProtectionMode = self.mitigationType or "ERROR", + -- TODO + BytesReceived = vars.bytes_received or 0, -- Doesn't seem to work + NetaceaUserIdCookieStatus = 1, + Optional = {} } - -- start STASH code - local request_params = {}; - - request_params.body = cjson.encode(data); - request_params.method = "POST"; - request_params.headers = { - ["Content-Length"] = #request_params.body, - ["Content-Type"] = "application/json", - ["x-netacea-api-key"] = self.apiKey; - }; - - local request_task = function() - local hc = http:new(); - - local res, err = hc:request_uri( self.ingestEndpoint, request_params ); - - if not res and err then - ngx.log( ngx.ERR, "Netacea ingest - failed API request - error: ", err ); - return; - else - if res.status ~= 200 and res.status ~= 201 then - ngx.log( ngx.ERR, "Netacea ingest - failed API request - status: ", res.status ); - return; - end - end + -- Add data directly to the queue for batch processing + local ok, err = data_queue:push(data) + if not ok and err then + ngx.log(ngx.WARN, "NETACEA INGEST - failed to queue data: ", err) + else + ngx.log(ngx.DEBUG, "NETACEA INGEST - queued data item, queue size: ", data_queue:count()) end - -- request_params are not going to get deallocated as long as function stays in the queue - local ok, _ = async_queue_low_priority:push( request_task ); - if ok then queue_sema_low_priority:post(1) end - - -- end STASH code end _N['idTypesText'] = {}