Skip to content

Commit 8db0952

Browse files
committed
Allow subscribing to 'rule-event', to monitor rule processing events
The first and main motivating example here is upstream passthrough events. This allows you to see exactly what a passthrough rule is send & receiving from the server, not just what the client is sending & receiving to Mockttp in general. This can be useful for low level debugging of proxy traffic and complex processing or transformations, where the forwarded traffic is confusingly different to the original input/resulting output. This does not attempt to do the same for websocket passthrough yet, as websocket passthrough doesn't support very much in terms of traffic modification, but this will be added in future as that develops. This required touching on a few details of how passthrough worked for server response bodies, and _may_ have fixed some related bugs there where the body could be read, not modified, but then lost and not forwarded back to clients.
1 parent b8b3e60 commit 8db0952

File tree

9 files changed

+364
-34
lines changed

9 files changed

+364
-34
lines changed

src/admin/mockttp-admin-model.ts

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ const TLS_PASSTHROUGH_OPENED_TOPIC = 'tls-passthrough-opened';
3232
const TLS_PASSTHROUGH_CLOSED_TOPIC = 'tls-passthrough-closed';
3333
const TLS_CLIENT_ERROR_TOPIC = 'tls-client-error';
3434
const CLIENT_ERROR_TOPIC = 'client-error';
35+
const RULE_EVENT_TOPIC = 'rule-event';
3536

3637
async function buildMockedEndpointData(endpoint: ServerMockedEndpoint): Promise<MockedEndpointData> {
3738
return {
@@ -131,6 +132,12 @@ export function buildAdminServerModel(
131132
})
132133
});
133134

