Skip to content

Commit 3be98cd

Browse files
committed
add fair semaphore
1 parent 8d1f30f commit 3be98cd

File tree

5 files changed

+226
-0
lines changed

5 files changed

+226
-0
lines changed
Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
-- Fair distributed semaphore using token pool (BRPOP-based)
2+
-- KEYS[1]: tokens_key (LIST of available tokens)
3+
-- KEYS[2]: holders_key (SET of current holder instance IDs)
4+
-- KEYS[3]: holder_key (individual holder TTL key for this instance)
5+
-- ARGV[1]: instance_id
6+
-- ARGV[2]: capacity (max concurrent holders)
7+
-- ARGV[3]: ttl_seconds
8+
-- ARGV[4]: timeout_seconds (for BRPOP)
9+
--
10+
-- Returns: {exit_code, status, token, current_count}
11+
-- exit_code: 0 if acquired, 255 if timeout/failed
12+
-- status: 'acquired' or 'timeout'
13+
14+
local tokens_key = KEYS[1]
15+
local holders_key = KEYS[2]
16+
local holder_key = KEYS[3]
17+
18+
local instance_id = ARGV[1]
19+
local capacity = tonumber(ARGV[2])
20+
local ttl_seconds = tonumber(ARGV[3])
21+
local timeout_seconds = tonumber(ARGV[4])
22+
23+
-- Step 1: Initialize token pool if needed (first time setup)
24+
local tokens_exist = redis.call('EXISTS', tokens_key)
25+
if tokens_exist == 0 then
26+
-- Initialize with capacity number of tokens
27+
for i = 1, capacity do
28+
redis.call('LPUSH', tokens_key, 'token_' .. i)
29+
end
30+
-- Set expiry on tokens list to prevent infinite growth
31+
redis.call('EXPIRE', tokens_key, ttl_seconds * 10)
32+
end
33+
34+
-- Step 2: Try to get a token using blocking pop
35+
-- timeout_seconds = 0 means block indefinitely
36+
local token_result = redis.call('BRPOP', tokens_key, timeout_seconds)
37+
38+
if token_result == false or token_result == nil then
39+
-- Timeout occurred
40+
local current_count = redis.call('SCARD', holders_key)
41+
return {255, 'timeout', '', current_count}
42+
end
43+
44+
local token = token_result[2] -- BRPOP returns {key, value}
45+
46+
-- Step 3: Register as holder
47+
redis.call('SADD', holders_key, instance_id)
48+
redis.call('SETEX', holder_key, ttl_seconds, token)
49+
50+
-- Step 4: Set expiry on holders set to prevent infinite growth
51+
redis.call('EXPIRE', holders_key, ttl_seconds * 10)
52+
53+
local current_count = redis.call('SCARD', holders_key)
54+
55+
return {0, 'acquired', token, current_count}
Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
-- Cleanup orphaned tokens from crashed clients
2+
-- KEYS[1]: tokens_key (LIST of available tokens)
3+
-- KEYS[2]: holders_key (SET of current holders)
4+
-- KEYS[3]: holder_prefix (prefix for holder keys, e.g. "semaphores:holders:key:")
5+
-- ARGV[1]: capacity (total semaphore capacity)
6+
--
7+
-- Returns: {recovered_tokens, current_holders, available_tokens, total_cleaned}
8+
-- This script should be run periodically to recover tokens from crashed clients
9+
10+
local tokens_key = KEYS[1]
11+
local holders_key = KEYS[2]
12+
local holder_prefix = KEYS[3]
13+
14+
local capacity = tonumber(ARGV[1])
15+
16+
-- Step 1: Get all current holders
17+
local current_holders = redis.call('SMEMBERS', holders_key)
18+
local recovered_tokens = 0
19+
local cleaned_holders = {}
20+
21+
-- Step 2: Check each holder to see if their TTL key still exists
22+
for i = 1, #current_holders do
23+
local holder_id = current_holders[i]
24+
local holder_key = holder_prefix .. holder_id
25+
local exists = redis.call('EXISTS', holder_key)
26+
27+
if exists == 0 then
28+
-- Holder key doesn't exist but holder is in SET
29+
-- This indicates a crashed client - clean up and recover token
30+
redis.call('SREM', holders_key, holder_id)
31+
redis.call('LPUSH', tokens_key, 'token_recovered_' .. holder_id)
32+
recovered_tokens = recovered_tokens + 1
33+
table.insert(cleaned_holders, holder_id)
34+
end
35+
end
36+
37+
-- Step 3: Ensure we have the correct total number of tokens
38+
local remaining_holders = redis.call('SCARD', holders_key)
39+
local available_tokens_count = redis.call('LLEN', tokens_key)
40+
local total_tokens = remaining_holders + available_tokens_count
41+
42+
-- If we're missing tokens (due to crashes or Redis issues), add them back
43+
local missing_tokens = capacity - total_tokens
44+
for i = 1, missing_tokens do
45+
redis.call('LPUSH', tokens_key, 'token_missing_' .. i)
46+
recovered_tokens = recovered_tokens + 1
47+
end
48+
49+
-- If we somehow have too many tokens (shouldn't happen), remove extras
50+
local excess_tokens = total_tokens - capacity
51+
for i = 1, excess_tokens do
52+
redis.call('RPOP', tokens_key)
53+
end
54+
55+
-- Step 4: Refresh expiry on data structures to prevent cleanup
56+
local final_holders = redis.call('SCARD', holders_key)
57+
local final_available = redis.call('LLEN', tokens_key)
58+
59+
if final_holders > 0 then
60+
redis.call('EXPIRE', holders_key, 3600) -- 1 hour expiry
61+
end
62+
if final_available > 0 then
63+
redis.call('EXPIRE', tokens_key, 3600) -- 1 hour expiry
64+
end
65+
66+
return {recovered_tokens, final_holders, final_available, #cleaned_holders}
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
-- Count current semaphore holders (simplified for token pool design)
2+
-- KEYS[1]: holders_key (SET of current holders)
3+
-- KEYS[2]: tokens_key (LIST of available tokens)
4+
-- ARGV[1]: capacity (total semaphore capacity)
5+
--
6+
-- Returns: {current_holders, available_tokens, total_capacity}
7+
8+
local holders_key = KEYS[1]
9+
local tokens_key = KEYS[2]
10+
11+
local capacity = tonumber(ARGV[1])
12+
13+
-- Count current holders and available tokens
14+
local current_holders = redis.call('SCARD', holders_key)
15+
local available_tokens = redis.call('LLEN', tokens_key)
16+
17+
return {current_holders, available_tokens, capacity}
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
-- Release fair semaphore and return token to pool
2+
-- KEYS[1]: tokens_key (LIST of available tokens)
3+
-- KEYS[2]: holders_key (SET of current holders)
4+
-- KEYS[3]: holder_key (individual holder TTL key for this instance)
5+
-- ARGV[1]: instance_id
6+
--
7+
-- Returns: {exit_code, status, current_count}
8+
-- exit_code: 0 if released, 255 if failed
9+
-- status: 'released', 'not_held', or 'already_expired'
10+
11+
local tokens_key = KEYS[1]
12+
local holders_key = KEYS[2]
13+
local holder_key = KEYS[3]
14+
15+
local instance_id = ARGV[1]
16+
17+
-- Step 1: Check if this instance is currently a holder
18+
local is_holder = redis.call('SISMEMBER', holders_key, instance_id)
19+
if is_holder == 0 then
20+
-- Not in holders set - check if holder key exists
21+
local exists = redis.call('EXISTS', holder_key)
22+
if exists == 1 then
23+
-- Holder key exists but not in set - clean it up
24+
redis.call('DEL', holder_key)
25+
return {255, 'already_expired', redis.call('SCARD', holders_key)}
26+
else
27+
return {255, 'not_held', redis.call('SCARD', holders_key)}
28+
end
29+
end
30+
31+
-- Step 2: Get the token from holder key before releasing
32+
local token = redis.call('GET', holder_key)
33+
if not token then
34+
-- Fallback token if somehow missing
35+
token = 'token_default'
36+
end
37+
38+
-- Step 3: Release the semaphore
39+
redis.call('SREM', holders_key, instance_id)
40+
redis.call('DEL', holder_key)
41+
42+
-- Step 4: Return token to available pool
43+
-- This automatically unblocks any waiting BRPOP calls
44+
redis.call('LPUSH', tokens_key, token)
45+
46+
local new_count = redis.call('SCARD', holders_key)
47+
48+
return {0, 'released', new_count}
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
-- Renew semaphore holder TTL (simplified for token pool design)
2+
-- KEYS[1]: holders_key (SET of current holders)
3+
-- KEYS[2]: holder_key (individual holder TTL key for this instance)
4+
-- ARGV[1]: instance_id
5+
-- ARGV[2]: ttl_seconds
6+
--
7+
-- Returns: {exit_code, status, current_count}
8+
-- exit_code: 0 if renewed, 255 if failed
9+
-- status: 'renewed', 'not_held', or 'expired'
10+
11+
local holders_key = KEYS[1]
12+
local holder_key = KEYS[2]
13+
14+
local instance_id = ARGV[1]
15+
local ttl_seconds = tonumber(ARGV[2])
16+
17+
-- Step 1: Check if this instance is currently a holder
18+
local is_holder = redis.call('SISMEMBER', holders_key, instance_id)
19+
if is_holder == 0 then
20+
-- Not in holders set
21+
local current_count = redis.call('SCARD', holders_key)
22+
return {255, 'not_held', current_count}
23+
end
24+
25+
-- Step 2: Check if holder key exists (to detect if it expired)
26+
local exists = redis.call('EXISTS', holder_key)
27+
if exists == 0 then
28+
-- Holder key expired - remove from set and fail renewal
29+
redis.call('SREM', holders_key, instance_id)
30+
local current_count = redis.call('SCARD', holders_key)
31+
return {255, 'expired', current_count}
32+
end
33+
34+
-- Step 3: Renew the holder key TTL
35+
local token = redis.call('GET', holder_key)
36+
redis.call('SETEX', holder_key, ttl_seconds, token)
37+
38+
local current_count = redis.call('SCARD', holders_key)
39+
40+
return {0, 'renewed', current_count}

0 commit comments

Comments
 (0)