Skip to content
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
-- ============================================================================
-- INCREMENT CUSTOMER ENTITLEMENT BALANCE
-- Atomically increments a cusEnt's balance in the cached FullCustomer JSON
-- using JSON.NUMINCRBY (relative delta, safe with concurrent deductions).
-- ============================================================================
-- KEYS[1] = fullCustomer cache key
-- ARGV[1] = JSON: { cus_ent_id: string, delta: number }
-- Returns: JSON: { ok: true, new_balance: number } | { ok: false, error: string }
-- ============================================================================

local cache_key = KEYS[1]
local params = cjson.decode(ARGV[1])

local cus_ent_id = params.cus_ent_id
local delta = tonumber(params.delta)

if not cus_ent_id or not delta then
return cjson.encode({ ok = false, error = "missing cus_ent_id or delta" })
end

-- Read the full customer to find the entitlement indices
local raw = redis.call('JSON.GET', cache_key, '.')
if not raw then
return cjson.encode({ ok = false, error = "cache_miss" })
end

local full_customer = cjson.decode(raw)
local cus_ent, cus_product, ce_idx, cp_idx = find_entitlement(full_customer, cus_ent_id)

if not cus_ent then
return cjson.encode({ ok = false, error = "cus_ent_not_found" })
end

-- Build the JSON path to the balance field
local base_path
if cp_idx then
-- Lua arrays are 1-indexed, RedisJSON is 0-indexed
base_path = '$.customer_products[' .. (cp_idx - 1) .. '].customer_entitlements[' .. (ce_idx - 1) .. ']'
else
base_path = '$.extra_customer_entitlements[' .. (ce_idx - 1) .. ']'
end

local balance_path = base_path .. '.balance'

-- Atomic relative increment (JSON.NUMINCRBY with $ path returns a JSON array e.g. "[105]")
local result = redis.call('JSON.NUMINCRBY', cache_key, balance_path, delta)
local new_balance = cjson.decode(result)[1]

-- Bump cache_version so sync_balances_v2 conflict detection stays in sync
-- with CusEntService.increment (which bumps Postgres cache_version atomically).
local version_path = base_path .. '.cache_version'
local version_result = redis.call('JSON.NUMINCRBY', cache_key, version_path, 1)
local new_cache_version = cjson.decode(version_result)[1]

return cjson.encode({ ok = true, new_balance = new_balance, new_cache_version = new_cache_version })
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
--[[
Lua Script: Update Customer Product Fields in Cache

Atomically updates specific fields on a cusProduct in the cached
FullCustomer. Matches by cusProduct id, then applies targeted
JSON.SET on each provided field.

CRDT Safety Note (Active-Active Redis):
- JSON.SET on specific paths follows "Update versus update" conflict
resolution (smallest instance ID wins) — safe for field-level updates.
- We never overwrite the entire cusProduct object.

KEYS[1] = FullCustomer cache key

ARGV[1] = JSON: { cus_product_id: string, updates: { field: value, ... } }
Supported fields: status, canceled, canceled_at, ended_at,
subscription_ids, scheduled_ids, options, quantity,
entity_id, internal_entity_id, trial_ends_at, collection_method

Returns JSON:
{ "ok": true, "updated_count": number }
{ "ok": false, "error": string }
]]

local cache_key = KEYS[1]
local params = cjson.decode(ARGV[1])

local cus_product_id = params.cus_product_id
local updates = params.updates

if not cus_product_id then
return cjson.encode({ ok = false, error = "missing cus_product_id" })
end

if not updates then
return cjson.encode({ ok = false, error = "missing updates" })
end

-- Read the full customer to find the matching cusProduct index
local raw = redis.call('JSON.GET', cache_key, '.')
if not raw then
return cjson.encode({ ok = false, error = "cache_miss" })
end

local full_customer = cjson.decode(raw)

if not full_customer.customer_products then
return cjson.encode({ ok = false, error = "no_customer_products" })
end