135+
mockServer.on('rule-event', (evt) => {
136+
pubsub.publish(RULE_EVENT_TOPIC, {
137+
ruleEvent: evt
138+
})
139+
});
140+
134141
return <any> {
135142
Query: {
136143
mockedEndpoints: async (): Promise<MockedEndpointData[]> => {
@@ -229,6 +236,9 @@ export function buildAdminServerModel(
229236
},
230237
failedClientRequest: {
231238
subscribe: () => pubsub.asyncIterator(CLIENT_ERROR_TOPIC)
239+
},
240+
ruleEvent: {
241+
subscribe: () => pubsub.asyncIterator(RULE_EVENT_TOPIC)
232242
}
233243
},
234244

src/admin/mockttp-schema.ts

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ export const MockttpSchema = gql`
3232
tlsPassthroughClosed: TlsPassthroughEvent!
3333
failedTlsRequest: TlsHandshakeFailure!
3434
failedClientRequest: ClientError!
35+
ruleEvent: RuleEvent!
3536
}
3637
3738
type MockedEndpoint {
@@ -113,6 +114,13 @@ export const MockttpSchema = gql`
113114
remotePort: Int!
114115
}
115116
117+
type RuleEvent {
118+
requestId: ID!
119+
ruleId: ID!
120+
eventType: String!
121+
eventData: Raw!
122+
}
123+
116124
type InitiatedRequest {
117125
id: ID!
118126
timingEvents: Json!

src/client/mockttp-admin-request-builder.ts

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -435,6 +435,14 @@ export class MockttpAdminRequestBuilder {
435435
body
436436
}
437437
}
438+
}`,
439+
'rule-event': gql`subscription OnRuleEvent {
440+
ruleEvent {
441+
requestId
442+
ruleId
443+
eventType
444+
eventData
445+
}
438446
}`
439447
}[event];
440448

@@ -460,6 +468,13 @@ export class MockttpAdminRequestBuilder {
460468
} else if (event === 'abort') {
461469
normalizeHttpMessage(data, event);
462470
data.error = data.error ? JSON.parse(data.error) : undefined;
471+
} else if (event === 'rule-event') {
472+
const { eventData } = data;
473+
474+
// Events may include raw body data buffers, serialized as base64:
475+
if (eventData.rawBody !== undefined) {
476+
eventData.rawBody = Buffer.from(eventData.rawBody, 'base64');
477+
}
463478
} else {
464479
normalizeHttpMessage(data, event);
465480
}
@@ -477,6 +492,7 @@ export class MockttpAdminRequestBuilder {
477492
query GetEndpointData($id: ID!) {
478493
mockedEndpoint(id: $id) {
479494
seenRequests {
495+
id,
480496
protocol,
481497
method,
482498
url,

src/mockttp.ts

Lines changed: 23 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,8 @@ import {
1919
RulePriority,
2020
WebSocketMessage,
2121
WebSocketClose,
22-
AbortedRequest
22+
AbortedRequest,
23+
RuleEvent
2324
} from "./types";
2425
import type { RequestRuleData } from "./rules/requests/request-rule";
2526
import type { WebSocketRuleData } from "./rules/websockets/websocket-rule";
@@ -544,6 +545,25 @@ export interface Mockttp {
544545
*/
545546
on(event: 'client-error', callback: (error: ClientError) => void): Promise<void>;
546547

548+
/**
549+
* Some rules may emit events with metadata about request processing. For example,
550+
* passthrough rules may emit events about upstream server interactions.
551+
*
552+
* You can listen to rule-event to hear about all these events. When emitted,
553+
* this will include the id of the request being processed, the id of the rule
554+
* that fired the event, the type of the event, and the event data itself.
555+
*
556+
* This is only useful in some niche use cases, such as logging all proxied upstream
557+
* requests made by the server, separately from the client connections handled.
558+
*
559+
* The callback will be called asynchronously from request handling. This function
560+
* returns a promise, and the callback is not guaranteed to be registered until
561+
* the promise is resolved.
562+
*
563+
* @category Events
564+
*/
565+
on<T = unknown>(event: 'rule-event', callback: (event: RuleEvent<T>) => void): Promise<void>;
566+
547567
/**
548568
* Adds the given HTTP request rules to the server.
549569
*
@@ -765,7 +785,8 @@ export type SubscribableEvent =
765785
| 'tls-passthrough-opened'
766786
| 'tls-passthrough-closed'
767787
| 'tls-client-error'
768-
| 'client-error';
788+
| 'client-error'
789+
| 'rule-event';
769790

770791
/**
771792
* @hidden

src/rules/requests/request-handlers.ts

Lines changed: 94 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -140,7 +140,15 @@ function isSerializedBuffer(obj: any): obj is SerializedBuffer {
140140
}
141141

142142
export interface RequestHandler extends RequestHandlerDefinition {
143-
handle(request: OngoingRequest, response: OngoingResponse): Promise<void>;
143+
handle(
144+
request: OngoingRequest,
145+
response: OngoingResponse,
146+
options: RequestHandlerOptions
147+
): Promise<void>;
148+
}
149+
150+
export interface RequestHandlerOptions {
151+
emitEventCallback?: (type: string, event: unknown) => void;
144152
}
145153

146154
export class SimpleHandler extends SimpleHandlerDefinition {
@@ -380,7 +388,11 @@ export class PassThroughHandler extends PassThroughHandlerDefinition {
380388
return this._trustedCACertificates;
381389
}
382390

383-
async handle(clientReq: OngoingRequest, clientRes: OngoingResponse) {
391+
async handle(
392+
clientReq: OngoingRequest,
393+
clientRes: OngoingResponse,
394+
options: RequestHandlerOptions
395+
) {
384396
// Don't let Node add any default standard headers - we want full control
385397
dropDefaultHeaders(clientRes);
386398

@@ -735,8 +747,24 @@ export class PassThroughHandler extends PassThroughHandlerDefinition {
735747
let serverStatusCode = serverRes.statusCode!;
736748
let serverStatusMessage = serverRes.statusMessage
737749
let serverRawHeaders = pairFlatRawHeaders(serverRes.rawHeaders);
750+
751+
// This is only set if we need to read the body here, for a callback or similar. If so,
752+
// we keep the buffer in case we need it afterwards (if the cb doesn't replace it).
753+
let originalBody: Buffer | undefined;
754+
755+
// This is set when we override the body data. Note that this doesn't mean we actually
756+
// read & buffered the original data! With a fixed replacement body we can skip that.
738757
let resBodyOverride: Uint8Array | undefined;
739758

759+
if (options.emitEventCallback) {
760+
options.emitEventCallback('passthrough-response-head', {
761+
statusCode: serverStatusCode,
762+
statusMessage: serverStatusMessage,
763+
httpVersion: serverRes.httpVersion,
764+
rawHeaders: serverRawHeaders
765+
});
766+
}
767+
740768
if (isH2Downstream) {
741769
serverRawHeaders = h1HeadersToH2(serverRawHeaders);
742770
}
@@ -775,8 +803,8 @@ export class PassThroughHandler extends PassThroughHandlerDefinition {
775803
} else if (replaceBodyFromFile) {
776804
resBodyOverride = await fs.readFile(replaceBodyFromFile);
777805
} else if (updateJsonBody) {
778-
const rawBody = await streamToBuffer(serverRes);
779-
const realBody = buildBodyReader(rawBody, serverRes.headers);
806+
originalBody = await streamToBuffer(serverRes);
807+
const realBody = buildBodyReader(originalBody, serverRes.headers);
780808

781809
if (await realBody.getJson() === undefined) {
782810
throw new Error("Can't transform non-JSON response body");
@@ -795,26 +823,27 @@ export class PassThroughHandler extends PassThroughHandlerDefinition {
795823

796824
resBodyOverride = asBuffer(JSON.stringify(updatedBody));
797825
} else if (matchReplaceBody) {
798-
const rawBody = await streamToBuffer(serverRes);
799-
const realBody = buildBodyReader(rawBody, serverRes.headers);
826+
originalBody = await streamToBuffer(serverRes);
827+
const realBody = buildBodyReader(originalBody, serverRes.headers);
800828

801-
const originalBody = await realBody.getText();
802-
if (originalBody === undefined) {
829+
const originalBodyText = await realBody.getText();
830+
if (originalBodyText === undefined) {
803831
throw new Error("Can't match & replace non-decodeable response body");
804832
}
805833

806-
let replacedBody = originalBody;
834+
let replacedBody = originalBodyText;
807835
for (let [match, result] of matchReplaceBody) {
808836
replacedBody = replacedBody!.replace(match, result);
809837
}
810838

811-
if (replacedBody !== originalBody) {
839+
if (replacedBody !== originalBodyText) {
812840
resBodyOverride = asBuffer(replacedBody);
813841
}
814842
}
815843

816844
if (resBodyOverride) {
817-
// We always re-encode the body to match the resulting content-encoding header:
845+
// In the above cases, the overriding data is assumed to always be in decoded form,
846+
// so we re-encode the body to match the resulting content-encoding header:
818847
resBodyOverride = await encodeBodyBuffer(
819848
resBodyOverride,
820849
serverHeaders
@@ -833,9 +862,8 @@ export class PassThroughHandler extends PassThroughHandlerDefinition {
833862
serverRawHeaders = objectHeadersToRaw(serverHeaders);
834863
} else if (this.beforeResponse) {
835864
let modifiedRes: CallbackResponseResult | void;
836-
let body: Buffer;
837865

838-
body = await streamToBuffer(serverRes);
866+
originalBody = await streamToBuffer(serverRes);
839867
let serverHeaders = rawHeadersToObject(serverRawHeaders);
840868

841869
modifiedRes = await this.beforeResponse({
@@ -844,19 +872,14 @@ export class PassThroughHandler extends PassThroughHandlerDefinition {
844872
statusMessage: serverRes.statusMessage,
845873
headers: serverHeaders,
846874
rawHeaders: _.cloneDeep(serverRawHeaders),
847-
body: buildBodyReader(body, serverHeaders)
875+
body: buildBodyReader(originalBody, serverHeaders)
848876
});
849877

850878
if (modifiedRes === 'close') {
851-
// Dump the real response data and kill the client socket:
852-
serverRes.resume();
853879
(clientReq as any).socket.end();
854880
throw new AbortError('Connection closed intentionally by rule');
855881
} else if (modifiedRes === 'reset') {
856882
requireSocketResetSupport();
857-
858-
// Dump the real response data and kill the client socket:
859-
serverRes.resume();
860883
resetOrDestroy(clientReq);
861884
throw new AbortError('Connection reset intentionally by rule');
862885
}
@@ -880,11 +903,6 @@ export class PassThroughHandler extends PassThroughHandlerDefinition {
880903
modifiedRes?.headers,
881904
method === 'HEAD' // HEAD responses are allowed mismatched content-length
882905
);
883-
} else {
884-
// If you don't specify a body override, we need to use the real
885-
// body anyway, because as we've read it already streaming it to
886-
// the response won't work
887-
resBodyOverride = body;
888906
}
889907

890908
serverRawHeaders = objectHeadersToRaw(serverHeaders);
@@ -910,14 +928,40 @@ export class PassThroughHandler extends PassThroughHandlerDefinition {
910928
if (resBodyOverride) {
911929
// Return the override data to the client:
912930
clientRes.end(resBodyOverride);
913-
// Dump the real response data:
914-
serverRes.resume();
915931

932+
// Dump the real response data, in case that body wasn't read yet:
933+
serverRes.resume();
934+
resolve();
935+
} else if (originalBody) {
936+
// If the original body was read, and not overridden, then send it
937+
// onward directly:
938+
clientRes.end(originalBody);
916939
resolve();
917940
} else {
941+
// Otherwise the body hasn't been read - stream it live:
918942
serverRes.pipe(clientRes);
919943
serverRes.once('end', resolve);
920944
}
945+
946+
if (options.emitEventCallback) {
947+
if (!!resBodyOverride) {
948+
(originalBody
949+
? Promise.resolve(originalBody)
950+
: streamToBuffer(serverRes)
951+
).then((upstreamBody) => {
952+
options.emitEventCallback!('passthrough-response-body', {
953+
overridden: true,
954+
rawBody: upstreamBody
955+
});
956+
});
957+
} else {
958+
options.emitEventCallback('passthrough-response-body', {
959+
overridden: false
960+
// We don't bother buffering & re-sending the body if
961+
// it's the same as the one being sent to the client.
962+
});
963+
}
964+
}
921965
})().catch(reject));
922966

923967
serverReq.once('socket', (socket: net.Socket) => {
@@ -979,6 +1023,30 @@ export class PassThroughHandler extends PassThroughHandlerDefinition {
9791023

9801024
// For similar reasons, we don't want any buffering on outgoing data at all if possible:
9811025
serverReq.setNoDelay(true);
1026+
1027+
// Fire rule events, to allow in-depth debugging of upstream traffic & modifications,
1028+
// so anybody interested can see _exactly_ what we're sending upstream here:
1029+
if (options.emitEventCallback) {
1030+
options.emitEventCallback('passthrough-request-head', {
1031+
method,
1032+
protocol: protocol!.replace(/:$/, ''),
1033+
hostname,
1034+
port,
1035+
path,
1036+
rawHeaders
1037+
});
1038+
1039+
if (!!reqBodyOverride) {
1040+
options.emitEventCallback('passthrough-request-body', {
1041+
overridden: true,
1042+
rawBody: reqBodyOverride
1043+
});
1044+
} else {
1045+
options.emitEventCallback!('passthrough-request-body', {
1046+
overridden: false
1047+
});
1048+
}
1049+
}
9821050
})().catch(reject)
9831051
).catch((e: ErrorLike) => {
9841052
// All errors anywhere above (thrown or from explicit reject()) should end up here.

src/rules/requests/request-rule.ts

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,10 @@ export interface RequestRule extends Explainable {
1818

1919
// We don't extend the main interfaces for these, because MockRules are not Serializable
2020
matches(request: OngoingRequest): MaybePromise<boolean>;
21-
handle(request: OngoingRequest, response: OngoingResponse, record: boolean): Promise<void>;
21+
handle(request: OngoingRequest, response: OngoingResponse, options: {
22+
record: boolean,
23+
emitEventCallback?: (type: string, event: unknown) => void
24+
}): Promise<void>;
2225
isComplete(): boolean | null;
2326
}
2427

@@ -64,14 +67,19 @@ export class RequestRule implements RequestRule {
6467
return matchers.matchesAll(request, this.matchers);
6568
}
6669

67-
handle(req: OngoingRequest, res: OngoingResponse, record: boolean): Promise<void> {
70+
handle(req: OngoingRequest, res: OngoingResponse, options: {
71+
record?: boolean,
72+
emitEventCallback?: (type: string, event: unknown) => void
73+
}): Promise<void> {
6874
let handlerPromise = (async () => { // Catch (a)sync errors
69-
return this.handler.handle(req, res);
75+
return this.handler.handle(req, res, {
76+
emitEventCallback: options.emitEventCallback
77+
});
7078
})();
7179

7280
// Requests are added to rule.requests as soon as they start being handled,
7381
// as promises, which resolve only when the response & request body is complete.
74-
if (record) {
82+
if (options.record) {
7583
this.requests.push(
7684
Promise.race([
7785
// When the handler resolves, the request is completed:

0 commit comments

Comments
 (0)