Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions packages/activity/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
"author": "Temporal Technologies Inc. <[email protected]>",
"license": "MIT",
"dependencies": {
"@temporalio/client": "file:../client",
"@temporalio/common": "file:../common",
"abort-controller": "^3.0.0"
},
Expand Down
44 changes: 44 additions & 0 deletions packages/activity/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -78,10 +78,12 @@ import {
MetricMeter,
Priority,
ActivityCancellationDetails,
IllegalStateError,
} from '@temporalio/common';
import { msToNumber } from '@temporalio/common/lib/time';
import { SymbolBasedInstanceOfError } from '@temporalio/common/lib/type-helpers';
import { ActivityCancellationDetailsHolder } from '@temporalio/common/lib/activity-cancellation-details';
import { Client } from '@temporalio/client';

export {
ActivityFunction,
Expand Down Expand Up @@ -273,6 +275,11 @@ export class Context {
*/
protected readonly heartbeatFn: (details?: any) => void;

/**
* The Worker's client, passed down through Activity context.
*/
protected readonly _client: Client | undefined;

/**
* The logger for this Activity.
*
Expand Down Expand Up @@ -313,6 +320,7 @@ export class Context {
cancelled: Promise<never>,
cancellationSignal: AbortSignal,
heartbeat: (details?: any) => void,
client: Client | undefined,
log: Logger,
metricMeter: MetricMeter,
details: ActivityCancellationDetailsHolder
Expand All @@ -321,6 +329,7 @@ export class Context {
this.cancelled = cancelled;
this.cancellationSignal = cancellationSignal;
this.heartbeatFn = heartbeat;
this._client = client;
this.log = log;
this.metricMeter = metricMeter;
this._cancellationDetails = details;
Expand Down Expand Up @@ -351,6 +360,25 @@ export class Context {
this.heartbeatFn(details);
};

/**
* A Temporal Client, bound to the same Temporal Namespace as the Worker executing this Activity.
*
* May throw an {@link IllegalStateError} if the Activity is running inside a `MockActivityEnvironment`
* that was created without a Client.
*
* @experimental Client support over `NativeConnection` is experimental. Error handling may be
* incomplete or different from what would be observed using a {@link Connection}
* instead. Client doesn't support cancellation through a Signal.
*/
public get client(): Client {
if (this._client === undefined) {
throw new IllegalStateError(
'No Client available. This may be a MockActivityEnvironment that was created without a Client.'
);
}
return this._client;
}

/**
* Helper function for sleeping in an Activity.
* @param ms Sleep duration: number of milliseconds or {@link https://www.npmjs.com/package/ms | ms-formatted string}
Expand Down Expand Up @@ -481,6 +509,22 @@ export function cancellationSignal(): AbortSignal {
return Context.current().cancellationSignal;
}

/**
* A Temporal Client, bound to the same Temporal Namespace as the Worker executing this Activity.
*
* May throw an {@link IllegalStateError} if the Activity is running inside a `MockActivityEnvironment`
* that was created without a Client.
*
* This is a shortcut for `Context.current().client` (see {@link Context.client}).
*
* @experimental Client support over `NativeConnection` is experimental. Error handling may be
* incomplete or different from what would be observed using a {@link Connection}
* instead. Client doesn't support cancellation through a Signal.
*/
export function getClient(): Client {
return Context.current().client;
}

/**
* Get the metric meter for the current activity, with activity-specific tags.
*
Expand Down
2 changes: 1 addition & 1 deletion packages/activity/tsconfig.json
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,6 @@
"outDir": "./lib",
"rootDir": "./src"
},
"references": [{ "path": "../common" }],
"references": [{ "path": "../common" }, { "path": "../client" }],
"include": ["./src/**/*.ts"]
}
2 changes: 1 addition & 1 deletion packages/client/src/base-client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ export function defaultBaseClientOptions(): WithDefaults<BaseClientOptions> {

export class BaseClient {
/**
* The underlying {@link Connection | connection} used by this client.
* The underlying {@link Connection | connection} or {@link NativeConnection | native connection} used by this client.
*
* Clients are cheap to create, but connections are expensive. Where it makes sense,
* a single connection may and should be reused by multiple `Client`s.
Expand Down
51 changes: 50 additions & 1 deletion packages/client/src/connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import { type temporal } from '@temporalio/proto';
import { isGrpcServiceError, ServiceError } from './errors';
import { defaultGrpcRetryOptions, makeGrpcRetryInterceptor } from './grpc-retry';
import pkg from './pkg';
import { CallContext, HealthService, Metadata, OperatorService, WorkflowService } from './types';
import { CallContext, HealthService, Metadata, OperatorService, TestService, WorkflowService } from './types';

/**
* The default Temporal Server's TCP port for public gRPC connections.
Expand Down Expand Up @@ -138,6 +138,23 @@ export type ConnectionOptionsWithDefaults = Required<
connectTimeoutMs: number;
};

/**
* A symbol used to attach extra, SDK-internal connection options.
*
* @internal
* @hidden
*/
export const InternalConnectionOptionsSymbol = Symbol('__temporal_internal_connection_options');
export type InternalConnectionOptions = ConnectionOptions & {
[InternalConnectionOptionsSymbol]?: {
/**
* Indicate whether the `TestService` should be enabled on this connection. This is set to true
* on connections created internally by `TestWorkflowEnvironment.createTimeSkipping()`.
*/
supportsTestService?: boolean;
};
};

export const LOCAL_TARGET = 'localhost:7233';

function addDefaults(options: ConnectionOptions): ConnectionOptionsWithDefaults {
Expand Down Expand Up @@ -239,6 +256,13 @@ export interface ConnectionCtorOptions {
*/
readonly operatorService: OperatorService;

/**
* Raw gRPC access to the Temporal test service.
*
* Will be `undefined` if connected to a server that does not support the test service.
*/
readonly testService: TestService | undefined;

/**
* Raw gRPC access to the standard gRPC {@link https://github.com/grpc/grpc/blob/92f58c18a8da2728f571138c37760a721c8915a2/doc/health-checking.md | health service}.
*/
Expand Down Expand Up @@ -286,6 +310,13 @@ export class Connection {
*/
public readonly operatorService: OperatorService;

/**
* Raw gRPC access to the Temporal test service.
*
* Will be `undefined` if connected to a server that does not support the test service.
*/
public readonly testService: TestService | undefined;

/**
* Raw gRPC access to the standard gRPC {@link https://github.com/grpc/grpc/blob/92f58c18a8da2728f571138c37760a721c8915a2/doc/health-checking.md | health service}.
*/
Expand Down Expand Up @@ -326,6 +357,7 @@ export class Connection {
apiKeyFnRef,
});
const workflowService = WorkflowService.create(workflowRpcImpl, false, false);

const operatorRpcImpl = this.generateRPCImplementation({
serviceName: 'temporal.api.operatorservice.v1.OperatorService',
client,
Expand All @@ -335,6 +367,20 @@ export class Connection {
apiKeyFnRef,
});
const operatorService = OperatorService.create(operatorRpcImpl, false, false);

let testService: TestService | undefined = undefined;
if ((options as InternalConnectionOptions)?.[InternalConnectionOptionsSymbol]?.supportsTestService) {
const testRpcImpl = this.generateRPCImplementation({
serviceName: 'temporal.api.testservice.v1.TestService',
client,
callContextStorage,
interceptors: optionsWithDefaults?.interceptors,
staticMetadata: optionsWithDefaults.metadata,
apiKeyFnRef,
});
testService = TestService.create(testRpcImpl, false, false);
}

const healthRpcImpl = this.generateRPCImplementation({
serviceName: 'grpc.health.v1.Health',
client,
Expand All @@ -350,6 +396,7 @@ export class Connection {
callContextStorage,
workflowService,
operatorService,
testService,
healthService,
options: optionsWithDefaults,
apiKeyFnRef,
Expand Down Expand Up @@ -414,6 +461,7 @@ export class Connection {
client,
workflowService,
operatorService,
testService,
healthService,
callContextStorage,
apiKeyFnRef,
Expand All @@ -422,6 +470,7 @@ export class Connection {
this.client = client;
this.workflowService = this.withNamespaceHeaderInjector(workflowService);
this.operatorService = operatorService;
this.testService = testService;
this.healthService = healthService;
this.callContextStorage = callContextStorage;
this.apiKeyFnRef = apiKeyFnRef;
Expand Down
5 changes: 2 additions & 3 deletions packages/client/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,8 @@ export type WorkflowService = proto.temporal.api.workflowservice.v1.WorkflowServ
export const { WorkflowService } = proto.temporal.api.workflowservice.v1;
export type OperatorService = proto.temporal.api.operatorservice.v1.OperatorService;
export const { OperatorService } = proto.temporal.api.operatorservice.v1;
export type TestService = proto.temporal.api.testservice.v1.TestService;
export const { TestService } = proto.temporal.api.testservice.v1;
export type HealthService = proto.grpc.health.v1.Health;
export const { Health: HealthService } = proto.grpc.health.v1;

Expand All @@ -117,9 +119,6 @@ export interface CallContext {

/**
* Connection interface used by high level SDK clients.
*
* NOTE: Currently the SDK only supports grpc-js based connection but in the future
* we might support grpc-web and native Rust connections.
*/
export interface ConnectionLike {
workflowService: WorkflowService;
Expand Down
Loading
Loading