Skip to content

Commit ba60834

Browse files
committed
refactor(usage): 🔨 centralize window aggregation and reconcile usage counts
- Extract `_aggregate_model_windows` to unify how usage stats are summed across quota groups and credentials, reducing code duplication. - Implement `_reconcile_window_counts` to ensure success and failure counters remain consistent (mathematically valid) when total request counts are updated from external quota sources. - Enable synchronization and backfilling of credential-level windows to reflect aggregated model usage. - Simplify `update_usage` and `_backfill_group_usage` logic by leveraging the new shared aggregation helpers.
1 parent 7ed4c9c commit ba60834

File tree

1 file changed

+170
-160
lines changed

1 file changed

+170
-160
lines changed

‎src/rotator_library/usage/manager.py‎

Lines changed: 170 additions & 160 deletions
Original file line numberDiff line numberDiff line change
@@ -1149,201 +1149,186 @@ async def _sync_quota_group_counts(
11491149
model: str,
11501150
) -> None:
11511151
"""
1152-
Synchronize request counts across models in the same quota group.
1152+
Synchronize quota-group and credential windows for model usage.
11531153
1154-
For providers with shared quota pools (e.g., Antigravity), all models
1155-
in a quota group share the same daily limit. When one model's count
1156-
increases, we sync that count to all other models in the group so
1157-
selection correctly accounts for the shared quota.
1154+
For providers with shared quota pools (e.g., Antigravity), we aggregate
1155+
per-model windows into group windows for shared quota tracking. We also
1156+
aggregate per-model windows into credential-level windows for display.
11581157
11591158
Args:
11601159
state: Credential state that was just updated
11611160
model: The normalized model that was just used
11621161
"""
1163-
# Check if this model is in a quota group
1162+
model_window_names = {
1163+
window_def.name
1164+
for window_def in self._config.windows
1165+
if window_def.applies_to == "model"
1166+
}
1167+
1168+
if model_window_names:
1169+
self._sync_credential_windows(state, model_window_names)
1170+
1171+
# Aggregate windows for quota groups
11641172
group = self._get_model_quota_group(model)
11651173
if not group:
11661174
return
11671175

1168-
# Get all models in the group
11691176
grouped_models = self._get_grouped_models(group)
1170-
if len(grouped_models) <= 1:
1177+
if not grouped_models:
11711178
return
11721179

1173-
# Get the current request count for the used model
1174-
current_count = state.usage.model_request_counts.get(model, 0)
1175-
1176-
# Sync to all other models in the group
1177-
for grouped_model in grouped_models:
1178-
if grouped_model != model:
1179-
state.usage.model_request_counts[grouped_model] = current_count
1180-
1181-
# Sync per-model windows if configured at model scope
1182-
source_usage = state.model_usage.get(model)
1183-
if source_usage:
1184-
for window_def in self._config.windows:
1185-
if window_def.applies_to != "model":
1186-
continue
1187-
source_window = source_usage.windows.get(window_def.name)
1188-
if not source_window:
1189-
continue
1190-
for grouped_model in grouped_models:
1191-
if grouped_model == model:
1192-
continue
1193-
target_usage = state.model_usage.setdefault(
1194-
grouped_model, UsageStats()
1195-
)
1196-
target_usage.windows[window_def.name] = WindowStats(
1197-
name=source_window.name,
1198-
request_count=source_window.request_count,
1199-
success_count=source_window.success_count,
1200-
failure_count=source_window.failure_count,
1201-
total_tokens=source_window.total_tokens,
1202-
prompt_tokens=source_window.prompt_tokens,
1203-
completion_tokens=source_window.completion_tokens,
1204-
thinking_tokens=source_window.thinking_tokens,
1205-
output_tokens=source_window.output_tokens,
1206-
prompt_tokens_cache_read=source_window.prompt_tokens_cache_read,
1207-
prompt_tokens_cache_write=source_window.prompt_tokens_cache_write,
1208-
approx_cost=source_window.approx_cost,
1209-
started_at=source_window.started_at,
1210-
reset_at=source_window.reset_at,
1211-
limit=source_window.limit,
1212-
)
1180+
group_usages = [
1181+
state.model_usage[m] for m in grouped_models if m in state.model_usage
1182+
]
1183+
if not group_usages:
1184+
return
12131185

