Skip to content
55 changes: 55 additions & 0 deletions server/src/_luaScriptsV2/incrementCusEntBalance.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
-- ============================================================================
-- INCREMENT CUSTOMER ENTITLEMENT BALANCE
-- Atomically increments a cusEnt's balance in the cached FullCustomer JSON
-- using JSON.NUMINCRBY (relative delta, safe with concurrent deductions).
-- ============================================================================
-- KEYS[1] = fullCustomer cache key
-- ARGV[1] = JSON: { cus_ent_id: string, delta: number }
-- Returns: JSON: { ok: true, new_balance: number } | { ok: false, error: string }
-- ============================================================================

local cache_key = KEYS[1]
local params = cjson.decode(ARGV[1])

local cus_ent_id = params.cus_ent_id
local delta = tonumber(params.delta)

if not cus_ent_id or not delta then
return cjson.encode({ ok = false, error = "missing cus_ent_id or delta" })
end

-- Read the full customer to find the entitlement indices
local raw = redis.call('JSON.GET', cache_key, '.')
if not raw then
return cjson.encode({ ok = false, error = "cache_miss" })
end

local full_customer = cjson.decode(raw)
local cus_ent, cus_product, ce_idx, cp_idx = find_entitlement(full_customer, cus_ent_id)

if not cus_ent then
return cjson.encode({ ok = false, error = "cus_ent_not_found" })
end

-- Build the JSON path to the balance field
local base_path
if cp_idx then
-- Lua arrays are 1-indexed, RedisJSON is 0-indexed
base_path = '$.customer_products[' .. (cp_idx - 1) .. '].customer_entitlements[' .. (ce_idx - 1) .. ']'
else
base_path = '$.extra_customer_entitlements[' .. (ce_idx - 1) .. ']'
end

local balance_path = base_path .. '.balance'

-- Atomic relative increment (JSON.NUMINCRBY with $ path returns a JSON array e.g. "[105]")
local result = redis.call('JSON.NUMINCRBY', cache_key, balance_path, delta)
local new_balance = cjson.decode(result)[1]

-- Bump cache_version so sync_balances_v2 conflict detection stays in sync
-- with CusEntService.increment (which bumps Postgres cache_version atomically).
local version_path = base_path .. '.cache_version'
local version_result = redis.call('JSON.NUMINCRBY', cache_key, version_path, 1)
local new_cache_version = cjson.decode(version_result)[1]