-- Find cusProduct by id. Returns (cus_product, 0-indexed position) or (nil, nil).
local function find_customer_product(customer_products, id)
for idx, cp in ipairs(customer_products) do
if cp.id == id then
return cp, idx - 1
end
end
return nil, nil
end

local _, cp_idx = find_customer_product(full_customer.customer_products, cus_product_id)

if cp_idx == nil then
return cjson.encode({ ok = false, error = "cus_product_not_found" })
end

local base_path = '$.customer_products[' .. cp_idx .. '].'
local updated_count = 0

-- Apply each update field via targeted JSON.SET
for field, value in pairs(updates) do
redis.call('JSON.SET', cache_key, base_path .. field, cjson.encode(value))
updated_count = updated_count + 1
end

return cjson.encode({ ok = true, updated_count = updated_count })
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@
metadata?: object | null,
send_email_receipts?: boolean | null,
processor?: object | null,
processors?: object | null
processors?: object | null,
auto_topups?: array | null
}
}

Expand Down Expand Up @@ -91,4 +92,13 @@ if updates.processors ~= nil and updates.processors ~= cjson.null then
table.insert(updated_fields, 'processors')
end

if updates.auto_topups ~= nil then
if updates.auto_topups == cjson.null then
redis.call('JSON.SET', cache_key, '$.auto_topups', 'null')
else
redis.call('JSON.SET', cache_key, '$.auto_topups', cjson.encode(updates.auto_topups))
end
table.insert(updated_fields, 'auto_topups')
end

return cjson.encode({ success = true, updated_fields = updated_fields })
63 changes: 63 additions & 0 deletions server/src/_luaScriptsV2/customers/upsertInvoice.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
--[[
Lua Script: Upsert Invoice in Customer Cache

Atomically upserts an invoice in the customer's invoices array:
- If invoice with same stripe_id exists: replace it via JSON.SET
- Otherwise: append via JSON.ARRAPPEND

CRDT Safety Note (Active-Active Redis):
- JSON.ARRAPPEND follows "Update versus update array" conflict resolution
- Resolution type: Merge - results from all instances are merged
- JSON.SET on specific paths follows "Update versus update" (smallest instance ID wins)

KEYS[1] = FullCustomer cache key

ARGV[1] = JSON-encoded invoice object (must have stripe_id field)

Returns JSON:
{ "success": true, "action": "appended" }
{ "success": true, "action": "updated" }
{ "success": false, "cache_miss": true }
]]

local cache_key = KEYS[1]
local invoice_json = ARGV[1]

-- Check if cache exists
local key_exists = redis.call('EXISTS', cache_key)
if key_exists == 0 then
return cjson.encode({ success = false, cache_miss = true })
end

-- Parse the incoming invoice to get stripe_id for matching
local invoice = cjson.decode(invoice_json)
local stripe_id = invoice.stripe_id

-- Get current invoices array
local invoices_json = redis.call('JSON.GET', cache_key, '$.invoices')
if not invoices_json then
-- invoices array doesn't exist, create it with the new invoice
redis.call('JSON.SET', cache_key, '$.invoices', cjson.encode({invoice}))
return cjson.encode({ success = true, action = "appended" })
end

-- Parse invoices array (JSON.GET with JSONPath returns array of results)
local invoices_wrapper = cjson.decode(invoices_json)
local invoices = invoices_wrapper[1] or {}

-- Search for existing invoice by stripe_id
if stripe_id then
for idx, existing_invoice in ipairs(invoices) do
if existing_invoice.stripe_id == stripe_id then
-- Replace the entire invoice at this index
local array_idx = idx - 1 -- JSON arrays are 0-indexed
redis.call('JSON.SET', cache_key, '$.invoices[' .. array_idx .. ']', invoice_json)
return cjson.encode({ success = true, action = "updated" })
end
end
end

-- No existing invoice found, append
redis.call('JSON.ARRAPPEND', cache_key, '$.invoices', invoice_json)