1214-
group_usage = state.group_usage.setdefault(group, UsageStats())
1215-
group_usage.windows[window_def.name] = WindowStats(
1216-
name=source_window.name,
1217-
request_count=source_window.request_count,
1218-
success_count=source_window.success_count,
1219-
failure_count=source_window.failure_count,
1220-
total_tokens=source_window.total_tokens,
1221-
prompt_tokens=source_window.prompt_tokens,
1222-
completion_tokens=source_window.completion_tokens,
1223-
thinking_tokens=source_window.thinking_tokens,
1224-
output_tokens=source_window.output_tokens,
1225-
prompt_tokens_cache_read=source_window.prompt_tokens_cache_read,
1226-
prompt_tokens_cache_write=source_window.prompt_tokens_cache_write,
1227-
approx_cost=source_window.approx_cost,
1228-
started_at=source_window.started_at,
1229-
reset_at=source_window.reset_at,
1230-
limit=source_window.limit,
1231-
)
1186+
aggregated = self._aggregate_model_windows(group_usages, model_window_names)
1187+
if not aggregated:
1188+
return
12321189

1233-
lib_logger.debug(
1234-
f"Synced request_count={current_count} across quota group '{group}' "
1235-
f"({len(grouped_models)} models)"
1236-
)
1190+
group_usage = state.group_usage.setdefault(group, UsageStats())
1191+
group_usage.windows.update(aggregated)
12371192

12381193
def _backfill_group_usage(self) -> None:
12391194
"""Backfill group usage windows and cooldowns from model data."""
12401195
for state in self._states.values():
12411196
self._backfill_model_usage(state)
1197+
self._backfill_credential_windows(state)
12421198
self._backfill_group_usage_for_state(state)
12431199
self._backfill_group_cooldowns(state)
12441200

1245-
def _backfill_group_usage_for_state(self, state: CredentialState) -> None:
1246-
group_windows: Dict[str, Dict[str, Dict[str, Any]]] = {}
1201+
def _aggregate_model_windows(
1202+
self,
1203+
usages: List[UsageStats],
1204+
model_window_names: Set[str],
1205+
) -> Dict[str, WindowStats]:
1206+
buckets: Dict[str, Dict[str, Any]] = {}
12471207

