@@ -22,6 +22,7 @@ import { attributesFromAuthenticatedEnv, tracer } from "../tracer.server";
2222import { getMaxDuration } from "../utils/maxDuration" ;
2323import { DevSubscriber , devPubSub } from "./devPubSub.server" ;
2424import { findQueueInEnvironment , sanitizeQueueName } from "~/models/taskQueue.server" ;
25+ import { createRedisClient , RedisClient } from "~/redis.server" ;
2526
2627const MessageBody = z . discriminatedUnion ( "type" , [
2728 z . object ( {
@@ -53,14 +54,21 @@ export class DevQueueConsumer {
5354 private _currentSpan : Span | undefined ;
5455 private _endSpanInNextIteration = false ;
5556 private _inProgressRuns : Map < string , string > = new Map ( ) ; // Keys are task run friendly IDs, values are TaskRun internal ids/queue message ids
57+ private _connectionLostAt ?: Date ;
58+ private _redisClient : RedisClient ;
5659
5760 constructor (
61+ public id : string ,
5862 public env : AuthenticatedEnvironment ,
5963 private _sender : ZodMessageSender < typeof serverWebsocketMessages > ,
6064 private _options : DevQueueConsumerOptions = { }
6165 ) {
6266 this . _traceTimeoutSeconds = _options . traceTimeoutSeconds ?? 60 ;
6367 this . _maximumItemsPerTrace = _options . maximumItemsPerTrace ?? 1_000 ;
68+ this . _redisClient = createRedisClient ( "tr:devQueueConsumer" , {
69+ keyPrefix : "tr:devQueueConsumer:" ,
70+ ...devPubSub . redisOptions ,
71+ } ) ;
6472 }
6573
6674 // This method is called when a background worker is deprecated and will no longer be used unless a run is locked to it
@@ -235,6 +243,8 @@ export class DevQueueConsumer {
235243 return ;
236244 }
237245
246+ await this . _redisClient . set ( `connection:${ this . env . id } ` , this . id , "EX" , 60 * 60 * 24 ) ; // 24 hours
247+
238248 this . _enabled = true ;
239249 // Create the session
240250 await createNewSession ( this . env , this . _options . ipAddress ?? "unknown" ) ;
@@ -252,6 +262,38 @@ export class DevQueueConsumer {
252262 return ;
253263 }
254264
265+ const canSendMessage = await this . _sender . validateCanSendMessage ( ) ;
266+
267+ if ( ! canSendMessage ) {
268+ this . _connectionLostAt ??= new Date ( ) ;
269+
270+ if ( Date . now ( ) - this . _connectionLostAt . getTime ( ) > 60 * 1000 ) {
271+ logger . debug ( "Connection lost for more than 60 seconds, stopping the consumer" , {
272+ env : this . env ,
273+ } ) ;
274+
275+ await this . stop ( "Connection lost for more than 60 seconds" ) ;
276+ return ;
277+ }
278+
279+ setTimeout ( ( ) => this . #doWork( ) , 1000 ) ;
280+ return ;
281+ }
282+
283+ this . _connectionLostAt = undefined ;
284+
285+ const currentConnection = await this . _redisClient . get ( `connection:${ this . env . id } ` ) ;
286+
287+ if ( currentConnection && currentConnection !== this . id ) {
288+ logger . debug ( "Another connection is active, stopping the consumer" , {
289+ currentConnection,
290+ env : this . env ,
291+ } ) ;
292+
293+ await this . stop ( "Another connection is active" ) ;
294+ return ;
295+ }
296+
255297 // Check if the trace has expired
256298 if (
257299 this . _perTraceCountdown === 0 ||
0 commit comments