Skip to content

Commit 7a156bc

Browse files
committed
attributes for handler loggers
1 parent 35ba69f commit 7a156bc

File tree

4 files changed

+85
-13
lines changed

4 files changed

+85
-13
lines changed

apps/coordinator/src/index.ts

Lines changed: 66 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ import { Server } from "socket.io";
33
import {
44
CoordinatorToPlatformMessages,
55
CoordinatorToProdWorkerMessages,
6+
omit,
67
PlatformToCoordinatorMessages,
78
ProdWorkerSocketData,
89
ProdWorkerToCoordinatorMessages,
@@ -127,17 +128,28 @@ class TaskCoordinator {
127128
clientMessages: CoordinatorToPlatformMessages,
128129
serverMessages: PlatformToCoordinatorMessages,
129130
authToken: PLATFORM_SECRET,
131+
logHandlerPayloads: false,
130132
handlers: {
131133
RESUME_AFTER_DEPENDENCY: async (message) => {
134+
const log = logger.child({
135+
eventName: "RESUME_AFTER_DEPENDENCY",
136+
...omit(message, "completions", "executions"),
137+
completions: message.completions.map((c) => c.id),
138+
executions: message.executions.length,
139+
});
140+
141+
log.debug("Handling RESUME_AFTER_DEPENDENCY");
142+
132143
const taskSocket = await this.#getAttemptSocket(message.attemptFriendlyId);
133144

134145
if (!taskSocket) {
135-
logger.log("Socket for attempt not found", {
136-
attemptFriendlyId: message.attemptFriendlyId,
137-
});
146+
log.debug("Socket for attempt not found");
138147
return;
139148
}
140149

150+
log.addFields({ socketData: taskSocket.data });
151+
log.log("Found task socket for RESUME_AFTER_DEPENDENCY");
152+
141153
await chaosMonkey.call();
142154

143155
// In case the task resumed faster than we could checkpoint
@@ -146,12 +158,17 @@ class TaskCoordinator {
146158
taskSocket.emit("RESUME_AFTER_DEPENDENCY", message);
147159
},
148160
RESUME_AFTER_DEPENDENCY_WITH_ACK: async (message) => {
161+
const log = logger.child({
162+
eventName: "RESUME_AFTER_DEPENDENCY_WITH_ACK",
163+
...omit(message, "completions", "executions"),
164+
});
165+
166+
log.debug("Handling RESUME_AFTER_DEPENDENCY_WITH_ACK");
167+
149168
const taskSocket = await this.#getAttemptSocket(message.attemptFriendlyId);
150169

151170
if (!taskSocket) {
152-
logger.log("Socket for attempt not found", {
153-
attemptFriendlyId: message.attemptFriendlyId,
154-
});
171+
log.debug("Socket for attempt not found");
155172
return {
156173
success: false,
157174
error: {
@@ -161,11 +178,12 @@ class TaskCoordinator {
161178
};
162179
}
163180

181+
log.addFields({ socketData: taskSocket.data });
182+
log.log("Found task socket for RESUME_AFTER_DEPENDENCY_WITH_ACK");
183+
164184
//if this is set, we want to kill the process because it will be resumed with the checkpoint from the queue
165185
if (taskSocket.data.requiresCheckpointResumeWithMessage) {
166-
logger.log("RESUME_AFTER_DEPENDENCY_WITH_ACK: Checkpoint is set so going to nack", {
167-
socketData: taskSocket.data,
168-
});
186+
log.log("RESUME_AFTER_DEPENDENCY_WITH_ACK: Checkpoint is set so going to nack");
169187

170188
return {
171189
success: false,
@@ -189,6 +207,13 @@ class TaskCoordinator {
189207
};
190208
},
191209
RESUME_AFTER_DURATION: async (message) => {
210+
const log = logger.child({
211+
eventName: "RESUME_AFTER_DURATION",
212+
...message,
213+
});
214+
215+
log.debug("Handling RESUME_AFTER_DURATION");
216+
192217
const taskSocket = await this.#getAttemptSocket(message.attemptFriendlyId);
193218

194219
if (!taskSocket) {
@@ -198,11 +223,19 @@ class TaskCoordinator {
198223
return;
199224
}
200225

226+
log.addFields({ socketData: taskSocket.data });
227+
log.log("Found task socket for RESUME_AFTER_DURATION");
228+
201229
await chaosMonkey.call();
202230

203231
taskSocket.emit("RESUME_AFTER_DURATION", message);
204232
},
205233
REQUEST_ATTEMPT_CANCELLATION: async (message) => {
234+
const log = logger.child({
235+
eventName: "REQUEST_ATTEMPT_CANCELLATION",
236+
...message,
237+
});
238+
206239
const taskSocket = await this.#getAttemptSocket(message.attemptFriendlyId);
207240

208241
if (!taskSocket) {
@@ -212,9 +245,17 @@ class TaskCoordinator {
212245
return;
213246
}
214247

248+
log.addFields({ socketData: taskSocket.data });
249+
log.log("Found task socket for REQUEST_ATTEMPT_CANCELLATION");
250+
215251
taskSocket.emit("REQUEST_ATTEMPT_CANCELLATION", message);
216252
},
217253
REQUEST_RUN_CANCELLATION: async (message) => {
254+
const log = logger.child({
255+
eventName: "REQUEST_RUN_CANCELLATION",
256+
...message,
257+
});
258+
218259
const taskSocket = await this.#getRunSocket(message.runId);
219260

220261
if (!taskSocket) {
@@ -224,6 +265,9 @@ class TaskCoordinator {
224265
return;
225266
}
226267

268+
log.addFields({ socketData: taskSocket.data });
269+
log.log("Found task socket for REQUEST_RUN_CANCELLATION");
270+
227271
this.#cancelCheckpoint(message.runId);
228272

229273
if (message.delayInMs) {
@@ -239,6 +283,11 @@ class TaskCoordinator {
239283
}
240284
},
241285
READY_FOR_RETRY: async (message) => {
286+
const log = logger.child({
287+
eventName: "READY_FOR_RETRY",
288+
...message,
289+
});
290+
242291
const taskSocket = await this.#getRunSocket(message.runId);
243292

244293
if (!taskSocket) {
@@ -248,11 +297,19 @@ class TaskCoordinator {
248297
return;
249298
}
250299

300+
log.addFields({ socketData: taskSocket.data });
301+
log.log("Found task socket for READY_FOR_RETRY");
302+
251303
await chaosMonkey.call();
252304

253305
taskSocket.emit("READY_FOR_RETRY", message);
254306
},
255307
DYNAMIC_CONFIG: async (message) => {
308+
const log = logger.child({
309+
eventName: "DYNAMIC_CONFIG",
310+
...message,
311+
});
312+
256313
this.#delayThresholdInMs = message.checkpointThresholdInMs;
257314

258315
// The first time we receive a dynamic config, the worker namespace will be created

packages/core/src/v3/utils/structuredLogger.ts

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,13 @@ export class SimpleStructuredLogger implements StructuredLogger {
6060
this.#structuredLog(console.debug, message, "debug", ...args);
6161
}
6262

63+
addFields(fields: Record<string, unknown>) {
64+
this.fields = {
65+
...this.fields,
66+
...fields,
67+
};
68+
}
69+
6370
#structuredLog(
6471
loggerFunction: (message: string, ...args: any[]) => void,
6572
message: string,

packages/core/src/v3/zodNamespace.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ interface ZodNamespaceOptions<
4141
handlers?: ZodSocketMessageHandlers<TClientMessages>;
4242
authToken?: string;
4343
logger?: StructuredLogger;
44+
logHandlerPayloads?: boolean;
4445
preAuth?: (
4546
socket: ZodNamespaceSocket<TClientMessages, TServerMessages, TServerSideEvents, TSocketData>,
4647
next: (err?: ExtendedError) => void,
@@ -96,6 +97,7 @@ export class ZodNamespace<
9697
this.#handler = new ZodSocketMessageHandler({
9798
schema: opts.clientMessages,
9899
handlers: opts.handlers,
100+
logPayloads: opts.logHandlerPayloads,
99101
});
100102

101103
this.io = opts.io;

packages/core/src/v3/zodSocket.ts

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,7 @@ export type ZodSocketMessageHandlerOptions<TMessageCatalog extends ZodSocketMess
6868
schema: TMessageCatalog;
6969
handlers?: ZodSocketMessageHandlers<TMessageCatalog>;
7070
logger?: StructuredLogger;
71+
logPayloads?: boolean;
7172
};
7273

7374
type MessageFromSocketSchema<
@@ -92,12 +93,14 @@ export class ZodSocketMessageHandler<TRPCCatalog extends ZodSocketMessageCatalog
9293
#schema: TRPCCatalog;
9394
#handlers: ZodSocketMessageHandlers<TRPCCatalog> | undefined;
9495
#logger: StructuredLogger;
96+
#logPayloads: boolean;
9597

9698
constructor(options: ZodSocketMessageHandlerOptions<TRPCCatalog>) {
9799
this.#schema = options.schema;
98100
this.#handlers = options.handlers;
99101
this.#logger =
100102
options.logger ?? new SimpleStructuredLogger("socket-message-handler", LogLevel.info);
103+
this.#logPayloads = options.logPayloads ?? !!process.env.LOG_SOCKET_HANDLER_PAYLOADS ?? false;
101104
}
102105

103106
public async handleMessage(message: unknown) {
@@ -120,7 +123,7 @@ export class ZodSocketMessageHandler<TRPCCatalog extends ZodSocketMessageCatalog
120123
const handler = this.#handlers[type];
121124

122125
if (!handler) {
123-
console.error(`No handler for message type: ${String(type)}`);
126+
this.#logger.error("No handler for message type", { type, payload });
124127
return;
125128
}
126129

@@ -164,7 +167,7 @@ export class ZodSocketMessageHandler<TRPCCatalog extends ZodSocketMessageCatalog
164167
const parsedPayload = schema.safeParse(messageWithVersion);
165168

166169
if (!parsedPayload.success) {
167-
console.error("Failed to parse message payload", {
170+
this.#logger.error("Failed to parse message payload", {
168171
message,
169172
payload: messageWithVersion,
170173
});
@@ -194,8 +197,9 @@ export class ZodSocketMessageHandler<TRPCCatalog extends ZodSocketMessageCatalog
194197

195198
for (const eventName of Object.keys(this.#handlers)) {
196199
emitter.on(eventName, async (message: any, callback?: any): Promise<void> => {
197-
log.info(`handling ${eventName}`, {
198-
payload: message,
200+
log.info(`Incoming event ${eventName}`, {
201+
eventName,
202+
...(this.#logPayloads ? { eventMessage: message } : {}),
199203
hasCallback: !!callback,
200204
});
201205

@@ -344,6 +348,7 @@ interface ZodSocketConnectionOptions<
344348
handlers?: ZodSocketMessageHandlers<TServerMessages>;
345349
authToken?: string;
346350
ioOptions?: Partial<ManagerOptions & SocketOptions>;
351+
logHandlerPayloads?: boolean;
347352
onConnection?: (
348353
socket: ZodSocket<TServerMessages, TClientMessages>,
349354
handler: ZodSocketMessageHandler<TServerMessages>,
@@ -399,6 +404,7 @@ export class ZodSocketConnection<
399404
this.#handler = new ZodSocketMessageHandler({
400405
schema: opts.serverMessages,
401406
handlers: opts.handlers,
407+
logPayloads: opts.logHandlerPayloads,
402408
});
403409
this.#handler.registerHandlers(this.socket, this.#logger);
404410

0 commit comments

Comments
 (0)