Skip to content

Commit 78515f0

Browse files
committed
Working kinesis ingest
1 parent bdbe0b1 commit 78515f0

File tree

4 files changed

+274
-111
lines changed

4 files changed

+274
-111
lines changed

Dockerfile

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,5 @@
11
FROM openresty/openresty:xenial
22

3-
LABEL author="Curtis Johnson <[email protected]>"
4-
LABEL maintainer="Curtis Johnson <[email protected]>"
5-
63
USER root
74

85
ENV HOME=/usr/src
@@ -20,5 +17,3 @@ RUN curl -L -o /tmp/luarocks-3.12.2-1.src.rock https://luarocks.org/luarocks-3.1
2017
rm /tmp/luarocks-3.12.2-1.src.rock
2118

2219
RUN /usr/local/openresty/luajit/bin/luarocks make ./lua_resty_netacea-0.2-2.rockspec
23-
24-
## https://github.com/luarocks/luarocks/issues/1797

docker-compose.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ services:
1313
volumes:
1414
- "./src/conf/nginx.conf:/usr/local/openresty/nginx/conf/nginx.conf"
1515
- "./src/lua_resty_netacea.lua:/usr/local/openresty/site/lualib/lua_resty_netacea.lua"
16+
- "./src/kinesis_resty.lua:/usr/local/openresty/site/lualib/kinesis_resty.lua"
1617

1718
test:
1819
build:

src/kinesis_resty.lua

Lines changed: 167 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,167 @@
1+
-- kinesis_resty.lua
2+
-- OpenResty-compatible AWS Kinesis client
3+
-- No external dependencies, fully thread-safe
4+
5+
local ffi = require "ffi"
6+
local http = require "resty.http"
7+
local cjson = require "cjson.safe"
8+
local sha256 = require "resty.sha256"
9+
local str = require "resty.string"
10+
local ngx = ngx
11+
12+
local Kinesis = {}
13+
Kinesis.__index = Kinesis
14+
15+
ngx.log(ngx.ERR, "*** kinesis_resty module loaded ***")
16+
17+
-- FFI-based HMAC-SHA256
18+
ffi.cdef[[
19+
unsigned char *HMAC(const void *evp_md,
20+
const void *key, int key_len,
21+
const unsigned char *d, size_t n,
22+
unsigned char *md, unsigned int *md_len);
23+
const void* EVP_sha256(void);
24+
]]
25+
26+
local function hmac_sha256(key, data)
27+
local md = ffi.new("unsigned char[32]")
28+
local md_len = ffi.new("unsigned int[1]")
29+
ffi.C.HMAC(ffi.C.EVP_sha256(),
30+
key, #key,
31+
data, #data,
32+
md, md_len)
33+
return ffi.string(md, md_len[0])
34+
end
35+
36+
-- SHA256 helper
37+
local function sha256_bin(data)
38+
local sha = sha256:new()
39+
sha:update(data)
40+
return sha:final()
41+
end
42+
43+
local function hex(bin)
44+
return str.to_hex(bin)
45+
end
46+
47+
-- Derive AWS signing key
48+
local function get_signing_key(secret_key, date, region, service)
49+
local kDate = hmac_sha256("AWS4"..secret_key, date)
50+
local kRegion = hmac_sha256(kDate, region)
51+
local kService= hmac_sha256(kRegion, service)
52+
local kSign = hmac_sha256(kService, "aws4_request")
53+
return kSign
54+
end
55+
56+
-- Constructor
57+
function Kinesis.new(stream_name, region, access_key, secret_key)
58+
local self = setmetatable({}, Kinesis)
59+
self.stream_name = stream_name
60+
self.region = region
61+
self.access_key = access_key
62+
self.secret_key = secret_key
63+
self.host = "kinesis."..region..".amazonaws.com"
64+
self.endpoint = "https://"..self.host.."/"
65+
return self
66+
end
67+
68+
-- Generate SigV4 headers
69+
function Kinesis:_sign_request(payload, target)
70+
local now = os.date("!%Y%m%dT%H%M%SZ") -- UTC time in ISO8601 basic
71+
local date = os.date("!%Y%m%d") -- YYYYMMDD for scope
72+
73+
local headers = {
74+
["Host"] = self.host,
75+
["Content-Type"] = "application/x-amz-json-1.1",
76+
["X-Amz-Date"] = now,
77+
["X-Amz-Target"] = target
78+
}
79+
80+
-- canonical headers
81+
local canonical_headers = ""
82+
local signed_headers = {}
83+
local keys = {}
84+
for k,_ in pairs(headers) do table.insert(keys,k) end
85+
table.sort(keys, function(a,b) return a:lower() < b:lower() end)
86+
for _,k in ipairs(keys) do
87+
canonical_headers = canonical_headers .. k:lower()..":"..headers[k].."\n"
88+
table.insert(signed_headers, k:lower())
89+
end
90+
local signed_headers_str = table.concat(signed_headers,";")
91+
92+
local payload_hash = hex(sha256_bin(payload))
93+
94+
local canonical_request = table.concat{
95+
"POST\n",
96+
"/\n",
97+
"\n",
98+
canonical_headers .. "\n",
99+
signed_headers_str .. "\n",
100+
payload_hash
101+
}
102+
103+
local canonical_request_hash = hex(sha256_bin(canonical_request))
104+
105+
local scope = date.."/"..self.region.."/kinesis/aws4_request"
106+
local string_to_sign = table.concat{
107+
"AWS4-HMAC-SHA256\n",
108+
now.."\n",
109+
scope.."\n",
110+
canonical_request_hash
111+
}
112+
113+
local signing_key = get_signing_key(self.secret_key, date, self.region, "kinesis")
114+
local signature = hex(hmac_sha256(signing_key, string_to_sign))
115+
116+
headers["Authorization"] = string.format(
117+
"AWS4-HMAC-SHA256 Credential=%s/%s, SignedHeaders=%s, Signature=%s",
118+
self.access_key, scope, signed_headers_str, signature
119+
)
120+
121+
headers["Content-Length"] = #payload
122+
123+
return headers
124+
end
125+
126+
-- Internal send
127+
function Kinesis:_send(target, payload)
128+
local httpc = http.new()
129+
httpc:set_timeout(5000)
130+
local headers = self:_sign_request(payload, target)
131+
ngx.log(ngx.ERR, "Kinesis Request Headers: ", cjson.encode(headers))
132+
local res, err = httpc:request_uri(self.endpoint, {
133+
method = "POST",
134+
body = payload,
135+
headers = headers,
136+
ssl_verify = true
137+
})
138+
return res, err
139+
end
140+
141+
-- PutRecord
142+
function Kinesis:put_record(partition_key, data)
143+
local payload = cjson.encode{
144+
StreamName = self.stream_name,
145+
PartitionKey = partition_key,
146+
Data = ngx.encode_base64(data)
147+
}
148+
return self:_send("Kinesis_20131202.PutRecord", payload)
149+
end
150+
151+
-- PutRecords
152+
function Kinesis:put_records(records)
153+
local recs = {}
154+
for _,r in ipairs(records) do
155+
table.insert(recs, {
156+
PartitionKey = r.partition_key,
157+
Data = ngx.encode_base64(r.data)
158+
})
159+
end
160+
local payload = cjson.encode{
161+
StreamName = self.stream_name,
162+
Records = recs
163+
}
164+
return self:_send("Kinesis_20131202.PutRecords", payload)
165+
end
166+
167+
return Kinesis

0 commit comments

Comments
 (0)