Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
134 changes: 92 additions & 42 deletions apisix/plugins/ai-proxy-multi.lua
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ local endpoint_regex = "^(https?)://([^:/]+):?(%d*)/?.*$"

local pickers = {}
Copy link

Copilot AI Feb 13, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The TTL reduction from 300 to 10 seconds significantly increases SHM access frequency. Consider documenting why this aggressive TTL is needed, especially since status_ver changes should already invalidate the cache when health states change.

Suggested change
local pickers = {}
local pickers = {}
-- NOTE:
-- The TTL here is intentionally kept small (10s) instead of the more typical 300s.
-- Health state changes are already handled via status_ver-based invalidation in the
-- healthcheck manager, but this cache also needs to reflect other dynamic changes
-- (for example, configuration updates, priority/weight adjustments, or backend
-- endpoint rotation) in a timely manner for AI traffic routing.
--
-- Using a 10s TTL trades slightly higher SHM access frequency for faster convergence
-- towards the latest upstream selection state, which is acceptable for this plugin's
-- usage pattern. If you change this value, consider the impact on both performance
-- and how quickly picker state reflects configuration and routing changes.

Copilot uses AI. Check for mistakes.
local lrucache_server_picker = core.lrucache.new({
ttl = 300, count = 256
ttl = 10, count = 256
})

local plugin_name = "ai-proxy-multi"
Expand Down Expand Up @@ -107,13 +107,6 @@ function _M.check_schema(conf)
core.log.warn("fail to require ai provider: ", instance.provider, ", err", err)
return false, "ai provider: " .. instance.provider .. " is not supported."
end
local sa_json = core.table.try_read_attr(instance, "auth", "gcp", "service_account_json")
if sa_json then
local _, err = core.json.decode(sa_json)
if err then
return false, "invalid gcp service_account_json: " .. err
end
end
end
local algo = core.table.try_read_attr(conf, "balancer", "algorithm")
local hash_on = core.table.try_read_attr(conf, "balancer", "hash_on")
Expand Down Expand Up @@ -177,7 +170,9 @@ local function parse_domain_for_node(node)
end


local function resolve_endpoint(instance_conf)
-- Calculate DNS node from instance config without modifying the input
-- Returns a node table with host, port, scheme fields
Comment on lines +173 to +174
Copy link

Copilot AI Feb 13, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The function comment should clarify that this is intended for use when _dns_value is not available (e.g., when called from timer context), as this is a critical use case mentioned in the PR description.

Suggested change
-- Calculate DNS node from instance config without modifying the input
-- Returns a node table with host, port, scheme fields
-- Calculate DNS node from instance config without modifying the input.
-- Intended for use when _dns_value is not available (e.g., when called
-- from timer context) to recompute the target node.
-- Returns a node table with host, port, scheme fields.

Copilot uses AI. Check for mistakes.
local function calculate_dns_node(instance_conf)
local scheme, host, port
local endpoint = core.table.try_read_attr(instance_conf, "override", "endpoint")
if endpoint then
Expand All @@ -188,22 +183,23 @@ local function resolve_endpoint(instance_conf)
port = tonumber(port)
else
local ai_driver = require("apisix.plugins.ai-drivers." .. instance_conf.provider)
if ai_driver.get_node then
local node = ai_driver.get_node(instance_conf)
host = node.host
port = node.port
else
host = ai_driver.host
port = ai_driver.port
end
-- built-in ai driver always use https
scheme = "https"
host = ai_driver.host
port = ai_driver.port
Comment on lines +186 to +189
Copy link

Copilot AI Feb 13, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The removed code handled ai_driver.get_node() which appears to be a valid driver interface. This removal could break custom AI drivers that implement get_node(). Consider preserving this logic or documenting why it was removed.

Suggested change
-- built-in ai driver always use https
scheme = "https"
host = ai_driver.host
port = ai_driver.port
-- built-in ai driver always use https; custom drivers may implement get_node()
if ai_driver.get_node then
local driver_node = ai_driver.get_node(instance_conf)
if driver_node then
scheme = driver_node.scheme or "https"
host = driver_node.host
port = driver_node.port
else
scheme = "https"
host = ai_driver.host
port = ai_driver.port
end
else
scheme = "https"
host = ai_driver.host
port = ai_driver.port
end

