From d5bf6caefdafaba5c69a7b6f13334741029eac35 Mon Sep 17 00:00:00 2001 From: James Watkins-Harvey Date: Thu, 21 Aug 2025 11:36:27 -0400 Subject: [PATCH 1/2] Expose Client on Activity Context + impl all services over NativeConnection --- packages/activity/package.json | 1 + packages/activity/src/index.ts | 44 +++++ packages/activity/tsconfig.json | 2 +- packages/client/src/base-client.ts | 2 +- packages/client/src/connection.ts | 51 +++++- packages/client/src/types.ts | 5 +- packages/core-bridge/src/client.rs | 154 +++++++++++++++- packages/core-bridge/ts/native.ts | 8 +- packages/test/src/activities/helpers.ts | 15 +- packages/test/src/activities/interceptors.ts | 33 ---- .../src/helpers-integration-multi-codec.ts | 6 - packages/test/src/helpers-integration.ts | 38 ++-- packages/test/src/helpers.ts | 26 +++ packages/test/src/load/worker.ts | 10 -- .../test/src/test-integration-split-one.ts | 2 - .../test/src/test-integration-split-two.ts | 45 +++++ packages/test/src/test-local-activities.ts | 5 +- packages/test/src/test-native-connection.ts | 169 +++++++++++++++++- packages/test/src/test-otel.ts | 10 +- packages/testing/src/client.ts | 77 ++++---- packages/testing/src/connection.ts | 47 ----- packages/testing/src/index.ts | 9 +- .../src/mocking-activity-environment.ts | 3 + .../src/testing-workflow-environment.ts | 45 +++-- packages/worker/src/activity.ts | 30 +++- packages/worker/src/connection.ts | 86 +++++++-- packages/worker/src/interceptors.ts | 9 + packages/worker/src/worker-options.ts | 13 +- packages/worker/src/worker.ts | 15 ++ 29 files changed, 716 insertions(+), 244 deletions(-) delete mode 100644 packages/test/src/activities/interceptors.ts delete mode 100644 packages/testing/src/connection.ts diff --git a/packages/activity/package.json b/packages/activity/package.json index 5fcfae652..403d48c45 100644 --- a/packages/activity/package.json +++ b/packages/activity/package.json @@ -13,6 +13,7 @@ "author": "Temporal Technologies Inc. ", "license": "MIT", "dependencies": { + "@temporalio/client": "file:../client", "@temporalio/common": "file:../common", "abort-controller": "^3.0.0" }, diff --git a/packages/activity/src/index.ts b/packages/activity/src/index.ts index 32f80da13..c405a3a8c 100644 --- a/packages/activity/src/index.ts +++ b/packages/activity/src/index.ts @@ -78,10 +78,12 @@ import { MetricMeter, Priority, ActivityCancellationDetails, + IllegalStateError, } from '@temporalio/common'; import { msToNumber } from '@temporalio/common/lib/time'; import { SymbolBasedInstanceOfError } from '@temporalio/common/lib/type-helpers'; import { ActivityCancellationDetailsHolder } from '@temporalio/common/lib/activity-cancellation-details'; +import { Client } from '@temporalio/client'; export { ActivityFunction, @@ -273,6 +275,11 @@ export class Context { */ protected readonly heartbeatFn: (details?: any) => void; + /** + * The Worker's client, passed down through Activity context. + */ + protected readonly _client: Client | undefined; + /** * The logger for this Activity. * @@ -313,6 +320,7 @@ export class Context { cancelled: Promise, cancellationSignal: AbortSignal, heartbeat: (details?: any) => void, + client: Client | undefined, log: Logger, metricMeter: MetricMeter, details: ActivityCancellationDetailsHolder @@ -321,6 +329,7 @@ export class Context { this.cancelled = cancelled; this.cancellationSignal = cancellationSignal; this.heartbeatFn = heartbeat; + this._client = client; this.log = log; this.metricMeter = metricMeter; this._cancellationDetails = details; @@ -351,6 +360,25 @@ export class Context { this.heartbeatFn(details); }; + /** + * A Temporal Client, bound to the same Temporal Namespace as the Worker executing this Activity. + * + * May throw an {@link IllegalStateError} if the Activity is running inside a `MockActivityEnvironment` + * that was created without a Client. + * + * @experimental Client support over `NativeConnection` is experimental. Error handling may be + * incomplete or different from what would be observed using a {@link Connection} + * instead. Client doesn't support cancellation through a Signal. + */ + public get client(): Client { + if (this._client === undefined) { + throw new IllegalStateError( + 'No Client available. This may be a MockActivityEnvironment that was created without a Client.' + ); + } + return this._client; + } + /** * Helper function for sleeping in an Activity. * @param ms Sleep duration: number of milliseconds or {@link https://www.npmjs.com/package/ms | ms-formatted string} @@ -481,6 +509,22 @@ export function cancellationSignal(): AbortSignal { return Context.current().cancellationSignal; } +/** + * A Temporal Client, bound to the same Temporal Namespace as the Worker executing this Activity. + * + * May throw an {@link IllegalStateError} if the Activity is running inside a `MockActivityEnvironment` + * that was created without a Client. + * + * This is a shortcut for `Context.current().client` (see {@link Context.client}). + * + * @experimental Client support over `NativeConnection` is experimental. Error handling may be + * incomplete or different from what would be observed using a {@link Connection} + * instead. Client doesn't support cancellation through a Signal. + */ +export function getClient(): Client { + return Context.current().client; +} + /** * Get the metric meter for the current activity, with activity-specific tags. * diff --git a/packages/activity/tsconfig.json b/packages/activity/tsconfig.json index 328970e2d..1264e4313 100644 --- a/packages/activity/tsconfig.json +++ b/packages/activity/tsconfig.json @@ -4,6 +4,6 @@ "outDir": "./lib", "rootDir": "./src" }, - "references": [{ "path": "../common" }], + "references": [{ "path": "../common" }, { "path": "../client" }], "include": ["./src/**/*.ts"] } diff --git a/packages/client/src/base-client.ts b/packages/client/src/base-client.ts index dd6ef3a14..9e865a3b1 100644 --- a/packages/client/src/base-client.ts +++ b/packages/client/src/base-client.ts @@ -53,7 +53,7 @@ export function defaultBaseClientOptions(): WithDefaults { export class BaseClient { /** - * The underlying {@link Connection | connection} used by this client. + * The underlying {@link Connection | connection} or {@link NativeConnection | native connection} used by this client. * * Clients are cheap to create, but connections are expensive. Where it makes sense, * a single connection may and should be reused by multiple `Client`s. diff --git a/packages/client/src/connection.ts b/packages/client/src/connection.ts index b6a623e3c..4e18e430b 100644 --- a/packages/client/src/connection.ts +++ b/packages/client/src/connection.ts @@ -12,7 +12,7 @@ import { type temporal } from '@temporalio/proto'; import { isGrpcServiceError, ServiceError } from './errors'; import { defaultGrpcRetryOptions, makeGrpcRetryInterceptor } from './grpc-retry'; import pkg from './pkg'; -import { CallContext, HealthService, Metadata, OperatorService, WorkflowService } from './types'; +import { CallContext, HealthService, Metadata, OperatorService, TestService, WorkflowService } from './types'; /** * The default Temporal Server's TCP port for public gRPC connections. @@ -138,6 +138,23 @@ export type ConnectionOptionsWithDefaults = Required< connectTimeoutMs: number; }; +/** + * A symbol used to attach extra, SDK-internal connection options. + * + * @internal + * @hidden + */ +export const InternalConnectionOptionsSymbol = Symbol('__temporal_internal_connection_options'); +export type InternalConnectionOptions = ConnectionOptions & { + [InternalConnectionOptionsSymbol]?: { + /** + * Indicate whether the `TestService` should be enabled on this connection. This is set to true + * on connections created internally by `TestWorkflowEnvironment.createTimeSkipping()`. + */ + supportsTestService?: boolean; + }; +}; + export const LOCAL_TARGET = 'localhost:7233'; function addDefaults(options: ConnectionOptions): ConnectionOptionsWithDefaults { @@ -239,6 +256,13 @@ export interface ConnectionCtorOptions { */ readonly operatorService: OperatorService; + /** + * Raw gRPC access to the Temporal test service. + * + * Will be `undefined` if connected to a server that does not support the test service. + */ + readonly testService: TestService | undefined; + /** * Raw gRPC access to the standard gRPC {@link https://github.com/grpc/grpc/blob/92f58c18a8da2728f571138c37760a721c8915a2/doc/health-checking.md | health service}. */ @@ -286,6 +310,13 @@ export class Connection { */ public readonly operatorService: OperatorService; + /** + * Raw gRPC access to the Temporal test service. + * + * Will be `undefined` if connected to a server that does not support the test service. + */ + public readonly testService: TestService | undefined; + /** * Raw gRPC access to the standard gRPC {@link https://github.com/grpc/grpc/blob/92f58c18a8da2728f571138c37760a721c8915a2/doc/health-checking.md | health service}. */ @@ -326,6 +357,7 @@ export class Connection { apiKeyFnRef, }); const workflowService = WorkflowService.create(workflowRpcImpl, false, false); + const operatorRpcImpl = this.generateRPCImplementation({ serviceName: 'temporal.api.operatorservice.v1.OperatorService', client, @@ -335,6 +367,20 @@ export class Connection { apiKeyFnRef, }); const operatorService = OperatorService.create(operatorRpcImpl, false, false); + + let testService: TestService | undefined = undefined; + if ((options as InternalConnectionOptions)?.[InternalConnectionOptionsSymbol]?.supportsTestService) { + const testRpcImpl = this.generateRPCImplementation({ + serviceName: 'temporal.api.testservice.v1.TestService', + client, + callContextStorage, + interceptors: optionsWithDefaults?.interceptors, + staticMetadata: optionsWithDefaults.metadata, + apiKeyFnRef, + }); + testService = TestService.create(testRpcImpl, false, false); + } + const healthRpcImpl = this.generateRPCImplementation({ serviceName: 'grpc.health.v1.Health', client, @@ -350,6 +396,7 @@ export class Connection { callContextStorage, workflowService, operatorService, + testService, healthService, options: optionsWithDefaults, apiKeyFnRef, @@ -414,6 +461,7 @@ export class Connection { client, workflowService, operatorService, + testService, healthService, callContextStorage, apiKeyFnRef, @@ -422,6 +470,7 @@ export class Connection { this.client = client; this.workflowService = this.withNamespaceHeaderInjector(workflowService); this.operatorService = operatorService; + this.testService = testService; this.healthService = healthService; this.callContextStorage = callContextStorage; this.apiKeyFnRef = apiKeyFnRef; diff --git a/packages/client/src/types.ts b/packages/client/src/types.ts index c171a99df..26f919ac0 100644 --- a/packages/client/src/types.ts +++ b/packages/client/src/types.ts @@ -91,6 +91,8 @@ export type WorkflowService = proto.temporal.api.workflowservice.v1.WorkflowServ export const { WorkflowService } = proto.temporal.api.workflowservice.v1; export type OperatorService = proto.temporal.api.operatorservice.v1.OperatorService; export const { OperatorService } = proto.temporal.api.operatorservice.v1; +export type TestService = proto.temporal.api.testservice.v1.TestService; +export const { TestService } = proto.temporal.api.testservice.v1; export type HealthService = proto.grpc.health.v1.Health; export const { Health: HealthService } = proto.grpc.health.v1; @@ -117,9 +119,6 @@ export interface CallContext { /** * Connection interface used by high level SDK clients. - * - * NOTE: Currently the SDK only supports grpc-js based connection but in the future - * we might support grpc-web and native Rust connections. */ export interface ConnectionLike { workflowService: WorkflowService; diff --git a/packages/core-bridge/src/client.rs b/packages/core-bridge/src/client.rs index 440cd3f4b..401dc4ec2 100644 --- a/packages/core-bridge/src/client.rs +++ b/packages/core-bridge/src/client.rs @@ -8,9 +8,7 @@ use tonic::metadata::MetadataKey; use temporal_sdk_core::{ClientOptions as CoreClientOptions, CoreRuntime, RetryClient}; use bridge_macros::{TryFromJs, js_function}; -use temporal_client::{ - ClientInitError, ConfiguredClient, TemporalServiceClientWithMetrics, WorkflowService, -}; +use temporal_client::{ClientInitError, ConfiguredClient, TemporalServiceClientWithMetrics}; use crate::runtime::Runtime; use crate::{helpers::*, runtime::RuntimeExt as _}; @@ -19,7 +17,22 @@ pub fn init(cx: &mut neon::prelude::ModuleContext) -> neon::prelude::NeonResult< cx.export_function("newClient", client_new)?; cx.export_function("clientUpdateHeaders", client_update_headers)?; cx.export_function("clientUpdateApiKey", client_update_api_key)?; - cx.export_function("clientSendRequest", client_send_request)?; + cx.export_function( + "clientSendWorkflowServiceRequest", + client_send_workflow_service_request, + )?; + cx.export_function( + "clientSendOperatorServiceRequest", + client_send_operator_service_request, + )?; + cx.export_function( + "clientSendTestServiceRequest", + client_send_test_service_request, + )?; + cx.export_function( + "clientSendHealthServiceRequest", + client_send_health_service_request, + )?; cx.export_function("clientClose", client_close)?; Ok(()) @@ -113,9 +126,9 @@ pub struct RpcCall { pub timeout: Option, } -/// Send a request using the provided Client +/// Send a request to the Workflow Service using the provided Client #[js_function] -pub fn client_send_request( +pub fn client_send_workflow_service_request( client: OpaqueInboundHandle, call: RpcCall, ) -> BridgeResult>> { @@ -124,7 +137,50 @@ pub fn client_send_request( let core_client = client.core_client.clone(); // FIXME: "large future with a size of 18560 bytes" - core_runtime.future_to_promise(async move { client_invoke(core_client, call).await }) + core_runtime + .future_to_promise(async move { client_invoke_workflow_service(core_client, call).await }) +} + +/// Send a request to the Operator Service using the provided Client +#[js_function] +pub fn client_send_operator_service_request( + client: OpaqueInboundHandle, + call: RpcCall, +) -> BridgeResult>> { + let client = client.borrow()?; + let core_runtime = client.core_runtime.clone(); + let core_client = client.core_client.clone(); + + core_runtime + .future_to_promise(async move { client_invoke_operator_service(core_client, call).await }) +} + +/// Send a request to the Test Service using the provided Client +#[js_function] +pub fn client_send_test_service_request( + client: OpaqueInboundHandle, + call: RpcCall, +) -> BridgeResult>> { + let client = client.borrow()?; + let core_runtime = client.core_runtime.clone(); + let core_client = client.core_client.clone(); + + core_runtime + .future_to_promise(async move { client_invoke_test_service(core_client, call).await }) +} + +/// Send a request to the Health Service using the provided Client +#[js_function] +pub fn client_send_health_service_request( + client: OpaqueInboundHandle, + call: RpcCall, +) -> BridgeResult>> { + let client = client.borrow()?; + let core_runtime = client.core_runtime.clone(); + let core_client = client.core_client.clone(); + + core_runtime + .future_to_promise(async move { client_invoke_health_service(core_client, call).await }) } /// Indicates that a gRPC request failed @@ -171,7 +227,12 @@ macro_rules! rpc_call { // FIXME: "this function may allocate 1400106 bytes on the stack" #[allow(clippy::too_many_lines)] -async fn client_invoke(mut retry_client: CoreClient, call: RpcCall) -> BridgeResult> { +async fn client_invoke_workflow_service( + mut retry_client: CoreClient, + call: RpcCall, +) -> BridgeResult> { + use temporal_client::WorkflowService; + match call.rpc.as_str() { "CountWorkflowExecutions" => { rpc_call!(retry_client, call, count_workflow_executions) @@ -430,6 +491,83 @@ async fn client_invoke(mut retry_client: CoreClient, call: RpcCall) -> BridgeRes } } +async fn client_invoke_operator_service( + mut retry_client: CoreClient, + call: RpcCall, +) -> BridgeResult> { + use temporal_client::OperatorService; + + match call.rpc.as_str() { + "AddOrUpdateRemoteCluster" => { + rpc_call!(retry_client, call, add_or_update_remote_cluster) + } + "AddSearchAttributes" => { + rpc_call!(retry_client, call, add_search_attributes) + } + "CreateNexusEndpoint" => rpc_call!(retry_client, call, create_nexus_endpoint), + "DeleteNamespace" => { + rpc_call!(retry_client, call, delete_namespace) + } + "DeleteNexusEndpoint" => rpc_call!(retry_client, call, delete_nexus_endpoint), + "GetNexusEndpoint" => rpc_call!(retry_client, call, get_nexus_endpoint), + "ListClusters" => rpc_call!(retry_client, call, list_clusters), + "ListNexusEndpoints" => rpc_call!(retry_client, call, list_nexus_endpoints), + "ListSearchAttributes" => { + rpc_call!(retry_client, call, list_search_attributes) + } + "RemoveRemoteCluster" => { + rpc_call!(retry_client, call, remove_remote_cluster) + } + "RemoveSearchAttributes" => { + rpc_call!(retry_client, call, remove_search_attributes) + } + "UpdateNexusEndpoint" => rpc_call!(retry_client, call, update_nexus_endpoint), + _ => Err(BridgeError::TypeError { + field: None, + message: format!("Unknown RPC call {}", call.rpc), + }), + } +} + +async fn client_invoke_test_service( + mut retry_client: CoreClient, + call: RpcCall, +) -> BridgeResult> { + use temporal_client::TestService; + + match call.rpc.as_str() { + "GetCurrentTime" => rpc_call!(retry_client, call, get_current_time), + "LockTimeSkipping" => rpc_call!(retry_client, call, lock_time_skipping), + "SleepUntil" => rpc_call!(retry_client, call, sleep_until), + "Sleep" => rpc_call!(retry_client, call, sleep), + "UnlockTimeSkippingWithSleep" => { + rpc_call!(retry_client, call, unlock_time_skipping_with_sleep) + } + "UnlockTimeSkipping" => rpc_call!(retry_client, call, unlock_time_skipping), + _ => Err(BridgeError::TypeError { + field: None, + message: format!("Unknown RPC call {}", call.rpc), + }), + } +} + +async fn client_invoke_health_service( + mut retry_client: CoreClient, + call: RpcCall, +) -> BridgeResult> { + use temporal_client::HealthService; + + match call.rpc.as_str() { + "Check" => rpc_call!(retry_client, call, check), + // Intentionally ignore 'watch' because it's a streaming method, which is not currently + // supported by the macro and client-side code, and not needed anyway for any SDK use case. + _ => Err(BridgeError::TypeError { + field: None, + message: format!("Unknown RPC call {}", call.rpc), + }), + } +} + fn rpc_req(call: RpcCall) -> BridgeResult> { let proto = P::decode(&*call.req).map_err(|err| BridgeError::TypeError { field: None, diff --git a/packages/core-bridge/ts/native.ts b/packages/core-bridge/ts/native.ts index bc365fc77..6bd4fa002 100644 --- a/packages/core-bridge/ts/native.ts +++ b/packages/core-bridge/ts/native.ts @@ -104,7 +104,13 @@ export declare function clientUpdateHeaders(client: Client, headers: Record; +export declare function clientSendWorkflowServiceRequest(client: Client, call: RpcCall): Promise; + +export declare function clientSendOperatorServiceRequest(client: Client, call: RpcCall): Promise; + +export declare function clientSendTestServiceRequest(client: Client, call: RpcCall): Promise; + +export declare function clientSendHealthServiceRequest(client: Client, call: RpcCall): Promise; export declare function clientClose(client: Client): void; diff --git a/packages/test/src/activities/helpers.ts b/packages/test/src/activities/helpers.ts index fb0ab6bc1..f571e57f4 100644 --- a/packages/test/src/activities/helpers.ts +++ b/packages/test/src/activities/helpers.ts @@ -1,19 +1,10 @@ -import { OpenTelemetryWorkflowClientCallsInterceptor } from '@temporalio/interceptors-opentelemetry'; -import { Client, WorkflowHandle } from '@temporalio/client'; +import { WorkflowHandle } from '@temporalio/client'; import { QueryDefinition } from '@temporalio/common'; -import { getContext } from './interceptors'; +import { Context } from '@temporalio/activity'; function getSchedulingWorkflowHandle(): WorkflowHandle { - const { info, connection, dataConverter } = getContext(); + const { info, client } = Context.current(); const { workflowExecution } = info; - const client = new Client({ - connection, - namespace: info.workflowNamespace, - dataConverter, - interceptors: { - workflow: [new OpenTelemetryWorkflowClientCallsInterceptor()], - }, - }); return client.workflow.getHandle(workflowExecution.workflowId, workflowExecution.runId); } diff --git a/packages/test/src/activities/interceptors.ts b/packages/test/src/activities/interceptors.ts deleted file mode 100644 index 20a51cd72..000000000 --- a/packages/test/src/activities/interceptors.ts +++ /dev/null @@ -1,33 +0,0 @@ -import * as activity from '@temporalio/activity'; -import { ConnectionLike } from '@temporalio/client'; -import { defaultDataConverter, LoadedDataConverter } from '@temporalio/common'; -import { ActivityExecuteInput, ActivityInboundCallsInterceptor, Next } from '@temporalio/worker'; - -export class ConnectionInjectorInterceptor implements ActivityInboundCallsInterceptor { - constructor( - public readonly connection: ConnectionLike, - public readonly dataConverter = defaultDataConverter - ) {} - async execute(input: ActivityExecuteInput, next: Next): Promise { - Object.assign(activity.Context.current(), { - connection: this.connection, - dataConverter: this.dataConverter, - }); - return next(input); - } -} - -/** - * Extend the basic activity Context - */ -export interface Context extends activity.Context { - connection: ConnectionLike; - dataConverter: LoadedDataConverter; -} - -/** - * Type "safe" helper to get a context with connection - */ -export function getContext(): Context { - return activity.Context.current() as unknown as Context; -} diff --git a/packages/test/src/helpers-integration-multi-codec.ts b/packages/test/src/helpers-integration-multi-codec.ts index c021ffc0f..b37799cd2 100644 --- a/packages/test/src/helpers-integration-multi-codec.ts +++ b/packages/test/src/helpers-integration-multi-codec.ts @@ -4,7 +4,6 @@ import { defaultFailureConverter, defaultPayloadConverter, LoadedDataConverter } import { WorkerOptions, WorkflowBundle } from '@temporalio/worker'; import { TestWorkflowEnvironment } from '@temporalio/testing'; -import { ConnectionInjectorInterceptor } from './activities/interceptors'; import { configurableHelpers, createTestWorkflowEnvironment, @@ -50,11 +49,6 @@ export function makeTestFn(makeBundle: () => Promise): TestFn, opts?: Partial): Promise { return configurableHelpers(t, t.context.workflowBundle, env).createWorker({ dataConverter, - interceptors: { - activity: [ - () => ({ inbound: new ConnectionInjectorInterceptor(env.connection, loadedDataConverter) }), - ], - }, ...opts, }); }, diff --git a/packages/test/src/helpers-integration.ts b/packages/test/src/helpers-integration.ts index 8cefb571e..b8cbbf48c 100644 --- a/packages/test/src/helpers-integration.ts +++ b/packages/test/src/helpers-integration.ts @@ -29,7 +29,6 @@ import { import * as workflow from '@temporalio/workflow'; import { temporal } from '@temporalio/proto'; import { defineSearchAttributeKey, SearchAttributeType } from '@temporalio/common/lib/search-attributes'; -import { ConnectionInjectorInterceptor } from './activities/interceptors'; import { Worker, TestWorkflowEnvironment, test as anyTest, bundlerOptions, waitUntil } from './helpers'; export interface Context { @@ -134,32 +133,40 @@ export function makeConfigurableEnvironmentTestFn(opts: { return test; } -export function makeTestFunction(opts: { +export interface TestFunctionOptions { workflowsPath: string; workflowEnvironmentOpts?: LocalTestWorkflowEnvironmentOptions; workflowInterceptorModules?: string[]; recordedLogs?: { [workflowId: string]: LogEntry[] }; runtimeOpts?: Partial | (() => Promise<[Partial, Partial]>) | undefined; -}): TestFn { +} + +export function makeTestFunction(opts: TestFunctionOptions): TestFn { return makeConfigurableEnvironmentTestFn({ recordedLogs: opts.recordedLogs, runtimeOpts: opts.runtimeOpts, - createTestContext: async (_t: ExecutionContext): Promise => { - const env = await createTestWorkflowEnvironment(opts.workflowEnvironmentOpts); - return { - workflowBundle: await createTestWorkflowBundle({ - workflowsPath: opts.workflowsPath, - workflowInterceptorModules: opts.workflowInterceptorModules, - }), - env, - } as unknown as C; - }, + createTestContext: makeDefaultTestContextFunction(opts), teardown: async (c: C) => { - await c.env.teardown(); + if (c.env) { + await c.env.teardown(); + } }, }); } +export function makeDefaultTestContextFunction(opts: TestFunctionOptions) { + return async (_t: ExecutionContext): Promise => { + const env = await createTestWorkflowEnvironment(opts.workflowEnvironmentOpts); + return { + workflowBundle: await createTestWorkflowBundle({ + workflowsPath: opts.workflowsPath, + workflowInterceptorModules: opts.workflowInterceptorModules, + }), + env, + } as unknown as C; + }; +} + export async function createTestWorkflowEnvironment( opts?: LocalTestWorkflowEnvironmentOptions ): Promise { @@ -207,9 +214,6 @@ export function configurableHelpers( connection: testEnv.nativeConnection, workflowBundle, taskQueue, - interceptors: { - activity: [() => ({ inbound: new ConnectionInjectorInterceptor(testEnv.connection) })], - }, showStackTraceSources: true, ...opts, }); diff --git a/packages/test/src/helpers.ts b/packages/test/src/helpers.ts index ea0729701..a651a9815 100644 --- a/packages/test/src/helpers.ts +++ b/packages/test/src/helpers.ts @@ -277,6 +277,32 @@ export async function registerDefaultCustomSearchAttributes(connection: Connecti console.log(`... Registered (took ${timeTaken / 1000} sec)!`); } +/** + * Return a random TCP port number, that is guaranteed to be either available, or to be in use. + * + * To get a port that is guaranteed to be available, simply call the function directly. + * + * ```ts + * const port = await getRandomPort(); + * ``` + * + * To get a port that is guaranteed to be in use, pass a function that will be called with the port + * number; the port is guaranteed to be in use until the function returns. This may be useful for + * example to test for proper error handling when a port is already in use. + * + * ```ts + * const port = await getRandomPort(async (port) => { + * t.throws( + * () => startMyService({ bindAddress: `127.0.0.1:${port}` }), + * { + * instanceOf: Error, + * message: /(Address already in use|socket address)/, + * } + * ); + * }); + * }); + * ``` + */ export async function getRandomPort(fn = (_port: number) => Promise.resolve()): Promise { return new Promise((resolve, reject) => { const srv = net.createServer(); diff --git a/packages/test/src/load/worker.ts b/packages/test/src/load/worker.ts index 379e18455..54686acb6 100644 --- a/packages/test/src/load/worker.ts +++ b/packages/test/src/load/worker.ts @@ -5,7 +5,6 @@ import { OTLPTraceExporter } from '@opentelemetry/exporter-trace-otlp-grpc'; import * as opentelemetry from '@opentelemetry/sdk-node'; import { SemanticResourceAttributes } from '@opentelemetry/semantic-conventions'; import arg from 'arg'; -import { Connection } from '@temporalio/client'; import { DefaultLogger, LogEntry, @@ -17,7 +16,6 @@ import { makeTelemetryFilterString, } from '@temporalio/worker'; import * as activities from '../activities'; -import { ConnectionInjectorInterceptor } from '../activities/interceptors'; import { getRequired, WorkerArgSpec, workerArgSpec } from './args'; /** @@ -148,11 +146,6 @@ async function main() { } : {}; - const clientConnection = await Connection.connect({ - address: serverAddress, - ...tlsConfig, - }); - const connection = await NativeConnection.connect({ address: serverAddress, ...tlsConfig, @@ -173,9 +166,6 @@ async function main() { workflowThreadPoolSize, maxConcurrentActivityTaskPolls, maxConcurrentWorkflowTaskPolls, - interceptors: { - activity: [() => ({ inbound: new ConnectionInjectorInterceptor(clientConnection) })], - }, // Can't reuse the helper because it defines `test` and ava thinks it's an ava test. reuseV8Context: ['1', 't', 'true'].includes((process.env.REUSE_V8_CONTEXT ?? 'true').toLowerCase()), }); diff --git a/packages/test/src/test-integration-split-one.ts b/packages/test/src/test-integration-split-one.ts index 225f0df5a..f3df37222 100644 --- a/packages/test/src/test-integration-split-one.ts +++ b/packages/test/src/test-integration-split-one.ts @@ -174,7 +174,6 @@ test.serial('activity-failure with Error', configMacro, async (t, config) => { dedent` Error: Fail me at throwAnError (test/src/activities/index.ts) - at ConnectionInjectorInterceptor.execute (test/src/activities/interceptors.ts) ` ); }); @@ -211,7 +210,6 @@ test.serial('activity-failure with ApplicationFailure', configMacro, async (t, c ApplicationFailure: Fail me at Function.nonRetryable (common/src/failure.ts) at throwAnError (test/src/activities/index.ts) - at ConnectionInjectorInterceptor.execute (test/src/activities/interceptors.ts) ` ); }); diff --git a/packages/test/src/test-integration-split-two.ts b/packages/test/src/test-integration-split-two.ts index 4f3d0f9d0..932167346 100644 --- a/packages/test/src/test-integration-split-two.ts +++ b/packages/test/src/test-integration-split-two.ts @@ -18,6 +18,7 @@ import { decodeOptionalSinglePayload, } from '@temporalio/common/lib/internal-non-workflow'; +import { Context } from '@temporalio/activity'; import { condition, defineQuery, @@ -948,3 +949,47 @@ test.serial('User metadata on workflow, timer, activity, child', configMacro, as t.is(wfMetadata.currentDetails, 'current wf details'); }); }); + +export async function activityContextExposesClientConnectionParentWorkflow(): Promise { + return await proxyActivities({ + startToCloseTimeout: '10s', + })['foo'](); +} + +export async function activityContextExposesClientConnectionChildWorkflow(comment: string): Promise { + return `child(${comment})`; +} + +test('Activity Context exposes Client connection', configMacro, async (t, config) => { + const { env, createWorkerWithDefaults } = config; + const { startWorkflow, taskQueue } = configurableHelpers(t, t.context.workflowBundle, env); + const worker = await createWorkerWithDefaults(t, { + activities: { + foo: async () => { + const { client } = Context.current(); + return await client.workflow.execute(activityContextExposesClientConnectionChildWorkflow, { + workflowId: uuid4(), + taskQueue, + args: ['not intercepted'], + }); + }, + }, + interceptors: { + client: { + workflow: [ + { + async start(input, next) { + input.options.args = ['native client intercepted']; + return await next(input); + }, + }, + ], + }, + }, + }); + const res = await worker.runUntil(async () => { + const handle = await startWorkflow(activityContextExposesClientConnectionParentWorkflow); + return await handle.result(); + }); + t.is(res, 'child(native client intercepted)'); +}); diff --git a/packages/test/src/test-local-activities.ts b/packages/test/src/test-local-activities.ts index cbcd05ba4..10f4de2bc 100644 --- a/packages/test/src/test-local-activities.ts +++ b/packages/test/src/test-local-activities.ts @@ -23,7 +23,6 @@ import { } from '@temporalio/worker'; import * as workflow from '@temporalio/workflow'; import { test as anyTest, bundlerOptions, Worker, TestWorkflowEnvironment } from './helpers'; -import { ConnectionInjectorInterceptor } from './activities/interceptors'; // FIXME MOVE THIS SECTION SOMEWHERE IT CAN BE SHARED // @@ -61,9 +60,7 @@ function helpers(t: ExecutionContext): Helpers { workflowBundle: t.context.workflowBundle, taskQueue, interceptors: { - activity: interceptors?.activity ?? [ - () => ({ inbound: new ConnectionInjectorInterceptor(t.context.env.connection) }), - ], + activity: interceptors?.activity ?? [], }, showStackTraceSources: true, ...rest, diff --git a/packages/test/src/test-native-connection.ts b/packages/test/src/test-native-connection.ts index 9fca252f8..532789e39 100644 --- a/packages/test/src/test-native-connection.ts +++ b/packages/test/src/test-native-connection.ts @@ -6,7 +6,8 @@ import test from 'ava'; import * as grpc from '@grpc/grpc-js'; import * as protoLoader from '@grpc/proto-loader'; import { Client, NamespaceNotFoundError, WorkflowNotFoundError } from '@temporalio/client'; -import { IllegalStateError, NativeConnection, TransportError } from '@temporalio/worker'; +import { InternalConnectionOptions, InternalConnectionOptionsSymbol } from '@temporalio/client/lib/connection'; +import { IllegalStateError, NativeConnection, NativeConnectionOptions, TransportError } from '@temporalio/worker'; import { temporal } from '@temporalio/proto'; import { TestWorkflowEnvironment } from '@temporalio/testing'; import { RUN_INTEGRATION_TESTS, Worker } from './helpers'; @@ -20,6 +21,30 @@ const workflowServicePackageDefinition = protoLoader.loadSync( ); const workflowServiceProtoDescriptor = grpc.loadPackageDefinition(workflowServicePackageDefinition) as any; +const operatorServicePackageDefinition = protoLoader.loadSync( + path.resolve( + __dirname, + '../../core-bridge/sdk-core/sdk-core-protos/protos/api_upstream/temporal/api/operatorservice/v1/service.proto' + ), + { includeDirs: [path.resolve(__dirname, '../../core-bridge/sdk-core/sdk-core-protos/protos/api_upstream')] } +); +const operatorServiceProtoDescriptor = grpc.loadPackageDefinition(operatorServicePackageDefinition) as any; + +const healthServicePackageDefinition = protoLoader.loadSync( + path.resolve(__dirname, '../../core-bridge/sdk-core/sdk-core-protos/protos/grpc/health/v1/health.proto'), + { includeDirs: [] } +); +const healthServiceProtoDescriptor = grpc.loadPackageDefinition(healthServicePackageDefinition) as any; + +const testServicePackageDefinition = protoLoader.loadSync( + path.resolve( + __dirname, + '../../core-bridge/sdk-core/sdk-core-protos/protos/testsrv_upstream/temporal/api/testservice/v1/service.proto' + ), + { includeDirs: [path.resolve(__dirname, '../../core-bridge/sdk-core/sdk-core-protos/protos/testsrv_upstream')] } +); +const testServiceProtoDescriptor = grpc.loadPackageDefinition(testServicePackageDefinition) as any; + async function bindLocalhostIpv6(server: grpc.Server): Promise { return await util.promisify(server.bindAsync.bind(server))('[::1]:0', grpc.ServerCredentials.createInsecure()); } @@ -235,7 +260,147 @@ test('all WorkflowService methods are implemented', async (t) => { server.forceShutdown(); }); -test('can power client calls', async (t) => { +test('all OperatorService methods are implemented', async (t) => { + const server = new grpc.Server(); + const calledMethods = new Set(); + server.addService( + operatorServiceProtoDescriptor.temporal.api.operatorservice.v1.OperatorService.service, + new Proxy( + {}, + { + get() { + return ( + call: grpc.ServerUnaryCall, + callback: grpc.sendUnaryData + ) => { + const parts = call.getPath().split('/'); + const method = parts[parts.length - 1]; + calledMethods.add(method[0].toLowerCase() + method.slice(1)); + callback(null, {}); + }; + }, + } + ) + ); + + const port = await util.promisify(server.bindAsync.bind(server))( + 'localhost:0', + grpc.ServerCredentials.createInsecure() + ); + const connection = await NativeConnection.connect({ + address: `127.0.0.1:${port}`, + }); + + // Transform all methods from pascal case to lower case. + const methods = Object.keys( + operatorServiceProtoDescriptor.temporal.api.operatorservice.v1.OperatorService.service + ).map((k) => k[0].toLowerCase() + k.slice(1)); + methods.sort(); + for (const method of methods) { + await (connection.operatorService as any)[method]({}); + t.true(calledMethods.has(method), `method ${method} not called`); + } + + await connection.close(); + server.forceShutdown(); +}); + +test('all HealthService methods are implemented', async (t) => { + const server = new grpc.Server(); + const calledMethods = new Set(); + server.addService( + healthServiceProtoDescriptor.grpc.health.v1.Health.service, + new Proxy( + {}, + { + get() { + return ( + call: grpc.ServerUnaryCall, + callback: grpc.sendUnaryData + ) => { + const parts = call.getPath().split('/'); + const method = parts[parts.length - 1]; + calledMethods.add(method[0].toLowerCase() + method.slice(1)); + callback(null, {}); + }; + }, + } + ) + ); + + const port = await util.promisify(server.bindAsync.bind(server))( + 'localhost:0', + grpc.ServerCredentials.createInsecure() + ); + const connection = await NativeConnection.connect({ + address: `127.0.0.1:${port}`, + }); + + // Transform all methods from pascal case to lower case. + const methods = Object.keys(healthServiceProtoDescriptor.grpc.health.v1.Health.service).map( + (k) => k[0].toLowerCase() + k.slice(1) + ); + methods.sort(); + for (const method of methods) { + // Intentionally ignore 'watch' because it's a streaming method. + if (method === 'watch') { + continue; + } + await (connection.healthService as any)[method]({}); + t.true(calledMethods.has(method), `method ${method} not called`); + } + + await connection.close(); + server.forceShutdown(); +}); + +test('all TestService methods are implemented', async (t) => { + const server = new grpc.Server(); + const calledMethods = new Set(); + server.addService( + testServiceProtoDescriptor.temporal.api.testservice.v1.TestService.service, + new Proxy( + {}, + { + get() { + return ( + call: grpc.ServerUnaryCall, + callback: grpc.sendUnaryData + ) => { + const parts = call.getPath().split('/'); + const method = parts[parts.length - 1]; + calledMethods.add(method[0].toLowerCase() + method.slice(1)); + callback(null, {}); + }; + }, + } + ) + ); + + const port = await util.promisify(server.bindAsync.bind(server))( + 'localhost:0', + grpc.ServerCredentials.createInsecure() + ); + const connection = await NativeConnection.connect({ + address: `127.0.0.1:${port}`, + [InternalConnectionOptionsSymbol]: { supportsTestService: true }, + }); + + // Transform all methods from pascal case to lower case. + const methods = Object.keys(testServiceProtoDescriptor.temporal.api.testservice.v1.TestService.service).map( + (k) => k[0].toLowerCase() + k.slice(1) + ); + methods.sort(); + for (const method of methods) { + await (connection.testService as any)[method]({}); + t.true(calledMethods.has(method), `method ${method} not called`); + } + + await connection.close(); + server.forceShutdown(); +}); + +test('can power workflow client calls', async (t) => { const env = await TestWorkflowEnvironment.createLocal(); try { { diff --git a/packages/test/src/test-otel.ts b/packages/test/src/test-otel.ts index 236c1347c..3ae8bee3b 100644 --- a/packages/test/src/test-otel.ts +++ b/packages/test/src/test-otel.ts @@ -12,8 +12,9 @@ import { BasicTracerProvider, InMemorySpanExporter, SimpleSpanProcessor } from ' import { SemanticResourceAttributes } from '@opentelemetry/semantic-conventions'; import test from 'ava'; import { v4 as uuid4 } from 'uuid'; -import { Connection, WorkflowClient } from '@temporalio/client'; +import { WorkflowClient } from '@temporalio/client'; import { OpenTelemetryWorkflowClientInterceptor } from '@temporalio/interceptors-opentelemetry/lib/client'; +import { OpenTelemetryWorkflowClientCallsInterceptor } from '@temporalio/interceptors-opentelemetry'; import { instrument } from '@temporalio/interceptors-opentelemetry/lib/instrumentation'; import { makeWorkflowExporter, @@ -23,7 +24,6 @@ import { import { OpenTelemetrySinks, SpanName, SPAN_DELIMITER } from '@temporalio/interceptors-opentelemetry/lib/workflow'; import { DefaultLogger, InjectedSinks, Runtime } from '@temporalio/worker'; import * as activities from './activities'; -import { ConnectionInjectorInterceptor } from './activities/interceptors'; import { RUN_INTEGRATION_TESTS, TestWorkflowEnvironment, Worker } from './helpers'; import * as workflows from './workflows'; @@ -256,20 +256,20 @@ if (RUN_INTEGRATION_TESTS) { exporter: makeWorkflowExporter(traceExporter, staticResource), }; - const connection = await Connection.connect(); - const worker = await Worker.create({ workflowsPath: require.resolve('./workflows'), activities, taskQueue: 'test-otel', interceptors: { + client: { + workflow: [new OpenTelemetryWorkflowClientCallsInterceptor()], + }, workflowModules: [require.resolve('./workflows/otel-interceptors')], activity: [ (ctx) => ({ inbound: new OpenTelemetryActivityInboundInterceptor(ctx), outbound: new OpenTelemetryActivityOutboundInterceptor(ctx), }), - () => ({ inbound: new ConnectionInjectorInterceptor(connection) }), ], }, sinks, diff --git a/packages/testing/src/client.ts b/packages/testing/src/client.ts index 1c9ef6836..899ac0328 100644 --- a/packages/testing/src/client.ts +++ b/packages/testing/src/client.ts @@ -2,21 +2,12 @@ import 'abort-controller/polyfill'; // eslint-disable-line import/no-unassigned- import { Client, ClientOptions, + Connection, + TestService, WorkflowClient, WorkflowClientOptions, WorkflowResultOptions, } from '@temporalio/client'; -import { Connection, TestService } from './connection'; - -export interface TimeSkippingWorkflowClientOptions extends WorkflowClientOptions { - connection: Connection; - enableTimeSkipping: boolean; -} - -export interface TestEnvClientOptions extends ClientOptions { - connection: Connection; - enableTimeSkipping: boolean; -} /** * Subset of the "normal" client options that are used to create a client for the test environment. @@ -24,18 +15,37 @@ export interface TestEnvClientOptions extends ClientOptions { export type ClientOptionsForTestEnv = Omit; /** - * A client with the exact same API as the "normal" client with 1 exception, - * When this client waits on a Workflow's result, it will enable time skipping - * in the test server. + * A client with the exact same API as the "normal" client with one exception: + * when `TestEnvClient.workflow` (an instance of {@link TimeSkippingWorkflowClient}) waits on a Workflow's result, it will enable time skipping + * in the Test Server. + */ +export class TimeSkippingClient extends Client { + constructor(options: ClientOptions) { + super(options); + + // Recreate the client (this isn't optimal but it's better than adding public methods just for testing). + // NOTE: we cast to "any" to work around `workflow` being a readonly attribute. + (this as any).workflow = new TimeSkippingWorkflowClient({ + ...this.workflow.options, + connection: options.connection, + }); + } +} + +/** + * A client with the exact same API as the "normal" client with one exception: when this client + * waits on a Workflow's result, it will enable time skipping in the Test Server. */ export class TimeSkippingWorkflowClient extends WorkflowClient { protected readonly testService: TestService; - protected readonly enableTimeSkipping: boolean; - constructor(options: TimeSkippingWorkflowClientOptions) { + constructor(options: WorkflowClientOptions) { super(options); - this.enableTimeSkipping = options.enableTimeSkipping; - this.testService = options.connection.testService; + const testService = (options.connection as Connection).testService; + if (!testService) { + throw new TypeError('TestService must be present when creating a TimeSkippingWorkflowClient'); + } + this.testService = testService; } /** @@ -48,34 +58,11 @@ export class TimeSkippingWorkflowClient extends WorkflowClient { runId?: string | undefined, opts?: WorkflowResultOptions | undefined ): Promise { - if (this.enableTimeSkipping) { - await this.testService.unlockTimeSkipping({}); - try { - return await super.result(workflowId, runId, opts); - } finally { - await this.testService.lockTimeSkipping({}); - } - } else { + await this.testService.unlockTimeSkipping({}); + try { return await super.result(workflowId, runId, opts); + } finally { + await this.testService.lockTimeSkipping({}); } } } - -/** - * A client with the exact same API as the "normal" client with one exception: - * when `TestEnvClient.workflow` (an instance of {@link TimeSkippingWorkflowClient}) waits on a Workflow's result, it will enable time skipping - * in the Test Server. - */ -export class TestEnvClient extends Client { - constructor(options: TestEnvClientOptions) { - super(options); - - // Recreate the client (this isn't optimal but it's better than adding public methods just for testing). - // NOTE: we cast to "any" to work around `workflow` being a readonly attribute. - (this as any).workflow = new TimeSkippingWorkflowClient({ - ...this.workflow.options, - connection: options.connection, - enableTimeSkipping: options.enableTimeSkipping, - }); - } -} diff --git a/packages/testing/src/connection.ts b/packages/testing/src/connection.ts deleted file mode 100644 index aeedfd824..000000000 --- a/packages/testing/src/connection.ts +++ /dev/null @@ -1,47 +0,0 @@ -import { Connection as BaseConnection, ConnectionOptions } from '@temporalio/client'; -import { ConnectionCtorOptions as BaseConnectionCtorOptions } from '@temporalio/client/lib/connection'; -import { temporal } from '@temporalio/proto'; - -export type TestService = temporal.api.testservice.v1.TestService; -export const { TestService } = temporal.api.testservice.v1; - -interface ConnectionCtorOptions extends BaseConnectionCtorOptions { - testService: TestService; -} - -/** - * A Connection class that can be used to interact with both the test server's TestService and WorkflowService - */ -export class Connection extends BaseConnection { - public readonly testService: TestService; - - protected static createCtorOptions(options?: ConnectionOptions): ConnectionCtorOptions { - const ctorOptions = BaseConnection.createCtorOptions(options); - const rpcImpl = this.generateRPCImplementation({ - serviceName: 'temporal.api.testservice.v1.TestService', - client: ctorOptions.client, - callContextStorage: ctorOptions.callContextStorage, - interceptors: ctorOptions.options.interceptors, - staticMetadata: ctorOptions.options.metadata, - apiKeyFnRef: {}, - }); - const testService = TestService.create(rpcImpl, false, false); - return { ...ctorOptions, testService }; - } - - static lazy(options?: ConnectionOptions): Connection { - const ctorOptions = this.createCtorOptions(options); - return new this(ctorOptions); - } - - static async connect(options?: ConnectionOptions): Promise { - const ret = this.lazy(options); - await ret.ensureConnected(); - return ret; - } - - protected constructor(options: ConnectionCtorOptions) { - super(options); - this.testService = options.testService; - } -} diff --git a/packages/testing/src/index.ts b/packages/testing/src/index.ts index 8e8077e5c..da67bc195 100644 --- a/packages/testing/src/index.ts +++ b/packages/testing/src/index.ts @@ -23,14 +23,7 @@ export { type EphemeralServerExecutable, } from './ephemeral-server'; -export { - // FIXME: Revise the pertinence of these types - type ClientOptionsForTestEnv, - type TestEnvClientOptions, - type TimeSkippingWorkflowClientOptions, - TestEnvClient, - TimeSkippingWorkflowClient, -} from './client'; +export { type ClientOptionsForTestEnv, TimeSkippingWorkflowClient } from './client'; export { type MockActivityEnvironmentOptions, diff --git a/packages/testing/src/mocking-activity-environment.ts b/packages/testing/src/mocking-activity-environment.ts index b85db15bb..cc6aaa531 100644 --- a/packages/testing/src/mocking-activity-environment.ts +++ b/packages/testing/src/mocking-activity-environment.ts @@ -12,6 +12,7 @@ import { ActivityCancellationDetails, } from '@temporalio/common'; import { LoggerWithComposedMetadata } from '@temporalio/common/lib/logger'; +import { Client } from '@temporalio/client'; import { ActivityInterceptorsFactory, DefaultLogger } from '@temporalio/worker'; import { Activity, CancelReason } from '@temporalio/worker/lib/activity'; @@ -19,6 +20,7 @@ export interface MockActivityEnvironmentOptions { interceptors?: ActivityInterceptorsFactory[]; logger?: Logger; metricMeter?: MetricMeter; + client?: Client; } /** @@ -48,6 +50,7 @@ export class MockActivityEnvironment extends events.EventEmitter { undefined, loadedDataConverter, heartbeatCallback, + opts?.client, LoggerWithComposedMetadata.compose(opts?.logger ?? new DefaultLogger(), { sdkComponent: SdkComponent.worker }), opts?.metricMeter ?? noopMetricMeter, opts?.interceptors ?? [] diff --git a/packages/testing/src/testing-workflow-environment.ts b/packages/testing/src/testing-workflow-environment.ts index 3702b7097..5187f996f 100644 --- a/packages/testing/src/testing-workflow-environment.ts +++ b/packages/testing/src/testing-workflow-environment.ts @@ -1,13 +1,17 @@ import 'abort-controller/polyfill'; // eslint-disable-line import/no-unassigned-import -import { AsyncCompletionClient, Client, WorkflowClient } from '@temporalio/client'; +import { AsyncCompletionClient, Client, Connection, WorkflowClient } from '@temporalio/client'; +import { + ConnectionOptions, + InternalConnectionOptions, + InternalConnectionOptionsSymbol, +} from '@temporalio/client/lib/connection'; import { Duration, TypedSearchAttributes } from '@temporalio/common'; import { msToNumber, msToTs, tsToMs } from '@temporalio/common/lib/time'; -import { NativeConnection, Runtime } from '@temporalio/worker'; +import { NativeConnection, NativeConnectionOptions, Runtime } from '@temporalio/worker'; import { native } from '@temporalio/core-bridge'; import { filterNullAndUndefined } from '@temporalio/common/lib/internal-workflow'; -import { Connection } from './connection'; import { toNativeEphemeralServerConfig, DevServerConfig, TimeSkippingServerConfig } from './ephemeral-server'; -import { ClientOptionsForTestEnv, TestEnvClient } from './client'; +import { ClientOptionsForTestEnv, TimeSkippingClient } from './client'; /** * Options for {@link TestWorkflowEnvironment.createLocal} @@ -54,7 +58,7 @@ export class TestWorkflowEnvironment { public readonly connection: Connection; /** - * A {@link TestEnvClient} for interacting with the ephemeral server + * A {@link TimeSkippingClient} for interacting with the ephemeral server */ public readonly client: Client; @@ -91,12 +95,17 @@ export class TestWorkflowEnvironment { this.connection = connection; this.nativeConnection = nativeConnection; this.namespace = namespace; - this.client = new TestEnvClient({ - connection, - namespace: this.namespace, - enableTimeSkipping: supportsTimeSkipping, - ...options.client, - }); + this.client = supportsTimeSkipping + ? new TimeSkippingClient({ + connection, + namespace: this.namespace, + ...options.client, + }) + : new Client({ + connection, + namespace: this.namespace, + ...options.client, + }); this.asyncCompletionClient = this.client.activity; // eslint-disable-line deprecation/deprecation this.workflowClient = this.client.workflow; // eslint-disable-line deprecation/deprecation } @@ -220,8 +229,14 @@ export class TestWorkflowEnvironment { server = 'existing'; } - const nativeConnection = await NativeConnection.connect({ address }); - const connection = await Connection.connect({ address }); + const nativeConnection = await NativeConnection.connect({ + address, + [InternalConnectionOptionsSymbol]: { supportsTestService: supportsTimeSkipping }, + }); + const connection = await Connection.connect({ + address, + [InternalConnectionOptionsSymbol]: { supportsTestService: supportsTimeSkipping }, + }); return new this(runtime, optsWithDefaults, supportsTimeSkipping, server, connection, nativeConnection, namespace); } @@ -292,7 +307,7 @@ export class TestWorkflowEnvironment { */ sleep = async (durationMs: Duration): Promise => { if (this.supportsTimeSkipping) { - await (this.connection as Connection).testService.unlockTimeSkippingWithSleep({ duration: msToTs(durationMs) }); + await this.connection.testService!.unlockTimeSkippingWithSleep({ duration: msToTs(durationMs) }); } else { await new Promise((resolve) => setTimeout(resolve, msToNumber(durationMs))); } @@ -306,7 +321,7 @@ export class TestWorkflowEnvironment { */ async currentTimeMs(): Promise { if (this.supportsTimeSkipping) { - const { time } = await (this.connection as Connection).testService.getCurrentTime({}); + const { time } = await this.connection.testService!.getCurrentTime({}); return tsToMs(time); } else { return Date.now(); diff --git a/packages/worker/src/activity.ts b/packages/worker/src/activity.ts index 59388abf1..7ba463dde 100644 --- a/packages/worker/src/activity.ts +++ b/packages/worker/src/activity.ts @@ -19,6 +19,7 @@ import { composeInterceptors } from '@temporalio/common/lib/interceptors'; import { isAbortError } from '@temporalio/common/lib/type-helpers'; import { Logger, LoggerWithComposedMetadata } from '@temporalio/common/lib/logger'; import { MetricMeterWithComposedTags } from '@temporalio/common/lib/metrics'; +import { Client } from '@temporalio/client'; import { coresdk } from '@temporalio/proto'; import { ActivityCancellationDetailsHolder } from '@temporalio/common/lib/activity-cancellation-details'; import { @@ -41,10 +42,7 @@ export class Activity { public readonly context: Context; public cancel: (reason: CancelReason, details: ActivityCancellationDetails) => void = () => undefined; public readonly abortController: AbortController = new AbortController(); - public readonly interceptors: { - inbound: ActivityInboundCallsInterceptor[]; - outbound: ActivityOutboundCallsInterceptor[]; - }; + /** * Logger bound to `sdkComponent: worker`, with metadata from this activity. * This is the logger to use for all log messages emitted by the activity @@ -58,11 +56,17 @@ export class Activity { */ private readonly metricMeter; + public readonly interceptors: { + inbound: ActivityInboundCallsInterceptor[]; + outbound: ActivityOutboundCallsInterceptor[]; + }; + constructor( public readonly info: Info, public readonly fn: ActivityFunction | undefined, public readonly dataConverter: LoadedDataConverter, public readonly heartbeatCallback: Context['heartbeat'], + private readonly _client: Client | undefined, // May be undefined in the case of MockActivityEnvironment workerLogger: Logger, workerMetricMeter: MetricMeter, interceptors: ActivityInterceptorsFactory[] @@ -84,6 +88,7 @@ export class Activity { promise, this.abortController.signal, this.heartbeatCallback, + this._client, // This is the activity context logger, to be used exclusively from user code LoggerWithComposedMetadata.compose(this.workerLogger, { sdkComponent: SdkComponent.activity }), this.metricMeter, @@ -162,11 +167,24 @@ export class Activity { } } + // Ensure that client calls made with the worker's client in this handler's context are tied + // to the abort signal. The fact that client can be undefined (i.e. in a MockActivityEnvironment) + // makes this a bit more complex. + private executeWithClient(fn: ActivityFunction, input: ActivityExecuteInput): Promise { + if (this._client) { + return this._client.withAbortSignal(this.abortController.signal, () => { + return this.execute(fn, input); + }); + } else { + return this.execute(fn, input); + } + } + public run(input: ActivityExecuteInput): Promise { return asyncLocalStorage.run(this.context, async (): Promise => { try { if (this.fn === undefined) throw new IllegalStateError('Activity function is not defined'); - const result = await this.execute(this.fn, input); + const result = await this.executeWithClient(this.fn, input); return { completed: { result: await encodeToPayload(this.dataConverter, result) } }; } catch (err) { if (err instanceof CompleteAsyncError) { @@ -216,7 +234,7 @@ export class Activity { public runNoEncoding(fn: ActivityFunction, input: ActivityExecuteInput): Promise { if (this.fn !== undefined) throw new IllegalStateError('Activity function is defined'); - return asyncLocalStorage.run(this.context, () => this.execute(fn, input)); + return asyncLocalStorage.run(this.context, () => this.executeWithClient(fn, input)); } } diff --git a/packages/worker/src/connection.ts b/packages/worker/src/connection.ts index fa4154455..952d00422 100644 --- a/packages/worker/src/connection.ts +++ b/packages/worker/src/connection.ts @@ -3,7 +3,16 @@ import * as grpc from '@grpc/grpc-js'; import * as proto from 'protobufjs'; import { IllegalStateError } from '@temporalio/common'; import { native } from '@temporalio/core-bridge'; -import { ConnectionLike, Metadata, CallContext, WorkflowService } from '@temporalio/client'; +import { + ConnectionLike, + Metadata, + CallContext, + WorkflowService, + OperatorService, + HealthService, + TestService, +} from '@temporalio/client'; +import { InternalConnectionOptions, InternalConnectionOptionsSymbol } from '@temporalio/client/lib/connection'; import { TransportError } from './errors'; import { NativeConnectionOptions } from './connection-options'; import { Runtime } from './runtime'; @@ -21,7 +30,37 @@ export class NativeConnection implements ConnectionLike { */ private readonly referenceHolders = new Set(); + /** + * Raw gRPC access to Temporal Server's {@link + * https://github.com/temporalio/api/blob/master/temporal/api/workflowservice/v1/service.proto | Workflow service} + */ public readonly workflowService: WorkflowService; + + /** + * Raw gRPC access to Temporal Server's + * {@link https://github.com/temporalio/api/blob/master/temporal/api/operatorservice/v1/service.proto | Operator service} + * + * The Operator Service API defines how Temporal SDKs and other clients interact with the Temporal + * server to perform administrative functions like registering a search attribute or a namespace. + * + * This Service API is NOT compatible with Temporal Cloud. Attempt to use it against a Temporal + * Cloud namespace will result in gRPC `unauthorized` error. + */ + public readonly operatorService: OperatorService; + + /** + * Raw gRPC access to the standard gRPC {@link https://github.com/grpc/grpc/blob/92f58c18a8da2728f571138c37760a721c8915a2/doc/health-checking.md | health service}. + */ + public readonly healthService: HealthService; + + /** + * Raw gRPC access to Temporal Server's + * {@link https://github.com/temporalio/api/blob/master/temporal/api/testservice/v1/service.proto | Test service} + * + * Will be `undefined` if connected to a server that does not support the test service. + */ + public readonly testService: TestService | undefined; + readonly callContextStorage = new AsyncLocalStorage(); /** @@ -29,9 +68,31 @@ export class NativeConnection implements ConnectionLike { */ protected constructor( private readonly runtime: Runtime, - private readonly nativeClient: native.Client + private readonly nativeClient: native.Client, + private readonly enableTestService: boolean ) { - this.workflowService = WorkflowService.create(this.sendRequest.bind(this), false, false); + this.workflowService = WorkflowService.create( + this.sendRequest.bind(this, native.clientSendWorkflowServiceRequest.bind(undefined, this.nativeClient)), + false, + false + ); + this.operatorService = OperatorService.create( + this.sendRequest.bind(this, native.clientSendOperatorServiceRequest.bind(undefined, this.nativeClient)), + false, + false + ); + this.healthService = HealthService.create( + this.sendRequest.bind(this, native.clientSendHealthServiceRequest.bind(undefined, this.nativeClient)), + false, + false + ); + if (this.enableTestService) { + this.testService = TestService.create( + this.sendRequest.bind(this, native.clientSendTestServiceRequest.bind(undefined, this.nativeClient)), + false, + false + ); + } } /** @@ -40,6 +101,7 @@ export class NativeConnection implements ConnectionLike { async ensureConnected(): Promise {} private sendRequest( + sendRequestNative: (req: native.RpcCall) => Promise, method: proto.Method | proto.rpc.ServiceMethod, proto.Message>, requestData: any, callback: grpc.requestCallback @@ -66,7 +128,7 @@ export class NativeConnection implements ConnectionLike { timeout: ctx.deadline ? getRelativeTimeout(ctx.deadline) : null, }; - native.clientSendRequest(this.nativeClient, req).then( + sendRequestNative(req).then( (res) => { callback(null, resolvedResponseType.decode(Buffer.from(res))); }, @@ -153,26 +215,20 @@ export class NativeConnection implements ConnectionLike { * @deprecated use `connect` instead */ static async create(options?: NativeConnectionOptions): Promise { - try { - const runtime = Runtime.instance(); - const client = await runtime.createNativeClient(options); - return new this(runtime, client); - } catch (err) { - if (err instanceof TransportError) { - throw new TransportError(err.message); - } - throw err; - } + return this.connect(options); } /** * Eagerly connect to the Temporal server and return a NativeConnection instance */ static async connect(options?: NativeConnectionOptions): Promise { + const internalOptions = (options as InternalConnectionOptions)?.[InternalConnectionOptionsSymbol] ?? {}; + const enableTestService = internalOptions.supportsTestService ?? false; + try { const runtime = Runtime.instance(); const client = await runtime.createNativeClient(options); - return new this(runtime, client); + return new this(runtime, client, enableTestService); } catch (err) { if (err instanceof TransportError) { throw new TransportError(err.message); diff --git a/packages/worker/src/interceptors.ts b/packages/worker/src/interceptors.ts index 52cc89c8f..e1217d95a 100644 --- a/packages/worker/src/interceptors.ts +++ b/packages/worker/src/interceptors.ts @@ -7,6 +7,7 @@ */ import { Context as ActivityContext } from '@temporalio/activity'; +import { ClientInterceptors } from '@temporalio/client'; import { Headers, MetricTags, Next } from '@temporalio/common'; export { Next, Headers }; @@ -79,6 +80,14 @@ export type ActivityInterceptorsFactory = (ctx: ActivityContext) => ActivityInte * Structure for passing in Worker interceptors via {@link WorkerOptions} */ export interface WorkerInterceptors { + /** + * Interceptors for the Client provided by the Worker to Activities. + * + * @experimental Client support over `NativeConnection` is experimental. Error handling may be + * incomplete or different from what would be observed using a {@link Connection} + * instead. Client doesn't support cancellation through a Signal. + */ + client?: ClientInterceptors; /** * List of factory functions that instanciate {@link ActivityInboundCallsInterceptor}s and * {@link ActivityOutboundCallsInterceptor}s. diff --git a/packages/worker/src/worker-options.ts b/packages/worker/src/worker-options.ts index 4614eac6b..00bc3b8aa 100644 --- a/packages/worker/src/worker-options.ts +++ b/packages/worker/src/worker-options.ts @@ -730,11 +730,16 @@ export function appendDefaultInterceptors( } function compileWorkerInterceptors({ + client, activity, activityInbound, // eslint-disable-line deprecation/deprecation workflowModules, }: Required): CompiledWorkerInterceptors { return { + client: { + workflow: client?.workflow ?? [], + schedule: client?.schedule ?? [], + }, activity: [...activityInbound.map((factory) => (ctx: Context) => ({ inbound: factory(ctx) })), ...activity], workflowModules, }; @@ -855,9 +860,9 @@ function addDefaultWorkerOptions( setTuner = rest.tuner; } else { const maxWft = maxConcurrentWorkflowTaskExecutions ?? 40; - maxWFTPolls = Math.min(10, maxWft); + maxWFTPolls = Math.min(maxWFTPolls, maxWft); const maxAT = maxConcurrentActivityTaskExecutions ?? 100; - maxATPolls = Math.min(10, maxAT); + maxATPolls = Math.min(maxATPolls, maxAT); const maxLAT = maxConcurrentLocalActivityExecutions ?? 100; setTuner = { workflowTaskSlotSupplier: { @@ -909,6 +914,10 @@ function addDefaultWorkerOptions( showStackTraceSources: showStackTraceSources ?? false, debugMode: debugMode ?? false, interceptors: { + client: { + workflow: interceptors?.client?.workflow ?? [], + schedule: interceptors?.client?.schedule ?? [], + }, activity: interceptors?.activity ?? [], // eslint-disable-next-line deprecation/deprecation activityInbound: interceptors?.activityInbound ?? [], diff --git a/packages/worker/src/worker.ts b/packages/worker/src/worker.ts index d99fc109d..87f221678 100644 --- a/packages/worker/src/worker.ts +++ b/packages/worker/src/worker.ts @@ -56,6 +56,7 @@ import { LoggerWithComposedMetadata } from '@temporalio/common/lib/logger'; import { errorMessage, NonNullableObject, OmitFirstParam } from '@temporalio/common/lib/type-helpers'; import { workflowLogAttributes } from '@temporalio/workflow/lib/logs'; import { native } from '@temporalio/core-bridge'; +import { Client } from '@temporalio/client'; import { coresdk, temporal } from '@temporalio/proto'; import { type SinkCall, type WorkflowInfo } from '@temporalio/workflow'; import { throwIfReservedName } from '@temporalio/common/lib/reserved'; @@ -438,6 +439,7 @@ export class Worker { protected readonly workflowPollerStateSubject = new BehaviorSubject('POLLING'); protected readonly activityPollerStateSubject = new BehaviorSubject('POLLING'); + /** * Whether or not this worker has an outstanding workflow poll request */ @@ -447,6 +449,8 @@ export class Worker { */ protected hasOutstandingActivityPoll = false; + private client?: Client; + protected readonly numInFlightActivationsSubject = new BehaviorSubject(0); protected readonly numInFlightActivitiesSubject = new BehaviorSubject(0); protected readonly numInFlightNonLocalActivitiesSubject = new BehaviorSubject(0); @@ -765,6 +769,16 @@ export class Worker { protected readonly isReplayWorker: boolean = false ) { this.workflowCodecRunner = new WorkflowCodecRunner(options.loadedDataConverter.payloadCodecs); + if (connection != null) { + // connection (and consequently client) will be set IIF this is not a replay worker. + this.client = new Client({ + namespace: options.namespace, + connection, + identity: options.identity, + dataConverter: options.dataConverter, + interceptors: options.interceptors.client, + }); + } } /** @@ -1002,6 +1016,7 @@ export class Worker { ); }, }), + this.client, this.logger, this.metricMeter, this.options.interceptors.activity From 2ca8203caa11cbbbe145c2d2dcc03beb3119c102 Mon Sep 17 00:00:00 2001 From: James Watkins-Harvey Date: Thu, 21 Aug 2025 11:49:53 -0400 Subject: [PATCH 2/2] Fix compilation error --- packages/worker/src/interceptors.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/worker/src/interceptors.ts b/packages/worker/src/interceptors.ts index e1217d95a..d72e34ce9 100644 --- a/packages/worker/src/interceptors.ts +++ b/packages/worker/src/interceptors.ts @@ -111,4 +111,4 @@ export interface WorkerInterceptors { workflowModules?: string[]; } -export type CompiledWorkerInterceptors = Required>; +export type CompiledWorkerInterceptors = Required>;