diff --git a/packages/activity/src/index.ts b/packages/activity/src/index.ts index 3a82ba5e8..f0ba9da60 100644 --- a/packages/activity/src/index.ts +++ b/packages/activity/src/index.ts @@ -70,7 +70,7 @@ */ import { AsyncLocalStorage } from 'node:async_hooks'; -import { Logger, Duration, LogLevel, LogMetadata, Priority } from '@temporalio/common'; +import { Logger, Duration, LogLevel, LogMetadata, MetricMeter, Priority } from '@temporalio/common'; import { msToNumber } from '@temporalio/common/lib/time'; import { SymbolBasedInstanceOfError } from '@temporalio/common/lib/type-helpers'; @@ -281,6 +281,14 @@ export class Context { */ public log: Logger; + /** + * Get the metric meter for this activity with activity-specific tags. + * + * To add custom tags, register a {@link ActivityOutboundCallsInterceptor} that + * intercepts the `getMetricTags()` method. + */ + public readonly metricMeter: MetricMeter; + /** * **Not** meant to instantiated by Activity code, used by the worker. * @@ -291,13 +299,15 @@ export class Context { cancelled: Promise, cancellationSignal: AbortSignal, heartbeat: (details?: any) => void, - log: Logger + log: Logger, + metricMeter: MetricMeter ) { this.info = info; this.cancelled = cancelled; this.cancellationSignal = cancellationSignal; this.heartbeatFn = heartbeat; this.log = log; + this.metricMeter = metricMeter; } /** @@ -434,3 +444,26 @@ export function cancelled(): Promise { export function cancellationSignal(): AbortSignal { return Context.current().cancellationSignal; } + +/** + * Get the metric meter for the current activity, with activity-specific tags. + * + * To add custom tags, register a {@link ActivityOutboundCallsInterceptor} that + * intercepts the `getMetricTags()` method. + * + * This is a shortcut for `Context.current().metricMeter` (see {@link Context.metricMeter}). + */ +export const metricMeter: MetricMeter = { + createCounter(name, unit, description) { + return Context.current().metricMeter.createCounter(name, unit, description); + }, + createHistogram(name, valueType = 'int', unit, description) { + return Context.current().metricMeter.createHistogram(name, valueType, unit, description); + }, + createGauge(name, valueType = 'int', unit, description) { + return Context.current().metricMeter.createGauge(name, valueType, unit, description); + }, + withTags(tags) { + return Context.current().metricMeter.withTags(tags); + }, +}; diff --git a/packages/client/src/async-completion-client.ts b/packages/client/src/async-completion-client.ts index 58ca6234c..fd41fd134 100644 --- a/packages/client/src/async-completion-client.ts +++ b/packages/client/src/async-completion-client.ts @@ -1,10 +1,7 @@ import { status as grpcStatus } from '@grpc/grpc-js'; import { ensureTemporalFailure } from '@temporalio/common'; -import { - encodeErrorToFailure, - encodeToPayloads, - filterNullAndUndefined, -} from '@temporalio/common/lib/internal-non-workflow'; +import { encodeErrorToFailure, encodeToPayloads } from '@temporalio/common/lib/internal-non-workflow'; +import { filterNullAndUndefined } from '@temporalio/common/lib/internal-workflow'; import { SymbolBasedInstanceOfError } from '@temporalio/common/lib/type-helpers'; import { BaseClient, diff --git a/packages/client/src/client.ts b/packages/client/src/client.ts index 12e4b992d..8de7daa68 100644 --- a/packages/client/src/client.ts +++ b/packages/client/src/client.ts @@ -1,4 +1,4 @@ -import { filterNullAndUndefined } from '@temporalio/common/lib/internal-non-workflow'; +import { filterNullAndUndefined } from '@temporalio/common/lib/internal-workflow'; import { AsyncCompletionClient } from './async-completion-client'; import { BaseClient, BaseClientOptions, defaultBaseClientOptions, LoadedWithDefaults } from './base-client'; import { ClientInterceptors } from './interceptors'; diff --git a/packages/client/src/connection.ts b/packages/client/src/connection.ts index 6d089cec7..b6a623e3c 100644 --- a/packages/client/src/connection.ts +++ b/packages/client/src/connection.ts @@ -2,11 +2,11 @@ import { AsyncLocalStorage } from 'node:async_hooks'; import * as grpc from '@grpc/grpc-js'; import type * as proto from 'protobufjs'; import { - filterNullAndUndefined, normalizeTlsConfig, TLSConfig, normalizeGrpcEndpointAddress, } from '@temporalio/common/lib/internal-non-workflow'; +import { filterNullAndUndefined } from '@temporalio/common/lib/internal-workflow'; import { Duration, msOptionalToNumber } from '@temporalio/common/lib/time'; import { type temporal } from '@temporalio/proto'; import { isGrpcServiceError, ServiceError } from './errors'; diff --git a/packages/client/src/schedule-client.ts b/packages/client/src/schedule-client.ts index d0c0180e2..6f9838151 100644 --- a/packages/client/src/schedule-client.ts +++ b/packages/client/src/schedule-client.ts @@ -7,11 +7,8 @@ import { encodeUnifiedSearchAttributes, } from '@temporalio/common/lib/converter/payload-search-attributes'; import { composeInterceptors, Headers } from '@temporalio/common/lib/interceptors'; -import { - encodeMapToPayloads, - decodeMapFromPayloads, - filterNullAndUndefined, -} from '@temporalio/common/lib/internal-non-workflow'; +import { encodeMapToPayloads, decodeMapFromPayloads } from '@temporalio/common/lib/internal-non-workflow'; +import { filterNullAndUndefined } from '@temporalio/common/lib/internal-workflow'; import { temporal } from '@temporalio/proto'; import { optionalDateToTs, diff --git a/packages/client/src/task-queue-client.ts b/packages/client/src/task-queue-client.ts index d551cac63..4c0c5d589 100644 --- a/packages/client/src/task-queue-client.ts +++ b/packages/client/src/task-queue-client.ts @@ -1,7 +1,6 @@ import { status } from '@grpc/grpc-js'; -import { filterNullAndUndefined } from '@temporalio/common/lib/internal-non-workflow'; import { assertNever, SymbolBasedInstanceOfError, RequireAtLeastOne } from '@temporalio/common/lib/type-helpers'; -import { makeProtoEnumConverters } from '@temporalio/common/lib/internal-workflow'; +import { filterNullAndUndefined, makeProtoEnumConverters } from '@temporalio/common/lib/internal-workflow'; import { temporal } from '@temporalio/proto'; import { BaseClient, BaseClientOptions, defaultBaseClientOptions, LoadedWithDefaults } from './base-client'; import { WorkflowService } from './types'; diff --git a/packages/client/src/workflow-client.ts b/packages/client/src/workflow-client.ts index f1bc4b194..42899800d 100644 --- a/packages/client/src/workflow-client.ts +++ b/packages/client/src/workflow-client.ts @@ -34,8 +34,8 @@ import { decodeOptionalFailureToOptionalError, encodeMapToPayloads, encodeToPayloads, - filterNullAndUndefined, } from '@temporalio/common/lib/internal-non-workflow'; +import { filterNullAndUndefined } from '@temporalio/common/lib/internal-workflow'; import { temporal } from '@temporalio/proto'; import { ServiceError, diff --git a/packages/cloud/src/cloud-operations-client.ts b/packages/cloud/src/cloud-operations-client.ts index ee962913d..86c41d48d 100644 --- a/packages/cloud/src/cloud-operations-client.ts +++ b/packages/cloud/src/cloud-operations-client.ts @@ -2,11 +2,11 @@ import { AsyncLocalStorage } from 'node:async_hooks'; import * as grpc from '@grpc/grpc-js'; import type { RPCImpl } from 'protobufjs'; import { - filterNullAndUndefined, normalizeTlsConfig, TLSConfig, normalizeGrpcEndpointAddress, } from '@temporalio/common/lib/internal-non-workflow'; +import { filterNullAndUndefined } from '@temporalio/common/lib/internal-workflow'; import { Duration, msOptionalToNumber } from '@temporalio/common/lib/time'; import { CallContext, diff --git a/packages/common/src/index.ts b/packages/common/src/index.ts index 7aecd6dc7..ba703f3f0 100644 --- a/packages/common/src/index.ts +++ b/packages/common/src/index.ts @@ -20,6 +20,7 @@ export { Headers, Next } from './interceptors'; export * from './interfaces'; export * from './logger'; export * from './priority'; +export * from './metrics'; export * from './retry-policy'; export type { Timestamp, Duration, StringValue } from './time'; export * from './worker-deployments'; diff --git a/packages/common/src/internal-non-workflow/index.ts b/packages/common/src/internal-non-workflow/index.ts index 13c993d92..11b2feadb 100644 --- a/packages/common/src/internal-non-workflow/index.ts +++ b/packages/common/src/internal-non-workflow/index.ts @@ -9,4 +9,3 @@ export * from './data-converter-helpers'; export * from './parse-host-uri'; export * from './proxy-config'; export * from './tls-config'; -export * from './utils'; diff --git a/packages/common/src/internal-non-workflow/utils.ts b/packages/common/src/internal-non-workflow/utils.ts deleted file mode 100644 index 504d8526c..000000000 --- a/packages/common/src/internal-non-workflow/utils.ts +++ /dev/null @@ -1,6 +0,0 @@ -/** - * Helper to prevent undefined and null values overriding defaults when merging maps - */ -export function filterNullAndUndefined>(obj: T): T { - return Object.fromEntries(Object.entries(obj).filter(([_k, v]) => v != null)) as any; -} diff --git a/packages/common/src/internal-workflow/index.ts b/packages/common/src/internal-workflow/index.ts index e0a2dc34a..c0198c6c1 100644 --- a/packages/common/src/internal-workflow/index.ts +++ b/packages/common/src/internal-workflow/index.ts @@ -1 +1,2 @@ export * from './enums-helpers'; +export * from './objects-helpers'; diff --git a/packages/common/src/internal-workflow/objects-helpers.ts b/packages/common/src/internal-workflow/objects-helpers.ts new file mode 100644 index 000000000..addf156d0 --- /dev/null +++ b/packages/common/src/internal-workflow/objects-helpers.ts @@ -0,0 +1,37 @@ +/** + * Helper to prevent `undefined` and `null` values overriding defaults when merging maps. + */ +export function filterNullAndUndefined>(obj: T): T { + return Object.fromEntries(Object.entries(obj).filter(([_k, v]) => v != null)) as any; +} + +/** + * Merge two objects, possibly removing keys. + * + * More specifically: + * - Any key/value pair in `delta` overrides the corresponding key/value pair in `original`; + * - A key present in `delta` with value `undefined` removes the key from the resulting object; + * - If `original` is `undefined` or empty, return `delta`; + * - If `delta` is `undefined` or empty, return `original` (or undefined if `original` is also undefined); + * - If there are no changes, then return `original`. + */ +export function mergeObjects>(original: T, delta: T | undefined): T; +export function mergeObjects>( + original: T | undefined, + delta: T | undefined +): T | undefined { + if (original == null) return delta; + if (delta == null) return original; + + const merged: Record = { ...original }; + let changed = false; + for (const [k, v] of Object.entries(delta)) { + if (v !== merged[k]) { + if (v == null) delete merged[k]; + else merged[k] = v; + changed = true; + } + } + + return changed ? (merged as T) : original; +} diff --git a/packages/common/src/logger.ts b/packages/common/src/logger.ts index 93a2f6c19..f1e1ff2bd 100644 --- a/packages/common/src/logger.ts +++ b/packages/common/src/logger.ts @@ -1,3 +1,5 @@ +import { filterNullAndUndefined, mergeObjects } from './internal-workflow'; + export type LogLevel = 'TRACE' | 'DEBUG' | 'INFO' | 'WARN' | 'ERROR'; export type LogMetadata = Record; @@ -53,3 +55,109 @@ export enum SdkComponent { */ core = 'core', } + +//////////////////////////////////////////////////////////////////////////////////////////////////// + +/** + * @internal + * @hidden + */ +export type LogMetaOrFunc = LogMetadata | (() => LogMetadata); + +/** + * A logger implementation that adds metadata before delegating calls to a parent logger. + * + * @internal + * @hidden + */ +export class LoggerWithComposedMetadata implements Logger { + /** + * Return a {@link Logger} that adds metadata before delegating calls to a parent logger. + * + * New metadata may either be specified statically as a delta object, or as a function evaluated + * every time a log is emitted that will return a delta object. + * + * Some optimizations are performed to avoid creating unnecessary objects and to keep runtime + * overhead associated with resolving metadata as low as possible. + */ + public static compose(logger: Logger, metaOrFunc: LogMetaOrFunc): Logger { + // Flatten recursive LoggerWithComposedMetadata instances + if (logger instanceof LoggerWithComposedMetadata) { + const contributors = appendToChain(logger.contributors, metaOrFunc); + // If the new contributor results in no actual change to the chain, then we don't need a new logger + if (contributors === undefined) return logger; + return new LoggerWithComposedMetadata(logger.parentLogger, contributors); + } else { + const contributors = appendToChain(undefined, metaOrFunc); + if (contributors === undefined) return logger; + return new LoggerWithComposedMetadata(logger, contributors); + } + } + + constructor( + private readonly parentLogger: Logger, + private readonly contributors: LogMetaOrFunc[] + ) {} + + log(level: LogLevel, message: string, extraMeta?: LogMetadata): void { + this.parentLogger.log(level, message, resolveMetadata(this.contributors, extraMeta)); + } + + trace(message: string, extraMeta?: LogMetadata): void { + this.parentLogger.trace(message, resolveMetadata(this.contributors, extraMeta)); + } + + debug(message: string, extraMeta?: LogMetadata): void { + this.parentLogger.debug(message, resolveMetadata(this.contributors, extraMeta)); + } + + info(message: string, extraMeta?: LogMetadata): void { + this.parentLogger.info(message, resolveMetadata(this.contributors, extraMeta)); + } + + warn(message: string, extraMeta?: LogMetadata): void { + this.parentLogger.warn(message, resolveMetadata(this.contributors, extraMeta)); + } + + error(message: string, extraMeta?: LogMetadata): void { + this.parentLogger.error(message, resolveMetadata(this.contributors, extraMeta)); + } +} + +function resolveMetadata(contributors: LogMetaOrFunc[], extraMeta?: LogMetadata): LogMetadata { + const resolved = {}; + for (const contributor of contributors) { + Object.assign(resolved, typeof contributor === 'function' ? contributor() : contributor); + } + Object.assign(resolved, extraMeta); + return filterNullAndUndefined(resolved); +} + +/** + * Append a metadata contributor to the chain, merging it with the former last contributor if both are plain objects + */ +function appendToChain( + existingContributors: LogMetaOrFunc[] | undefined, + newContributor: LogMetaOrFunc +): LogMetaOrFunc[] | undefined { + // If the new contributor is an empty object, then it results in no actual change to the chain + if (typeof newContributor === 'object' && Object.keys(newContributor).length === 0) { + return existingContributors; + } + + // If existing chain is empty, then the new contributor is the chain + if (existingContributors == null || existingContributors.length === 0) { + return [newContributor]; + } + + // If both last contributor and new contributor are plain objects, merge them to a single object. + const last = existingContributors[existingContributors.length - 1]; + if (typeof last === 'object' && typeof newContributor === 'object') { + const merged = mergeObjects(last, newContributor); + if (merged === last) return existingContributors; + return [...existingContributors.slice(0, -1), merged]; + } + + // Otherwise, just append the new contributor to the chain. + return [...existingContributors, newContributor]; +} diff --git a/packages/common/src/metrics.ts b/packages/common/src/metrics.ts new file mode 100644 index 000000000..c7fe003ca --- /dev/null +++ b/packages/common/src/metrics.ts @@ -0,0 +1,443 @@ +import { filterNullAndUndefined, mergeObjects } from './internal-workflow'; + +/** + * A meter for creating metrics to record values on. + * + * @experimental The Metric API is an experimental feature and may be subject to change. + */ +export interface MetricMeter { + /** + * Create a new counter metric that supports adding values. + * + * @param name Name for the counter metric. + * @param unit Unit for the counter metric. Optional. + * @param description Description for the counter metric. Optional. + */ + createCounter(name: string, unit?: string, description?: string): MetricCounter; + + /** + * Create a new histogram metric that supports recording values. + * + * @param name Name for the histogram metric. + * @param valueType Type of value to record. Defaults to `int`. + * @param unit Unit for the histogram metric. Optional. + * @param description Description for the histogram metric. Optional. + */ + createHistogram( + name: string, + valueType?: NumericMetricValueType, + unit?: string, + description?: string + ): MetricHistogram; + + /** + * Create a new gauge metric that supports setting values. + * + * @param name Name for the gauge metric. + * @param valueType Type of value to set. Defaults to `int`. + * @param unit Unit for the gauge metric. Optional. + * @param description Description for the gauge metric. Optional. + */ + createGauge(name: string, valueType?: NumericMetricValueType, unit?: string, description?: string): MetricGauge; + + /** + * Return a clone of this meter, with additional tags. All metrics created off the meter will + * have the tags. + * + * @param tags Tags to append. + */ + withTags(tags: MetricTags): MetricMeter; +} + +/** + * Base interface for all metrics. + * + * @experimental The Metric API is an experimental feature and may be subject to change. + */ +export interface Metric { + /** + * The name of the metric. + */ + name: string; + + /** + * The unit of the metric, if any. + */ + unit?: string; + + /** + * The description of the metric, if any. + */ + description?: string; +} + +/** + * A metric that supports adding values as a counter. + * + * @experimental The Metric API is an experimental feature and may be subject to change. + */ +export interface MetricCounter extends Metric { + /** + * Add the given value to the counter. + * + * @param value Value to add. + * @param extraTags Extra tags if any. + */ + add(value: number, extraTags?: MetricTags): void; + + /** + * Return a clone of this counter, with additional tags. + * + * @param tags Tags to append to existing tags. + */ + withTags(tags: MetricTags): MetricCounter; +} + +/** + * A metric that supports recording values on a histogram. + * + * @experimental The Metric API is an experimental feature and may be subject to change. + */ +export interface MetricHistogram extends Metric { + /** + * The type of value to record. Either `int` or `float`. + */ + valueType: NumericMetricValueType; + + /** + * Record the given value on the histogram. + * + * @param value Value to record. Must be a non-negative number. Value will be casted to the given + * {@link valueType}. Loss of precision may occur if the value is not already of the + * correct type. + * @param extraTags Extra tags if any. + */ + record(value: number, extraTags?: MetricTags): void; + + /** + * Return a clone of this histogram, with additional tags. + * + * @param tags Tags to append to existing tags. + */ + withTags(tags: MetricTags): MetricHistogram; +} + +/** + * A metric that supports setting values. + * + * @experimental The Metric API is an experimental feature and may be subject to change. + */ +export interface MetricGauge extends Metric { + /** + * The type of value to set. Either `int` or `float`. + */ + valueType: NumericMetricValueType; + + /** + * Set the given value on the gauge. + * + * @param value Value to set. + * @param extraTags Extra tags if any. + */ + set(value: number, extraTags?: MetricTags): void; + + /** + * Return a clone of this gauge, with additional tags. + * + * @param tags Tags to append to existing tags. + */ + withTags(tags: MetricTags): MetricGauge; +} + +/** + * Tags to be attached to some metrics. + * + * @experimental The Metric API is an experimental feature and may be subject to change. + */ +export type MetricTags = Record; + +export type NumericMetricValueType = 'int' | 'float'; + +//////////////////////////////////////////////////////////////////////////////////////////////////// + +/** + * A meter implementation that does nothing. + */ +class NoopMetricMeter implements MetricMeter { + createCounter(name: string, unit?: string, description?: string): MetricCounter { + return { + name, + unit, + description, + + add(_value, _extraTags) {}, + + withTags(_extraTags) { + return this; + }, + }; + } + + createHistogram( + name: string, + valueType: NumericMetricValueType = 'int', + unit?: string, + description?: string + ): MetricHistogram { + return { + name, + valueType, + unit, + description, + + record(_value, _extraTags) {}, + + withTags(_extraTags) { + return this; + }, + }; + } + + createGauge(name: string, valueType?: NumericMetricValueType, unit?: string, description?: string): MetricGauge { + return { + name, + valueType: valueType ?? 'int', + unit, + description, + + set(_value, _extraTags) {}, + + withTags(_extraTags) { + return this; + }, + }; + } + + withTags(_extraTags: MetricTags): MetricMeter { + return this; + } +} + +export const noopMetricMeter = new NoopMetricMeter(); + +//////////////////////////////////////////////////////////////////////////////////////////////////// + +export type MetricTagsOrFunc = MetricTags | (() => MetricTags); + +/** + * A meter implementation that adds tags before delegating calls to a parent meter. + * + * @experimental The Metric API is an experimental feature and may be subject to change. + * @internal + * @hidden + */ +export class MetricMeterWithComposedTags implements MetricMeter { + /** + * Return a {@link MetricMeter} that adds tags before delegating calls to a parent meter. + * + * New tags may either be specified statically as a delta object, or as a function evaluated + * every time a metric is recorded that will return a delta object. + * + * Some optimizations are performed to avoid creating unnecessary objects and to keep runtime + * overhead associated with resolving tags as low as possible. + * + * @param meter The parent meter to delegate calls to. + * @param tagsOrFunc New tags may either be specified statically as a delta object, or as a function + * evaluated every time a metric is recorded that will return a delta object. + * @param force if `true`, then a `MetricMeterWithComposedTags` will be created even if there + * is no tags to add. This is useful to add tags support to an underlying meter + * implementation that does not support tags directly. + */ + public static compose(meter: MetricMeter, tagsOrFunc: MetricTagsOrFunc, force: boolean = false): MetricMeter { + if (meter instanceof MetricMeterWithComposedTags) { + const contributors = appendToChain(meter.contributors, tagsOrFunc); + // If the new contributor results in no actual change to the chain, then we don't need a new meter + if (contributors === undefined && !force) return meter; + return new MetricMeterWithComposedTags(meter.parentMeter, contributors ?? []); + } else { + const contributors = appendToChain(undefined, tagsOrFunc); + if (contributors === undefined && !force) return meter; + return new MetricMeterWithComposedTags(meter, contributors ?? []); + } + } + + private constructor( + private readonly parentMeter: MetricMeter, + private readonly contributors: MetricTagsOrFunc[] + ) {} + + createCounter(name: string, unit?: string, description?: string): MetricCounter { + const parentCounter = this.parentMeter.createCounter(name, unit, description); + return new MetricCounterWithComposedTags(parentCounter, this.contributors); + } + + createHistogram( + name: string, + valueType: NumericMetricValueType = 'int', + unit?: string, + description?: string + ): MetricHistogram { + const parentHistogram = this.parentMeter.createHistogram(name, valueType, unit, description); + return new MetricHistogramWithComposedTags(parentHistogram, this.contributors); + } + + createGauge( + name: string, + valueType: NumericMetricValueType = 'int', + unit?: string, + description?: string + ): MetricGauge { + const parentGauge = this.parentMeter.createGauge(name, valueType, unit, description); + return new MetricGaugeWithComposedTags(parentGauge, this.contributors); + } + + withTags(tags: MetricTags): MetricMeter { + return MetricMeterWithComposedTags.compose(this, tags); + } +} + +/** + * @experimental The Metric API is an experimental feature and may be subject to change. + */ +class MetricCounterWithComposedTags implements MetricCounter { + constructor( + private parentCounter: MetricCounter, + private contributors: MetricTagsOrFunc[] + ) {} + + add(value: number, extraTags?: MetricTags | undefined): void { + this.parentCounter.add(value, resolveTags(this.contributors, extraTags)); + } + + withTags(extraTags: MetricTags): MetricCounter { + const contributors = appendToChain(this.contributors, extraTags); + if (contributors === undefined) return this; + return new MetricCounterWithComposedTags(this.parentCounter, contributors); + } + + get name(): string { + return this.parentCounter.name; + } + + get unit(): string | undefined { + return this.parentCounter.unit; + } + + get description(): string | undefined { + return this.parentCounter.description; + } +} + +/** + * @experimental The Metric API is an experimental feature and may be subject to change. + */ +class MetricHistogramWithComposedTags implements MetricHistogram { + constructor( + private parentHistogram: MetricHistogram, + private contributors: MetricTagsOrFunc[] + ) {} + + record(value: number, extraTags?: MetricTags): void { + this.parentHistogram.record(value, resolveTags(this.contributors, extraTags)); + } + + withTags(extraTags: MetricTags): MetricHistogram { + const contributors = appendToChain(this.contributors, extraTags); + if (contributors === undefined) return this; + return new MetricHistogramWithComposedTags(this.parentHistogram, contributors); + } + + get name(): string { + return this.parentHistogram.name; + } + + get valueType(): NumericMetricValueType { + return this.parentHistogram.valueType; + } + + get unit(): string | undefined { + return this.parentHistogram.unit; + } + + get description(): string | undefined { + return this.parentHistogram.description; + } +} + +/** + * @internal + * @hidden + */ +class MetricGaugeWithComposedTags implements MetricGauge { + constructor( + private parentGauge: MetricGauge, + private contributors: MetricTagsOrFunc[] + ) {} + + set(value: number, extraTags?: MetricTags): void { + this.parentGauge.set(value, resolveTags(this.contributors, extraTags)); + } + + withTags(extraTags: MetricTags): MetricGauge { + const contributors = appendToChain(this.contributors, extraTags); + if (contributors === undefined) return this; + return new MetricGaugeWithComposedTags(this.parentGauge, contributors); + } + + get name(): string { + return this.parentGauge.name; + } + + get valueType(): NumericMetricValueType { + return this.parentGauge.valueType; + } + + get unit(): string | undefined { + return this.parentGauge.unit; + } + + get description(): string | undefined { + return this.parentGauge.description; + } +} + +function resolveTags(contributors: MetricTagsOrFunc[], extraTags?: MetricTags): MetricTags { + const resolved = {}; + for (const contributor of contributors) { + Object.assign(resolved, typeof contributor === 'function' ? contributor() : contributor); + } + Object.assign(resolved, extraTags); + return filterNullAndUndefined(resolved); +} + +/** + * Append a tags contributor to the chain, merging it with the former last contributor if possible. + * + * If appending the new contributor results in no actual change to the chain of contributors, return + * `existingContributors`; in that case, the caller should avoid creating a new object if possible. + */ +function appendToChain( + existingContributors: MetricTagsOrFunc[] | undefined, + newContributor: MetricTagsOrFunc +): MetricTagsOrFunc[] | undefined { + // If the new contributor is an empty object, then it results in no actual change to the chain + if (typeof newContributor === 'object' && Object.keys(newContributor).length === 0) { + return existingContributors; + } + + // If existing chain is empty, then the new contributor is the chain + if (existingContributors == null || existingContributors.length === 0) { + return [newContributor]; + } + + // If both last contributor and new contributor are plain objects, merge them to a single object. + const last = existingContributors[existingContributors.length - 1]; + if (typeof last === 'object' && typeof newContributor === 'object') { + const merged = mergeObjects(last, newContributor); + if (merged === last) return existingContributors; + return [...existingContributors.slice(0, -1), merged!]; + } + + // Otherwise, just append the new contributor to the chain. + return [...existingContributors, newContributor]; +} diff --git a/packages/core-bridge/src/helpers/abort_controller.rs b/packages/core-bridge/src/helpers/abort_controller.rs index 33e8fb092..946baaf27 100644 --- a/packages/core-bridge/src/helpers/abort_controller.rs +++ b/packages/core-bridge/src/helpers/abort_controller.rs @@ -57,9 +57,9 @@ struct AbortControllerJsCounterpart { } const STATE_UNINITIALIZED: u8 = 0; -const STATE_ARMED: u8 = 1; -const STATE_ABORTED: u8 = 2; -const STATE_DISARMED: u8 = 3; +const STATE_ARMED: u8 = 1 << 0; +const STATE_ABORTED: u8 = 1 << 1; +const STATE_DISARMED: u8 = STATE_ARMED | STATE_ABORTED; impl AbortController { /// Create a new `AbortController`. diff --git a/packages/core-bridge/src/helpers/json_string.rs b/packages/core-bridge/src/helpers/json_string.rs index 75e9a83b4..0290c96bd 100644 --- a/packages/core-bridge/src/helpers/json_string.rs +++ b/packages/core-bridge/src/helpers/json_string.rs @@ -1,10 +1,13 @@ -use std::marker::PhantomData; - -use neon::{prelude::Context, result::JsResult, types::JsString}; -use serde::Serialize; +use neon::{ + handle::Handle, + prelude::Context, + result::JsResult, + types::{JsString, JsValue}, +}; +use serde::{Serialize, de::DeserializeOwned}; use serde_json; -use super::{BridgeError, TryIntoJs}; +use super::{BridgeError, BridgeResult, TryFromJs, TryIntoJs}; /// A newtype wrapper for a T serialized as a JSON string. /// @@ -15,31 +18,41 @@ use super::{BridgeError, TryIntoJs}; /// (claim from the Neon's author, circa April 2025). /// /// This newtype wrapper allows specifying values that will be serialized to a JSON string -/// when being transferred to JS using the `TryIntoJs` trait. The JSON serialization happens -/// on the caller Rust thread, therefore limiting the time spent in the JS thread. +/// when being transferred to JS using the `TryIntoJs` trait, and to be deserialized from JSON +/// when being transferred from JS using the `TryFromJs` trait. #[derive(Debug, Clone)] pub struct JsonString { - json: String, - _phantom: PhantomData, + pub json: String, + pub value: T, +} + +impl TryIntoJs for JsonString { + type Output = JsString; + fn try_into_js<'a>(self, cx: &mut impl Context<'a>) -> JsResult<'a, JsString> { + Ok(cx.string(&self.json)) + } } -impl JsonString -where - T: Serialize, -{ +impl JsonString { pub fn try_from_value(value: T) -> Result { let json = serde_json::to_string(&value) .map_err(|e| BridgeError::Other(anyhow::Error::from(e)))?; - Ok(Self { - json, - _phantom: PhantomData, - }) + Ok(Self { json, value }) } } -impl TryIntoJs for JsonString { - type Output = JsString; - fn try_into_js<'a>(self, cx: &mut impl Context<'a>) -> JsResult<'a, JsString> { - Ok(cx.string(&self.json)) +impl TryFromJs for JsonString { + fn try_from_js<'cx, 'b>( + cx: &mut impl Context<'cx>, + js_value: Handle<'b, JsValue>, + ) -> BridgeResult { + let json = js_value.downcast::(cx)?.value(cx); + match serde_json::from_str(&json) { + Ok(value) => Ok(Self { json, value }), + Err(e) => Err(BridgeError::TypeError { + field: None, + message: e.to_string(), + }), + } } } diff --git a/packages/core-bridge/src/lib.rs b/packages/core-bridge/src/lib.rs index e53bd05e5..5c7b46404 100644 --- a/packages/core-bridge/src/lib.rs +++ b/packages/core-bridge/src/lib.rs @@ -17,6 +17,7 @@ pub mod helpers; mod client; mod logs; +mod metrics; mod runtime; mod testing; mod worker; @@ -25,6 +26,7 @@ mod worker; fn main(mut cx: neon::prelude::ModuleContext) -> neon::prelude::NeonResult<()> { client::init(&mut cx)?; logs::init(&mut cx)?; + metrics::init(&mut cx)?; runtime::init(&mut cx)?; testing::init(&mut cx)?; worker::init(&mut cx)?; diff --git a/packages/core-bridge/src/metrics.rs b/packages/core-bridge/src/metrics.rs new file mode 100644 index 000000000..0f7d37b54 --- /dev/null +++ b/packages/core-bridge/src/metrics.rs @@ -0,0 +1,325 @@ +use std::{collections::HashMap, sync::Arc}; + +use anyhow::Context as _; +use neon::prelude::*; +use serde::Deserialize; + +use temporal_sdk_core::api::telemetry::metrics::{ + CoreMeter, Counter as CoreCounter, Gauge as CoreGauge, Histogram as CoreHistogram, + MetricParametersBuilder, NewAttributes, TemporalMeter, +}; +use temporal_sdk_core::api::telemetry::metrics::{ + GaugeF64 as CoreGaugeF64, HistogramF64 as CoreHistogramF64, +}; +use temporal_sdk_core::api::telemetry::metrics::{ + MetricKeyValue as CoreMetricKeyValue, MetricValue as CoreMetricValue, +}; + +use bridge_macros::js_function; + +use crate::helpers::{ + BridgeError, BridgeResult, JsonString, MutableFinalize, OpaqueInboundHandle, + OpaqueOutboundHandle, +}; +use crate::runtime::*; + +pub fn init(cx: &mut neon::prelude::ModuleContext) -> neon::prelude::NeonResult<()> { + cx.export_function("newMetricCounter", new_metric_counter)?; + cx.export_function("newMetricHistogram", new_metric_histogram)?; + cx.export_function("newMetricHistogramF64", new_metric_histogram_f64)?; + cx.export_function("newMetricGauge", new_metric_gauge)?; + cx.export_function("newMetricGaugeF64", new_metric_gauge_f64)?; + + cx.export_function("addMetricCounterValue", add_metric_counter_value)?; + cx.export_function("recordMetricHistogramValue", record_metric_histogram_value)?; + cx.export_function( + "recordMetricHistogramF64Value", + record_metric_histogram_f64_value, + )?; + cx.export_function("setMetricGaugeValue", set_metric_gauge_value)?; + cx.export_function("setMetricGaugeF64Value", set_metric_gauge_f64_value)?; + + Ok(()) +} + +pub struct Counter { + pub(crate) meter: TemporalMeter, + pub(crate) counter: Arc, +} + +impl MutableFinalize for Counter {} + +pub struct Histogram { + pub(crate) meter: TemporalMeter, + pub(crate) histogram: Arc, +} +impl MutableFinalize for Histogram {} + +pub struct HistogramF64 { + pub(crate) meter: TemporalMeter, + pub(crate) histogram: Arc, +} +impl MutableFinalize for HistogramF64 {} + +pub struct Gauge { + pub(crate) meter: TemporalMeter, + pub(crate) gauge: Arc, +} +impl MutableFinalize for Gauge {} + +pub struct GaugeF64 { + pub(crate) meter: TemporalMeter, + pub(crate) gauge: Arc, +} +impl MutableFinalize for GaugeF64 {} + +#[derive(Debug, Deserialize)] +pub struct MetricAttributes { + #[serde(flatten)] + pub attributes: HashMap, +} + +#[derive(Debug, Deserialize)] +#[serde(untagged)] +pub enum MetricValue { + Int(i64), + Float(f64), + Bool(bool), + String(String), +} + +impl From for CoreMetricValue { + fn from(value: MetricValue) -> Self { + match value { + MetricValue::Int(i) => CoreMetricValue::Int(i), + MetricValue::Float(f) => CoreMetricValue::Float(f), + MetricValue::Bool(b) => CoreMetricValue::Bool(b), + MetricValue::String(s) => CoreMetricValue::String(s), + } + } +} + +/// Create a new metric counter +#[js_function] +pub fn new_metric_counter( + runtime: OpaqueInboundHandle, + name: String, + unit: String, + description: String, +) -> BridgeResult> { + let core_runtime = runtime.borrow()?.core_runtime.clone(); + let meter = core_runtime + .telemetry() + .get_metric_meter() + .ok_or(BridgeError::UnexpectedError( + "Failed to get metric meter".into(), + ))?; + + let counter = meter.inner.counter( + MetricParametersBuilder::default() + .name(name) + .unit(unit) + .description(description) + .build() + .context("Failed to build metric parameters")?, + ); + + Ok(OpaqueOutboundHandle::new(Counter { meter, counter })) +} + +#[js_function] +pub fn new_metric_histogram( + runtime: OpaqueInboundHandle, + name: String, + unit: String, + description: String, +) -> BridgeResult> { + let core_runtime = runtime.borrow()?.core_runtime.clone(); + let meter = core_runtime + .telemetry() + .get_metric_meter() + .ok_or(BridgeError::UnexpectedError( + "Failed to get metric meter".into(), + ))?; + + let histogram = meter.inner.histogram( + MetricParametersBuilder::default() + .name(name) + .unit(unit) + .description(description) + .build() + .context("Failed to build metric parameters")?, + ); + + Ok(OpaqueOutboundHandle::new(Histogram { meter, histogram })) +} + +#[js_function] +pub fn new_metric_histogram_f64( + runtime: OpaqueInboundHandle, + name: String, + unit: String, + description: String, +) -> BridgeResult> { + let core_runtime = runtime.borrow()?.core_runtime.clone(); + let meter = core_runtime + .telemetry() + .get_metric_meter() + .ok_or(BridgeError::UnexpectedError( + "Failed to get metric meter".into(), + ))?; + + let histogram = meter.inner.histogram_f64( + MetricParametersBuilder::default() + .name(name) + .unit(unit) + .description(description) + .build() + .context("Failed to build metric parameters")?, + ); + + Ok(OpaqueOutboundHandle::new(HistogramF64 { meter, histogram })) +} + +#[js_function] +pub fn new_metric_gauge( + runtime: OpaqueInboundHandle, + name: String, + unit: String, + description: String, +) -> BridgeResult> { + let core_runtime = runtime.borrow()?.core_runtime.clone(); + let meter = core_runtime + .telemetry() + .get_metric_meter() + .ok_or(BridgeError::UnexpectedError( + "Failed to get metric meter".into(), + ))?; + + let gauge = meter.inner.gauge( + MetricParametersBuilder::default() + .name(name) + .unit(unit) + .description(description) + .build() + .context("Failed to build metric parameters")?, + ); + + Ok(OpaqueOutboundHandle::new(Gauge { meter, gauge })) +} + +#[js_function] +pub fn new_metric_gauge_f64( + runtime: OpaqueInboundHandle, + name: String, + unit: String, + description: String, +) -> BridgeResult> { + let core_runtime = runtime.borrow()?.core_runtime.clone(); + let meter = core_runtime + .telemetry() + .get_metric_meter() + .ok_or(BridgeError::UnexpectedError( + "Failed to get metric meter".into(), + ))?; + + let gauge = meter.inner.gauge_f64( + MetricParametersBuilder::default() + .name(name) + .unit(unit) + .description(description) + .build() + .context("Failed to build metric parameters")?, + ); + + Ok(OpaqueOutboundHandle::new(GaugeF64 { meter, gauge })) +} + +#[js_function] +pub fn add_metric_counter_value( + counter_handle: OpaqueInboundHandle, + value: f64, + attributes: JsonString, +) -> BridgeResult<()> { + let counter_handle = counter_handle.borrow()?; + let attributes = counter_handle + .meter + .inner + .new_attributes(parse_metric_attributes(attributes.value)); + counter_handle.counter.add(value as u64, &attributes); + Ok(()) +} + +#[js_function] +pub fn record_metric_histogram_value( + histogram_handle: OpaqueInboundHandle, + value: u64, + attributes: JsonString, +) -> BridgeResult<()> { + let histogram_handle = histogram_handle.borrow()?; + let attributes = histogram_handle + .meter + .inner + .new_attributes(parse_metric_attributes(attributes.value)); + histogram_handle.histogram.record(value, &attributes); + Ok(()) +} + +#[js_function] +pub fn record_metric_histogram_f64_value( + histogram_handle: OpaqueInboundHandle, + value: f64, + attributes: JsonString, +) -> BridgeResult<()> { + let histogram_handle = histogram_handle.borrow()?; + let attributes = histogram_handle + .meter + .inner + .new_attributes(parse_metric_attributes(attributes.value)); + histogram_handle.histogram.record(value, &attributes); + Ok(()) +} + +#[js_function] +pub fn set_metric_gauge_value( + gauge_handle: OpaqueInboundHandle, + value: u64, + attributes: JsonString, +) -> BridgeResult<()> { + let gauge_handle = gauge_handle.borrow()?; + let attributes = gauge_handle + .meter + .inner + .new_attributes(parse_metric_attributes(attributes.value)); + gauge_handle.gauge.record(value, &attributes); + Ok(()) +} + +#[js_function] +pub fn set_metric_gauge_f64_value( + gauge_handle: OpaqueInboundHandle, + value: f64, + attributes: JsonString, +) -> BridgeResult<()> { + let gauge_handle = gauge_handle.borrow()?; + let attributes = gauge_handle + .meter + .inner + .new_attributes(parse_metric_attributes(attributes.value)); + gauge_handle.gauge.record(value, &attributes); + Ok(()) +} + +//////////////////////////////////////////////////////////////////////////////////////////////////// + +fn parse_metric_attributes(attrs: MetricAttributes) -> NewAttributes { + let attrs = attrs + .attributes + .into_iter() + .map(|(key, value)| CoreMetricKeyValue { + key, + value: value.into(), + }) + .collect(); + NewAttributes { attributes: attrs } +} diff --git a/packages/core-bridge/ts/native.ts b/packages/core-bridge/ts/native.ts index 90ec514f7..bc365fc77 100644 --- a/packages/core-bridge/ts/native.ts +++ b/packages/core-bridge/ts/native.ts @@ -404,3 +404,93 @@ type LogLevel = 'TRACE' | 'DEBUG' | 'INFO' | 'WARN' | 'ERROR'; type LogEntryMetadata = { [key: string]: string | number | boolean | LogEntryMetadata; }; + +//////////////////////////////////////////////////////////////////////////////////////////////////// +// Custom Metrics +//////////////////////////////////////////////////////////////////////////////////////////////////// + +export interface MetricMeter { + type: 'metric-meter'; +} + +export interface MetricCounter { + type: 'metric-counter'; +} + +export interface MetricHistogram { + type: 'metric-histogram'; +} + +export interface MetricHistogramF64 { + type: 'metric-histogram-f64'; +} + +export interface MetricGauge { + type: 'metric-gauge'; +} + +export interface MetricGaugeF64 { + type: 'metric-gauge-f64'; +} + +export type MetricAttributes = Record; + +export declare function newMetricCounter( + runtime: Runtime, + name: string, + unit: string, + description: string +): MetricCounter; + +export declare function newMetricHistogram( + runtime: Runtime, + name: string, + unit: string, + description: string +): MetricHistogram; + +export declare function newMetricHistogramF64( + runtime: Runtime, + name: string, + unit: string, + description: string +): MetricHistogramF64; + +export declare function newMetricGauge(runtime: Runtime, name: string, unit: string, description: string): MetricGauge; + +export declare function newMetricGaugeF64( + runtime: Runtime, + name: string, + unit: string, + description: string +): MetricGaugeF64; + +export declare function addMetricCounterValue( + counter: MetricCounter, + value: number, + attrs: JsonString +): void; + +export declare function recordMetricHistogramValue( + histogram: MetricHistogram, + value: number, + attrs: JsonString +): void; + +export declare function recordMetricHistogramF64Value( + histogram: MetricHistogramF64, + value: number, + attrs: JsonString +): void; + +export declare function setMetricGaugeValue( + gauge: MetricGauge, + value: number, + attrs: JsonString +): void; + +export declare function setMetricGaugeF64Value( + gauge: MetricGaugeF64, + value: number, + attrs: JsonString +): void; diff --git a/packages/test/src/helpers-integration.ts b/packages/test/src/helpers-integration.ts index 5cb73758e..bbaad5ff8 100644 --- a/packages/test/src/helpers-integration.ts +++ b/packages/test/src/helpers-integration.ts @@ -19,6 +19,7 @@ import { LogLevel, ReplayWorkerOptions, Runtime, + RuntimeOptions, WorkerOptions, WorkflowBundle, WorkflowBundleWithSourceMap, @@ -48,7 +49,7 @@ const defaultDynamicConfigOptions = [ 'worker.removableBuildIdDurationSinceDefault=1', ]; -function setupRuntime(recordedLogs?: { [workflowId: string]: LogEntry[] }) { +function setupRuntime(recordedLogs?: { [workflowId: string]: LogEntry[] }, runtimeOpts?: Partial) { const logger = recordedLogs ? new DefaultLogger('DEBUG', (entry) => { const workflowId = (entry.meta as any)?.workflowInfo?.workflowId ?? (entry.meta as any)?.workflowId; @@ -57,9 +58,12 @@ function setupRuntime(recordedLogs?: { [workflowId: string]: LogEntry[] }) { }) : new DefaultLogger((process.env.TEST_LOG_LEVEL || 'WARN').toUpperCase() as LogLevel); Runtime.install({ + ...runtimeOpts, logger, telemetryOptions: { + ...runtimeOpts?.telemetryOptions, logging: { + ...runtimeOpts?.telemetryOptions?.logging, filter: makeTelemetryFilterString({ core: (process.env.TEST_LOG_LEVEL || 'INFO').toUpperCase() as LogLevel, }), @@ -114,11 +118,14 @@ export function makeConfigurableEnvironmentTestFn(opts: { recordedLogs?: { [workflowId: string]: LogEntry[] }; createTestContext: (t: ExecutionContext) => Promise; teardown: (t: T) => Promise; + runtimeOpts?: Partial | (() => Promise<[Partial, Partial]>) | undefined; }): TestFn { const test = anyTest as TestFn; test.before(async (t) => { - setupRuntime(opts.recordedLogs); - t.context = await opts.createTestContext(t); + const [runtimeOpts, extraContext] = + typeof opts.runtimeOpts === 'function' ? await opts.runtimeOpts() : [opts.runtimeOpts, {}]; + setupRuntime(opts.recordedLogs, runtimeOpts); + t.context = { ...(await opts.createTestContext(t)), ...extraContext }; }); test.after.always(async (t) => { await opts.teardown(t.context); @@ -126,15 +133,17 @@ export function makeConfigurableEnvironmentTestFn(opts: { return test; } -export function makeTestFunction(opts: { +export function makeTestFunction(opts: { workflowsPath: string; workflowEnvironmentOpts?: LocalTestWorkflowEnvironmentOptions; workflowInterceptorModules?: string[]; recordedLogs?: { [workflowId: string]: LogEntry[] }; -}): TestFn { - return makeConfigurableEnvironmentTestFn({ + runtimeOpts?: Partial | (() => Promise<[Partial, Partial]>) | undefined; +}): TestFn { + return makeConfigurableEnvironmentTestFn({ recordedLogs: opts.recordedLogs, - createTestContext: async (_t: ExecutionContext): Promise => { + runtimeOpts: opts.runtimeOpts, + createTestContext: async (_t: ExecutionContext): Promise => { let env: TestWorkflowEnvironment; if (process.env.TEMPORAL_SERVICE_ADDRESS) { env = await TestWorkflowEnvironment.createFromExistingServer({ @@ -149,9 +158,9 @@ export function makeTestFunction(opts: { workflowInterceptorModules: opts.workflowInterceptorModules, }), env, - }; + } as unknown as C; }, - teardown: async (c: Context) => { + teardown: async (c: C) => { await c.env.teardown(); }, }); diff --git a/packages/test/src/mock-native-worker.ts b/packages/test/src/mock-native-worker.ts index 385233e97..5b536046d 100644 --- a/packages/test/src/mock-native-worker.ts +++ b/packages/test/src/mock-native-worker.ts @@ -6,7 +6,8 @@ import { coresdk } from '@temporalio/proto'; import { DefaultLogger, Runtime, ShutdownError } from '@temporalio/worker'; import { byteArrayToBuffer } from '@temporalio/worker/lib/utils'; import { NativeReplayHandle, NativeWorkerLike, Worker as RealWorker } from '@temporalio/worker/lib/worker'; -import { withMetadata } from '@temporalio/worker/lib/logger'; +import { LoggerWithComposedMetadata } from '@temporalio/common/lib/logger'; +import { MetricMeterWithComposedTags } from '@temporalio/common/lib/metrics'; import { CompiledWorkerOptions, compileWorkerOptions, WorkerOptions } from '@temporalio/worker/lib/worker-options'; import type { WorkflowCreator } from '@temporalio/worker/lib/workflow/interface'; import * as activities from './activities'; @@ -160,12 +161,12 @@ export class Worker extends RealWorker { public constructor(workflowCreator: WorkflowCreator, opts: CompiledWorkerOptions) { const runtime = Runtime.instance(); - const logger = withMetadata(runtime.logger, { + const logger = LoggerWithComposedMetadata.compose(runtime.logger, { sdkComponent: SdkComponent.worker, taskQueue: opts.taskQueue, }); const nativeWorker = new MockNativeWorker(); - super(runtime, nativeWorker, workflowCreator, opts, logger); + super(runtime, nativeWorker, workflowCreator, opts, logger, runtime.metricMeter); } public runWorkflows(...args: Parameters): Promise { @@ -181,10 +182,15 @@ export const defaultOptions: WorkerOptions = { }; export function isolateFreeWorker(options: WorkerOptions = defaultOptions): Worker { - const logger = withMetadata(Runtime.instance().logger, { + const runtime = Runtime.instance(); + const logger = LoggerWithComposedMetadata.compose(runtime.logger, { sdkComponent: SdkComponent.worker, taskQueue: options.taskQueue ?? 'default', }); + const metricMeter = MetricMeterWithComposedTags.compose(runtime.metricMeter, { + namespace: options.namespace ?? 'default', + taskQueue: options.taskQueue ?? 'default', + }); return new Worker( { async createWorkflow() { @@ -194,6 +200,6 @@ export function isolateFreeWorker(options: WorkerOptions = defaultOptions): Work /* Nothing to destroy */ }, }, - compileWorkerOptions(options, logger) + compileWorkerOptions(options, logger, metricMeter) ); } diff --git a/packages/test/src/test-metrics-custom.ts b/packages/test/src/test-metrics-custom.ts new file mode 100644 index 000000000..5e73505a2 --- /dev/null +++ b/packages/test/src/test-metrics-custom.ts @@ -0,0 +1,374 @@ +import { ExecutionContext } from 'ava'; +import { ActivityInboundCallsInterceptor, ActivityOutboundCallsInterceptor, Runtime } from '@temporalio/worker'; +import * as workflow from '@temporalio/workflow'; +import { MetricTags } from '@temporalio/common'; +import { Context as ActivityContext, metricMeter as activityMetricMeter } from '@temporalio/activity'; +import { Context as BaseContext, helpers, makeTestFunction } from './helpers-integration'; +import { getRandomPort } from './helpers'; + +interface Context extends BaseContext { + port: number; +} + +const test = makeTestFunction({ + workflowsPath: __filename, + workflowInterceptorModules: [__filename], + runtimeOpts: async () => { + const port = await getRandomPort(); + return [ + { + telemetryOptions: { + metrics: { + metricPrefix: 'foo_', + prometheus: { + bindAddress: `127.0.0.1:${port}`, + histogramBucketOverrides: { + 'my-float-histogram': [0.01, 0.02, 0.05, 0.1, 0.2, 0.5, 1], + 'workflow-float-histogram': [0.01, 0.02, 0.05, 0.1, 0.2, 0.5, 1], + }, + }, + }, + }, + }, + { port }, + ]; + }, +}); + +async function assertMetricReported(t: ExecutionContext, regexp: RegExp) { + const resp = await fetch(`http://127.0.0.1:${t.context.port}/metrics`); + const text = await resp.text(); + const matched = t.regex(text, regexp); + if (!matched) { + t.log(text); + } +} + +/** + * Asserts custom metrics works properly at the bridge level. + */ +test('Custom Metrics - Bridge supports works properly (no tags)', async (t) => { + const meter = Runtime.instance().metricMeter; + + // Counter + const counter = meter.createCounter('my-counter', 'my-counter-unit', 'my-counter-description'); + counter.add(1); + counter.add(1); + counter.add(40); // 1+1+40 => 42 + await assertMetricReported(t, /my_counter 42/); + + // Int Gauge + const gaugeInt = meter.createGauge('my-int-gauge', 'int', 'my-int-gauge-description'); + gaugeInt.set(1); + gaugeInt.set(40); + await assertMetricReported(t, /my_int_gauge 40/); + + // Float Gauge + const gaugeFloat = meter.createGauge('my-float-gauge', 'float', 'my-float-gauge-description'); + gaugeFloat.set(1.1); + gaugeFloat.set(1.1); + gaugeFloat.set(40.1); + await assertMetricReported(t, /my_float_gauge 40.1/); + + // Int Histogram + const histogramInt = meter.createHistogram('my-int-histogram', 'int', 'my-int-histogram-description'); + histogramInt.record(20); + histogramInt.record(200); + histogramInt.record(2000); + await assertMetricReported(t, /my_int_histogram_bucket{le="50"} 1/); + await assertMetricReported(t, /my_int_histogram_bucket{le="500"} 2/); + await assertMetricReported(t, /my_int_histogram_bucket{le="10000"} 3/); + + // Float Histogram + const histogramFloat = meter.createHistogram('my-float-histogram', 'float', 'my-float-histogram-description'); + histogramFloat.record(0.02); + histogramFloat.record(0.07); + histogramFloat.record(0.99); + await assertMetricReported(t, /my_float_histogram_bucket{le="0.05"} 1/); + await assertMetricReported(t, /my_float_histogram_bucket{le="0.1"} 2/); + await assertMetricReported(t, /my_float_histogram_bucket{le="1"} 3/); +}); + +/** + * Asserts custom metrics tags composition works properly + */ +test('Custom Metrics - Tags composition works properly', async (t) => { + const meter = Runtime.instance().metricMeter; + + // Counter + const counter = meter.createCounter('my-counter', 'my-counter-unit', 'my-counter-description'); + counter.add(1, { labelA: 'value-a', labelB: true, labelC: 123, labelD: 123.456 }); + await assertMetricReported(t, /my_counter{labelA="value-a",labelB="true",labelC="123",labelD="123.456"} 1/); + + // Int Gauge + const gaugeInt = meter.createGauge('my-int-gauge', 'int', 'my-int-gauge-description'); + gaugeInt.set(1, { labelA: 'value-a', labelB: true, labelC: 123, labelD: 123.456 }); + await assertMetricReported(t, /my_int_gauge{labelA="value-a",labelB="true",labelC="123",labelD="123.456"} 1/); + + // Float Gauge + const gaugeFloat = meter.createGauge('my-float-gauge', 'float', 'my-float-gauge-description'); + gaugeFloat.set(1.2, { labelA: 'value-a', labelB: true, labelC: 123, labelD: 123.456 }); + await assertMetricReported(t, /my_float_gauge{labelA="value-a",labelB="true",labelC="123",labelD="123.456"} 1.2/); + + // Int Histogram + const histogramInt = meter.createHistogram('my-int-histogram', 'int', 'my-int-histogram-description'); + histogramInt.record(1, { labelA: 'value-a', labelB: true, labelC: 123, labelD: 123.456 }); + await assertMetricReported( + t, + /my_int_histogram_bucket{labelA="value-a",labelB="true",labelC="123",labelD="123.456",le="50"} 1/ + ); + + // Float Histogram + const histogramFloat = meter.createHistogram('my-float-histogram', 'float', 'my-float-histogram-description'); + histogramFloat.record(0.4, { labelA: 'value-a', labelB: true, labelC: 123, labelD: 123.456 }); + await assertMetricReported( + t, + /my_float_histogram_bucket{labelA="value-a",labelB="true",labelC="123",labelD="123.456",le="0.5"} 1/ + ); +}); + +export async function metricWorksWorkflow(): Promise { + const metricMeter = workflow.metricMeter; + + const myCounterMetric = metricMeter.createCounter( + 'workflow-counter', + 'workflow-counter-unit', + 'workflow-counter-description' + ); + const myHistogramMetric = metricMeter.createHistogram( + 'workflow-histogram', + 'int', + 'workflow-histogram-unit', + 'workflow-histogram-description' + ); + const myFloatHistogramMetric = metricMeter.createHistogram( + 'workflow-float-histogram', + 'float', + 'workflow-float-histogram-unit', + 'workflow-float-histogram-description' + ); + const myGaugeMetric = metricMeter.createGauge( + 'workflow-gauge', + 'int', + 'workflow-gauge-unit', + 'workflow-gauge-description' + ); + const myFloatGaugeMetric = metricMeter.createGauge( + 'workflow-float-gauge', + 'float', + 'workflow-float-gauge-unit', + 'workflow-float-gauge-description' + ); + + myCounterMetric.add(1); + myHistogramMetric.record(1); + myFloatHistogramMetric.record(0.01); + myGaugeMetric.set(1); + myFloatGaugeMetric.set(0.1); + + // Pause here, so that we can force replay to a distinct worker + let signalReceived = false; + workflow.setHandler(workflow.defineUpdate('checkpoint'), () => {}); + workflow.setHandler(workflow.defineSignal('unblock'), () => { + signalReceived = true; + }); + await workflow.condition(() => signalReceived); + + myCounterMetric.add(3); + myHistogramMetric.record(3); + myFloatHistogramMetric.record(0.03); + myGaugeMetric.set(3); + myFloatGaugeMetric.set(0.3); +} + +test('Metric in Workflow works and are not replayed', async (t) => { + const { createWorker, startWorkflow } = helpers(t); + const worker = await createWorker({ + // Avoid delay when transitioning to the second worker + stickyQueueScheduleToStartTimeout: '100ms', + }); + + const [handle1, handle2] = await worker.runUntil(async () => { + // Start two workflows, and wait for both to reach the checkpoint. + // Why two workflows? To confirm that there's no problem in having multiple + // workflows and sink reinstantiating a same metric. + return await Promise.all([ + // FIXME: Add support for Update with Start to our internal test helpers + startWorkflow(metricWorksWorkflow).then(async (handle) => { + await handle.executeUpdate('checkpoint'); + return handle; + }), + startWorkflow(metricWorksWorkflow).then(async (handle) => { + await handle.executeUpdate('checkpoint'); + return handle; + }), + ]); + }); + + await assertMetricReported(t, /workflow_counter{[^}]+} 2/); // 1 + 1 + await assertMetricReported(t, /workflow_histogram_bucket{[^}]+?,le="50"} 2/); + await assertMetricReported(t, /workflow_float_histogram_bucket{[^}]+?,le="0.05"} 2/); + await assertMetricReported(t, /workflow_gauge{[^}]+} 1/); + await assertMetricReported(t, /workflow_float_gauge{[^}]+} 0.1/); + + const worker2 = await createWorker(); + await worker2.runUntil(async () => { + await Promise.all([handle1.signal('unblock'), handle2.signal('unblock')]); + await Promise.all([handle1.result(), handle2.result()]); + }); + + await assertMetricReported(t, /workflow_counter{[^}]+} 8/); // 1 + 1 + 3 + 3 + await assertMetricReported(t, /workflow_histogram_bucket{[^}]+?,le="50"} 4/); + await assertMetricReported(t, /workflow_float_histogram_bucket{[^}]+?,le="0.05"} 4/); + await assertMetricReported(t, /workflow_gauge{[^}]+} 3/); + await assertMetricReported(t, /workflow_float_gauge{[^}]+} 0.3/); +}); + +export async function MetricTagsWorkflow(): Promise { + const metricMeter = workflow.metricMeter.withTags({ labelX: 'value-x', labelY: 'value-y' }); + + const myCounterMetric = metricMeter.createCounter( + 'workflow2-counter', + 'workflow2-counter-unit', + 'workflow2-counter-description' + ); + const myHistogramMetric = metricMeter.createHistogram( + 'workflow2-histogram', + 'int', + 'workflow2-histogram-unit', + 'workflow2-histogram-description' + ); + const myGaugeMetric = metricMeter.createGauge( + 'workflow2-gauge', + 'int', + 'workflow2-gauge-unit', + 'workflow2-gauge-description' + ); + + myCounterMetric + .withTags({ labelA: 'value-a', labelB: 'value-b' }) + .withTags({ labelC: 'value-c', labelB: 'value-b2' }) + .add(2, { labelD: 'value-d' }); + + myHistogramMetric + .withTags({ labelA: 'value-a', labelB: 'value-b' }) + .withTags({ labelC: 'value-c', labelB: 'value-b2' }) + .record(2, { labelD: 'value-d' }); + + myGaugeMetric + .withTags({ labelA: 'value-a', labelB: 'value-b' }) + .withTags({ labelC: 'value-c', labelB: 'value-b2' }) + .set(2, { labelD: 'value-d' }); +} + +test('Metric tags in Workflow works', async (t) => { + const { createWorker, executeWorkflow, taskQueue } = helpers(t); + const tags = `labelA="value-a",labelB="value-b2",labelC="value-c",labelD="value-d",labelX="value-x",labelY="value-y",namespace="default",taskQueue="${taskQueue}",workflowType="MetricTagsWorkflow"`; + + const worker = await createWorker(); + await worker.runUntil(executeWorkflow(MetricTagsWorkflow)); + + await assertMetricReported(t, new RegExp(`workflow2_counter{${tags}} 2`)); + await assertMetricReported(t, new RegExp(`workflow2_histogram_bucket{${tags},le="50"} 1`)); + await assertMetricReported(t, new RegExp(`workflow2_gauge{${tags}} 2`)); +}); + +// Define workflow interceptor for metrics +export const interceptors = (): workflow.WorkflowInterceptors => ({ + outbound: [ + { + getMetricTags(tags: MetricTags): MetricTags { + if (!workflow.workflowInfo().workflowType.includes('Interceptor')) return tags; + return { + ...tags, + intercepted: 'workflow-interceptor', + }; + }, + }, + ], +}); + +// Define activity interceptor for metrics +export function activityInterceptorFactory(_ctx: ActivityContext): { + inbound?: ActivityInboundCallsInterceptor; + outbound?: ActivityOutboundCallsInterceptor; +} { + return { + outbound: { + getMetricTags(tags: MetricTags): MetricTags { + return { + ...tags, + intercepted: 'activity-interceptor', + }; + }, + }, + }; +} + +// Activity that uses metrics +export async function metricActivity(): Promise { + const { metricMeter } = ActivityContext.current(); + + const counter = metricMeter.createCounter('activity-counter'); + counter.add(5); + + const histogram = metricMeter.createHistogram('activity-histogram'); + histogram.record(10); + + // Use the `metricMeter` exported from the top level of the activity module rather than the one in the context + const gauge = activityMetricMeter.createGauge('activity-gauge'); + gauge.set(15); +} + +// Workflow that uses metrics and calls the activity +export async function metricsInterceptorWorkflow(): Promise { + const metricMeter = workflow.metricMeter; + + // Use workflow metrics + const counter = metricMeter.createCounter('intercepted-workflow-counter'); + counter.add(3); + + const histogram = metricMeter.createHistogram('intercepted-workflow-histogram'); + histogram.record(6); + + const gauge = metricMeter.createGauge('intercepted-workflow-gauge'); + gauge.set(9); + + // Call activity with metrics + await workflow + .proxyActivities({ + startToCloseTimeout: '1 minute', + }) + .metricActivity(); +} + +// Test for workflow metrics interceptor +test('Workflow and Activity Context metrics interceptors add tags', async (t) => { + const { createWorker, executeWorkflow, taskQueue } = helpers(t); + + const worker = await createWorker({ + taskQueue, + workflowsPath: __filename, + activities: { + metricActivity, + }, + interceptors: { + activity: [activityInterceptorFactory], + }, + }); + + await worker.runUntil(executeWorkflow(metricsInterceptorWorkflow)); + + // Verify workflow metrics have interceptor tag + await assertMetricReported(t, /intercepted_workflow_counter{[^}]*intercepted="workflow-interceptor"[^}]*} 3/); + await assertMetricReported( + t, + /intercepted_workflow_histogram_bucket{[^}]*intercepted="workflow-interceptor"[^}]*} \d+/ + ); + await assertMetricReported(t, /intercepted_workflow_gauge{[^}]*intercepted="workflow-interceptor"[^}]*} 9/); + + // Verify activity metrics have interceptor tag + await assertMetricReported(t, /activity_counter{[^}]*intercepted="activity-interceptor"[^}]*} 5/); + await assertMetricReported(t, /activity_histogram_bucket{[^}]*intercepted="activity-interceptor"[^}]*} \d+/); + await assertMetricReported(t, /activity_gauge{[^}]*intercepted="activity-interceptor"[^}]*} 15/); +}); diff --git a/packages/test/src/test-sinks.ts b/packages/test/src/test-sinks.ts index a39bfcb1a..ebe358da4 100644 --- a/packages/test/src/test-sinks.ts +++ b/packages/test/src/test-sinks.ts @@ -2,7 +2,7 @@ import test from 'ava'; import { v4 as uuid4 } from 'uuid'; import { Connection, WorkflowClient } from '@temporalio/client'; -import { DefaultLogger, InjectedSinks, Runtime, WorkerOptions, LogEntry } from '@temporalio/worker'; +import { DefaultLogger, InjectedSinks, Runtime, WorkerOptions, LogEntry, NativeConnection } from '@temporalio/worker'; import { SearchAttributes, WorkflowInfo } from '@temporalio/workflow'; import { UnsafeWorkflowInfo } from '@temporalio/workflow/lib/interfaces'; import { SdkComponent, TypedSearchAttributes } from '@temporalio/common'; @@ -21,6 +21,7 @@ class DependencyError extends Error { if (RUN_INTEGRATION_TESTS) { const recordedLogs: { [workflowId: string]: LogEntry[] } = {}; + let nativeConnection: NativeConnection; test.before(async (_) => { await registerDefaultCustomSearchAttributes(await Connection.connect({})); @@ -31,6 +32,17 @@ if (RUN_INTEGRATION_TESTS) { recordedLogs[workflowId].push(entry); }), }); + + // FIXME(JWH): At some point, tests in this file ends up creating a situation where we no longer have any + // native resource tracked by the lang side Runtime object, so the lang Runtime tries to shutdown itself, + // but in the mean time, another test tries to create another resource. which results in a rust side + // finalization error. Holding on to a nativeConnection object avoids that situation. That's a dirty hack. + // Proper fix will be implemented in a distinct PR. + nativeConnection = await NativeConnection.connect({}); + }); + + test.after.always(async () => { + await nativeConnection.close(); }); test('Worker injects sinks', async (t) => { @@ -298,7 +310,7 @@ if (RUN_INTEGRATION_TESTS) { await worker.runUntil(client.execute(workflows.logSinkTester, { taskQueue, workflowId })); const history = await client.getHandle(workflowId).fetchHistory(); - // Last 3 events are WorkflowExecutionStarted, WorkflowTaskCompleted and WorkflowExecutionCompleted + // Last 3 events are WorkflowTaskStarted, WorkflowTaskCompleted and WorkflowExecutionCompleted history.events = history!.events!.slice(0, -3); recordedMessages.length = 0; diff --git a/packages/testing/src/mocking-activity-environment.ts b/packages/testing/src/mocking-activity-environment.ts index 8b1cec47b..b9f9c05b0 100644 --- a/packages/testing/src/mocking-activity-environment.ts +++ b/packages/testing/src/mocking-activity-environment.ts @@ -7,14 +7,17 @@ import { SdkComponent, defaultFailureConverter, defaultPayloadConverter, + MetricMeter, + noopMetricMeter, } from '@temporalio/common'; +import { LoggerWithComposedMetadata } from '@temporalio/common/lib/logger'; import { ActivityInterceptorsFactory, DefaultLogger } from '@temporalio/worker'; -import { withMetadata } from '@temporalio/worker/lib/logger'; import { Activity } from '@temporalio/worker/lib/activity'; export interface MockActivityEnvironmentOptions { interceptors?: ActivityInterceptorsFactory[]; logger?: Logger; + metricMeter?: MetricMeter; } /** @@ -44,7 +47,8 @@ export class MockActivityEnvironment extends events.EventEmitter { undefined, loadedDataConverter, heartbeatCallback, - withMetadata(opts?.logger ?? new DefaultLogger(), { sdkComponent: SdkComponent.worker }), + LoggerWithComposedMetadata.compose(opts?.logger ?? new DefaultLogger(), { sdkComponent: SdkComponent.worker }), + opts?.metricMeter ?? noopMetricMeter, opts?.interceptors ?? [] ); this.context = this.activity.context; diff --git a/packages/testing/src/testing-workflow-environment.ts b/packages/testing/src/testing-workflow-environment.ts index 76a5556dd..3702b7097 100644 --- a/packages/testing/src/testing-workflow-environment.ts +++ b/packages/testing/src/testing-workflow-environment.ts @@ -4,7 +4,7 @@ import { Duration, TypedSearchAttributes } from '@temporalio/common'; import { msToNumber, msToTs, tsToMs } from '@temporalio/common/lib/time'; import { NativeConnection, Runtime } from '@temporalio/worker'; import { native } from '@temporalio/core-bridge'; -import { filterNullAndUndefined } from '@temporalio/common/lib/internal-non-workflow'; +import { filterNullAndUndefined } from '@temporalio/common/lib/internal-workflow'; import { Connection } from './connection'; import { toNativeEphemeralServerConfig, DevServerConfig, TimeSkippingServerConfig } from './ephemeral-server'; import { ClientOptionsForTestEnv, TestEnvClient } from './client'; diff --git a/packages/worker/src/activity.ts b/packages/worker/src/activity.ts index 8774dd3aa..df0c10b1d 100644 --- a/packages/worker/src/activity.ts +++ b/packages/worker/src/activity.ts @@ -8,11 +8,15 @@ import { FAILURE_SOURCE, IllegalStateError, LoadedDataConverter, + MetricMeter, + MetricTags, SdkComponent, } from '@temporalio/common'; import { encodeErrorToFailure, encodeToPayload } from '@temporalio/common/lib/internal-non-workflow'; import { composeInterceptors } from '@temporalio/common/lib/interceptors'; import { isAbortError } from '@temporalio/common/lib/type-helpers'; +import { Logger, LoggerWithComposedMetadata } from '@temporalio/common/lib/logger'; +import { MetricMeterWithComposedTags } from '@temporalio/common/lib/metrics'; import { coresdk } from '@temporalio/proto'; import { ActivityExecuteInput, @@ -20,7 +24,6 @@ import { ActivityInterceptorsFactory, ActivityOutboundCallsInterceptor, } from './interceptors'; -import { Logger, withMetadata } from './logger'; const UNINITIALIZED = Symbol('UNINITIALIZED'); @@ -46,15 +49,23 @@ export class Activity { */ private readonly workerLogger; + /** + * Metric Meter with tags from this activity, including tags from interceptors. + */ + private readonly metricMeter; + constructor( public readonly info: Info, public readonly fn: ActivityFunction | undefined, public readonly dataConverter: LoadedDataConverter, public readonly heartbeatCallback: Context['heartbeat'], workerLogger: Logger, + workerMetricMeter: MetricMeter, interceptors: ActivityInterceptorsFactory[] ) { - this.workerLogger = withMetadata(workerLogger, () => this.getLogAttributes()); + this.workerLogger = LoggerWithComposedMetadata.compose(workerLogger, this.getLogAttributes.bind(this)); + this.metricMeter = MetricMeterWithComposedTags.compose(workerMetricMeter, this.getMetricTags.bind(this)); + const promise = new Promise((_, reject) => { this.cancel = (reason: CancelReason) => { this.cancelReason = reason; @@ -68,8 +79,9 @@ export class Activity { promise, this.abortController.signal, this.heartbeatCallback, - // This is the activity context logger - withMetadata(this.workerLogger, { sdkComponent: SdkComponent.activity }) + // This is the activity context logger, to be used exclusively from user code + LoggerWithComposedMetadata.compose(this.workerLogger, { sdkComponent: SdkComponent.activity }), + this.metricMeter ); // Prevent unhandled rejection promise.catch(() => undefined); @@ -89,6 +101,17 @@ export class Activity { return composeInterceptors(this.interceptors.outbound, 'getLogAttributes', (a) => a)(logAttributes); } + protected getMetricTags(): MetricTags { + const baseTags = { + namespace: this.info.workflowNamespace, + taskQueue: this.info.taskQueue, + activityType: this.info.activityType, + }; + // In case some interceptors use the metric meter while initializing... + if (this.interceptors == null) return baseTags; + return composeInterceptors(this.interceptors.outbound, 'getMetricTags', (a) => a)(baseTags); + } + /** * Actually executes the function. * diff --git a/packages/worker/src/interceptors.ts b/packages/worker/src/interceptors.ts index f278a82ee..52cc89c8f 100644 --- a/packages/worker/src/interceptors.ts +++ b/packages/worker/src/interceptors.ts @@ -7,7 +7,7 @@ */ import { Context as ActivityContext } from '@temporalio/activity'; -import { Headers, Next } from '@temporalio/common'; +import { Headers, MetricTags, Next } from '@temporalio/common'; export { Next, Headers }; @@ -41,6 +41,9 @@ export interface ActivityInboundCallsInterceptorFactory { /** Input for ActivityOutboundCallsInterceptor.getLogAttributes */ export type GetLogAttributesInput = Record; +/** Input for ActivityOutboundCallsInterceptor.getMetricTags */ +export type GetMetricTagsInput = MetricTags; + /** * Implement any of these methods to intercept Activity outbound calls */ @@ -51,6 +54,15 @@ export interface ActivityOutboundCallsInterceptor { * The attributes returned in this call are attached to every log message. */ getLogAttributes?: (input: GetLogAttributesInput, next: Next) => Record; + + /** + * Called once every time a metric is emitted from an Activity metric + * (ie. a metric created from {@link activity.metricMeter}). + * + * Tags returned by this hook are _prepended_ to tags defined at the metric level and tags defined + * on the emitter function itself. + */ + getMetricTags?: (input: GetMetricTagsInput, next: Next) => MetricTags; } export interface ActivityInterceptors { diff --git a/packages/worker/src/logger.ts b/packages/worker/src/logger.ts index 3f625538a..40288e4a5 100644 --- a/packages/worker/src/logger.ts +++ b/packages/worker/src/logger.ts @@ -28,13 +28,6 @@ export interface LogEntry { meta?: LogMetadata; } -/** - * @internal - */ -interface LoggerWithColorSupport extends Logger { - [loggerHasColorsSymbol]?: boolean; -} - export const LogTimestamp = Symbol.for('log_timestamp'); const severities: LogLevel[] = ['TRACE', 'DEBUG', 'INFO', 'WARN', 'ERROR']; @@ -112,74 +105,15 @@ export class DefaultLogger implements Logger { /** * @internal */ -export function hasColorSupport(logger: Logger): boolean { - return (logger as LoggerWithColorSupport)[loggerHasColorsSymbol] ?? false; -} - -export function withMetadata(logger: Logger, meta: LogMetadata | (() => LogMetadata)): Logger { - return new LoggerWithMetadata(logger, meta); +interface LoggerWithColorSupport extends Logger { + [loggerHasColorsSymbol]?: boolean; } -class LoggerWithMetadata implements Logger { - private parentLogger: Logger; - private metaChain: (LogMetadata | (() => LogMetadata))[]; - - constructor(parent: Logger, meta: LogMetadata | (() => LogMetadata)) { - // Flatten recusive LoggerWithMetadata instances - if (parent instanceof LoggerWithMetadata) { - this.parentLogger = parent.parentLogger; - this.metaChain = LoggerWithMetadata.appendToChain(parent.metaChain, meta); - } else { - this.parentLogger = parent; - this.metaChain = [meta]; - } - (this as LoggerWithColorSupport)[loggerHasColorsSymbol] = hasColorSupport(parent); - } - - log(level: LogLevel, message: string, meta?: LogMetadata): void { - this.parentLogger.log(level, message, this.resolveMetadata(meta)); - } - - trace(message: string, meta?: LogMetadata): void { - this.parentLogger.trace(message, this.resolveMetadata(meta)); - } - - debug(message: string, meta?: LogMetadata): void { - this.parentLogger.debug(message, this.resolveMetadata(meta)); - } - - info(message: string, meta?: LogMetadata): void { - this.parentLogger.info(message, this.resolveMetadata(meta)); - } - - warn(message: string, meta?: LogMetadata): void { - this.parentLogger.warn(message, this.resolveMetadata(meta)); - } - - error(message: string, meta?: LogMetadata): void { - this.parentLogger.error(message, this.resolveMetadata(meta)); - } - - private resolveMetadata(meta?: LogMetadata): LogMetadata { - const resolved = {}; - for (const contributor of this.metaChain) { - Object.assign(resolved, typeof contributor === 'function' ? contributor() : contributor); - } - Object.assign(resolved, meta); - return resolved; - } - - /** - * Append a metadata contributor to the chain, merging it with the former last contributor if both are plain objects - */ - private static appendToChain(chain: (LogMetadata | (() => LogMetadata))[], meta: LogMetadata | (() => LogMetadata)) { - if (chain.length === 0) return [meta]; - const last = chain[chain.length - 1]; - if (typeof last === 'object' && typeof meta === 'object') { - return [...chain.slice(0, -1), { ...last, ...meta }]; - } - return [...chain, meta]; - } +/** + * @internal + */ +export function hasColorSupport(logger: Logger): boolean { + return (logger as LoggerWithColorSupport)[loggerHasColorsSymbol] ?? false; } export interface FlushableLogger extends Logger { diff --git a/packages/worker/src/native-log-forward.ts b/packages/worker/src/runtime-logger.ts similarity index 99% rename from packages/worker/src/native-log-forward.ts rename to packages/worker/src/runtime-logger.ts index 8d09e38de..66c5eeb53 100644 --- a/packages/worker/src/native-log-forward.ts +++ b/packages/worker/src/runtime-logger.ts @@ -10,6 +10,7 @@ import { DefaultLogger, LogEntry, Logger, LogTimestamp } from './logger'; * logger, in the right order. * * @internal + * @hidden */ export class NativeLogCollector { /** diff --git a/packages/worker/src/runtime-metrics.ts b/packages/worker/src/runtime-metrics.ts new file mode 100644 index 000000000..bc9a7f0ea --- /dev/null +++ b/packages/worker/src/runtime-metrics.ts @@ -0,0 +1,172 @@ +import { + MetricCounter, + MetricGauge, + MetricHistogram, + MetricMeter, + MetricTags, + NumericMetricValueType, +} from '@temporalio/common'; +import { native } from '@temporalio/core-bridge'; + +export class RuntimeMetricMeter implements MetricMeter { + public constructor(protected runtime: native.Runtime) {} + + createCounter(name: string, unit: string = '', description: string = ''): MetricCounter { + const nativeMetric = native.newMetricCounter(this.runtime, name, unit, description); + return new RuntimeMetricCounter(nativeMetric, name, unit, description); + } + + createHistogram( + name: string, + valueType: NumericMetricValueType = 'int', + unit: string = '', + description: string = '' + ): MetricHistogram { + switch (valueType) { + case 'int': { + const nativeMetric = native.newMetricHistogram(this.runtime, name, unit, description); + return new RuntimeMetricHistogram(nativeMetric, name, unit, description); + } + case 'float': { + const nativeMetric = native.newMetricHistogramF64(this.runtime, name, unit, description); + return new RuntimeMetricHistogramF64(nativeMetric, name, unit, description); + } + } + } + + createGauge( + name: string, + valueType: NumericMetricValueType = 'int', + unit: string = '', + description: string = '' + ): MetricGauge { + switch (valueType) { + case 'int': { + const nativeMetric = native.newMetricGauge(this.runtime, name, unit, description); + return new RuntimeMetricGauge(nativeMetric, name, unit, description); + } + case 'float': { + const nativeMetric = native.newMetricGaugeF64(this.runtime, name, unit, description); + return new RuntimeMetricGaugeF64(nativeMetric, name, unit, description); + } + } + } + + withTags(_extraTags: MetricTags): MetricMeter { + // Tags composition is handled by a MetricMeterWithComposedTags wrapper over this one + throw new Error('withTags is not supported directly on RuntimeMetricMeter'); + } +} + +class RuntimeMetricCounter implements MetricCounter { + public constructor( + private readonly native: native.MetricCounter, + public readonly name: string, + public readonly unit: string, + public readonly description: string + ) {} + + add(value: number, tags: MetricTags = {}): void { + if (value < 0) { + throw new Error(`MetricCounter value must be non-negative (got ${value})`); + } + native.addMetricCounterValue(this.native, value, JSON.stringify(tags)); + } + + withTags(_tags: MetricTags): MetricCounter { + // Tags composition is handled by a MetricMeterWithComposedTags wrapper over this one + throw new Error('withTags is not supported directly on RuntimeMetricCounter'); + } +} + +class RuntimeMetricHistogram implements MetricHistogram { + public readonly valueType = 'int'; + + public constructor( + private readonly native: native.MetricHistogram, + public readonly name: string, + public readonly unit: string, + public readonly description: string + ) {} + + record(value: number, tags: MetricTags = {}): void { + if (value < 0) { + throw new Error(`MetricHistogram value must be non-negative (got ${value})`); + } + native.recordMetricHistogramValue(this.native, value, JSON.stringify(tags)); + } + + withTags(_tags: MetricTags): MetricHistogram { + // Tags composition is handled by a MetricMeterWithComposedTags wrapper over this one + throw new Error('withTags is not supported directly on RuntimeMetricHistogram'); + } +} + +class RuntimeMetricHistogramF64 implements MetricHistogram { + public readonly valueType = 'float'; + + public constructor( + private readonly native: native.MetricHistogramF64, + public readonly name: string, + public readonly unit: string, + public readonly description: string + ) {} + + record(value: number, tags: MetricTags = {}): void { + if (value < 0) { + throw new Error(`MetricHistogram value must be non-negative (got ${value})`); + } + native.recordMetricHistogramF64Value(this.native, value, JSON.stringify(tags)); + } + + withTags(_tags: MetricTags): MetricHistogram { + // Tags composition is handled by a MetricMeterWithComposedTags wrapper over this one + throw new Error('withTags is not supported directly on RuntimeMetricHistogramF64'); + } +} + +class RuntimeMetricGauge implements MetricGauge { + public readonly valueType = 'int'; + + public constructor( + private readonly native: native.MetricGauge, + public readonly name: string, + public readonly unit: string, + public readonly description: string + ) {} + + set(value: number, tags: MetricTags = {}): void { + if (value < 0) { + throw new Error(`MetricGauge value must be non-negative (got ${value})`); + } + native.setMetricGaugeValue(this.native, value, JSON.stringify(tags)); + } + + withTags(_tags: MetricTags): MetricGauge { + // Tags composition is handled by a MetricMeterWithComposedTags wrapper over this one + throw new Error('withTags is not supported directly on RuntimeMetricGauge'); + } +} + +class RuntimeMetricGaugeF64 implements MetricGauge { + public readonly valueType = 'float'; + + public constructor( + private readonly native: native.MetricGaugeF64, + public readonly name: string, + public readonly unit: string, + public readonly description: string + ) {} + + set(value: number, tags: MetricTags = {}): void { + if (value < 0) { + throw new Error(`MetricGauge value must be non-negative (got ${value})`); + } + native.setMetricGaugeF64Value(this.native, value, JSON.stringify(tags)); + } + + withTags(_tags: MetricTags): MetricGauge { + // Tags composition is handled by a MetricMeterWithComposedTags wrapper over this one + throw new Error('withTags is not supported directly on RuntimeMetricGaugeF64'); + } +} diff --git a/packages/worker/src/runtime-options.ts b/packages/worker/src/runtime-options.ts index 83f3675f7..5597c8268 100644 --- a/packages/worker/src/runtime-options.ts +++ b/packages/worker/src/runtime-options.ts @@ -2,7 +2,7 @@ import { native } from '@temporalio/core-bridge'; import { Logger, LogLevel } from '@temporalio/common'; import { Duration, msToNumber } from '@temporalio/common/lib/time'; import { DefaultLogger } from './logger'; -import { NativeLogCollector } from './native-log-forward'; +import { NativeLogCollector } from './runtime-logger'; /** * Options used to create a Temporal Runtime. diff --git a/packages/worker/src/runtime.ts b/packages/worker/src/runtime.ts index 82dcf7142..c70c7ab8e 100644 --- a/packages/worker/src/runtime.ts +++ b/packages/worker/src/runtime.ts @@ -2,11 +2,13 @@ import * as v8 from 'node:v8'; import * as fs from 'node:fs'; import * as os from 'node:os'; import { native } from '@temporalio/core-bridge'; -import { filterNullAndUndefined } from '@temporalio/common/lib/internal-non-workflow'; -import { IllegalStateError, Logger, SdkComponent } from '@temporalio/common'; +import { filterNullAndUndefined } from '@temporalio/common/lib/internal-workflow'; +import { IllegalStateError, Logger, noopMetricMeter, SdkComponent, MetricMeter } from '@temporalio/common'; import { temporal } from '@temporalio/proto'; import { History } from '@temporalio/common/lib/proto-utils'; +import { MetricMeterWithComposedTags } from '@temporalio/common/lib/metrics'; import { isFlushableLogger } from './logger'; +import { RuntimeMetricMeter } from './runtime-metrics'; import { toNativeClientOptions, NativeConnectionOptions } from './connection-options'; import { byteArrayToBuffer, toMB } from './utils'; import { CompiledRuntimeOptions, compileOptions, RuntimeOptions } from './runtime-options'; @@ -23,6 +25,9 @@ type TrackedNativeObject = native.Client | native.Worker | native.EphemeralServe export class Runtime { public readonly logger: Logger; + /** The metric meter associated with this runtime. */ + public readonly metricMeter: MetricMeter; + /** Track the number of pending creation calls into the tokio runtime to prevent shut down */ protected pendingCreations = 0; /** Track the registered native objects to automatically shutdown when all have been deregistered */ @@ -45,6 +50,10 @@ export class Runtime { public readonly options: CompiledRuntimeOptions ) { this.logger = options.logger; + this.metricMeter = options.telemetryOptions.metricsExporter + ? MetricMeterWithComposedTags.compose(new RuntimeMetricMeter(this.native), {}, true) + : noopMetricMeter; + this.checkHeapSizeLimit(); this.setupShutdownHook(); } @@ -267,7 +276,9 @@ export class Runtime { if (this.native === undefined) return; try { if (Runtime._instance === this) delete Runtime._instance; + (this as any).metricMeter = noopMetricMeter; this.teardownShutdownHook(); + // FIXME(JWH): I think we no longer need this, but will have to thoroughly validate. native.runtimeShutdown(this.native); this.flushLogs(); } finally { diff --git a/packages/worker/src/worker-options.ts b/packages/worker/src/worker-options.ts index 354afd188..6b27f0be0 100644 --- a/packages/worker/src/worker-options.ts +++ b/packages/worker/src/worker-options.ts @@ -5,6 +5,7 @@ import { ActivityFunction, DataConverter, LoadedDataConverter, + MetricMeter, VersioningBehavior, WorkerDeploymentVersion, } from '@temporalio/common'; @@ -18,6 +19,7 @@ import { NativeConnection } from './connection'; import { CompiledWorkerInterceptors, WorkerInterceptors } from './interceptors'; import { Logger } from './logger'; import { initLoggerSink } from './workflow/logger'; +import { initMetricSink } from './workflow/metrics'; import { Runtime } from './runtime'; import { InjectedSinks } from './sinks'; import { MiB } from './utils'; @@ -800,7 +802,11 @@ export type CompiledWorkerOptionsWithBuildId = CompiledWorkerOptions & { buildId: string; }; -function addDefaultWorkerOptions(options: WorkerOptions, logger: Logger): WorkerOptionsWithDefaults { +function addDefaultWorkerOptions( + options: WorkerOptions, + logger: Logger, + metricMeter: MetricMeter +): WorkerOptionsWithDefaults { const { buildId, // eslint-disable-line deprecation/deprecation useVersioning, // eslint-disable-line deprecation/deprecation @@ -909,6 +915,7 @@ function addDefaultWorkerOptions(options: WorkerOptions, logger: Logger): Worker nonStickyToStickyPollRatio: nonStickyToStickyPollRatio ?? 0.2, sinks: { ...initLoggerSink(logger), + ...initMetricSink(metricMeter), // Fix deprecated registration of the 'defaultWorkerLogger' sink ...(sinks?.defaultWorkerLogger ? { __temporal_logger: sinks.defaultWorkerLogger } : {}), ...sinks, @@ -919,8 +926,12 @@ function addDefaultWorkerOptions(options: WorkerOptions, logger: Logger): Worker }; } -export function compileWorkerOptions(rawOpts: WorkerOptions, logger: Logger): CompiledWorkerOptions { - const opts = addDefaultWorkerOptions(rawOpts, logger); +export function compileWorkerOptions( + rawOpts: WorkerOptions, + logger: Logger, + metricMeter: MetricMeter +): CompiledWorkerOptions { + const opts = addDefaultWorkerOptions(rawOpts, logger, metricMeter); if (opts.maxCachedWorkflows !== 0 && opts.maxCachedWorkflows < 2) { logger.warn('maxCachedWorkflows must be either 0 (ie. cache is disabled) or greater than 1. Defaulting to 2.'); opts.maxCachedWorkflows = 2; diff --git a/packages/worker/src/worker.ts b/packages/worker/src/worker.ts index d93a058cf..7a1487d72 100644 --- a/packages/worker/src/worker.ts +++ b/packages/worker/src/worker.ts @@ -32,6 +32,7 @@ import { ensureApplicationFailure, TypedSearchAttributes, decodePriority, + MetricMeter, } from '@temporalio/common'; import { decodeArrayFromPayloads, @@ -50,6 +51,7 @@ import { tsToDate, tsToMs, } from '@temporalio/common/lib/time'; +import { LoggerWithComposedMetadata } from '@temporalio/common/lib/logger'; import { errorMessage, NonNullableObject, OmitFirstParam } from '@temporalio/common/lib/type-helpers'; import { workflowLogAttributes } from '@temporalio/workflow/lib/logs'; import { native } from '@temporalio/core-bridge'; @@ -58,7 +60,7 @@ import { type SinkCall, type WorkflowInfo } from '@temporalio/workflow'; import { Activity, CancelReason, activityLogAttributes } from './activity'; import { extractNativeClient, extractReferenceHolders, InternalNativeConnection, NativeConnection } from './connection'; import { ActivityExecuteInput } from './interceptors'; -import { Logger, withMetadata } from './logger'; +import { Logger } from './logger'; import pkg from './pkg'; import { EvictionReason, @@ -149,8 +151,12 @@ export interface NativeReplayHandle { } interface NativeWorkerConstructor { - create(connection: NativeConnection, options: CompiledWorkerOptionsWithBuildId): Promise; - createReplay(options: CompiledWorkerOptionsWithBuildId): Promise; + create( + runtime: Runtime, + connection: NativeConnection, + options: CompiledWorkerOptionsWithBuildId + ): Promise; + createReplay(runtime: Runtime, options: CompiledWorkerOptionsWithBuildId): Promise; } interface WorkflowWithLogAttributes { @@ -177,16 +183,18 @@ export class NativeWorker implements NativeWorkerLike { public readonly initiateShutdown: OmitFirstParam; public static async create( + runtime: Runtime, connection: NativeConnection, options: CompiledWorkerOptionsWithBuildId ): Promise { - const runtime = Runtime.instance(); const nativeWorker = await runtime.registerWorker(extractNativeClient(connection), toNativeWorkerOptions(options)); return new NativeWorker(runtime, nativeWorker); } - public static async createReplay(options: CompiledWorkerOptionsWithBuildId): Promise { - const runtime = Runtime.instance(); + public static async createReplay( + runtime: Runtime, + options: CompiledWorkerOptionsWithBuildId + ): Promise { const [worker, historyPusher] = await runtime.createReplayWorker(toNativeWorkerOptions(options)); return { worker: new NativeWorker(runtime, worker), @@ -460,12 +468,16 @@ export class Worker { */ public static async create(options: WorkerOptions): Promise { const runtime = Runtime.instance(); - const logger = withMetadata(runtime.logger, { + const logger = LoggerWithComposedMetadata.compose(runtime.logger, { sdkComponent: SdkComponent.worker, taskQueue: options.taskQueue ?? 'default', }); + const metricMeter = runtime.metricMeter.withTags({ + namespace: options.namespace ?? 'default', + taskQueue: options.taskQueue ?? 'default', + }); const nativeWorkerCtor: NativeWorkerConstructor = this.nativeWorkerCtor; - const compiledOptions = compileWorkerOptions(options, logger); + const compiledOptions = compileWorkerOptions(options, logger, metricMeter); logger.debug('Creating worker', { options: { ...compiledOptions, @@ -490,7 +502,7 @@ export class Worker { let nativeWorker: NativeWorkerLike; const compiledOptionsWithBuildId = addBuildIdIfMissing(compiledOptions, bundle?.code); try { - nativeWorker = await nativeWorkerCtor.create(connection, compiledOptionsWithBuildId); + nativeWorker = await nativeWorkerCtor.create(runtime, connection, compiledOptionsWithBuildId); } catch (err) { // We just created this connection, close it if (!options.connection) { @@ -499,7 +511,15 @@ export class Worker { throw err; } extractReferenceHolders(connection).add(nativeWorker); - return new this(runtime, nativeWorker, workflowCreator, compiledOptionsWithBuildId, logger, connection); + return new this( + runtime, + nativeWorker, + workflowCreator, + compiledOptionsWithBuildId, + logger, + metricMeter, + connection + ); } protected static async createWorkflowCreator( @@ -567,7 +587,7 @@ export class Worker { histories: ReplayHistoriesIterable ): AsyncIterableIterator { const [worker, pusher] = await this.constructReplayWorker(options); - const rt = Runtime.instance(); + const rt = worker.runtime; const evictions = on(worker.evictionsEmitter, 'eviction') as AsyncIterableIterator<[EvictionWithRunID]>; const runPromise = worker.run().then(() => { throw new ShutdownError('Worker was shutdown'); @@ -648,19 +668,26 @@ export class Worker { }; this.replayWorkerCount++; const runtime = Runtime.instance(); - const logger = withMetadata(runtime.logger, { + const logger = LoggerWithComposedMetadata.compose(runtime.logger, { sdkComponent: 'worker', taskQueue: fixedUpOptions.taskQueue, }); - const compiledOptions = compileWorkerOptions(fixedUpOptions, logger); + const metricMeter = runtime.metricMeter.withTags({ + namespace: 'default', + taskQueue: fixedUpOptions.taskQueue, + }); + const compiledOptions = compileWorkerOptions(fixedUpOptions, logger, metricMeter); const bundle = await this.getOrCreateBundle(compiledOptions, logger); if (!bundle) { throw new TypeError('ReplayWorkerOptions must contain workflowsPath or workflowBundle'); } const workflowCreator = await this.createWorkflowCreator(bundle, compiledOptions, logger); - const replayHandle = await nativeWorkerCtor.createReplay(addBuildIdIfMissing(compiledOptions, bundle.code)); + const replayHandle = await nativeWorkerCtor.createReplay( + runtime, + addBuildIdIfMissing(compiledOptions, bundle.code) + ); return [ - new this(runtime, replayHandle.worker, workflowCreator, compiledOptions, logger, undefined, true), + new this(runtime, replayHandle.worker, workflowCreator, compiledOptions, logger, metricMeter, undefined, true), replayHandle.historyPusher, ]; } @@ -727,6 +754,7 @@ export class Worker { public readonly options: CompiledWorkerOptions, /** Logger bound to 'sdkComponent: worker' */ protected readonly logger: Logger, + protected readonly metricMeter: MetricMeter, protected readonly connection?: NativeConnection, protected readonly isReplayWorker: boolean = false ) { @@ -964,6 +992,7 @@ export class Worker { }, }), this.logger, + this.metricMeter, this.options.interceptors.activity ); output = { type: 'run', activity, input }; diff --git a/packages/worker/src/workflow/logger.ts b/packages/worker/src/workflow/logger.ts index 9c1c9c216..37508d44f 100644 --- a/packages/worker/src/workflow/logger.ts +++ b/packages/worker/src/workflow/logger.ts @@ -1,13 +1,14 @@ import { type LoggerSinksInternal } from '@temporalio/workflow/lib/logs'; import { SdkComponent } from '@temporalio/common'; +import { LoggerWithComposedMetadata } from '@temporalio/common/lib/logger'; import { type InjectedSinks } from '../sinks'; -import { withMetadata, type Logger } from '../logger'; +import { type Logger } from '../logger'; /** * Injects a logger sink that forwards to the worker's logger */ export function initLoggerSink(logger: Logger): InjectedSinks { - logger = withMetadata(logger, { sdkComponent: SdkComponent.workflow }); + logger = LoggerWithComposedMetadata.compose(logger, { sdkComponent: SdkComponent.workflow }); return { __temporal_logger: { trace: { diff --git a/packages/worker/src/workflow/metrics.ts b/packages/worker/src/workflow/metrics.ts new file mode 100644 index 000000000..40a367215 --- /dev/null +++ b/packages/worker/src/workflow/metrics.ts @@ -0,0 +1,72 @@ +import { NumericMetricValueType, type MetricMeter, type MetricTags, Metric } from '@temporalio/common'; +import { MetricSinks } from '@temporalio/workflow/lib/metrics'; +import { InjectedSinks } from '../sinks'; + +export function initMetricSink(metricMeter: MetricMeter): InjectedSinks { + // Creation of a new metric object isn't quite cheap, requiring a call down the bridge to the + // actual Metric Meter. Unfortunately, the workflow sandbox execution model doesn't allow to + // reuse metric objects from the caller side. We therefore maintain local caches of metric + // objects to avoid creating a new one for every single metric value being emitted. + const cache = new Map>(); + + function getOrCreate(key: string, create: () => T): T { + let value = cache.get(key)?.deref(); + if (value === undefined) { + value = create(); + cache.set(key, new WeakRef(value)); + } + return value as T; + } + + return { + __temporal_metrics: { + addMetricCounterValue: { + fn( + _, + metricName: string, + unit: string | undefined, + description: string | undefined, + value: number, + attrs: MetricTags + ) { + const cacheKey = `${metricName}:counter`; + const createFn = () => metricMeter.createCounter(metricName, unit, description); + getOrCreate(cacheKey, createFn).add(value, attrs); + }, + callDuringReplay: false, + }, + recordMetricHistogramValue: { + fn( + _, + metricName: string, + valueType: NumericMetricValueType, + unit: string | undefined, + description: string | undefined, + value: number, + attrs: MetricTags + ) { + const cacheKey = `histogram:${valueType}:${metricName}`; + const createFn = () => metricMeter.createHistogram(metricName, valueType, unit, description); + getOrCreate(cacheKey, createFn).record(value, attrs); + }, + callDuringReplay: false, + }, + setMetricGaugeValue: { + fn( + _, + metricName: string, + valueType: NumericMetricValueType, + unit: string | undefined, + description: string | undefined, + value: number, + attrs: MetricTags + ) { + const cacheKey = `gauge:${valueType}:${metricName}`; + const createFn = () => metricMeter.createGauge(metricName, valueType, unit, description); + getOrCreate(cacheKey, createFn).set(value, attrs); + }, + callDuringReplay: false, + }, + }, + }; +} diff --git a/packages/workflow/src/index.ts b/packages/workflow/src/index.ts index 79882aa5a..d64e35615 100644 --- a/packages/workflow/src/index.ts +++ b/packages/workflow/src/index.ts @@ -107,6 +107,7 @@ export { log } from './logs'; export { Trigger } from './trigger'; export * from './workflow'; export { ChildWorkflowHandle, ExternalWorkflowHandle } from './workflow-handle'; +export { metricMeter } from './metrics'; // Anything below this line is deprecated diff --git a/packages/workflow/src/interceptors.ts b/packages/workflow/src/interceptors.ts index 1ae70c4b5..978fd5a08 100644 --- a/packages/workflow/src/interceptors.ts +++ b/packages/workflow/src/interceptors.ts @@ -6,7 +6,15 @@ * @module */ -import { ActivityOptions, Headers, LocalActivityOptions, Next, Timestamp, WorkflowExecution } from '@temporalio/common'; +import { + ActivityOptions, + Headers, + LocalActivityOptions, + MetricTags, + Next, + Timestamp, + WorkflowExecution, +} from '@temporalio/common'; import type { coresdk } from '@temporalio/proto'; import { ChildWorkflowOptionsWithDefaults, ContinueAsNewOptions } from './interfaces'; @@ -139,6 +147,9 @@ export interface SignalWorkflowInput { /** Input for WorkflowOutboundCallsInterceptor.getLogAttributes */ export type GetLogAttributesInput = Record; +/** Input for WorkflowOutboundCallsInterceptor.getMetricTags */ +export type GetMetricTagsInput = MetricTags; + /** * Implement any of these methods to intercept Workflow code calls to the Temporal APIs, like scheduling an activity and starting a timer */ @@ -189,6 +200,15 @@ export interface WorkflowOutboundCallsInterceptor { * The attributes returned in this call are attached to every log message. */ getLogAttributes?: (input: GetLogAttributesInput, next: Next) => Record; + + /** + * Called once every time a metric is emitted from a Workflow metric (ie. a metric created + * from {@link workflow.metricMeter}). + * + * Tags returned by this hook are _prepended_ to tags defined at the metric level and tags + * defined on the emitter function itself. + */ + getMetricTags?: (input: GetMetricTagsInput, next: Next) => MetricTags; } /** Input for WorkflowInternalsInterceptor.concludeActivation */ diff --git a/packages/workflow/src/metrics.ts b/packages/workflow/src/metrics.ts new file mode 100644 index 000000000..b13819263 --- /dev/null +++ b/packages/workflow/src/metrics.ts @@ -0,0 +1,191 @@ +import { + MetricCounter, + MetricGauge, + MetricHistogram, + MetricMeter, + MetricMeterWithComposedTags, + MetricTags, + NumericMetricValueType, +} from '@temporalio/common'; +import { composeInterceptors } from '@temporalio/common/lib/interceptors'; +import { proxySinks, Sink, Sinks } from './sinks'; +import { workflowInfo } from './workflow'; +import { assertInWorkflowContext } from './global-attributes'; + +class WorkflowMetricMeterImpl implements MetricMeter { + constructor() {} + + createCounter(name: string, unit?: string, description?: string): MetricCounter { + assertInWorkflowContext("Workflow's `metricMeter` can only be used while in Workflow Context"); + return new WorkflowMetricCounter(name, unit, description); + } + + createHistogram( + name: string, + valueType: NumericMetricValueType = 'int', + unit?: string, + description?: string + ): MetricHistogram { + assertInWorkflowContext("Workflow's `metricMeter` can only be used while in Workflow Context"); + return new WorkflowMetricHistogram(name, valueType, unit, description); + } + + createGauge( + name: string, + valueType: NumericMetricValueType = 'int', + unit?: string, + description?: string + ): MetricGauge { + assertInWorkflowContext("Workflow's `metricMeter` can only be used while in Workflow Context"); + return new WorkflowMetricGauge(name, valueType, unit, description); + } + + withTags(_tags: MetricTags): MetricMeter { + assertInWorkflowContext("Workflow's `metricMeter` can only be used while in Workflow Context"); + // Tags composition is handled by a MetricMeterWithComposedTags wrapper over this one + throw new Error(`withTags is not supported directly on WorkflowMetricMeter`); + } +} + +class WorkflowMetricCounter implements MetricCounter { + constructor( + public readonly name: string, + public readonly unit: string | undefined, + public readonly description: string | undefined + ) {} + + add(value: number, extraTags: MetricTags = {}): void { + if (value < 0) { + throw new Error(`MetricCounter value must be non-negative (got ${value})`); + } + if (!workflowInfo().unsafe.isReplaying) { + metricSink.addMetricCounterValue(this.name, this.unit, this.description, value, extraTags); + } + } + + withTags(_tags: MetricTags): MetricCounter { + // Tags composition is handled by a MetricMeterWithComposedTags wrapper over this one + throw new Error(`withTags is not supported directly on WorkflowMetricCounter`); + } +} + +class WorkflowMetricHistogram implements MetricHistogram { + constructor( + public readonly name: string, + public readonly valueType: NumericMetricValueType, + public readonly unit: string | undefined, + public readonly description: string | undefined + ) {} + + record(value: number, extraTags: MetricTags = {}): void { + if (value < 0) { + throw new Error(`MetricHistogram value must be non-negative (got ${value})`); + } + if (!workflowInfo().unsafe.isReplaying) { + metricSink.recordMetricHistogramValue(this.name, this.valueType, this.unit, this.description, value, extraTags); + } + } + + withTags(_tags: MetricTags): MetricHistogram { + // Tags composition is handled by a MetricMeterWithComposedTags wrapper over this one + throw new Error(`withTags is not supported directly on WorkflowMetricHistogram`); + } +} + +class WorkflowMetricGauge implements MetricGauge { + constructor( + public readonly name: string, + public readonly valueType: NumericMetricValueType, + public readonly unit: string | undefined, + public readonly description: string | undefined + ) {} + + set(value: number, tags?: MetricTags): void { + if (value < 0) { + throw new Error(`MetricGauge value must be non-negative (got ${value})`); + } + if (!workflowInfo().unsafe.isReplaying) { + metricSink.setMetricGaugeValue(this.name, this.valueType, this.unit, this.description, value, tags ?? {}); + } + } + + withTags(_tags: MetricTags): MetricGauge { + // Tags composition is handled by a MetricMeterWithComposedTags wrapper over this one + throw new Error(`withTags is not supported directly on WorkflowMetricGauge`); + } +} + +//////////////////////////////////////////////////////////////////////////////////////////////////// + +// Note: given that forwarding metrics outside of the sanbox can be quite chatty and add non +// negligeable overhead, we eagerly check for `isReplaying` and completely skip doing sink +// calls if we are replaying. +const metricSink = proxySinks().__temporal_metrics; + +/** + * Sink interface for forwarding metrics from the Workflow sandbox to the Worker. + * + * These sink functions are not intended to be called directly from workflow code; instead, + * developers should use the `metricMeter` object exposed to workflow code by the SDK, which + * provides an API that is easier to work with. + * + * This sink interface is also not meant to be implemented by user. + * + * @hidden + * @internal Users should not implement this interface, nor use it directly. Use `metricMeter` instead. + */ +export interface MetricSinks extends Sinks { + __temporal_metrics: WorkflowMetricMeter; +} + +/** + * @hidden + * @internal Users should not implement this interface, nor use it directly. Use `metricMeter` instead. + */ +export interface WorkflowMetricMeter extends Sink { + addMetricCounterValue( + metricName: string, + unit: string | undefined, + description: string | undefined, + value: number, + attrs: MetricTags + ): void; + + recordMetricHistogramValue( + metricName: string, + valueType: NumericMetricValueType, + unit: string | undefined, + description: string | undefined, + value: number, + attrs: MetricTags + ): void; + + setMetricGaugeValue( + metricName: string, + valueType: NumericMetricValueType, + unit: string | undefined, + description: string | undefined, + value: number, + attrs: MetricTags + ): void; +} + +/** + * A MetricMeter that can be used to emit metrics from within a Workflow. + * + * @experimental The Metric API is an experimental feature and may be subject to change. + */ +export const metricMeter: MetricMeter = MetricMeterWithComposedTags.compose( + new WorkflowMetricMeterImpl(), + () => { + const activator = assertInWorkflowContext('Workflow.metricMeter may only be used from workflow context.'); + const getMetricTags = composeInterceptors(activator.interceptors.outbound, 'getMetricTags', (a) => a); + + const info = activator.info; + return getMetricTags({ + // namespace and taskQueue will be added by the Worker + workflowType: info.workflowType, + }); + }, + true +);