Skip to content

Commit 00618b0

Browse files
dongjiang1989jiangdong
authored andcommitted
add zookeeper discovery
Signed-off-by: dongjiang1989 <[email protected]>
1 parent 7816e92 commit 00618b0

File tree

12 files changed

+1113
-3
lines changed

12 files changed

+1113
-3
lines changed

Makefile

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -281,14 +281,15 @@ install: runtime
281281

282282
$(ENV_INSTALL) -d $(ENV_INST_LUADIR)/apisix/discovery
283283
$(ENV_INSTALL) apisix/discovery/*.lua $(ENV_INST_LUADIR)/apisix/discovery/
284-
$(ENV_INSTALL) -d $(ENV_INST_LUADIR)/apisix/discovery/{consul,consul_kv,dns,eureka,nacos,kubernetes,tars}
284+
$(ENV_INSTALL) -d $(ENV_INST_LUADIR)/apisix/discovery/{consul,consul_kv,dns,eureka,nacos,kubernetes,tars,zookeeper}
285285
$(ENV_INSTALL) apisix/discovery/consul/*.lua $(ENV_INST_LUADIR)/apisix/discovery/consul
286286
$(ENV_INSTALL) apisix/discovery/consul_kv/*.lua $(ENV_INST_LUADIR)/apisix/discovery/consul_kv
287287
$(ENV_INSTALL) apisix/discovery/dns/*.lua $(ENV_INST_LUADIR)/apisix/discovery/dns
288288
$(ENV_INSTALL) apisix/discovery/eureka/*.lua $(ENV_INST_LUADIR)/apisix/discovery/eureka
289289
$(ENV_INSTALL) apisix/discovery/kubernetes/*.lua $(ENV_INST_LUADIR)/apisix/discovery/kubernetes
290290
$(ENV_INSTALL) apisix/discovery/nacos/*.lua $(ENV_INST_LUADIR)/apisix/discovery/nacos
291291
$(ENV_INSTALL) apisix/discovery/tars/*.lua $(ENV_INST_LUADIR)/apisix/discovery/tars
292+
$(ENV_INSTALL) apisix/discovery/zookeeper/*.lua $(ENV_INST_LUADIR)/apisix/discovery/zookeeper
292293

293294
$(ENV_INSTALL) -d $(ENV_INST_LUADIR)/apisix/http
294295
$(ENV_INSTALL) apisix/http/*.lua $(ENV_INST_LUADIR)/apisix/http/

apisix-master-0.rockspec

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@ dependencies = {
6363
"api7-dkjson = 0.1.1",
6464
"resty-redis-cluster = 1.05-1",
6565
"lua-resty-expr = 1.3.2",
66+
"lua-resty-zookeeper = 0.2.5",
6667
"graphql = 0.0.2",
6768
"argparse = 0.7.1-1",
6869
"luasocket = 3.1.0-1",
Lines changed: 238 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,238 @@
1+
--
2+
-- Licensed to the Apache Software Foundation (ASF) under one or more
3+
-- contributor license agreements. See the NOTICE file distributed with
4+
-- this work for additional information regarding copyright ownership.
5+
-- The ASF licenses this file to You under the Apache License, Version 2.0
6+
-- (the "License"); you may not use this file except in compliance with
7+
-- the License. You may obtain a copy of the License at
8+
--
9+
-- http://www.apache.org/licenses/LICENSE-2.0
10+
--
11+
-- Unless required by applicable law or agreed to in writing, software
12+
-- distributed under the License is distributed on an "AS IS" BASIS,
13+
-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
-- See the License for the specific language governing permissions and
15+
-- limitations under the License.
16+
--
17+
18+
local core = require("apisix.core")
19+
local utils = require("apisix.discovery.zookeeper.utils")
20+
local lrucache = require("resty.lrucache")
21+
local table = require("apisix.core.table")
22+
local local_conf = require("apisix.core.config_local").local_conf()
23+
local ngx = ngx
24+
local ipairs = ipairs
25+
local log = core.log
26+
27+
local _M = {
28+
version = 0.1,
29+
priority = 1000,
30+
name = "zookeeper",
31+
}
32+
33+
-- Global Configuration
34+
local zookeeper_conf
35+
-- Service Instance Cache(service_name -> {nodes, expire_time})
36+
local instance_cache = lrucache.new(1024)
37+
38+
-- Timer Identifier
39+
local fetch_timer
40+
41+
-- The instance list of a single service from ZooKeeper
42+
local function fetch_service_instances(conf, service_name)
43+
-- 1. Init connect
44+
local client, err = utils.new_zk_client(conf)
45+
if not client then
46+
return nil, err
47+
end
48+
49+
-- 2. Create path
50+
local service_path = conf.root_path .. "/" .. service_name
51+
local ok, err = utils.create_zk_path(client, service_path)
52+
if not ok then
53+
utils.close_zk_client(client)
54+
return nil, err
55+
end
56+
57+
-- 3. All instance nodes under a service
58+
local children, err = client:get_children(service_path)
59+
if not children then
60+
utils.close_zk_client(client)
61+
if err == "not exists" then
62+
log.warn("service path not exists: ", service_path)
63+
return {}
64+
end
65+
log.error("get zk children failed: ", err)
66+
return nil, err
67+
end
68+
69+
-- 4. Parse the data of each instance node one by one
70+
local instances = {}
71+
for _, child in ipairs(children) do
72+
local instance_path = service_path .. "/" .. child
73+
local data, stat, err = client:get_data(instance_path)
74+
do
75+
if not data then
76+
log.error("get instance data failed: ", instance_path, " stat:", stat, " err: ", err)
77+
break
78+
end
79+
80+
-- Parse instance data
81+
local instance = utils.parse_instance_data(data)
82+
if instance then
83+
table.insert(instances, instance)
84+
end
85+
end
86+
end
87+
88+
-- 5. Close connects
89+
utils.close_zk_client(client)
90+
91+
log.debug("fetch service instances: ", service_name, " count: ", #instances)
92+
return instances
93+
end
94+
95+
-- Scheduled fetch of all service instances (full cache update))
96+
local function fetch_all_services()
97+
if not zookeeper_conf then
98+
log.warn("zookeeper discovery config not loaded")
99+
return
100+
end
101+
102+
-- 1. Init Zookeeper client
103+
local client, err = utils.new_zk_client(zookeeper_conf)
104+
if not client then
105+
log.error("init zk client failed: ", err)
106+
return
107+
end
108+
109+
-- 2. Check instance root path exist
110+
local ex, err = client:exists(zookeeper_conf.root_path)
111+
if ex == false then
112+
local created_parent, cerr = client:create(zookeeper_conf.root_path, "", "persistent", false)
113+
if not created_parent then
114+
utils.close_zk_client(client)
115+
log.error("failed to create parent: ", cerr)
116+
end
117+
elseif err then
118+
utils.close_zk_client(client)
119+
log.error("exist zk root failed: ", err)
120+
return
121+
end
122+
123+
-- 3. All instance nodes under a service
124+
local services, err = client:get_children(zookeeper_conf.root_path)
125+
if not services then
126+
utils.close_zk_client(client)
127+
log.error("get zk root children failed: ", err)
128+
return
129+
end
130+
131+
-- 4. Fetch the instances of each service and update the cache
132+
local now = ngx.time()
133+
for _, service in ipairs(services) do
134+
local instances, err = fetch_service_instances(zookeeper_conf, service)
135+
if instances then
136+
instance_cache:set(service, {
137+
nodes = instances,
138+
}, now + zookeeper_conf.cache_ttl)
139+
else
140+
log.error("fetch service instances failed: ", service, " err: ", err)
141+
end
142+
end
143+
144+
-- 5. Close connects
145+
utils.close_zk_client(client)
146+
end
147+
148+
function _M.nodes(service_name)
149+
if not service_name or service_name == "" then
150+
log.error("service name is empty")
151+
return nil
152+
end
153+
154+
-- 1. Check instance_cache initialized
155+
if not instance_cache then
156+
log.error("instance_cache not initialized")
157+
return nil
158+
end
159+
160+
-- 2. Get from cache
161+
local cache, _, _ = instance_cache:get(service_name)
162+
local now = ngx.time()
163+
164+
-- 3. If the cache is missed or expired, actively pull (the data))
165+
if not cache or cache.expire_time < now then
166+
log.debug("cache miss or expired, fetch from zk: ", service_name)
167+
local instances, err = fetch_service_instances(zookeeper_conf, service_name)
168+
if not instances then
169+
log.error("fetch instances failed: ", service_name, " err: ", err)
170+
-- Fallback: Return the old cache (if available))
171+
if cache then
172+
return cache.nodes
173+
end
174+
return nil
175+
end
176+
177+
-- Update the cache
178+
cache = {
179+
nodes = instances,
180+
}
181+
182+
instance_cache:set(service_name, cache, now + zookeeper_conf.cache_ttl)
183+
end
184+
185+
return cache.nodes
186+
end
187+
188+
function _M.init_worker()
189+
-- Load configuration
190+
zookeeper_conf = local_conf.discovery and local_conf.discovery.zookeeper or {}
191+
192+
-- The default values
193+
zookeeper_conf.connect_string = zookeeper_conf.connect_string or "127.0.0.1:2181"
194+
zookeeper_conf.fetch_interval = zookeeper_conf.fetch_interval or 10
195+
zookeeper_conf.session_timeout = zookeeper_conf.session_timeout or 30000
196+
zookeeper_conf.connect_timeout = zookeeper_conf.connect_timeout or 5000
197+
zookeeper_conf.cache_ttl = zookeeper_conf.cache_ttl or 30
198+
zookeeper_conf.root_path = zookeeper_conf.root_path or "/apisix/discovery/zk"
199+
zookeeper_conf.auth = zookeeper_conf.auth or {type = "digest", creds = "",}
200+
zookeeper_conf.weight = zookeeper_conf.weight or 100
201+
202+
log.info("zookeeper_conf:", core.json.encode(zookeeper_conf))
203+
-- Start the timer
204+
if not fetch_timer then
205+
fetch_timer = ngx.timer.every(zookeeper_conf.fetch_interval, fetch_all_services)
206+
log.info("zk discovery fetch timer started, interval: ", zookeeper_conf.fetch_interval, "s")
207+
end
208+
209+
-- Manually execute a full pull immediately
210+
ngx.timer.at(0, fetch_all_services)
211+
end
212+
213+
function _M.dump_data()
214+
local keys = instance_cache:get_keys(0)
215+
local applications = {}
216+
for _, key in ipairs(keys) do
217+
local value, _, _ = instance_cache:get(key)
218+
if value then
219+
local nodes = core.json.decode(value)
220+
if nodes then
221+
applications[key] = {
222+
nodes = nodes,
223+
}
224+
end
225+
end
226+
end
227+
return {services = applications or {}}
228+
end
229+
230+
function _M.destroy()
231+
if fetch_timer then
232+
fetch_timer = nil
233+
end
234+
instance_cache:flush_all()
235+
log.info("zookeeper discovery destroyed")
236+
end
237+
238+
return _M
Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
--
2+
-- Licensed to the Apache Software Foundation (ASF) under one or more
3+
-- contributor license agreements. See the NOTICE file distributed with
4+
-- this work for additional information regarding copyright ownership.
5+
-- The ASF licenses this file to You under the Apache License, Version 2.0
6+
-- (the "License"); you may not use this file except in compliance with
7+
-- the License. You may obtain a copy of the License at
8+
--
9+
-- http://www.apache.org/licenses/LICENSE-2.0
10+
--
11+
-- Unless required by applicable law or agreed to in writing, software
12+
-- distributed under the License is distributed on an "AS IS" BASIS,
13+
-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
-- See the License for the specific language governing permissions and
15+
-- limitations under the License.
16+
--
17+
18+
local core = require("apisix.core")
19+
20+
local schema = {
21+
type = "object",
22+
properties = {
23+
-- ZooKeeper Cluster Addresses (separated by commas for multiple addresses)
24+
connect_string = {
25+
type = "string",
26+
default = "127.0.0.1:2181"
27+
},
28+
-- ZooKeeper Session Timeout (milliseconds)
29+
session_timeout = {
30+
type = "integer",
31+
minimum = 1000,
32+
default = 30000
33+
},
34+
-- ZooKeeper Connect Timeout (milliseconds)
35+
connect_timeout = {
36+
type = "integer",
37+
minimum = 1000,
38+
default = 5000
39+
},
40+
-- Service Discovery Root Path
41+
root_path = {
42+
type = "string",
43+
default = "/apisix/discovery/zk"
44+
},
45+
-- Instance Fetch Interval (seconds)
46+
fetch_interval = {
47+
type = "integer",
48+
minimum = 1,
49+
default = 10
50+
},
51+
-- The default weight value for service instances that do not specify a weight in ZooKeeper.
52+
-- It is used for load balancing (higher weight means more traffic).
53+
-- Default value is 100, and the value range is 1-500.
54+
weight = {
55+
type = "integer",
56+
minimum = 1,
57+
default = 100
58+
},
59+
-- ZooKeeper Authentication Information (digest: username:password):
60+
-- Digest authentication credentials for accessing ZooKeeper cluster.
61+
-- Format requirement: "digest:{username}:{password}".
62+
-- Leave empty to disable authentication (not recommended for production).
63+
auth = {
64+
type = "object",
65+
properties = {
66+
type = {type = "string", enum = {"digest"}, default = "digest"},
67+
creds = {type = "string"} -- digest: username:password
68+
}
69+
},
70+
-- Cache Expiration Time (seconds):
71+
-- The time after which service instance cache becomes expired.
72+
-- Default value is 60 seconds
73+
cache_ttl = {
74+
type = "integer",
75+
minimum = 1,
76+
default = 60
77+
}
78+
},
79+
required = {},
80+
additionalProperties = false
81+
}
82+
83+
local _M = {
84+
schema = schema
85+
}
86+
87+
function _M.check(conf)
88+
return core.schema.check(schema, conf)
89+
end
90+
91+
return _M

0 commit comments

Comments
 (0)