Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
309 changes: 161 additions & 148 deletions packages/core-bridge/Cargo.lock

Large diffs are not rendered by default.

27 changes: 14 additions & 13 deletions packages/core-bridge/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,28 +25,29 @@ async-trait = "0.1.83"
bridge-macros = { path = "bridge-macros" }
futures = { version = "0.3", features = ["executor"] }
neon = { version = "1.0.0", default-features = false, features = [
"napi-6",
"futures",
"napi-6",
"futures",
] }
opentelemetry = "0.29"
opentelemetry = "0.31"
os_pipe = "1.2.1"
parking_lot = "0.12"
prost = "0.13"
prost-types = "0.13"
prost = "0.14"
prost-types = "0.14"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
temporal-sdk-core = { version = "*", path = "./sdk-core/core", features = [
"ephemeral-server",
temporalio-sdk-core = { version = "*", path = "./sdk-core/crates/sdk-core", features = [
"ephemeral-server",
] }
temporal-client = { version = "*", path = "./sdk-core/client" }
temporalio-client = { version = "*", path = "./sdk-core/crates/client" }
temporalio-common = { version = "*", path = "./sdk-core/crates/common" }
thiserror = "2"
tokio = "1.13"
tokio-stream = "0.1"
tonic = "0.13"
tonic = "0.14"
tracing = "0.1"
tracing-subscriber = { version = "0.3", default-features = false, features = [
"parking_lot",
"env-filter",
"registry",
"ansi",
"parking_lot",
"env-filter",
"registry",
"ansi",
] }
2 changes: 1 addition & 1 deletion packages/core-bridge/sdk-core
Submodule sdk-core updated 360 files
22 changes: 14 additions & 8 deletions packages/core-bridge/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,10 @@ use std::{collections::HashMap, sync::Arc};
use neon::prelude::*;
use tonic::metadata::{BinaryMetadataValue, MetadataKey};

use temporal_sdk_core::{ClientOptions as CoreClientOptions, CoreRuntime, RetryClient};
use temporalio_sdk_core::{ClientOptions as CoreClientOptions, CoreRuntime, RetryClient};

use bridge_macros::{TryFromJs, js_function};
use temporal_client::{ClientInitError, ConfiguredClient, TemporalServiceClient};
use temporalio_client::{ClientInitError, ConfiguredClient, TemporalServiceClient};

use crate::runtime::Runtime;
use crate::{helpers::*, runtime::RuntimeExt as _};
Expand Down Expand Up @@ -255,7 +255,7 @@ async fn client_invoke_workflow_service(
mut retry_client: CoreClient,
call: RpcCall,
) -> BridgeResult<Vec<u8>> {
use temporal_client::WorkflowService;
use temporalio_client::WorkflowService;

match call.rpc.as_str() {
"CountWorkflowExecutions" => {
Expand Down Expand Up @@ -288,6 +288,9 @@ async fn client_invoke_workflow_service(
"DescribeDeployment" => {
rpc_call!(retry_client, call, describe_deployment)
}
"DescribeWorker" => {
rpc_call!(retry_client, call, describe_worker)
}
"DeprecateNamespace" => rpc_call!(retry_client, call, deprecate_namespace),
"DescribeNamespace" => rpc_call!(retry_client, call, describe_namespace),
"DescribeSchedule" => rpc_call!(retry_client, call, describe_schedule),
Expand Down Expand Up @@ -448,6 +451,9 @@ async fn client_invoke_workflow_service(
"SetWorkerDeploymentCurrentVersion" => {
rpc_call!(retry_client, call, set_worker_deployment_current_version)
}
"SetWorkerDeploymentManager" => {
rpc_call!(retry_client, call, set_worker_deployment_manager)
}
"SetWorkerDeploymentRampingVersion" => {
rpc_call!(retry_client, call, set_worker_deployment_ramping_version)
}
Expand Down Expand Up @@ -519,7 +525,7 @@ async fn client_invoke_operator_service(
mut retry_client: CoreClient,
call: RpcCall,
) -> BridgeResult<Vec<u8>> {
use temporal_client::OperatorService;
use temporalio_client::OperatorService;

match call.rpc.as_str() {
"AddOrUpdateRemoteCluster" => {
Expand Down Expand Up @@ -557,7 +563,7 @@ async fn client_invoke_test_service(
mut retry_client: CoreClient,
call: RpcCall,
) -> BridgeResult<Vec<u8>> {
use temporal_client::TestService;
use temporalio_client::TestService;

match call.rpc.as_str() {
"GetCurrentTime" => rpc_call!(retry_client, call, get_current_time),
Expand All @@ -579,7 +585,7 @@ async fn client_invoke_health_service(
mut retry_client: CoreClient,
call: RpcCall,
) -> BridgeResult<Vec<u8>> {
use temporal_client::HealthService;
use temporalio_client::HealthService;

match call.rpc.as_str() {
"Check" => rpc_call!(retry_client, call, check),
Expand Down Expand Up @@ -649,8 +655,8 @@ mod config {

use anyhow::Context as _;

use temporal_client::HttpConnectProxyOptions;
use temporal_sdk_core::{
use temporalio_client::HttpConnectProxyOptions;
use temporalio_sdk_core::{
ClientOptions as CoreClientOptions, ClientOptionsBuilder,
ClientTlsConfig as CoreClientTlsConfig, TlsConfig as CoreTlsConfig, Url,
};
Expand Down
2 changes: 1 addition & 1 deletion packages/core-bridge/src/helpers/try_from_js.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use neon::{
Value, buffer::TypedArray,
},
};
use temporal_sdk_core::Url;
use temporalio_sdk_core::Url;

use super::{AppendFieldContext, BridgeError, BridgeResult};

Expand Down
2 changes: 1 addition & 1 deletion packages/core-bridge/src/logs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use std::{
use neon::prelude::*;

use serde::{Serialize, ser::SerializeMap as _};
use temporal_sdk_core::api::telemetry::CoreLog;
use temporalio_common::telemetry::CoreLog;

use bridge_macros::js_function;

Expand Down
6 changes: 3 additions & 3 deletions packages/core-bridge/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,14 @@ use anyhow::Context as _;
use neon::prelude::*;
use serde::Deserialize;

use temporal_sdk_core::api::telemetry::metrics::{
use temporalio_common::telemetry::metrics::{
CoreMeter, Counter as CoreCounter, Gauge as CoreGauge, Histogram as CoreHistogram,
MetricParametersBuilder, NewAttributes, TemporalMeter,
};
use temporal_sdk_core::api::telemetry::metrics::{
use temporalio_common::telemetry::metrics::{
GaugeF64 as CoreGaugeF64, HistogramF64 as CoreHistogramF64,
};
use temporal_sdk_core::api::telemetry::metrics::{
use temporalio_common::telemetry::metrics::{
MetricKeyValue as CoreMetricKeyValue, MetricValue as CoreMetricValue,
};

Expand Down
35 changes: 18 additions & 17 deletions packages/core-bridge/src/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,12 @@ use futures::channel::mpsc::Receiver;
use neon::prelude::*;
use tracing::{Instrument, warn};

use temporal_sdk_core::{
CoreRuntime, TokioRuntimeBuilder,
api::telemetry::{
CoreLog, OtelCollectorOptions as CoreOtelCollectorOptions,
PrometheusExporterOptions as CorePrometheusExporterOptions, metrics::CoreMeter,
},
use temporalio_common::telemetry::{
CoreLog, OtelCollectorOptions as CoreOtelCollectorOptions,
PrometheusExporterOptions as CorePrometheusExporterOptions, metrics::CoreMeter,
};
use temporalio_sdk_core::{
CoreRuntime, RuntimeOptionsBuilder, TokioRuntimeBuilder,
telemetry::{build_otlp_metric_exporter, start_prometheus_metric_exporter},
};

Expand Down Expand Up @@ -62,7 +62,11 @@ pub fn runtime_new(
let (telemetry_options, metrics_options, logging_options) = bridge_options.try_into()?;

// Create core runtime which starts tokio multi-thread runtime
let mut core_runtime = CoreRuntime::new(telemetry_options, TokioRuntimeBuilder::default())
let runtime_options = RuntimeOptionsBuilder::default()
.telemetry_options(telemetry_options)
.build()
.context("Failed to build runtime options")?;
let mut core_runtime = CoreRuntime::new(runtime_options, TokioRuntimeBuilder::default())
.context("Failed to initialize Core Runtime")?;

enter_sync!(core_runtime);
Expand Down Expand Up @@ -235,17 +239,14 @@ mod config {
use anyhow::Context as _;

use neon::prelude::*;
use temporal_sdk_core::{
Url,
api::telemetry::{
HistogramBucketOverrides, Logger as CoreTelemetryLogger, MetricTemporality,
OtelCollectorOptions as CoreOtelCollectorOptions, OtelCollectorOptionsBuilder,
OtlpProtocol, PrometheusExporterOptions as CorePrometheusExporterOptions,
PrometheusExporterOptionsBuilder, TelemetryOptions as CoreTelemetryOptions,
TelemetryOptionsBuilder,
},
telemetry::CoreLogStreamConsumer,
use temporalio_common::telemetry::{
HistogramBucketOverrides, Logger as CoreTelemetryLogger, MetricTemporality,
OtelCollectorOptions as CoreOtelCollectorOptions, OtelCollectorOptionsBuilder,
OtlpProtocol, PrometheusExporterOptions as CorePrometheusExporterOptions,
PrometheusExporterOptionsBuilder, TelemetryOptions as CoreTelemetryOptions,
TelemetryOptionsBuilder,
};
use temporalio_sdk_core::{Url, telemetry::CoreLogStreamConsumer};

use bridge_macros::TryFromJs;

Expand Down
6 changes: 3 additions & 3 deletions packages/core-bridge/src/testing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,13 @@ use std::{process::Stdio, sync::Arc};
use anyhow::Context as _;
use neon::prelude::*;

use temporal_sdk_core::ephemeral_server::{
use temporalio_sdk_core::ephemeral_server::{
EphemeralServer as CoreEphemeralServer, TemporalDevServerConfig as CoreTemporalDevServerConfig,
TestServerConfig as CoreTestServerConfig,
};

use bridge_macros::js_function;
use temporal_sdk_core::CoreRuntime;
use temporalio_sdk_core::CoreRuntime;

use crate::helpers::*;
use crate::runtime::{Runtime, RuntimeExt as _};
Expand Down Expand Up @@ -191,7 +191,7 @@ mod config {

use anyhow::Context as _;

use temporal_sdk_core::ephemeral_server::{
use temporalio_sdk_core::ephemeral_server::{
EphemeralExe, EphemeralExeVersion, TemporalDevServerConfig as CoreTemporalDevServerConfig,
TemporalDevServerConfigBuilder, TestServerConfig as CoreTestServerConfig,
TestServerConfigBuilder,
Expand Down
64 changes: 31 additions & 33 deletions packages/core-bridge/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,20 +6,19 @@ use prost::Message;
use tokio::sync::mpsc::{Sender, channel};
use tokio_stream::wrappers::ReceiverStream;

use temporal_sdk_core::{
CoreRuntime,
api::{
Worker as CoreWorkerTrait,
errors::{CompleteActivityError, CompleteNexusError, CompleteWfError, PollError},
},
init_replay_worker, init_worker,
protos::{
coresdk::{
ActivityHeartbeat, ActivityTaskCompletion, nexus::NexusTaskCompletion,
workflow_completion::WorkflowActivationCompletion,
},
temporal::api::history::v1::History,
use temporalio_common::Worker as CoreWorkerTrait;
use temporalio_common::errors::{
CompleteActivityError, CompleteNexusError, CompleteWfError, PollError,
};
use temporalio_common::protos::{
coresdk::{
ActivityHeartbeat, ActivityTaskCompletion, nexus::NexusTaskCompletion,
workflow_completion::WorkflowActivationCompletion,
},
temporal::api::history::v1::History,
};
use temporalio_sdk_core::{
CoreRuntime, init_replay_worker, init_worker,
replay::{HistoryForReplay, ReplayWorkerInput},
};

Expand Down Expand Up @@ -70,7 +69,7 @@ pub struct Worker {
core_runtime: Arc<CoreRuntime>,

// Arc so that we can send reference into async closures
core_worker: Arc<temporal_sdk_core::Worker>,
core_worker: Arc<temporalio_sdk_core::Worker>,
}

/// Create a new worker.
Expand Down Expand Up @@ -400,9 +399,10 @@ pub fn replay_worker_new(
OpaqueOutboundHandle<Worker>,
OpaqueOutboundHandle<HistoryForReplayTunnelHandle>,
)> {
let config = config
let mut config = config
.into_core_config()
.context("Failed to convert WorkerOptions to CoreWorkerConfig")?;
config.skip_client_worker_set_check = true;

let runtime = runtime.borrow()?.core_runtime.clone();
enter_sync!(runtime);
Expand Down Expand Up @@ -466,16 +466,16 @@ impl MutableFinalize for HistoryForReplayTunnelHandle {}
mod config {
use std::{sync::Arc, time::Duration};

use temporal_sdk_core::{
use temporalio_common::protos::temporal::api::enums::v1::VersioningBehavior as CoreVersioningBehavior;
use temporalio_common::worker::{
ActivitySlotKind, LocalActivitySlotKind, NexusSlotKind,
PollerBehavior as CorePollerBehavior, SlotKind, WorkerConfig, WorkerConfigBuilder,
WorkerConfigBuilderError, WorkerDeploymentOptions as CoreWorkerDeploymentOptions,
WorkerDeploymentVersion as CoreWorkerDeploymentVersion, WorkflowSlotKind,
};
use temporalio_sdk_core::{
ResourceBasedSlotsOptions, ResourceBasedSlotsOptionsBuilder, ResourceSlotOptions,
SlotSupplierOptions as CoreSlotSupplierOptions, TunerHolder, TunerHolderOptionsBuilder,
api::worker::{
ActivitySlotKind, LocalActivitySlotKind, NexusSlotKind,
PollerBehavior as CorePollerBehavior, SlotKind, WorkerConfig, WorkerConfigBuilder,
WorkerConfigBuilderError, WorkerDeploymentOptions as CoreWorkerDeploymentOptions,
WorkerDeploymentVersion as CoreWorkerDeploymentVersion, WorkflowSlotKind,
},
protos::temporal::api::enums::v1::VersioningBehavior as CoreVersioningBehavior,
};

use super::custom_slot_supplier::CustomSlotSupplierOptions;
Expand All @@ -485,7 +485,7 @@ mod config {
use neon::object::Object;
use neon::prelude::JsResult;
use neon::types::JsObject;
use temporal_sdk_core::api::worker::WorkerVersioningStrategy;
use temporalio_common::worker::WorkerVersioningStrategy;

#[derive(TryFromJs)]
pub struct BridgeWorkerOptions {
Expand Down Expand Up @@ -749,16 +749,14 @@ mod custom_slot_supplier {

use neon::{context::Context, handle::Handle, prelude::*};

use temporal_sdk_core::{
SlotSupplierOptions as CoreSlotSupplierOptions,
api::worker::{
SlotInfo as CoreSlotInfo, SlotInfoTrait as _, SlotKind,
SlotKindType as CoreSlotKindType, SlotMarkUsedContext as CoreSlotMarkUsedContext,
SlotReleaseContext as CoreSlotReleaseContext,
SlotReservationContext as CoreSlotReservationContext, SlotSupplier as CoreSlotSupplier,
SlotSupplierPermit as CoreSlotSupplierPermit,
},
use temporalio_common::worker::{
SlotInfo as CoreSlotInfo, SlotInfoTrait as _, SlotKind, SlotKindType as CoreSlotKindType,
SlotMarkUsedContext as CoreSlotMarkUsedContext,
SlotReleaseContext as CoreSlotReleaseContext,
SlotReservationContext as CoreSlotReservationContext, SlotSupplier as CoreSlotSupplier,
SlotSupplierPermit as CoreSlotSupplierPermit,
};
use temporalio_sdk_core::SlotSupplierOptions as CoreSlotSupplierOptions;

use bridge_macros::{TryFromJs, TryIntoJs};
use tracing::warn;
Expand Down
2 changes: 1 addition & 1 deletion packages/proto/scripts/compile-proto.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ const outputDir = resolve(__dirname, '../protos');
const jsOutputFile = resolve(outputDir, 'json-module.js');
const tempFile = resolve(outputDir, 'temp.js');

const protoBaseDir = resolve(__dirname, '../../core-bridge/sdk-core/sdk-core-protos/protos');
const protoBaseDir = resolve(__dirname, '../../core-bridge/sdk-core/crates/common/protos');

function mtime(path) {
try {
Expand Down
6 changes: 6 additions & 0 deletions packages/test/src/helpers-integration.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ import {
DefaultLogger,
LogEntry,
LogLevel,
NativeConnection,
NativeConnectionOptions,
ReplayWorkerOptions,
Runtime,
RuntimeOptions,
Expand Down Expand Up @@ -184,6 +186,7 @@ export async function createTestWorkflowEnvironment(
export interface Helpers {
taskQueue: string;
createWorker(opts?: Partial<WorkerOptions>): Promise<Worker>;
createNativeConnection(opts?: Partial<NativeConnectionOptions>): Promise<NativeConnection>;
runReplayHistory(opts: Partial<ReplayWorkerOptions>, history: temporal.api.history.v1.IHistory): Promise<void>;
executeWorkflow<T extends () => Promise<any>>(workflowType: T): Promise<workflow.WorkflowResultType<T>>;
executeWorkflow<T extends workflow.Workflow>(
Expand Down Expand Up @@ -218,6 +221,9 @@ export function configurableHelpers<T>(
...opts,
});
},
async createNativeConnection(opts?: Partial<NativeConnectionOptions>): Promise<NativeConnection> {
return await NativeConnection.connect({ address: testEnv.address, ...opts });
},
async runReplayHistory(
opts: Partial<ReplayWorkerOptions>,
history: temporal.api.history.v1.IHistory
Expand Down
Loading
Loading