1248-
for model, usage in state.model_usage.items():
1249-
group = self._get_model_quota_group(model)
1250-
if not group:
1251-
continue
1208+
for usage in usages:
12521209
for window_name, window in usage.windows.items():
1253-
bucket = group_windows.setdefault(group, {}).setdefault(
1210+
if window_name not in model_window_names:
1211+
continue
1212+
bucket = buckets.setdefault(
12541213
window_name,
12551214
{
1256-
"counts": [],
1257-
"successes": [],
1258-
"failures": [],
1259-
"prompt_tokens": [],
1260-
"completion_tokens": [],
1261-
"thinking_tokens": [],
1262-
"output_tokens": [],
1263-
"prompt_tokens_cache_read": [],
1264-
"prompt_tokens_cache_write": [],
1265-
"total_tokens": [],
1266-
"approx_cost": [],
1215+
"request_count": 0,
1216+
"success_count": 0,
1217+
"failure_count": 0,
1218+
"prompt_tokens": 0,
1219+
"completion_tokens": 0,
1220+
"thinking_tokens": 0,
1221+
"output_tokens": 0,
1222+
"prompt_tokens_cache_read": 0,
1223+
"prompt_tokens_cache_write": 0,
1224+
"total_tokens": 0,
1225+
"approx_cost": 0.0,
12671226
"started_at": [],
12681227
"reset_at": [],
12691228
"limit": [],
12701229
},
12711230
)
1272-
bucket["counts"].append(window.request_count)
1273-
bucket["successes"].append(window.success_count)
1274-
bucket["failures"].append(window.failure_count)
1275-
bucket["prompt_tokens"].append(window.prompt_tokens)
1276-
bucket["completion_tokens"].append(window.completion_tokens)
1277-
bucket["thinking_tokens"].append(window.thinking_tokens)
1278-
bucket["output_tokens"].append(window.output_tokens)
1279-
bucket["prompt_tokens_cache_read"].append(
1280-
window.prompt_tokens_cache_read
1281-
)
1282-
bucket["prompt_tokens_cache_write"].append(
1283-
window.prompt_tokens_cache_write
1284-
)
1285-
bucket["total_tokens"].append(window.total_tokens)
1286-
bucket["approx_cost"].append(window.approx_cost)
1231+
bucket["request_count"] += window.request_count
1232+
bucket["success_count"] += window.success_count
1233+
bucket["failure_count"] += window.failure_count
1234+
bucket["prompt_tokens"] += window.prompt_tokens
1235+
bucket["completion_tokens"] += window.completion_tokens
1236+
bucket["thinking_tokens"] += window.thinking_tokens
1237+
bucket["output_tokens"] += window.output_tokens
1238+
bucket["prompt_tokens_cache_read"] += window.prompt_tokens_cache_read
1239+
bucket["prompt_tokens_cache_write"] += window.prompt_tokens_cache_write
1240+
bucket["total_tokens"] += window.total_tokens
1241+
bucket["approx_cost"] += window.approx_cost
12871242
if window.started_at is not None:
12881243
bucket["started_at"].append(window.started_at)
12891244
if window.reset_at is not None:
12901245
bucket["reset_at"].append(window.reset_at)
12911246
if window.limit is not None:
12921247
bucket["limit"].append(window.limit)
12931248

1294-
for group, windows in group_windows.items():
1249+
aggregated: Dict[str, WindowStats] = {}
1250+
for window_name, bucket in buckets.items():
1251+
success_count = bucket["success_count"]
1252+
failure_count = bucket["failure_count"]
1253+
request_count = success_count + failure_count
1254+
if request_count == 0:
1255+
request_count = bucket["request_count"]
1256+
if request_count > 0:
1257+
success_count = request_count
1258+
started_at = min(bucket["started_at"]) if bucket["started_at"] else None
1259+
reset_at = max(bucket["reset_at"]) if bucket["reset_at"] else None
1260+
limit = max(bucket["limit"]) if bucket["limit"] else None
1261+
aggregated[window_name] = WindowStats(
1262+
name=window_name,
1263+
request_count=request_count,
1264+
success_count=success_count,
1265+
failure_count=failure_count,
1266+
total_tokens=bucket["total_tokens"],
1267+
prompt_tokens=bucket["prompt_tokens"],
1268+
completion_tokens=bucket["completion_tokens"],
1269+
thinking_tokens=bucket["thinking_tokens"],
1270+
output_tokens=bucket["output_tokens"],
1271+
prompt_tokens_cache_read=bucket["prompt_tokens_cache_read"],
1272+
prompt_tokens_cache_write=bucket["prompt_tokens_cache_write"],
1273+
approx_cost=bucket["approx_cost"],
1274+
started_at=started_at,
1275+
reset_at=reset_at,
1276+
limit=limit,
1277+
)
1278+
1279+
return aggregated
1280+
1281+
def _sync_credential_windows(
1282+
self,
1283+
state: CredentialState,
1284+
model_window_names: Set[str],
1285+
) -> None:
1286+
if not model_window_names:
1287+
return
1288+
usages = list(state.model_usage.values())
1289+
if not usages:
1290+
return
1291+
aggregated = self._aggregate_model_windows(usages, model_window_names)
1292+
if not aggregated:
1293+
return
1294+
state.usage.windows.update(aggregated)
1295+
1296+
def _backfill_group_usage_for_state(self, state: CredentialState) -> None:
1297+
model_window_names = {
1298+
window_def.name
1299+
for window_def in self._config.windows
1300+
if window_def.applies_to == "model"
1301+
}
1302+
if not model_window_names:
1303+
return
1304+
1305+
for model in state.model_usage.keys():
1306+
group = self._get_model_quota_group(model)
1307+
if not group:
1308+
continue
12951309
group_usage = state.group_usage.setdefault(group, UsageStats())
1296-
for window_name, bucket in windows.items():
1297-
counts = bucket["counts"]
1298-
if not counts:
1299-
continue
1300-
synced = all(count == counts[0] for count in counts)
1301-
if synced:
1302-
request_count = counts[0]
1303-
success_count = bucket["successes"][0]
1304-
failure_count = bucket["failures"][0]
1305-
prompt_tokens = bucket["prompt_tokens"][0]
1306-
completion_tokens = bucket["completion_tokens"][0]
1307-
thinking_tokens = bucket["thinking_tokens"][0]
1308-
output_tokens = bucket["output_tokens"][0]
1309-
prompt_tokens_cache_read = bucket["prompt_tokens_cache_read"][0]
1310-
prompt_tokens_cache_write = bucket["prompt_tokens_cache_write"][0]
1311-
total_tokens = bucket["total_tokens"][0]
1312-
approx_cost = bucket["approx_cost"][0]
1313-
else:
1314-
request_count = sum(counts)
1315-
success_count = sum(bucket["successes"])
1316-
failure_count = sum(bucket["failures"])
1317-
prompt_tokens = sum(bucket["prompt_tokens"])
1318-
completion_tokens = sum(bucket["completion_tokens"])
1319-
thinking_tokens = sum(bucket["thinking_tokens"])
1320-
output_tokens = sum(bucket["output_tokens"])
1321-
prompt_tokens_cache_read = sum(bucket["prompt_tokens_cache_read"])
1322-
prompt_tokens_cache_write = sum(bucket["prompt_tokens_cache_write"])
1323-
total_tokens = sum(bucket["total_tokens"])
1324-
approx_cost = sum(bucket["approx_cost"])
1325-
1326-
started_at = min(bucket["started_at"]) if bucket["started_at"] else None
1327-
reset_at = max(bucket["reset_at"]) if bucket["reset_at"] else None
1328-
limit = max(bucket["limit"]) if bucket["limit"] else None
1329-
1330-
group_usage.windows[window_name] = WindowStats(
1331-
name=window_name,
1332-
request_count=request_count,
1333-
success_count=success_count,
1334-
failure_count=failure_count,
1335-
total_tokens=total_tokens,
1336-
prompt_tokens=prompt_tokens,
1337-
completion_tokens=completion_tokens,
1338-
thinking_tokens=thinking_tokens,
1339-
output_tokens=output_tokens,
1340-
prompt_tokens_cache_read=prompt_tokens_cache_read,
1341-
prompt_tokens_cache_write=prompt_tokens_cache_write,
1342-
approx_cost=approx_cost,
1343-
started_at=started_at,
1344-
reset_at=reset_at,
1345-
limit=limit,
1346-
)
1310+
grouped_models = self._get_grouped_models(group)
1311+
group_usages = [
1312+
state.model_usage[m] for m in grouped_models if m in state.model_usage
1313+
]
1314+
aggregated = self._aggregate_model_windows(group_usages, model_window_names)
1315+
if aggregated:
1316+
group_usage.windows.update(aggregated)
1317+
1318+
def _backfill_credential_windows(self, state: CredentialState) -> None:
1319+
model_window_names = {
1320+
window_def.name
1321+
for window_def in self._config.windows
1322+
if window_def.applies_to == "model"
1323+
}
1324+
if not model_window_names:
1325+
return
1326+
usages = list(state.model_usage.values())
1327+
if not usages:
1328+
return
1329+
aggregated = self._aggregate_model_windows(usages, model_window_names)
1330+
if aggregated:
1331+
state.usage.windows.update(aggregated)
13471332

13481333
def _backfill_model_usage(self, state: CredentialState) -> None:
13491334
if not state.usage.model_request_counts:
@@ -1396,6 +1381,24 @@ def _backfill_group_cooldowns(self, state: CredentialState) -> None:
13961381
if not existing or existing.until < cooldown.until:
13971382
state.cooldowns[grouped_model] = cooldown
13981383

1384+
def _reconcile_window_counts(self, window: WindowStats, request_count: int) -> None:
1385+
local_total = window.success_count + window.failure_count
1386+
window.request_count = request_count
1387+
if local_total == 0 and request_count > 0:
1388+
window.success_count = request_count
1389+
window.failure_count = 0
1390+
return
1391+
1392+
if request_count < local_total:
1393+
failure_count = min(window.failure_count, request_count)
1394+
success_count = max(0, request_count - failure_count)
1395+
window.success_count = success_count
1396+
window.failure_count = failure_count
1397+
return
1398+
1399+
if request_count > local_total:
1400+
window.success_count += request_count - local_total
1401+
13991402
async def save(self, force: bool = False) -> bool:
14001403
"""
14011404
Save usage data to file.
@@ -1558,8 +1561,12 @@ async def update_quota_baseline(
15581561
if force:
15591562
synced_count = quota_used
15601563
else:
1561-
synced_count = max(primary_window.request_count, quota_used)
1562-
primary_window.request_count = synced_count
1564+
synced_count = max(
1565+
primary_window.request_count,
1566+
quota_used,
1567+
primary_window.success_count + primary_window.failure_count,
1568+
)
1569+
self._reconcile_window_counts(primary_window, synced_count)
15631570
state.usage.model_request_counts[normalized_model] = synced_count
15641571
else:
15651572
state.usage.model_request_counts.setdefault(normalized_model, 0)
@@ -1580,11 +1587,14 @@ async def update_quota_baseline(
15801587
if quota_used is not None:
15811588
# Same stale-data protection for group windows
15821589
if force:
1583-
group_window.request_count = quota_used
1590+
synced_count = quota_used
15841591
else:
1585-
group_window.request_count = max(
1586-
group_window.request_count, quota_used
1592+
synced_count = max(
1593+
group_window.request_count,
1594+
quota_used,
1595+
group_window.success_count + group_window.failure_count,
15871596
)
1597+
self._reconcile_window_counts(group_window, synced_count)
15881598
else:
15891599
group_window.request_count = group_window.request_count or 0
15901600

0 commit comments

Comments
 (0)