diff --git a/README.md b/README.md index bd81d78..b0ae1aa 100644 --- a/README.md +++ b/README.md @@ -114,8 +114,8 @@ Where `options` is an object and can contain the following: * `prefix`: Sets a default prefix for all metrics. (optional) * Use this to namespace your metrics. * `flushIntervalSeconds`: How often to send metrics to Datadog. (optional) - * This defaults to 15 seconds. Set it to 0 to disable auto-flushing which - means you must call `flush()` manually. + * This defaults to 15 seconds. Set it to `0` to disable auto-flushing (which + means you must call `flush()` manually). * `site`: Sets the Datadog "site", or server where metrics are sent. (optional) * Defaults to `datadoghq.com`. * See more details on setting your site at: @@ -197,12 +197,12 @@ metrics.init({ `metrics.gauge(key, value[, tags[, timestamp]])` -Record the current *value* of a metric. The most recent value in -a given flush interval will be recorded. Optionally, specify a set of -tags to associate with the metric. This should be used for sum values -such as total hard disk space, process uptime, total number of active -users, or number of rows in a database table. The optional timestamp -is in milliseconds since 1 Jan 1970 00:00:00 UTC, e.g. from `Date.now()`. +Record the current *value* of a metric. The most recent value since the last +flush will be recorded. Optionally, specify a set of tags to associate with the +metric. This should be used for sum values such as total hard disk space, +process uptime, total number of active users, or number of rows in a database +table. The optional timestamp is in milliseconds since 1 Jan 1970 00:00:00 UTC, +e.g. from `Date.now()`. Example: @@ -284,15 +284,34 @@ metrics.distribution('test.service_time', 0.248); ### Flushing -`metrics.flush()` +By default, datadog-metrics will automatically flush, or send accumulated +metrics to Datadog, at regular intervals, and, in environments that support it, +before your program exits. (However, if you call `process.exit()` to cause a +hard exit, datadog-metrics doesn’t get a chance to flush. In this case, you may +want to call `await metrics.stop()` first.) + +You can adjust the interval by using the `flushIntervalSeconds` option. Setting +it to `0` will disable auto-flushing entirely: + +```js +// Set auto-flush interval to 10 seconds. +metrics.init({ flushIntervalSeconds: 10 }); +``` + +You can also send accumulated metrics manually at any time by calling +`metrics.flush()`. + +Please note that, when calling the `BufferedMetricsLogger` constructor directly, +`flushIntervalSeconds` defaults to `0` instead. When constructing your own +logger this way, you must expicitly opt-in to auto-flushing by setting a +positive value. -Calling `flush` sends any buffered metrics to Datadog and returns a promise. -This function will be called automatically unless you set `flushIntervalSeconds` -to `0`. -It can be useful to trigger a manual flush by calling if you want to -make sure pending metrics have been sent before you quit the application -process, for example. +#### `metrics.flush()` + +Sends any buffered metrics to Datadog and returns a promise. By default, +`flush()` will be called for you automatically unless you set +`flushIntervalSeconds` to `0` (see above for more details). ⚠️ This method used to take two callback arguments for handling successes and errors. That form is deprecated and will be removed in a future update: @@ -318,6 +337,18 @@ metrics.flush() .catch((error) => console.log('Flush error:', error)) ; ``` +#### `metrics.stop(options)` + +Stops auto-flushing (if enabled) and flushes any currently buffered metrics. +This is mainly useful if you want to manually clean up and send remaining +metrics before hard-quitting your program (usually by calling `process.exit()`). +Returns a promise for the result of the flush. + +Takes an optional object with properties: +* `flush` (boolean) Whether to flush any remaining metrics after stopping. + Defaults to `true`. + + ## Logging Datadog-metrics uses the [debug](https://github.com/visionmedia/debug) @@ -344,7 +375,9 @@ TBD **New Features:** -TBD +* When auto-flushing is enabled, metrics are now also flushed before the process exits. In previous versions, you needed to do this manually by calling `metrics.flush()` at the every end of your program. + + You will still need to flush manually if you set `flushIntervalSeconds` to `0` or you are quitting your program by calling `process.exit()` [(which interrupts a variety of operations)](https://nodejs.org/docs/latest/api/process.html#processexitcode). **Deprecations:** diff --git a/index.js b/index.js index 1f8291b..e0dd183 100644 --- a/index.js +++ b/index.js @@ -35,6 +35,12 @@ function callOnSharedLogger(func) { // compiler that this satisfies the types. :( return (...args) => { if (sharedLogger === null) { + // Special case: don't make a new logger just to stop it. + // @ts-expect-error TypeScript compiler can't figure this one out. + if (func === BufferedMetricsLogger.prototype.stop) { + return Promise.resolve(undefined); + } + init(); } return func.apply(sharedLogger, args); @@ -44,6 +50,7 @@ function callOnSharedLogger(func) { module.exports = { init, flush: callOnSharedLogger(BufferedMetricsLogger.prototype.flush), + stop: callOnSharedLogger(BufferedMetricsLogger.prototype.stop), gauge: callOnSharedLogger(BufferedMetricsLogger.prototype.gauge), increment: callOnSharedLogger(BufferedMetricsLogger.prototype.increment), histogram: callOnSharedLogger(BufferedMetricsLogger.prototype.histogram), diff --git a/lib/loggers.js b/lib/loggers.js index c6ad360..429cb81 100644 --- a/lib/loggers.js +++ b/lib/loggers.js @@ -7,6 +7,9 @@ const Counter = require('./metrics').Counter; const Histogram = require('./metrics').Histogram; const Distribution = require('./metrics').Distribution; +const supportsProcessExit = typeof process !== 'undefined' + && typeof process.once === 'function'; + /** * @typedef {object} AggregatorType Buffers metrics to send. * @property {( @@ -103,6 +106,9 @@ class BufferedMetricsLogger { opts.site = opts.site || opts.apiHost; } + this.performAutoFlush = this.performAutoFlush.bind(this); + this.handleProcessExit = this.handleProcessExit.bind(this); + /** @private */ this.aggregator = opts.aggregator || new Aggregator(opts.defaultTags); /** @private @type {ReporterType} */ @@ -117,34 +123,27 @@ class BufferedMetricsLogger { /** @private */ this.prefix = opts.prefix || ''; /** @private */ - this.flushIntervalSeconds = opts.flushIntervalSeconds; - /** @private */ this.histogramOptions = opts.histogram; + /** @private */ + this.onError = null; if (typeof opts.onError === 'function') { - /** @private */ this.onError = opts.onError; } else if (opts.onError != null) { throw new TypeError('The `onError` option must be a function'); } - if (this.flushIntervalSeconds) { - logDebug('Auto-flushing every %d seconds', this.flushIntervalSeconds); + /** @private */ + this.flushTimer = null; + /** @private */ + this.flushIntervalSeconds = 0; + if (opts.flushIntervalSeconds < 0) { + throw new TypeError(`flushIntervalSeconds must be >= 0 (got: ${opts.flushIntervalSeconds})`); } else { - logDebug('Auto-flushing is disabled'); + this.flushIntervalSeconds = opts.flushIntervalSeconds; } - const autoFlushCallback = () => { - this.flush(); - if (this.flushIntervalSeconds) { - const interval = this.flushIntervalSeconds * 1000; - const tid = setTimeout(autoFlushCallback, interval); - // Let the event loop exit if this is the only active timer. - if (tid.unref) tid.unref(); - } - }; - - autoFlushCallback(); + this.start(); } /** @@ -326,6 +325,60 @@ class BufferedMetricsLogger { return result; } + + /** + * Start auto-flushing metrics. + */ + start() { + if (this.flushTimer) { + logDebug('Auto-flushing is already enabled'); + } else if (this.flushIntervalSeconds > 0) { + logDebug('Auto-flushing every %d seconds', this.flushIntervalSeconds); + if (supportsProcessExit) { + process.once('beforeExit', this.handleProcessExit); + } + this.performAutoFlush(); + } else { + logDebug('Auto-flushing is disabled'); + } + } + + /** + * Stop auto-flushing metrics. By default, this will also flush any + * currently buffered metrics. You can leave them in the buffer and not + * flush by setting the `flush` option to `false`. + * @param {Object} [options] + * @param {boolean} [options.flush] Whether to flush before returning. + * Defaults to true. + * @returns {Promise} + */ + async stop(options) { + clearTimeout(this.flushTimer); + this.flushTimer = null; + if (supportsProcessExit) { + process.off('beforeExit', this.handleProcessExit); + } + if (!options || options.flush) { + await this.flush(); + } + } + + /** @private */ + performAutoFlush() { + this.flush(); + if (this.flushIntervalSeconds) { + const interval = this.flushIntervalSeconds * 1000; + this.flushTimer = setTimeout(this.performAutoFlush, interval); + // Let the event loop exit if this is the only active timer. + if (this.flushTimer.unref) this.flushTimer.unref(); + } + } + + /** @private */ + async handleProcessExit() { + logDebug('Auto-flushing before process exits...'); + this.flush(); + } } module.exports = { diff --git a/test-other/types_check.ts b/test-other/types_check.ts index ddad7b5..9e36813 100644 --- a/test-other/types_check.ts +++ b/test-other/types_check.ts @@ -4,6 +4,7 @@ import { reporters, init, flush, + stop, gauge, increment, histogram, @@ -17,6 +18,8 @@ function useLogger(logger: BufferedMetricsLogger) { logger.histogram('histogram.key', 11); logger.distribution('distribution.key', 11); logger.flush(); + logger.stop(); + logger.stop({ flush: false }); } useLogger(new BufferedMetricsLogger()); @@ -51,3 +54,5 @@ increment('increment.key'); histogram('histogram.key', 11); distribution('distribution.key', 11); flush(); +stop(); +stop({ flush: false }); diff --git a/test/loggers_tests.js b/test/loggers_tests.js index 2c26c95..e51664c 100644 --- a/test/loggers_tests.js +++ b/test/loggers_tests.js @@ -8,6 +8,22 @@ chai.should(); const { BufferedMetricsLogger } = require('../lib/loggers'); const { NullReporter } = require('../lib/reporters'); +class MockReporter { + constructor() { + this.calls = []; + this.error = null; + } + + async report(metrics) { + this.calls.push(metrics); + if (!metrics || metrics.length === 0) { + throw new Error('No metrics were sent to the reporter!'); + } else if (this.error) { + throw this.error; + } + } +} + describe('BufferedMetricsLogger', function() { let warnLogs = []; let errorLogs = []; @@ -229,18 +245,18 @@ describe('BufferedMetricsLogger', function() { describe('on error', function () { beforeEach(() => { - reporter.expectError = new Error('test error'); + reporter.error = new Error('test error'); }); it('should reject the promise with the reporter error', async () => { - await logger.flush().should.be.rejectedWith(reporter.expectError); + await logger.flush().should.be.rejectedWith(reporter.error); }); it('should call the flush error handler with the reporter error', (done) => { logger.flush( () => done(new Error('The success handler was called!')), (error) => { - if (error === reporter.expectError) { + if (error === reporter.error) { done(); } else { done(new Error('Error was not the reporter error')); @@ -265,7 +281,7 @@ describe('BufferedMetricsLogger', function() { await logger.flush().should.be.rejected; onErrorCalled.should.equal(true); - onErrorValue.should.equal(reporter.expectError); + onErrorValue.should.equal(reporter.error); }); it('should log if `onError` init option is not set', async () => { @@ -278,16 +294,7 @@ describe('BufferedMetricsLogger', function() { describe('with a promise-based reporter', function() { beforeEach(() => { - reporter = { - expectError: null, - async report(metrics) { - if (!metrics || metrics.length === 0) { - throw new Error('No metrics were sent to the reporter!'); - } else if (this.expectError) { - throw this.expectError; - } - } - }; + reporter = new MockReporter(); }); standardFlushTests(); @@ -295,23 +302,107 @@ describe('BufferedMetricsLogger', function() { describe('[deprecated] with a callback-based reporter', function() { beforeEach(() => { - reporter = { - expectError: null, - report(metrics, onSuccess, onError) { - setTimeout(() => { - if (!metrics || metrics.length === 0) { - throw new Error('No metrics were sent to the reporter!'); - } else if (this.expectError) { - onError(this.expectError); - } else { - onSuccess(); - } - }, 0); - } + reporter = new MockReporter(); + reporter.report = function(metrics, onSuccess, onError) { + return this.__proto__.report.call(this, metrics) + .then(onSuccess, onError); }; }); standardFlushTests(); }); }); + + describe('stop()', function () { + beforeEach(function () { + this.reporter = new MockReporter(); + this.logger = new BufferedMetricsLogger({ + flushIntervalSeconds: 0.1, + reporter: this.reporter + }); + this.logger.gauge('test.gauge', 23); + }); + + it('flushes by default', async function () { + this.reporter.calls.should.have.lengthOf(0); + await this.logger.stop(); + this.reporter.calls.should.have.lengthOf(1); + }); + + it('does not flush if `flush: false`', async function () { + this.reporter.calls.should.have.lengthOf(0); + await this.logger.stop({ flush: false }); + this.reporter.calls.should.have.lengthOf(0); + }); + + it('stops auto-flushing', async function () { + await this.logger.stop({ flush: false }); + this.reporter.calls.should.have.lengthOf(0); + + await new Promise(r => setTimeout(r, 125)); + this.reporter.calls.should.have.lengthOf(0); + }); + + it('stops auto-flushing on exit', async function () { + await this.logger.stop({ flush: false }); + this.reporter.calls.should.have.lengthOf(0); + + process.emit('beforeExit', 0); + this.reporter.calls.should.have.lengthOf(0); + }); + }); + + describe('option: flushIntervalSeconds', function () { + beforeEach(function () { + this.reporter = new MockReporter(); + }); + + it('flushes after the specified number of seconds', async function () { + this.logger = new BufferedMetricsLogger({ + flushIntervalSeconds: 0.1, + reporter: this.reporter + }); + this.logger.gauge('test.gauge', 23); + + this.reporter.calls.should.have.lengthOf(0); + await new Promise(r => setTimeout(r, 125)); + this.reporter.calls.should.have.lengthOf(1); + }); + + it('flushes before exiting if auto-flushing', async function () { + this.logger = new BufferedMetricsLogger({ + flushIntervalSeconds: 0.1, + reporter: this.reporter + }); + this.logger.gauge('test.gauge', 23); + + this.reporter.calls.should.have.lengthOf(0); + process.emit('beforeExit', 0); + this.reporter.calls.should.have.lengthOf(1); + }); + + it('does not auto-flush if set to 0', async function () { + this.logger = new BufferedMetricsLogger({ + flushIntervalSeconds: 0, + reporter: this.reporter + }); + this.logger.gauge('test.gauge', 23); + this.reporter.calls.should.have.lengthOf(0); + + await new Promise(r => setTimeout(r, 125)); + this.reporter.calls.should.have.lengthOf(0); + + process.emit('beforeExit', 0); + this.reporter.calls.should.have.lengthOf(0); + }); + + it('throws if set to a negative value', async function () { + (() => { + this.logger = new BufferedMetricsLogger({ + flushIntervalSeconds: -1, + reporter: this.reporter + }); + }).should.throw(/flushIntervalSeconds/); + }); + }); }); diff --git a/test/module_tests.js b/test/module_tests.js index 720aa48..66c4a79 100644 --- a/test/module_tests.js +++ b/test/module_tests.js @@ -1,10 +1,12 @@ 'use strict'; const chai = require('chai'); +const reporters = require('../lib/reporters.js'); + chai.should(); +/** @type {import("..") DotadogMetrics} */ let metrics = null; -const reporters = require('../lib/reporters.js'); // Force-reload the module before every test so we // can realistically test all the scenarios. @@ -13,6 +15,10 @@ beforeEach(function() { metrics = require('../index.js'); }); +afterEach(async function() { + await metrics.stop({ flush: false }); +}); + describe('datadog-metrics', function() { it('should let me create a metrics logger instance', function() { metrics.BufferedMetricsLogger.should.be.a('function'); diff --git a/test/reporters_tests.js b/test/reporters_tests.js index 53084e3..dad02d3 100644 --- a/test/reporters_tests.js +++ b/test/reporters_tests.js @@ -3,10 +3,12 @@ const chai = require('chai'); const nock = require('nock'); -chai.should(); const { DatadogReporter, NullReporter } = require('../lib/reporters'); const { AuthorizationError } = require('../lib/errors'); +chai.use(require('chai-as-promised')); +chai.should(); + const mockMetric = { metric: 'test.gauge', points: [[Math.floor(Date.now() / 1000), 1]],