return cjson.encode({ success = true, action = "appended" })
59 changes: 47 additions & 12 deletions server/src/_luaScriptsV2/luaScriptsV2.ts
Original file line number Diff line number Diff line change
Expand Up @@ -129,30 +129,65 @@ export const UPDATE_CUSTOMER_ENTITLEMENTS_SCRIPT = `${LUA_UTILS}
${updateMainScript}`;

// ============================================================================
// UPDATE CUSTOMER DATA SCRIPT (top-level customer fields)
// ADJUST CUSTOMER ENTITLEMENT BALANCE SCRIPT
// ============================================================================

const CUS_ENT_DIR = join(__dirname, "customerEntitlements");

const adjustBalanceMainScript = readFileSync(
join(CUS_ENT_DIR, "adjustCustomerEntitlementBalance.lua"),
"utf-8",
);

/**
* Lua script for atomically updating top-level customer fields in the cached
* FullCustomer (name, email, metadata, send_email_receipts, etc.).
* Lua script for atomically incrementing a cusEnt balance in the cached
* FullCustomer via JSON.NUMINCRBY. Safe with concurrent deductions.
*/
export const ADJUST_CUSTOMER_ENTITLEMENT_BALANCE_SCRIPT = `${LUA_UTILS}
${adjustBalanceMainScript}`;

// ============================================================================
// CUSTOMER SCRIPTS (top-level customer fields, entities, invoices)
// ============================================================================

const CUSTOMER_DIR = join(__dirname, "customers");

/** Atomically update top-level customer fields (name, email, metadata, etc.). */
export const UPDATE_CUSTOMER_DATA_SCRIPT = readFileSync(
join(__dirname, "updateCustomerData.lua"),
join(CUSTOMER_DIR, "updateCustomerData.lua"),
"utf-8",
);

/**
* Atomically append an entity to the customer's entities array.
* CRDT-safe: JSON.ARRAPPEND uses merge conflict resolution in Active-Active.
*/
export const APPEND_ENTITY_TO_CUSTOMER_SCRIPT = readFileSync(
join(CUSTOMER_DIR, "appendEntityToCustomer.lua"),
"utf-8",
);

/**
* Atomically upsert an invoice in the customer's invoices array.
* Matches by stripe_id — replaces if found, appends if not.
* CRDT-safe: JSON.ARRAPPEND uses merge, JSON.SET uses update-vs-update.
*/
export const UPSERT_INVOICE_IN_CUSTOMER_SCRIPT = readFileSync(
join(CUSTOMER_DIR, "upsertInvoice.lua"),
"utf-8",
);

// ============================================================================
// APPEND ENTITY TO CUSTOMER SCRIPT
// CUSTOMER PRODUCT SCRIPTS
// ============================================================================

const CUS_PRODUCT_DIR = join(__dirname, "customerProducts");

