Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
423 changes: 132 additions & 291 deletions packages/core-bridge/Cargo.lock

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion packages/core-bridge/sdk-core
Submodule sdk-core updated 144 files
105 changes: 86 additions & 19 deletions packages/core-bridge/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,12 @@ use std::time::Duration;
use std::{collections::HashMap, sync::Arc};

use neon::prelude::*;
use tonic::metadata::MetadataKey;
use tonic::metadata::{BinaryMetadataValue, MetadataKey};

use temporal_sdk_core::{ClientOptions as CoreClientOptions, CoreRuntime, RetryClient};

use bridge_macros::{TryFromJs, js_function};
use temporal_client::{ClientInitError, ConfiguredClient, TemporalServiceClientWithMetrics};
use temporal_client::{ClientInitError, ConfiguredClient, TemporalServiceClient};

use crate::runtime::Runtime;
use crate::{helpers::*, runtime::RuntimeExt as _};
Expand Down Expand Up @@ -38,7 +38,7 @@ pub fn init(cx: &mut neon::prelude::ModuleContext) -> neon::prelude::NeonResult<
Ok(())
}

type CoreClient = RetryClient<ConfiguredClient<TemporalServiceClientWithMetrics>>;
type CoreClient = RetryClient<ConfiguredClient<TemporalServiceClient>>;

