Skip to content

Commit 1f28bee

Browse files
committed
improve api queue management with lifo and priority bumping
- convert queue from fifo to lifo (process newest requests first) - add 10 element queue limit, drop oldest when full - add 1 minute age limit, skip stale requests - add priority bumping: if user re-queued, remove old request and process new one first - clear queue on navigation to prevent stale requests - replace setInterval with promise.race for navigation detection
1 parent 5175368 commit 1f28bee

File tree

3 files changed

+147
-53
lines changed

3 files changed

+147
-53
lines changed

package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
{
22
"name": "xblockorigin",
3-
"version": "23.0.0",
3+
"version": "24.0.0",
44
"type": "module",
55
"scripts": {
66
"postinstall": "bun run setup-hooks",

packages/extension/src/Content/orchestrator.ts

Lines changed: 74 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ function showToast(message: string) {
4949
async function processUser(username: string, tweetElement?: Element) {
5050
// skip if already processing this user
5151
if (inFlightUsers.has(username)) {
52-
console.log(`[xBlockOrigin] Skipping @${username} - already processing`)
52+
console.log(`[xBlockOrigin] Skipping @${username} - already in queue`)
5353
return
5454
}
5555
inFlightUsers.add(username)
@@ -67,7 +67,10 @@ async function processUser(username: string, tweetElement?: Element) {
6767
console.log(
6868
`[xBlockOrigin] Fetching user data for @${username} (cache miss)`
6969
)
70-
const userData = await apiQueue.enqueue(() => getUserData(username))
70+
const userData = await apiQueue.enqueue(
71+
() => getUserData(username),
72+
username
73+
)
7174

7275
if (!userData) {
7376
console.error(
@@ -102,7 +105,10 @@ async function processUser(username: string, tweetElement?: Element) {
102105
console.log(
103106
`[xBlockOrigin] Following status cache miss for @${username}, fetching user data`
104107
)
105-
const userData = await apiQueue.enqueue(() => getUserData(username))
108+
const userData = await apiQueue.enqueue(
109+
() => getUserData(username),
110+
username
111+
)
106112

107113
if (!userData) {
108114
console.error(
@@ -151,7 +157,7 @@ async function processUser(username: string, tweetElement?: Element) {
151157

152158
// fetch country for new users
153159
console.log(`[xBlockOrigin] Fetching country for @${username}`)
154-
country = await apiQueue.enqueue(() => getCountry(username))
160+
country = await apiQueue.enqueue(() => getCountry(username), username)
155161

156162
if (!country) {
157163
console.log(
@@ -196,7 +202,7 @@ async function processUser(username: string, tweetElement?: Element) {
196202
console.log(
197203
`[xBlockOrigin] Attempting to mute @${username} (${userId}) from ${country}...`
198204
)
199-
const success = await apiQueue.enqueue(() => muteUser(userId))
205+
const success = await apiQueue.enqueue(() => muteUser(userId), username)
200206

201207
if (!success) {
202208
console.error(`[xBlockOrigin] Failed to mute @${username}`)
@@ -252,64 +258,85 @@ function getCurrentPage(): string {
252258
return 'unknown'
253259
}
254260

261+
function waitForNavigation(currentUrl: string): Promise<string> {
262+
// race between popstate event and polling
263+
const popstatePromise = new Promise<string>((resolve) => {
264+
const handler = () => {
265+
if (window.location.href !== currentUrl) {
266+
resolve(window.location.href)
267+
}
268+
}
269+
window.addEventListener('popstate', handler, { once: true })
270+
})
271+
272+
const pollingPromise = new Promise<string>((resolve) => {
273+
const checkUrl = () => {
274+
if (window.location.href !== currentUrl) {
275+
resolve(window.location.href)
276+
} else {
277+
setTimeout(checkUrl, 100)
278+
}
279+
}
280+
checkUrl()
281+
})
282+
283+
return Promise.race([popstatePromise, pollingPromise])
284+
}
285+
255286
export function startOrchestrator() {
256287
const cleanupFns: Array<() => void> = []
288+
let running = true
257289

258290
const handleUser = (username: string, tweetElement?: Element) => {
259291
processUser(username, tweetElement)
260292
}
261293

262-
const currentPage = getCurrentPage()
263-
264-
switch (currentPage) {
265-
case 'timeline':
266-
cleanupFns.push(scanTimeline(handleUser))
267-
break
268-
case 'search':
269-
cleanupFns.push(scanSearch(handleUser))
270-
break
271-
case 'notifications':
272-
cleanupFns.push(scanReplies(handleUser))
273-
break
274-
case 'status':
275-
cleanupFns.push(scanStatus(handleUser))
276-
break
277-
case 'profile':
278-
cleanupFns.push(scanProfile(handleUser))
279-
break
294+
const startScanners = (page: string) => {
295+
switch (page) {
296+
case 'timeline':
297+
cleanupFns.push(scanTimeline(handleUser))
298+
break
299+
case 'search':
300+
cleanupFns.push(scanSearch(handleUser))
301+
break
302+
case 'notifications':
303+
cleanupFns.push(scanReplies(handleUser))
304+
break
305+
case 'status':
306+
cleanupFns.push(scanStatus(handleUser))
307+
break
308+
case 'profile':
309+
cleanupFns.push(scanProfile(handleUser))
310+
break
311+
}
280312
}
281313

282-
let lastUrl = window.location.href
283-
const urlWatcher = setInterval(() => {
284-
if (window.location.href !== lastUrl) {
285-
lastUrl = window.location.href
314+
const handleNavigation = async () => {
315+
let currentUrl = window.location.href
316+
startScanners(getCurrentPage())
317+
318+
while (running) {
319+
const newUrl = await waitForNavigation(currentUrl)
320+
if (!running) break
321+
322+
currentUrl = newUrl
286323

324+
// clear pending API requests on navigation
325+
apiQueue.clear()
326+
327+
// cleanup old scanners
287328
cleanupFns.forEach((fn) => fn())
288329
cleanupFns.length = 0
289330

290-
const newPage = getCurrentPage()
291-
switch (newPage) {
292-
case 'timeline':
293-
cleanupFns.push(scanTimeline(handleUser))
294-
break
295-
case 'search':
296-
cleanupFns.push(scanSearch(handleUser))
297-
break
298-
case 'notifications':
299-
cleanupFns.push(scanReplies(handleUser))
300-
break
301-
case 'status':
302-
cleanupFns.push(scanStatus(handleUser))
303-
break
304-
case 'profile':
305-
cleanupFns.push(scanProfile(handleUser))
306-
break
307-
}
331+
// start new scanners
332+
startScanners(getCurrentPage())
308333
}
309-
}, 1000)
334+
}
335+
336+
handleNavigation()
310337

311338
return () => {
312-
clearInterval(urlWatcher)
339+
running = false
313340
cleanupFns.forEach((fn) => fn())
314341
}
315342
}

packages/extension/src/Utils/rateLimit.ts

Lines changed: 72 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,13 @@ type QueuedTask = {
22
fn: () => Promise<unknown>
33
resolve: (value: unknown) => void
44
reject: (error: unknown) => void
5+
addedAt: number
6+
key?: string
57
}
68

9+
const MAX_AGE_MS = 60 * 1000 // 1 minute
10+
const MAX_QUEUE_SIZE = 10
11+
712
export function createQueue() {
813
const queue: QueuedTask[] = []
914
let isProcessing = false
@@ -16,9 +21,23 @@ export function createQueue() {
1621
isProcessing = true
1722

1823
while (queue.length > 0) {
19-
const task = queue.shift()
24+
const task = queue.pop()
2025
if (!task) break
2126

27+
// skip tasks older than 1 minute
28+
const age = Date.now() - task.addedAt
29+
if (age > MAX_AGE_MS) {
30+
console.warn(
31+
`[xBlockOrigin] Skipping stale API request (age: ${Math.round(age / 1000)}s)`
32+
)
33+
task.reject(
34+
new Error(
35+
`Request timed out after ${MAX_AGE_MS}ms in queue`
36+
)
37+
)
38+
continue
39+
}
40+
2241
try {
2342
const result = await task.fn()
2443
task.resolve(result)
@@ -31,19 +50,67 @@ export function createQueue() {
3150
}
3251

3352
return {
34-
enqueue<T>(fn: () => Promise<T>): Promise<T> {
53+
enqueue<T>(fn: () => Promise<T>, key?: string): Promise<T> {
3554
return new Promise<T>((resolve, reject) => {
36-
queue.push({
55+
// if key is provided, check if task already exists and remove it
56+
if (key) {
57+
const existingIndex = queue.findIndex(
58+
(task) => task.key === key
59+
)
60+
if (existingIndex !== -1) {
61+
const removed = queue.splice(existingIndex, 1)[0]
62+
if (removed) {
63+
console.log(
64+
`[xBlockOrigin] Bumping priority for ${key} (was at position ${existingIndex})`
65+
)
66+
removed.reject(
67+
new Error('Task re-queued with higher priority')
68+
)
69+
}
70+
}
71+
}
72+
73+
// remove oldest item if queue is full
74+
if (queue.length >= MAX_QUEUE_SIZE) {
75+
const removed = queue.shift()
76+
if (removed) {
77+
console.warn(
78+
`[xBlockOrigin] Queue full, dropping oldest request (age: ${Math.round((Date.now() - removed.addedAt) / 1000)}s)`
79+
)
80+
removed.reject(new Error('Queue full, request dropped'))
81+
}
82+
}
83+
84+
const task: QueuedTask = {
3785
fn,
3886
resolve: (value) => resolve(value as T),
39-
reject
40-
})
87+
reject,
88+
addedAt: Date.now()
89+
}
90+
if (key) {
91+
task.key = key
92+
}
93+
queue.push(task)
4194
processQueue()
4295
})
4396
},
4497

4598
getQueueLength(): number {
4699
return queue.length
100+
},
101+
102+
clear(): void {
103+
const count = queue.length
104+
if (count > 0) {
105+
console.log(
106+
`[xBlockOrigin] Clearing ${count} pending API requests from queue`
107+
)
108+
// reject all pending tasks
109+
for (const task of queue) {
110+
task.reject(new Error('Queue cleared due to navigation'))
111+
}
112+
queue.length = 0
113+
}
47114
}
48115
}
49116
}

0 commit comments

Comments
 (0)