return cjson.encode({ ok = true, new_balance = new_balance, new_cache_version = new_cache_version })
29 changes: 29 additions & 0 deletions server/src/_luaScriptsV2/luaScriptsV2.ts
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,35 @@ const updateMainScript = readFileSync(
export const UPDATE_CUSTOMER_ENTITLEMENTS_SCRIPT = `${LUA_UTILS}
${updateMainScript}`;

// ============================================================================
// INCREMENT CUSTOMER ENTITLEMENT BALANCE SCRIPT
// ============================================================================

const incrementMainScript = readFileSync(
join(__dirname, "incrementCusEntBalance.lua"),
"utf-8",
);

/**
* Lua script for atomically incrementing a cusEnt balance in the cached
* FullCustomer via JSON.NUMINCRBY. Safe with concurrent deductions.
*/
export const INCREMENT_CUS_ENT_BALANCE_SCRIPT = `${LUA_UTILS}
${incrementMainScript}`;

// ============================================================================
// UPDATE CUSTOMER PRODUCT OPTIONS SCRIPT
// ============================================================================

/**
* Lua script for atomically incrementing a cusProduct's options[].quantity
* in the cached FullCustomer via JSON.NUMINCRBY.
*/
export const UPDATE_CUS_PRODUCT_OPTIONS_SCRIPT = readFileSync(
join(__dirname, "updateCusProductOptions.lua"),
"utf-8",
);

// ============================================================================
// UPDATE CUSTOMER DATA SCRIPT (top-level customer fields)
// ============================================================================
Expand Down
61 changes: 61 additions & 0 deletions server/src/_luaScriptsV2/updateCusProductOptions.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
-- ============================================================================
-- UPDATE CUSTOMER PRODUCT OPTIONS QUANTITY
-- Atomically increments a cusProduct's options[].quantity in the cached
-- FullCustomer JSON using JSON.NUMINCRBY.
-- ============================================================================
-- KEYS[1] = fullCustomer cache key
-- ARGV[1] = JSON: { internal_feature_id: string, feature_id: string, delta: number }
-- Returns: JSON: { ok: true, new_quantity: number } | { ok: false, error: string }
-- ============================================================================

local cache_key = KEYS[1]
local params = cjson.decode(ARGV[1])

local internal_feature_id = params.internal_feature_id
local feature_id = params.feature_id
local delta = tonumber(params.delta)

if not delta then
return cjson.encode({ ok = false, error = "missing delta" })
end

if not internal_feature_id and not feature_id then
return cjson.encode({ ok = false, error = "missing internal_feature_id and feature_id" })
end

-- Read the full customer to find the matching options entry
local raw = redis.call('JSON.GET', cache_key, '.')
if not raw then
return cjson.encode({ ok = false, error = "cache_miss" })
end

local full_customer = cjson.decode(raw)

if not full_customer.customer_products then
return cjson.encode({ ok = false, error = "no_customer_products" })
end

-- Iterate customer_products and their options to find the matching entry
for cp_idx, cus_product in ipairs(full_customer.customer_products) do
if cus_product.options then
for opt_idx, option in ipairs(cus_product.options) do
local matches = false
if internal_feature_id and option.internal_feature_id == internal_feature_id then
matches = true
end
if not matches and feature_id and option.feature_id == feature_id then
matches = true
end

if matches then
-- Lua arrays are 1-indexed, RedisJSON is 0-indexed
local path = '$.customer_products[' .. (cp_idx - 1) .. '].options[' .. (opt_idx - 1) .. '].quantity'
local result = redis.call('JSON.NUMINCRBY', cache_key, path, delta)
local new_quantity = cjson.decode(result)[1]
return cjson.encode({ ok = true, new_quantity = new_quantity })
end
end
end
end

return cjson.encode({ ok = false, error = "options_entry_not_found" })
12 changes: 11 additions & 1 deletion server/src/_luaScriptsV2/updateCustomerData.lua
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@
metadata?: object | null,
send_email_receipts?: boolean | null,
processor?: object | null,
processors?: object | null
processors?: object | null,
auto_topup?: array | null
}
}

Expand Down Expand Up @@ -91,4 +92,13 @@ if updates.processors ~= nil and updates.processors ~= cjson.null then
table.insert(updated_fields, 'processors')
end

if updates.auto_topup ~= nil then
if updates.auto_topup == cjson.null then
redis.call('JSON.SET', cache_key, '$.auto_topup', 'null')
else
redis.call('JSON.SET', cache_key, '$.auto_topup', cjson.encode(updates.auto_topup))
end
table.insert(updated_fields, 'auto_topup')
end

return cjson.encode({ success = true, updated_fields = updated_fields })
2 changes: 2 additions & 0 deletions server/src/external/autumn/autumnCli.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import {
type CreateCustomerParamsV0Input,
type CreateEntityParams,
type CreateRewardProgram,
type CustomerBillingControls,
CustomerExpand,
EntityExpand,
ErrCode,
Expand Down Expand Up @@ -487,6 +488,7 @@ export class AutumnInt {
email?: string;
send_email_receipts?: boolean;
metadata?: Record<string, unknown>;
billing_controls?: CustomerBillingControls;
},
) => {
const data = await this.patch(`/customers/${customerId}`, updates);
Expand Down
20 changes: 20 additions & 0 deletions server/src/external/redis/initRedis.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@ import {
BATCH_DELETE_FULL_CUSTOMER_CACHE_SCRIPT,
DEDUCT_FROM_CUSTOMER_ENTITLEMENTS_SCRIPT,
DELETE_FULL_CUSTOMER_CACHE_SCRIPT,
INCREMENT_CUS_ENT_BALANCE_SCRIPT,
RESET_CUSTOMER_ENTITLEMENTS_SCRIPT,
SET_FULL_CUSTOMER_CACHE_SCRIPT,
UPDATE_CUS_PRODUCT_OPTIONS_SCRIPT,
UPDATE_CUSTOMER_DATA_SCRIPT,
UPDATE_CUSTOMER_ENTITLEMENTS_SCRIPT,
} from "../../_luaScriptsV2/luaScriptsV2.js";
Expand Down Expand Up @@ -203,6 +205,16 @@ const configureRedisInstance = (redisInstance: Redis): Redis => {
lua: APPEND_ENTITY_TO_CUSTOMER_SCRIPT,
});

