Skip to content

Commit 45d33ad

Browse files
authored
Implement dogstatsd, add timestamp support, fix flushing (#648)
* create dogstatsd class and constructor * create `report` method to build dogstatsd payload * implement `send` UDP datagram * replace `hot-shots` with custom `dogstatsd` implementation * implement flushing strategy * remove `hot-shots` * lint * fix metric timestamp * set max flush timeout to prevent infinite blocking * fix metric log using fractional timestamps * fix unit tests; test that we use dogstatsd with timestamp+extension * update license * add debug log on error * unit tests * fix debug logs * log when socket times out * nits * round in dogstatsd; unit test * bind to local port before setting min send buffer size; add debug * lint * fix * remove `setSendBufferSize`, since it sets MAX send size, not MIN send size... no way to set min send size in Node
1 parent 191c5c7 commit 45d33ad

File tree

10 files changed

+214
-91
lines changed

10 files changed

+214
-91
lines changed

LICENSE-3rdparty.csv

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@ ts-jest,dev,MIT,Copyright (c) 2016-2018 Kulshekhar Kabra
1515
tslint,dev,Apache-2.0,"Copyright 2013-2019 Palantir Technologies, Inc."
1616
typescript,dev,Apache-2.0,Copyright (c) Microsoft Corporation.
1717
dc-polyfill,import,MIT,"Copyright (c) 2023 Datadog, Inc."
18-
hot-shots,import,MIT,Copyright 2011 Steve Ivy. All rights reserved.
1918
promise-retry,import,MIT,Copyright (c) 2014 IndigoUnited
2019
serialize-error,import,MIT,Copyright (c) Sindre Sorhus <[email protected]> (https://sindresorhus.com)
2120
shimmer,import,BSD-2-Clause,"Copyright (c) 2013-2019, Forrest L Norvell"

package.json

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,6 @@
4141
"dependencies": {
4242
"@aws-crypto/sha256-js": "5.2.0",
4343
"dc-polyfill": "^0.1.3",
44-
"hot-shots": "8.5.0",
4544
"promise-retry": "^2.0.1",
4645
"serialize-error": "^8.1.0",
4746
"shimmer": "1.2.1"

src/index.spec.ts

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -509,7 +509,6 @@ describe("datadog", () => {
509509

510510
expect(mockedIncrementInvocations).toBeCalledTimes(1);
511511
expect(mockedIncrementInvocations).toBeCalledWith(expect.anything(), mockContext);
512-
expect(logger.debug).toHaveBeenCalledTimes(8);
513512
expect(logger.debug).toHaveBeenLastCalledWith('{"status":"debug","message":"datadog:Unpatching HTTP libraries"}');
514513
});
515514

src/metrics/dogstatsd.spec.ts

Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
import * as dgram from "node:dgram";
2+
import { LambdaDogStatsD } from "./dogstatsd";
3+
4+
jest.mock("node:dgram", () => ({
5+
createSocket: jest.fn(),
6+
}));
7+
8+
describe("LambdaDogStatsD", () => {
9+
let mockSend: jest.Mock;
10+
11+
beforeEach(() => {
12+
// A send() that immediately calls its callback
13+
mockSend = jest.fn((msg, port, host, cb) => cb());
14+
(dgram.createSocket as jest.Mock).mockReturnValue({
15+
send: mockSend,
16+
getSendBufferSize: jest.fn().mockReturnValue(64 * 1024),
17+
setSendBufferSize: jest.fn(),
18+
bind: jest.fn(),
19+
});
20+
});
21+
22+
afterEach(() => {
23+
jest.clearAllMocks();
24+
});
25+
26+
it("sends a distribution metric without tags or timestamp", async () => {
27+
const client = new LambdaDogStatsD();
28+
client.distribution("metric", 1);
29+
await client.flush();
30+
31+
expect(mockSend).toHaveBeenCalledWith(Buffer.from("metric:1|d", "utf8"), 8125, "127.0.0.1", expect.any(Function));
32+
});
33+
34+
it("sends with tags (sanitized) and timestamp", async () => {
35+
const client = new LambdaDogStatsD();
36+
client.distribution("metric2", 2, 12345, ["tag1", "bad?tag"]);
37+
await client.flush();
38+
39+
// "bad?tag" becomes "bad_tag"
40+
expect(mockSend).toHaveBeenCalledWith(
41+
Buffer.from("metric2:2|d|#tag1,bad_tag|T12345", "utf8"),
42+
8125,
43+
"127.0.0.1",
44+
expect.any(Function),
45+
);
46+
});
47+
48+
it("rounds timestamp", async () => {
49+
const client = new LambdaDogStatsD();
50+
client.distribution("metric2", 2, 12345.678);
51+
await client.flush();
52+
53+
expect(mockSend).toHaveBeenCalledWith(
54+
Buffer.from("metric2:2|d|T12345", "utf8"),
55+
8125,
56+
"127.0.0.1",
57+
expect.any(Function),
58+
);
59+
});
60+
61+
it("flush() resolves immediately when there are no sends", async () => {
62+
const client = new LambdaDogStatsD();
63+
await expect(client.flush()).resolves.toBeUndefined();
64+
});
65+
66+
it("flush() times out if a send never invokes its callback", async () => {
67+
// replace socket.send with a never‐calling callback
68+
(dgram.createSocket as jest.Mock).mockReturnValue({
69+
send: jest.fn(), // never calls callback
70+
getSendBufferSize: jest.fn(),
71+
setSendBufferSize: jest.fn(),
72+
bind: jest.fn(),
73+
});
74+
75+
const client = new LambdaDogStatsD();
76+
client.distribution("will", 9);
77+
78+
jest.useFakeTimers();
79+
const p = client.flush();
80+
// advance past the 1000ms MAX_FLUSH_TIMEOUT
81+
jest.advanceTimersByTime(1100);
82+
83+
// expect the Promise returned by flush() to resolve successfully
84+
await expect(p).resolves.toBeUndefined();
85+
jest.useRealTimers();
86+
});
87+
});

src/metrics/dogstatsd.ts

Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
import * as dgram from "node:dgram";
2+
import { SocketType } from "node:dgram";
3+
import { logDebug } from "../utils";
4+
5+
export class LambdaDogStatsD {
6+
private static readonly HOST = "127.0.0.1";
7+
private static readonly PORT = 8125;
8+
private static readonly MIN_SEND_BUFFER_SIZE = 32 * 1024;
9+
private static readonly ENCODING: BufferEncoding = "utf8";
10+
private static readonly SOCKET_TYPE: SocketType = "udp4";
11+
private static readonly TAG_RE = /[^\w\d_\-:\/\.]/gu;
12+
private static readonly TAG_SUB = "_";
13+
// The maximum amount to wait while flushing pending sends, so we don't block forever.
14+
private static readonly MAX_FLUSH_TIMEOUT = 1000;
15+
16+
private readonly socket: dgram.Socket;
17+
private readonly pendingSends = new Set<Promise<void>>();
18+
19+
constructor() {
20+
this.socket = dgram.createSocket(LambdaDogStatsD.SOCKET_TYPE);
21+
}
22+
23+
/**
24+
* Send a distribution value, optionally setting tags and timestamp.
25+
* Timestamp is seconds since epoch.
26+
*/
27+
public distribution(metric: string, value: number, timestamp?: number, tags?: string[]): void {
28+
this.report(metric, "d", value, tags, timestamp);
29+
}
30+
31+
private normalizeTags(tags: string[]): string[] {
32+
return tags.map((t) => t.replace(LambdaDogStatsD.TAG_RE, LambdaDogStatsD.TAG_SUB));
33+
}
34+
35+
private report(metric: string, metricType: string, value: number | null, tags?: string[], timestamp?: number): void {
36+
if (value == null) {
37+
return;
38+
}
39+
40+
if (timestamp) {
41+
timestamp = Math.floor(timestamp);
42+
}
43+
44+
const serializedTags = tags && tags.length ? `|#${this.normalizeTags(tags).join(",")}` : "";
45+
const timestampPart = timestamp != null ? `|T${timestamp}` : "";
46+
const payload = `${metric}:${value}|${metricType}${serializedTags}${timestampPart}`;
47+
this.send(payload);
48+
}
49+
50+
private send(packet: string) {
51+
const msg = Buffer.from(packet, LambdaDogStatsD.ENCODING);
52+
const promise = new Promise<void>((resolve) => {
53+
this.socket.send(msg, LambdaDogStatsD.PORT, LambdaDogStatsD.HOST, (err) => {
54+
if (err) {
55+
logDebug(`Unable to send metric packet: ${err.message}`);
56+
}
57+
58+
resolve();
59+
});
60+
});
61+
62+
this.pendingSends.add(promise);
63+
void promise.finally(() => this.pendingSends.delete(promise));
64+
}
65+
66+
/** Block until all in-flight sends have settled */
67+
public async flush(): Promise<void> {
68+
const allSettled = Promise.allSettled(this.pendingSends);
69+
const maxTimeout = new Promise<"timeout">((resolve) => {
70+
setTimeout(() => resolve("timeout"), LambdaDogStatsD.MAX_FLUSH_TIMEOUT);
71+
});
72+
73+
const winner = await Promise.race([allSettled, maxTimeout]);
74+
if (winner === "timeout") {
75+
logDebug("Timed out before sending all metric payloads");
76+
}
77+
78+
this.pendingSends.clear();
79+
}
80+
}

src/metrics/listener.spec.ts

Lines changed: 23 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,10 @@ import { LogLevel, setLogLevel } from "../utils";
55
import { EXTENSION_URL } from "./extension";
66

77
import { MetricsListener } from "./listener";
8-
import StatsDClient from "hot-shots";
8+
import { LambdaDogStatsD } from "./dogstatsd";
99
import { Context } from "aws-lambda";
10-
jest.mock("hot-shots");
10+
11+
jest.mock("./dogstatsd");
1112

1213
jest.mock("@aws-sdk/client-secrets-manager", () => {
1314
return {
@@ -17,6 +18,9 @@ jest.mock("@aws-sdk/client-secrets-manager", () => {
1718
};
1819
});
1920

21+
const MOCK_TIME_SECONDS = 1487076708;
22+
const MOCK_TIME_MS = 1487076708000;
23+
2024
const siteURL = "example.com";
2125

2226
class MockKMS {
@@ -56,6 +60,7 @@ describe("MetricsListener", () => {
5660

5761
expect(nock.isDone()).toBeTruthy();
5862
});
63+
5964
it("uses encrypted kms key if it's the only value available", async () => {
6065
nock("https://api.example.com").post("/api/v1/distribution_points?api_key=kms-api-key-decrypted").reply(200, {});
6166

@@ -184,7 +189,7 @@ describe("MetricsListener", () => {
184189

185190
it("logs metrics when logForwarding is enabled", async () => {
186191
const spy = jest.spyOn(process.stdout, "write");
187-
jest.spyOn(Date, "now").mockImplementation(() => 1487076708000);
192+
jest.spyOn(Date.prototype, "getTime").mockReturnValue(MOCK_TIME_MS);
188193
const kms = new MockKMS("kms-api-key-decrypted");
189194
const listener = new MetricsListener(kms as any, {
190195
apiKey: "api-key",
@@ -202,22 +207,23 @@ describe("MetricsListener", () => {
202207
listener.sendDistributionMetric("my-metric", 10, false, "tag:a", "tag:b");
203208
await listener.onCompleteInvocation();
204209

205-
expect(spy).toHaveBeenCalledWith(`{"e":1487076708,"m":"my-metric","t":["tag:a","tag:b"],"v":10}\n`);
210+
expect(spy).toHaveBeenCalledWith(`{"e":${MOCK_TIME_SECONDS},"m":"my-metric","t":["tag:a","tag:b"],"v":10}\n`);
206211
});
212+
207213
it("always sends metrics to statsD when extension is enabled, ignoring logForwarding=true", async () => {
208214
const flushScope = nock(EXTENSION_URL).post("/lambda/flush", JSON.stringify({})).reply(200);
209215
mock({
210216
"/opt/extensions/datadog-agent": Buffer.from([0]),
211217
});
212218
const distributionMock = jest.fn();
213-
(StatsDClient as any).mockImplementation(() => {
219+
(LambdaDogStatsD as any).mockImplementation(() => {
214220
return {
215221
distribution: distributionMock,
216222
close: (callback: any) => callback(undefined),
217223
};
218224
});
219225

220-
jest.spyOn(Date, "now").mockImplementation(() => 1487076708000);
226+
jest.spyOn(Date.prototype, "getTime").mockReturnValue(MOCK_TIME_MS);
221227

222228
const kms = new MockKMS("kms-api-key-decrypted");
223229
const listener = new MetricsListener(kms as any, {
@@ -236,25 +242,26 @@ describe("MetricsListener", () => {
236242
listener.sendDistributionMetric("my-metric", 10, false, "tag:a", "tag:b");
237243
await listener.onCompleteInvocation();
238244
expect(flushScope.isDone()).toBeTruthy();
239-
expect(distributionMock).toHaveBeenCalledWith("my-metric", 10, undefined, ["tag:a", "tag:b"]);
245+
expect(distributionMock).toHaveBeenCalledWith("my-metric", 10, MOCK_TIME_SECONDS, ["tag:a", "tag:b"]);
240246
});
241247

242-
it("only sends metrics with timestamps to the API when the extension is enabled", async () => {
248+
it("sends metrics with timestamps to statsD (not API!) when the extension is enabled", async () => {
249+
jest.spyOn(Date.prototype, "getTime").mockReturnValue(MOCK_TIME_MS);
243250
const flushScope = nock(EXTENSION_URL).post("/lambda/flush", JSON.stringify({})).reply(200);
244251
mock({
245252
"/opt/extensions/datadog-agent": Buffer.from([0]),
246253
});
247254
const apiScope = nock("https://api.example.com").post("/api/v1/distribution_points?api_key=api-key").reply(200, {});
248255

249256
const distributionMock = jest.fn();
250-
(StatsDClient as any).mockImplementation(() => {
257+
(LambdaDogStatsD as any).mockImplementation(() => {
251258
return {
252259
distribution: distributionMock,
253260
close: (callback: any) => callback(undefined),
254261
};
255262
});
256263

257-
const metricTimeOneMinuteAgo = new Date(Date.now() - 60000);
264+
const metricTimeOneMinuteAgo = new Date(MOCK_TIME_MS - 60000);
258265
const kms = new MockKMS("kms-api-key-decrypted");
259266
const listener = new MetricsListener(kms as any, {
260267
apiKey: "api-key",
@@ -280,12 +287,15 @@ describe("MetricsListener", () => {
280287
"tag:a",
281288
"tag:b",
282289
);
283-
listener.sendDistributionMetric("my-metric-without-a-timestamp", 10, false, "tag:a", "tag:b");
290+
listener.sendDistributionMetric("my-metric-with-a-timestamp", 10, false, "tag:a", "tag:b");
284291
await listener.onCompleteInvocation();
285292

286293
expect(flushScope.isDone()).toBeTruthy();
287-
expect(apiScope.isDone()).toBeTruthy();
288-
expect(distributionMock).toHaveBeenCalledWith("my-metric-without-a-timestamp", 10, undefined, ["tag:a", "tag:b"]);
294+
expect(apiScope.isDone()).toBeFalsy();
295+
expect(distributionMock).toHaveBeenCalledWith("my-metric-with-a-timestamp", 10, MOCK_TIME_SECONDS, [
296+
"tag:a",
297+
"tag:b",
298+
]);
289299
});
290300

291301
it("does not send historical metrics from over 4 hours ago to the API", async () => {
@@ -316,7 +326,6 @@ describe("MetricsListener", () => {
316326

317327
it("logs metrics when logForwarding is enabled with custom timestamp", async () => {
318328
const spy = jest.spyOn(process.stdout, "write");
319-
// jest.spyOn(Date, "now").mockImplementation(() => 1487076708000);
320329
const kms = new MockKMS("kms-api-key-decrypted");
321330
const listener = new MetricsListener(kms as any, {
322331
apiKey: "api-key",
@@ -328,7 +337,6 @@ describe("MetricsListener", () => {
328337
localTesting: false,
329338
siteURL,
330339
});
331-
// jest.useFakeTimers();
332340

333341
await listener.onStartInvocation({});
334342
listener.sendDistributionMetricWithDate("my-metric", 10, new Date(1584983836 * 1000), false, "tag:a", "tag:b");

0 commit comments

Comments
 (0)