Skip to content

Commit 19ae5b7

Browse files
authored
Fairness protos (#971)
1 parent 148774c commit 19ae5b7

File tree

27 files changed

+2080
-254
lines changed

27 files changed

+2080
-254
lines changed

.cargo/config.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[env]
22
# This temporarily overrides the version of the CLI used for integration tests, locally and in CI
3-
#CLI_VERSION_OVERRIDE = "v1.3.1-priority.0"
3+
CLI_VERSION_OVERRIDE = "v1.4.1-cloud-v1-29-0-139-2.0"
44

55
[alias]
66
integ-test = ["test", "--features", "temporal-sdk-core-protos/serde_serialize", "--package", "temporal-sdk-core", "--test", "integ_runner", "--"]

client/src/lib.rs

Lines changed: 41 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1142,7 +1142,7 @@ pub struct WorkflowOptions {
11421142
/// The overall semantics of Priority are:
11431143
/// (more will be added here later)
11441144
/// 1. First, consider "priority_key": lower number goes first.
1145-
#[derive(Debug, Clone, Default, PartialEq, Eq)]
1145+
#[derive(Debug, Clone, Default, PartialEq)]
11461146
pub struct Priority {
11471147
/// Priority key is a positive integer from 1 to n, where smaller integers
11481148
/// correspond to higher priorities (tasks run sooner). In general, tasks in
@@ -1155,12 +1155,50 @@ pub struct Priority {
11551155
/// The default priority is (min+max)/2. With the default max of 5 and min of
11561156
/// 1, that comes out to 3.
11571157
pub priority_key: u32,
1158+
1159+
/// Fairness key is a short string that's used as a key for a fairness
1160+
/// balancing mechanism. It may correspond to a tenant id, or to a fixed
1161+
/// string like "high" or "low". The default is the empty string.
1162+
///
1163+
/// The fairness mechanism attempts to dispatch tasks for a given key in
1164+
/// proportion to its weight. For example, using a thousand distinct tenant
1165+
/// ids, each with a weight of 1.0 (the default) will result in each tenant
1166+
/// getting a roughly equal share of task dispatch throughput.
1167+
///
1168+
/// (Note: this does not imply equal share of worker capacity! Fairness
1169+
/// decisions are made based on queue statistics, not
1170+
/// current worker load.)
1171+
///
1172+
/// As another example, using keys "high" and "low" with weight 9.0 and 1.0
1173+
/// respectively will prefer dispatching "high" tasks over "low" tasks at a
1174+
/// 9:1 ratio, while allowing either key to use all worker capacity if the
1175+
/// other is not present.
1176+
///
1177+
/// All fairness mechanisms, including rate limits, are best-effort and
1178+
/// probabilistic. The results may not match what a "perfect" algorithm with
1179+
/// infinite resources would produce. The more unique keys are used, the less
1180+
/// accurate the results will be.
1181+
///
1182+
/// Fairness keys are limited to 64 bytes.
1183+
pub fairness_key: String,
1184+
1185+
/// Fairness weight for a task can come from multiple sources for
1186+
/// flexibility. From highest to lowest precedence:
1187+
/// 1. Weights for a small set of keys can be overridden in task queue
1188+
/// configuration with an API.
1189+
/// 2. It can be attached to the workflow/activity in this field.
1190+
/// 3. The default weight of 1.0 will be used.
1191+
///
1192+
/// Weight values are clamped by the server to the range [0.001, 1000].
1193+
pub fairness_weight: f32,
11581194
}
11591195

11601196
impl From<Priority> for common::v1::Priority {
11611197
fn from(priority: Priority) -> Self {
11621198
common::v1::Priority {
11631199
priority_key: priority.priority_key as i32,
1200+
fairness_key: priority.fairness_key,
1201+
fairness_weight: priority.fairness_weight,
11641202
}
11651203
}
11661204
}
@@ -1169,6 +1207,8 @@ impl From<common::v1::Priority> for Priority {
11691207
fn from(priority: common::v1::Priority) -> Self {
11701208
Self {
11711209
priority_key: priority.priority_key as u32,
1210+
fairness_key: priority.fairness_key,
1211+
fairness_weight: priority.fairness_weight,
11721212
}
11731213
}
11741214
}

client/src/raw.rs

Lines changed: 35 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1354,6 +1354,34 @@ proxier! {
13541354
r.extensions_mut().insert(labels);
13551355
}
13561356
);
1357+
(
1358+
update_task_queue_config,
1359+
UpdateTaskQueueConfigRequest,
1360+
UpdateTaskQueueConfigResponse,
1361+
|r| {
1362+
let mut labels = namespaced_request!(r);
1363+
labels.task_q_str(r.get_ref().task_queue.clone());
1364+
r.extensions_mut().insert(labels);
1365+
}
1366+
);
1367+
(
1368+
fetch_worker_config,
1369+
FetchWorkerConfigRequest,
1370+
FetchWorkerConfigResponse,
1371+
|r| {
1372+
let labels = namespaced_request!(r);
1373+
r.extensions_mut().insert(labels);
1374+
}
1375+
);
1376+
(
1377+
update_worker_config,
1378+
UpdateWorkerConfigRequest,
1379+
UpdateWorkerConfigResponse,
1380+
|r| {
1381+
let labels = namespaced_request!(r);
1382+
r.extensions_mut().insert(labels);
1383+
}
1384+
);
13571385
}
13581386

13591387
proxier! {
@@ -1543,11 +1571,17 @@ mod tests {
15431571
})
15441572
.collect();
15451573
let no_underscores: HashSet<_> = impl_list.iter().map(|x| x.replace('_', "")).collect();
1574+
let mut not_implemented = vec![];
15461575
for method in methods {
15471576
if !no_underscores.contains(&method.to_lowercase()) {
1548-
panic!("RPC method {method} is not implemented by raw client")
1577+
not_implemented.push(method);
15491578
}
15501579
}
1580+
if !not_implemented.is_empty() {
1581+
panic!(
1582+
"The following RPC methods are not implemented by raw client: {not_implemented:?}"
1583+
);
1584+
}
15511585
}
15521586
#[test]
15531587
fn verify_all_workflow_service_methods_implemented() {

core-c-bridge/src/client.rs

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,8 @@
1-
use crate::ByteArray;
2-
use crate::ByteArrayRef;
3-
use crate::CancellationToken;
4-
use crate::MetadataRef;
5-
use crate::UserDataHandle;
6-
use crate::runtime::Runtime;
7-
8-
use std::str::FromStr;
9-
use std::time::Duration;
1+
use crate::{
2+
ByteArray, ByteArrayRef, CancellationToken, MetadataRef, UserDataHandle, runtime::Runtime,
3+
};
4+
5+
use std::{str::FromStr, time::Duration};
106
use temporal_client::{
117
ClientKeepAliveConfig, ClientOptions as CoreClientOptions, ClientOptionsBuilder,
128
ClientTlsConfig, CloudService, ConfiguredClient, HealthService, HttpConnectProxyOptions,
@@ -326,6 +322,7 @@ async fn call_workflow_service(
326322
"DescribeWorkflowExecution" => rpc_call!(client, call, describe_workflow_execution),
327323
"DescribeWorkflowRule" => rpc_call!(client, call, describe_workflow_rule),
328324
"ExecuteMultiOperation" => rpc_call!(client, call, execute_multi_operation),
325+
"FetchWorkerConfig" => rpc_call!(client, call, fetch_worker_config),
329326
"GetClusterInfo" => rpc_call!(client, call, get_cluster_info),
330327
"GetCurrentDeployment" => rpc_call!(client, call, get_current_deployment),
331328
"GetDeploymentReachability" => rpc_call!(client, call, get_deployment_reachability),
@@ -419,6 +416,8 @@ async fn call_workflow_service(
419416
}
420417
"UpdateNamespace" => rpc_call_on_trait!(client, call, WorkflowService, update_namespace),
421418
"UpdateSchedule" => rpc_call!(client, call, update_schedule),
419+
"UpdateTaskQueueConfig" => rpc_call!(client, call, update_task_queue_config),
420+
"UpdateWorkerConfig" => rpc_call!(client, call, update_worker_config),
422421
"UpdateWorkerDeploymentVersionMetadata" => {
423422
rpc_call!(client, call, update_worker_deployment_version_metadata)
424423
}

core/src/pollers/poll_buffer.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -773,7 +773,6 @@ mod tests {
773773
});
774774
mock_client
775775
.expect_poll_workflow_task()
776-
.times(1)
777776
.returning(move |_, _| async { Ok(Default::default()) }.boxed());
778777

779778
let pb = LongPollBuffer::new_workflow_task(

0 commit comments

Comments
 (0)