pub struct Client {
// These fields are pub because they are accessed from Worker::new
Expand All @@ -61,6 +61,10 @@ pub fn client_new(

let core_client = match res {
Ok(core_client) => core_client,
Err(ClientInitError::InvalidHeaders(e)) => Err(BridgeError::TypeError {
message: format!("Invalid metadata key: {e}"),
field: None,
})?,
Err(ClientInitError::SystemInfoCallError(e)) => Err(BridgeError::TransportError(
format!("Failed to call GetSystemInfo: {e}"),
))?,
Expand All @@ -84,13 +88,27 @@ pub fn client_new(
#[js_function]
pub fn client_update_headers(
client: OpaqueInboundHandle<Client>,
headers: HashMap<String, String>,
headers: HashMap<String, MetadataValue>,
) -> BridgeResult<()> {
let (ascii_headers, bin_headers) = config::partition_headers(Some(headers));
client
.borrow()?
.core_client
.get_client()
.set_headers(ascii_headers.unwrap_or_default())
.map_err(|err| BridgeError::TypeError {
message: format!("Invalid metadata key: {err}"),
field: None,
})?;
client
.borrow()?
.core_client
.get_client()
.set_headers(headers);
.set_binary_headers(bin_headers.unwrap_or_default())
.map_err(|err| BridgeError::TypeError {
message: format!("Invalid metadata key: {err}"),
field: None,
})?;
Ok(())
}

Expand Down Expand Up @@ -122,10 +140,16 @@ pub struct RpcCall {
pub rpc: String,
pub req: Vec<u8>,
pub retry: bool,
pub metadata: HashMap<String, String>,
pub metadata: HashMap<String, MetadataValue>,
pub timeout: Option<Duration>,
}

#[derive(Debug, Clone, TryFromJs)]
pub enum MetadataValue {
Ascii { value: String },
Binary { value: Vec<u8> },
}

/// Send a request to the Workflow Service using the provided Client
#[js_function]
pub fn client_send_workflow_service_request(
Expand Down Expand Up @@ -576,16 +600,29 @@ fn rpc_req<P: prost::Message + Default>(call: RpcCall) -> BridgeResult<tonic::Re

let mut req = tonic::Request::new(proto);
for (k, v) in call.metadata {
req.metadata_mut().insert(
MetadataKey::from_str(k.as_str()).map_err(|err| BridgeError::TypeError {
field: None,
message: format!("Invalid metadata key: {err}"),
})?,
v.parse().map_err(|err| BridgeError::TypeError {
field: None,
message: format!("Invalid metadata value: {err}"),
})?,
);
match v {
MetadataValue::Ascii { value: v } => {
req.metadata_mut().insert(
MetadataKey::from_str(k.as_str()).map_err(|err| BridgeError::TypeError {
field: None,
message: format!("Invalid metadata key: {err}"),
})?,
v.parse().map_err(|err| BridgeError::TypeError {
field: None,
message: format!("Invalid metadata value: {err}"),
})?,
);
}
MetadataValue::Binary { value: v } => {
req.metadata_mut().insert_bin(
MetadataKey::from_str(k.as_str()).map_err(|err| BridgeError::TypeError {
field: None,
message: format!("Invalid metadata key: {err}"),
})?,
BinaryMetadataValue::from_bytes(&v),
);
}
}
}

if let Some(timeout) = call.timeout {
Expand Down Expand Up @@ -620,7 +657,7 @@ mod config {

use bridge_macros::TryFromJs;

use crate::helpers::*;
use crate::{client::MetadataValue, helpers::*};

#[derive(Debug, Clone, TryFromJs)]
pub(super) struct ClientOptions {
Expand All @@ -629,7 +666,7 @@ mod config {
client_version: String,
tls: Option<TlsConfig>,
http_connect_proxy: Option<HttpConnectProxy>,
headers: Option<HashMap<String, String>>,
headers: Option<HashMap<String, MetadataValue>>,
api_key: Option<String>,
disable_error_code_metric_tags: bool,
}
Expand Down Expand Up @@ -669,13 +706,16 @@ mod config {
builder.tls_cfg(tls.into());
}

let (ascii_headers, bin_headers) = partition_headers(self.headers);

let client_options = builder
.target_url(self.target_url)
.client_name(self.client_name)
.client_version(self.client_version)
// tls_cfg -- above
.http_connect_proxy(self.http_connect_proxy.map(Into::into))
.headers(self.headers)
.headers(ascii_headers)
.binary_headers(bin_headers)
.api_key(self.api_key)
.disable_error_code_metric_tags(self.disable_error_code_metric_tags)
// identity -- skipped: will be set on worker
Expand Down Expand Up @@ -711,4 +751,31 @@ mod config {
}
}
}

pub(super) fn partition_headers(
headers: Option<HashMap<String, MetadataValue>>,
) -> (
Option<HashMap<String, String>>,
Option<HashMap<String, Vec<u8>>>,
) {
let Some(headers) = headers else {
return (None, None);
};
let mut ascii_headers = HashMap::default();
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Possible optimization is to create the hashmap with the same capacity as the initial headers map. Only downside I could see would be if the headers were mostly binary.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is once per native client + on clientUpdateHeader. Neither of them should be called frequently enough to justify efforts on performance optimization IMO.

But if we'd indeed want to go there, I'd initiate both hashmaps at the capacity of the initial headers map, up to a certain upper limit (something like 128 or 256). That should optimize performance in all cases while providing a reasonable upper limit on how much memory we may allocate uselessly.

But honestly, I wouldn't mind that one.

let mut bin_headers = HashMap::default();
for (k, v) in headers {
match v {
MetadataValue::Ascii { value: v } => {
ascii_headers.insert(k, v);
}
MetadataValue::Binary { value: v } => {
bin_headers.insert(k, v);
}
}
}
(
(!ascii_headers.is_empty()).then_some(ascii_headers),
(!bin_headers.is_empty()).then_some(bin_headers),
)
}
}
7 changes: 4 additions & 3 deletions packages/core-bridge/src/helpers/try_from_js.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use neon::{
};
use temporal_sdk_core::Url;

use super::{BridgeError, BridgeResult};
use super::{AppendFieldContext, BridgeError, BridgeResult};

/// Trait for Rust types that can be created from JavaScript values, possibly throwing an error.
pub trait TryFromJs: Sized {
Expand Down Expand Up @@ -175,8 +175,9 @@ impl<T: TryFromJs> TryFromJs for HashMap<String, T> {
let mut map = Self::new();
for key_handle in props {
let key = key_handle.to_string(cx)?.value(cx);
let value = obj.get_value(cx, key_handle)?;
map.insert(key, T::try_from_js(cx, value)?);
let js_value = obj.get_value(cx, key_handle)?;
let value = T::try_from_js(cx, js_value).field(&key)?;
map.insert(key, value);
}
Ok(map)
}
Expand Down
16 changes: 13 additions & 3 deletions packages/core-bridge/ts/native.ts
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ export interface OtelMetricsExporterOptions {

export declare function newClient(runtime: Runtime, clientOptions: ClientOptions): Promise<Client>;

export declare function clientUpdateHeaders(client: Client, headers: Record<string, string>): void;
export declare function clientUpdateHeaders(client: Client, headers: Record<string, MetadataValue>): void;

export declare function clientUpdateApiKey(client: Client, apiKey: string): void;

Expand All @@ -124,7 +124,7 @@ export interface ClientOptions {
clientVersion: string;
tls: Option<TLSConfig>;
httpConnectProxy: Option<HttpConnectProxy>;
headers: Option<Record<string, string>>;
headers: Option<Record<string, MetadataValue>>;
apiKey: Option<string>;
disableErrorCodeMetricTags: boolean;
}
Expand Down Expand Up @@ -157,7 +157,7 @@ export interface RpcCall {
rpc: string;
req: Buffer;
retry: boolean;
metadata: Record<string, string>;
metadata: Record<string, MetadataValue>;
timeout: Option<number>;
}

Expand Down Expand Up @@ -191,6 +191,16 @@ export interface Worker {
type: 'worker';
}

export type MetadataValue =
| {
type: 'ascii';
value: string;
}
| {
type: 'binary';
value: Buffer;
};

export interface WorkerOptions {
identity: string;
buildId: string;
Expand Down
10 changes: 8 additions & 2 deletions packages/test/src/test-client-connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ async function bindLocalhostTls(server: grpc.Server): Promise<number> {

test('withMetadata / withDeadline / withAbortSignal set the CallContext for RPC call', async (t) => {
let gotTestHeaders = false;
let gotStaticBinValue;
let gotOtherBinValue;
let gotDeadline = false;
const authTokens: string[] = [];
const deadline = Date.now() + 10000;
Expand Down Expand Up @@ -89,6 +91,8 @@ test('withMetadata / withDeadline / withAbortSignal set the CallContext for RPC
) {
gotTestHeaders = true;
}
gotStaticBinValue = call.metadata.get('staticKey-bin');
gotOtherBinValue = call.metadata.get('otherKey-bin');
const receivedDeadline = call.getDeadline();
// For some reason the deadline the server gets is slightly different from the one we send in the client
if (typeof receivedDeadline === 'number' && receivedDeadline >= deadline && receivedDeadline - deadline < 1000) {
Expand All @@ -108,16 +112,18 @@ test('withMetadata / withDeadline / withAbortSignal set the CallContext for RPC
const port = await bindLocalhost(server);
const conn = await Connection.connect({
address: `127.0.0.1:${port}`,
metadata: { staticKey: 'set' },
metadata: { staticKey: 'set', 'staticKey-bin': Buffer.from([0x00]) },
apiKey: 'test-token',
});
await conn.withMetadata({ test: 'true' }, () =>
conn.withMetadata({ otherKey: 'set' }, () =>
conn.withMetadata({ otherKey: 'set', 'otherKey-bin': Buffer.from([0x01]) }, () =>
conn.withDeadline(deadline, () => conn.workflowService.registerNamespace({}))
)
);
t.true(gotTestHeaders);
t.true(gotDeadline);
t.deepEqual(gotStaticBinValue, [Buffer.from([0x00])]);
t.deepEqual(gotOtherBinValue, [Buffer.from([0x01])]);
await conn.withApiKey('tt-2', () => conn.workflowService.startWorkflowExecution({}));
conn.setApiKey('tt-3');
await conn.workflowService.startWorkflowExecution({});
Expand Down
43 changes: 42 additions & 1 deletion packages/test/src/test-native-connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -203,13 +203,18 @@ test('withMetadata and withDeadline propagate metadata and deadline', async (t)
);
const connection = await NativeConnection.connect({
address: `127.0.0.1:${port}`,
metadata: { 'default-bin': Buffer.from([0x00]) },
});

await connection.withDeadline(Date.now() + 10_000, () =>
connection.withMetadata({ test: 'true' }, () => connection.workflowService.getSystemInfo({}))
connection.withMetadata({ test: 'true', 'other-bin': Buffer.from([0x01]) }, () =>
connection.workflowService.getSystemInfo({})
)
);
t.is(requests.length, 2);
t.is(requests[1].metadata.get('test').toString(), 'true');
t.deepEqual(requests[1].metadata.get('default-bin'), [Buffer.from([0x00])]);
t.deepEqual(requests[1].metadata.get('other-bin'), [Buffer.from([0x01])]);
t.true(typeof requests[1].deadline === 'number' && requests[1].deadline > 5_000);
await connection.close();
server.forceShutdown();
Expand Down Expand Up @@ -434,3 +439,39 @@ test('can power workflow client calls', async (t) => {
await env.teardown();
}
});

test('setMetadata accepts binary headers', async (t) => {
const requests = new Array<{ metadata: grpc.Metadata; deadline: grpc.Deadline }>();
const server = new grpc.Server();
server.addService(workflowServiceProtoDescriptor.temporal.api.workflowservice.v1.WorkflowService.service, {
getSystemInfo(
call: grpc.ServerUnaryCall<
temporal.api.workflowservice.v1.IGetSystemInfoRequest,
temporal.api.workflowservice.v1.IGetSystemInfoResponse
>,
callback: grpc.sendUnaryData<temporal.api.workflowservice.v1.IGetSystemInfoResponse>
) {
requests.push({ metadata: call.metadata, deadline: call.getDeadline() });
callback(null, {});
},
});

const port = await util.promisify(server.bindAsync.bind(server))(
'localhost:0',
grpc.ServerCredentials.createInsecure()
);
const connection = await NativeConnection.connect({
address: `127.0.0.1:${port}`,
metadata: { 'start-ascii': 'a', 'start-bin': Buffer.from([0x00]) },
});

await connection.setMetadata({ 'end-bin': Buffer.from([0x01]) });

await connection.workflowService.getSystemInfo({});
t.is(requests.length, 2);
t.deepEqual(requests[1].metadata.get('start-bin'), []);
t.deepEqual(requests[1].metadata.get('start-ascii'), []);
t.deepEqual(requests[1].metadata.get('end-bin'), [Buffer.from([0x01])]);
await connection.close();
server.forceShutdown();
});
Loading