/**
* Lua script for atomically appending an entity to the customer's entities
* array in the cached FullCustomer. Checks for duplicates before appending.
*
* CRDT-safe: JSON.ARRAPPEND uses merge conflict resolution in Active-Active,
* so concurrent appends from different regions will both succeed.
* Atomically update specific fields on a cusProduct in the cached FullCustomer.
* CRDT-safe: JSON.SET on specific paths uses "update vs update" resolution.
*/
export const APPEND_ENTITY_TO_CUSTOMER_SCRIPT = readFileSync(
join(__dirname, "appendEntityToCustomer.lua"),
export const UPDATE_CUSTOMER_PRODUCT_SCRIPT = readFileSync(
join(CUS_PRODUCT_DIR, "updateCustomerProduct.lua"),
"utf-8",
);
2 changes: 2 additions & 0 deletions server/src/external/autumn/autumnCli.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import {
type CreateCustomerParamsV0Input,
type CreateEntityParams,
type CreateRewardProgram,
type CustomerBillingControlsInput,
CustomerExpand,
EntityExpand,
ErrCode,
Expand Down Expand Up @@ -509,6 +510,7 @@ export class AutumnInt {
email?: string;
send_email_receipts?: boolean;
metadata?: Record<string, unknown>;
billing_controls?: CustomerBillingControlsInput;
},
) => {
const data = await this.patch(`/customers/${customerId}`, updates);
Expand Down
30 changes: 30 additions & 0 deletions server/src/external/redis/initRedis.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import {
SET_SUBSCRIPTIONS_SCRIPT,
} from "../../_luaScripts/luaScripts.js";
import {
ADJUST_CUSTOMER_ENTITLEMENT_BALANCE_SCRIPT,
APPEND_ENTITY_TO_CUSTOMER_SCRIPT,
BATCH_DELETE_FULL_CUSTOMER_CACHE_SCRIPT,
DEDUCT_FROM_CUSTOMER_ENTITLEMENTS_SCRIPT,
Expand All @@ -22,6 +23,8 @@ import {
SET_FULL_CUSTOMER_CACHE_SCRIPT,
UPDATE_CUSTOMER_DATA_SCRIPT,
UPDATE_CUSTOMER_ENTITLEMENTS_SCRIPT,
UPDATE_CUSTOMER_PRODUCT_SCRIPT,
UPSERT_INVOICE_IN_CUSTOMER_SCRIPT,
} from "../../_luaScriptsV2/luaScriptsV2.js";

// if (!process.env.CACHE_URL) {
Expand Down Expand Up @@ -203,6 +206,21 @@ const configureRedisInstance = (redisInstance: Redis): Redis => {
lua: APPEND_ENTITY_TO_CUSTOMER_SCRIPT,
});

redisInstance.defineCommand("upsertInvoiceInCustomer", {
numberOfKeys: 1,
lua: UPSERT_INVOICE_IN_CUSTOMER_SCRIPT,
});

redisInstance.defineCommand("adjustCustomerEntitlementBalance", {
numberOfKeys: 1,
lua: ADJUST_CUSTOMER_ENTITLEMENT_BALANCE_SCRIPT,
});

redisInstance.defineCommand("updateCustomerProduct", {
numberOfKeys: 1,
lua: UPDATE_CUSTOMER_PRODUCT_SCRIPT,
});

redisInstance.on("error", (error) => {
console.error(`[Redis] Connection error:`, error.message);
});
Expand Down Expand Up @@ -385,11 +403,23 @@ declare module "ioredis" {
cacheKey: string,
paramsJson: string,
): Promise<string>;
adjustCustomerEntitlementBalance(
cacheKey: string,
paramsJson: string,
): Promise<string>;
updateCustomerData(cacheKey: string, paramsJson: string): Promise<string>;
appendEntityToCustomer(
cacheKey: string,
entityJson: string,
): Promise<string>;
upsertInvoiceInCustomer(
cacheKey: string,
invoiceJson: string,
): Promise<string>;
updateCustomerProduct(
cacheKey: string,
paramsJson: string,
): Promise<string>;
}
}

Expand Down
3 changes: 1 addition & 2 deletions server/src/external/stripe/handleStripeWebhookEvent.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import { Stripe } from "stripe";
import { handleStripeInvoicePaid } from "@/external/stripe/webhookHandlers/handleStripeInvoicePaid/handleStripeInvoicePaid.js";
import { handleStripeSubscriptionUpdated } from "@/external/stripe/webhookHandlers/handleStripeSubscriptionUpdated/handleStripeSubscriptionUpdated.js";
import { unsetOrgStripeKeys } from "@/internal/orgs/orgUtils.js";
import type { ExtendedRequest } from "@/utils/models/Request.js";
import { handleWebhookErrorSkip } from "@/utils/routerUtils/webhookErrorSkip.js";
import { getSentryTags } from "../sentry/sentryUtils.js";
import { handleCusDiscountDeleted } from "./webhookHandlers/handleCusDiscountDeleted.js";
Expand Down Expand Up @@ -52,8 +51,8 @@ export const handleStripeWebhookEvent = async (

case "invoice.updated":
await handleInvoiceUpdated({
ctx,
event,
req: ctx as unknown as ExtendedRequest,
});
break;

Expand Down
Loading
Loading