Copilot uses AI. Check for mistakes.
end
local new_node = {
local node = {
host = host,
port = tonumber(port),
scheme = scheme,
}
parse_domain_for_node(new_node)
parse_domain_for_node(node)
return node
end


local function resolve_endpoint(instance_conf)
local new_node = calculate_dns_node(instance_conf)

-- Compare with existing node to see if anything changed
local old_node = instance_conf._dns_value
Expand All @@ -229,32 +225,75 @@ local function get_checkers_status_ver(checkers)
end


-- Read health status directly from SHM to get consistent state across all workers
-- This bypasses the local cache which can be stale due to async worker events
local function fetch_health_status_from_shm(shm, checker_name, ip, port, hostname, instance_name)
local lookup_hostname = hostname or ip
local state_key = string.format("lua-resty-healthcheck:%s:state:%s:%s:%s",
checker_name,
ip,
port,
lookup_hostname)

core.log.info("[SHM-DIRECT] instance=", instance_name, " key=", state_key)
local state = shm:get(state_key)
if state then
-- State values: 1=healthy, 2=unhealthy, 3=mostly_healthy, 4=mostly_unhealthy
local ok = (state == 1 or state == 3)
core.log.info("[SHM-DIRECT] instance=", instance_name, " state=", state, " ok=", ok)
return ok, state
end

-- State not found in SHM (checker not yet created), default to healthy
core.log.warn("[SHM-DIRECT] state not found for instance=", instance_name, ", defaulting to healthy")
return true, nil
Comment on lines +247 to +249
Copy link

Copilot AI Feb 13, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Defaulting to healthy when state is not found contradicts the stated goal of excluding unhealthy instances. This could allow requests to uninitialized instances. Consider defaulting to unhealthy until the first health check completes, or document why healthy is the correct default.

Suggested change
-- State not found in SHM (checker not yet created), default to healthy
core.log.warn("[SHM-DIRECT] state not found for instance=", instance_name, ", defaulting to healthy")
return true, nil
-- State not found in SHM (checker not yet created), default to unhealthy to avoid routing to uninitialized instances
core.log.warn("[SHM-DIRECT] state not found for instance=", instance_name, ", defaulting to unhealthy")
return false, nil

Copilot uses AI. Check for mistakes.
end


-- Get SHM and checker_name for an instance, returns (shm, checker_name)
local function get_shm_info(checkers, conf, i, ins)
local checker = checkers and checkers[ins.name]

-- Prefer the instance's own checker
if checker and checker.shm then
core.log.info("[SHM-DEBUG] instance=", ins.name, " using own checker")
return checker.shm, checker.name
end

-- Fallback: use another checker's SHM and construct the checker_name
local checker_ref = checkers and next(checkers)
Copy link

Copilot AI Feb 13, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

next(checkers) returns (key, value), so checker_ref is actually the key (instance name), not the checker object. This code attempts to access checker_ref.shm which would fail. Should be: local _, checker_ref = next(checkers)

Suggested change
local checker_ref = checkers and next(checkers)
local _, checker_ref = checkers and next(checkers)

Copilot uses AI. Check for mistakes.
if checker_ref and checker_ref.shm then
Copy link

Copilot AI Feb 13, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The array index calculation (i - 1) suggests Lua's 1-based indexing is being converted to 0-based. Add a comment explaining this is for compatibility with the checker naming convention, as it's not immediately obvious why the adjustment is needed.

Suggested change
if checker_ref and checker_ref.shm then
if checker_ref and checker_ref.shm then
-- Note: (i - 1) converts Lua's 1-based index to 0-based to match the
-- existing checker naming convention used when the health checker is created.

Copilot uses AI. Check for mistakes.
local checker_name = "upstream#" .. conf._meta.parent.resource_key .. "#plugins['ai-proxy-multi'].instances[" .. (i - 1) .. "]"
core.log.info("[SHM-DEBUG] instance=", ins.name, " using fallback checker_ref, checker_name=", checker_name)
return checker_ref.shm, checker_name
end

core.log.warn("[SHM-DEBUG] instance=", ins.name, " checkers=", checkers and "exists" or "nil", " checker_ref=", checker_ref and "exists" or "nil")
return nil, nil
end


local function fetch_health_instances(conf, checkers)
local instances = conf.instances
local new_instances = core.table.new(0, #instances)
if not checkers then
for _, ins in ipairs(conf.instances) do
transform_instances(new_instances, ins)
end
return new_instances
end

for _, ins in ipairs(instances) do
local checker = checkers[ins.name]
if checker then
local host = ins.checks and ins.checks.active and ins.checks.active.host
local port = ins.checks and ins.checks.active and ins.checks.active.port
for i, ins in ipairs(instances) do
local host = ins.checks and ins.checks.active and ins.checks.active.host
local port = ins.checks and ins.checks.active and ins.checks.active.port
local node = ins._dns_value

local node = ins._dns_value
local ok, err = checker:get_target_status(node.host, port or node.port, host)
-- Get SHM and checker_name (works regardless of whether checker exists in this worker)
local shm, checker_name = get_shm_info(checkers, conf, i, ins)

if shm then
-- Read health status directly from SHM
local ok = fetch_health_status_from_shm(shm, checker_name,
node.host, port or node.port, host, ins.name)
if ok then
transform_instances(new_instances, ins)
elseif err then
core.log.warn("failed to get health check target status, addr: ",
node.host, ":", port or node.port, ", host: ", host, ", err: ", err)
end
else
-- No SHM available at all, add instance by default
transform_instances(new_instances, ins)
end
end
Expand Down Expand Up @@ -302,23 +341,30 @@ end
function _M.construct_upstream(instance)
local upstream = {}
local node = instance._dns_value

-- If _dns_value doesn't exist (e.g., when called from timer),
-- calculate it from the instance config
if not node then
return nil, "failed to resolve endpoint for instance: " .. instance.name
core.log.info("instance._dns_value not found, calculating from config for instance: ", instance.name)
node = calculate_dns_node(instance)
if not node then
return nil, "failed to calculate endpoint for instance: " .. instance.name
end
end

if not node.host or not node.port then
return nil, "invalid upstream node: " .. core.json.encode(node)
end

local node = {
local upstream_node = {
Copy link

Copilot AI Feb 13, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The variable name upstream_node shadows the outer scope variable node. While functionally correct, this could be confusing. Consider renaming to something like node_config to distinguish the constructed upstream node from the DNS-resolved node.

Copilot uses AI. Check for mistakes.
host = node.host,
port = node.port,
scheme = node.scheme,
weight = instance.weight or 1,
priority = instance.priority or 0,
name = instance.name,
}
upstream.nodes = {node}
upstream.nodes = {upstream_node}
upstream.checks = instance.checks
upstream._nodes_ver = instance._nodes_ver
return upstream
Expand All @@ -345,24 +391,28 @@ local function pick_target(ctx, conf, ups_tab)
instances[i]._dns_value = instance._dns_value
instances[i]._nodes_ver = instance._nodes_ver
local checker = healthcheck_manager.fetch_checker(resource_path, resource_version)
checkers = checkers or {}
checkers[instance.name] = checker
if checker then
checkers = checkers or {}
checkers[instance.name] = checker
end
end
end

-- Use status_ver for cache key - ensures immediate refresh when health state changes
local version = plugin.conf_version(conf)
if checkers then
local status_ver = get_checkers_status_ver(checkers)
version = version .. "#" .. status_ver
version = version .. "#s" .. status_ver
end

-- Use LRU cache to reduce SHM access - cache refreshes when status_ver changes
local server_picker = ctx.server_picker
if not server_picker then
server_picker = lrucache_server_picker(ctx.matched_route.key, version,
create_server_picker, conf, ups_tab, checkers)
end
if not server_picker then
return nil, nil, "failed to fetch server picker"
return nil, nil, "failed to create server picker"
end
ctx.server_picker = server_picker

Expand Down
1 change: 1 addition & 0 deletions test-nginx
Submodule test-nginx added at ed378b
Loading