redisInstance.defineCommand("incrementCusEntBalance", {
numberOfKeys: 1,
lua: INCREMENT_CUS_ENT_BALANCE_SCRIPT,
});

redisInstance.defineCommand("updateCusProductOptions", {
numberOfKeys: 1,
lua: UPDATE_CUS_PRODUCT_OPTIONS_SCRIPT,
});

redisInstance.on("error", (error) => {
console.error(`[Redis] Connection error:`, error.message);
});
Expand Down Expand Up @@ -385,6 +397,14 @@ declare module "ioredis" {
cacheKey: string,
paramsJson: string,
): Promise<string>;
incrementCusEntBalance(
cacheKey: string,
paramsJson: string,
): Promise<string>;
updateCusProductOptions(
cacheKey: string,
paramsJson: string,
): Promise<string>;
updateCustomerData(cacheKey: string, paramsJson: string): Promise<string>;
appendEntityToCustomer(
cacheKey: string,
Expand Down
74 changes: 74 additions & 0 deletions server/src/internal/balances/autoTopUp/autoTopUpRateLimit.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
import {
type AutoTopupPurchaseLimit,
billingIntervalToSeconds,
} from "@autumn/shared";
import { redis } from "@/external/redis/initRedis.js";
import { tryRedisWrite } from "@/utils/cacheUtils/cacheUtils.js";

const buildRateLimitKey = ({
orgId,
env,
customerId,
featureId,
}: {
orgId: string;
env: string;
customerId: string;
featureId: string;
}) => {
return `auto_topup_count:${orgId}:${env}:${customerId}:${featureId}`;
};

/** Check if auto top-up is within the purchase limit */
export const checkAutoTopUpRateLimit = async ({
orgId,
env,
customerId,
featureId,
purchaseLimit,
}: {
orgId: string;
env: string;
customerId: string;
featureId: string;
purchaseLimit: AutoTopupPurchaseLimit;
}): Promise<boolean> => {
if (redis.status !== "ready") {
return true;
}

const key = buildRateLimitKey({ orgId, env, customerId, featureId });
const current = await redis.get(key);

if (current === null) {
return true;
}

return Number.parseInt(current, 10) < purchaseLimit.limit;
};

/** Increment the auto top-up purchase counter. Sets TTL on first increment. */
export const incrementAutoTopUpCounter = async ({
orgId,
env,
customerId,
featureId,
purchaseLimit,
}: {
orgId: string;
env: string;
customerId: string;
featureId: string;
purchaseLimit: AutoTopupPurchaseLimit;
}): Promise<void> => {
const key = buildRateLimitKey({ orgId, env, customerId, featureId });
const ttl = billingIntervalToSeconds({ interval: purchaseLimit.interval });

await tryRedisWrite(async () => {
const count = await redis.incr(key);

if (count === 1) {
await redis.expire(key, ttl);
}
});
};
47 changes: 47 additions & 0 deletions server/src/internal/balances/autoTopUp/autoTopUpUtils.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
import type {
Feature,
FeatureOptions,
FullCusEntWithFullCusProduct,
} from "@autumn/shared";
import { Decimal } from "decimal.js";

/** Build the Redis lock key for auto top-up deduplication. */
export const buildAutoTopUpLockKey = ({
orgId,
env,
customerId,
featureId,
}: {
orgId: string;
env: string;
customerId: string;
featureId: string;
}) => {
return `auto_topup:${orgId}:${env}:${customerId}:${featureId}`;
};

/** Compute updated options array with the top-up packs added. */
export const buildUpdatedOptions = ({
cusProduct,
feature,
topUpPacks,
}: {
cusProduct: FullCusEntWithFullCusProduct["customer_product"];
feature: Feature;
topUpPacks: number;
}): FeatureOptions[] => {
if (!cusProduct) return [];

return cusProduct.options.map((opt) => {
if (
opt.internal_feature_id === feature.internal_id ||
opt.feature_id === feature.id
) {
return {
...opt,
quantity: new Decimal(opt.quantity || 0).add(topUpPacks).toNumber(),
};
}
return opt;
});
};
Loading