Skip to content

Commit cd029df

Browse files
authored
Merge pull request #52 from Balatro-Multiplayer/queue-fixes
Queue fixes and cron job
2 parents 0aa6021 + cb79262 commit cd029df

File tree

4 files changed

+61
-26
lines changed

4 files changed

+61
-26
lines changed

src/events/interactionCreate.ts

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -158,14 +158,10 @@ export default {
158158
: 'You left the queue.',
159159
})
160160

161-
await updateQueueMessage()
162-
163161
// Delete the message after 10 seconds
164162
setTimeout(async () => {
165163
await interaction.deleteReply(reply.id).catch(() => {})
166164
}, 10000)
167-
} else {
168-
await updateQueueMessage()
169165
}
170166
} finally {
171167
// Always remove user from processing set
@@ -491,8 +487,6 @@ export default {
491487
content: message,
492488
})
493489

494-
await updateQueueMessage()
495-
496490
// Delete the message after 10 seconds
497491
setTimeout(async () => {
498492
await interaction.deleteReply(reply.id).catch(() => {})

src/index.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import {
66
runDecayTick,
77
updateMatchCountCronJob,
88
deleteExpiredStrikesCronJob,
9+
updateQueueMessageCronJob,
910
} from './utils/cronJobs'
1011
import { app } from './api/app'
1112
import { client } from './client'
@@ -70,6 +71,9 @@ void updateMatchCountCronJob().catch((error) =>
7071
void deleteExpiredStrikesCronJob().catch((error) =>
7172
console.error('[STRIKES CRON ERROR]', error),
7273
)
74+
void updateQueueMessageCronJob().catch((error) =>
75+
console.error('[QUEUE MESSAGE CRON ERROR]', error),
76+
)
7377

7478
// Start API server
7579
const port = Number(process.env.PORT) || 4931

src/utils/cronJobs.ts

Lines changed: 53 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,11 @@ import {
1414
removeUserFromQueue,
1515
updateCurrentEloRangeForUser,
1616
} from './queryDB'
17-
import { createMatch, timeSpentInQueue } from './queueHelpers'
17+
import {
18+
createMatch,
19+
timeSpentInQueue,
20+
updateQueueMessage,
21+
} from './queueHelpers'
1822
import { updateMatchCountChannel } from './matchHelpers'
1923
import { pool } from '../db'
2024
import * as fs from 'fs'
@@ -159,21 +163,44 @@ export async function incrementEloCronJobAllQueues() {
159163
if (bestPair.length === 2) {
160164
const matchupUsers = bestPair.map((u) => u.userId)
161165

162-
// remove users from all queues they are in
163-
// Wait for all removal operations to complete before creating match
164-
await Promise.all(
165-
matchupUsers.map(async (userId) => {
166-
const userQueues = await getUserQueues(userId)
167-
await Promise.all(
168-
userQueues.map((queue) =>
169-
removeUserFromQueue(queue.id, userId),
170-
),
171-
)
172-
}),
173-
)
166+
// Check with a lock to ensure that only one match is created at a time
167+
const dbClient = await pool.connect()
168+
try {
169+
await dbClient.query('BEGIN')
170+
171+
// Lock the rows and check if all users are still in queue
172+
const checkResult = await dbClient.query(
173+
`SELECT user_id FROM queue_users
174+
WHERE user_id = ANY($1::varchar[])
175+
AND queue_join_time IS NOT NULL
176+
FOR UPDATE`,
177+
[matchupUsers],
178+
)
179+
180+
// If not all users are still available, skip this match
181+
if (checkResult.rowCount !== matchupUsers.length) {
182+
await dbClient.query('ROLLBACK')
183+
dbClient.release()
184+
continue
185+
}
186+
187+
// Remove users from queue (set queue_join_time to NULL)
188+
await dbClient.query(
189+
`UPDATE queue_users SET queue_join_time = NULL
190+
WHERE user_id = ANY($1::varchar[])`,
191+
[matchupUsers],
192+
)
174193

175-
// queue them for the match
176-
await createMatch(matchupUsers, bestPair[0].queueId)
194+
await dbClient.query('COMMIT')
195+
dbClient.release()
196+
197+
// Now create the match (users are already removed from queue)
198+
await createMatch(matchupUsers, bestPair[0].queueId)
199+
} catch (err) {
200+
await dbClient.query('ROLLBACK')
201+
dbClient.release()
202+
console.error('Error creating match in incrementEloCronJob:', err)
203+
}
177204
}
178205
}
179206
} catch (err) {
@@ -258,3 +285,14 @@ export async function deleteExpiredStrikesCronJob() {
258285
2 * 60 * 60 * 1000,
259286
) // every 2 hours
260287
}
288+
289+
// update queue message every 2 seconds
290+
export async function updateQueueMessageCronJob() {
291+
setInterval(async () => {
292+
try {
293+
await updateQueueMessage()
294+
} catch (err) {
295+
console.error('Error updating queue message:', err)
296+
}
297+
}, 2 * 1000) // every 2 seconds
298+
}

src/utils/queueHelpers.ts

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -287,17 +287,18 @@ export async function joinQueues(
287287
// Batch upsert all queue joins
288288
for (const qId of selectedQueueIds) {
289289
const queueId = parseInt(qId)
290-
const queue = queueMap.get(queueId)
290+
const queue: Queues = queueMap.get(queueId)
291291
if (!queue) continue
292292

293293
await createQueueUser(userId, queueId)
294294
await client.query(
295295
`UPDATE queue_users
296296
SET queue_join_time = NOW(),
297297
elo = COALESCE(elo, $3),
298-
peak_elo = COALESCE(peak_elo, $3)
298+
peak_elo = COALESCE(peak_elo, $3),
299+
current_elo_range = $4
299300
WHERE user_id = $1 AND queue_id = $2`,
300-
[userId, queueId, queue.default_elo],
301+
[userId, queueId, queue.default_elo, queue.elo_search_start],
301302
)
302303
}
303304

@@ -543,8 +544,6 @@ export async function createMatch(
543544
} catch (err) {}
544545
}
545546

546-
await updateQueueMessage()
547-
548547
// Wait 2 seconds for channel to fully propagate in Discord's API
549548
await new Promise((resolve) => setTimeout(resolve, 2000))
550549

0 commit comments

Comments
 (0)