Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion apps/webapp/app/v3/authenticatedSocketConnection.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,10 @@ export class AuthenticatedSocketConnection {
});
});
},
canSendMessage: () => ws.readyState === WebSocket.OPEN,
});

this._consumer = new DevQueueConsumer(authenticatedEnv, this._sender, {
this._consumer = new DevQueueConsumer(this.id, authenticatedEnv, this._sender, {
ipAddress: Array.isArray(this.ipAddress) ? this.ipAddress.join(", ") : this.ipAddress,
});

Expand Down
41 changes: 41 additions & 0 deletions apps/webapp/app/v3/marqs/devQueueConsumer.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import { attributesFromAuthenticatedEnv, tracer } from "../tracer.server";
import { getMaxDuration } from "../utils/maxDuration";
import { DevSubscriber, devPubSub } from "./devPubSub.server";
import { findQueueInEnvironment, sanitizeQueueName } from "~/models/taskQueue.server";
import { createRedisClient, RedisClient } from "~/redis.server";

const MessageBody = z.discriminatedUnion("type", [
z.object({
Expand Down Expand Up @@ -53,14 +54,21 @@ export class DevQueueConsumer {
private _currentSpan: Span | undefined;
private _endSpanInNextIteration = false;
private _inProgressRuns: Map<string, string> = new Map(); // Keys are task run friendly IDs, values are TaskRun internal ids/queue message ids
private _connectionLostAt?: Date;
private _redisClient: RedisClient;

constructor(
public id: string,
public env: AuthenticatedEnvironment,
private _sender: ZodMessageSender<typeof serverWebsocketMessages>,
private _options: DevQueueConsumerOptions = {}
) {
this._traceTimeoutSeconds = _options.traceTimeoutSeconds ?? 60;
this._maximumItemsPerTrace = _options.maximumItemsPerTrace ?? 1_000;
this._redisClient = createRedisClient("tr:devQueueConsumer", {
keyPrefix: "tr:devQueueConsumer:",
...devPubSub.redisOptions,
});
}

// This method is called when a background worker is deprecated and will no longer be used unless a run is locked to it
Expand Down Expand Up @@ -235,6 +243,8 @@ export class DevQueueConsumer {
return;
}

await this._redisClient.set(`connection:${this.env.id}`, this.id);

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Add TTL or cleanup routine for the Redis key
Setting connection:${this.env.id} may lead to stale entries if the consumer crashes. Consider using an expiration or a cleanup process, so inactive connections won't block new consumers.

this._enabled = true;
// Create the session
await createNewSession(this.env, this._options.ipAddress ?? "unknown");
Expand All @@ -252,6 +262,37 @@ export class DevQueueConsumer {
return;
}

const canSendMessage = await this._sender.validateCanSendMessage();

if (!canSendMessage) {
this._connectionLostAt ??= new Date();

if (Date.now() - this._connectionLostAt.getTime() > 60 * 1000) {
logger.debug("Connection lost for more than 60 seconds, stopping the consumer", {
env: this.env,
});

await this.stop("Connection lost for more than 60 seconds");
return;
}

setTimeout(() => this.#doWork(), 1000);
}

this._connectionLostAt = undefined;

const currentConnection = await this._redisClient.get(`connection:${this.env.id}`);

if (currentConnection !== this.id) {
logger.debug("Another connection is active, stopping the consumer", {
currentConnection,
env: this.env,
});

await this.stop("Another connection is active");
return;
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Assess concurrency handling
Using Redis to store the active connection ID is a good start, but there may be a race condition if multiple consumers try to claim the same environment concurrently. For robust exclusivity, consider using distributed locks or atomic check-and-set operations.


// Check if the trace has expired
if (
this._perTraceCountdown === 0 ||
Expand Down
4 changes: 4 additions & 0 deletions apps/webapp/app/v3/utils/zodPubSub.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,10 @@ export class ZodPubSub<TMessageCatalog extends ZodMessageCatalogSchema> {
this._publisher = createRedisClient("trigger:zodSubscriber", _options.redis);
}

get redisOptions() {
return this._options.redis;
}

public async publish<K extends keyof TMessageCatalog>(
channel: string,
type: K,
Expand Down
Loading