diff --git a/engine/packages/gasoline/src/ctx/activity.rs b/engine/packages/gasoline/src/ctx/activity.rs index f1bb667e1b..0aedbaa0e0 100644 --- a/engine/packages/gasoline/src/ctx/activity.rs +++ b/engine/packages/gasoline/src/ctx/activity.rs @@ -87,6 +87,17 @@ impl ActivityCtx { .await } + /// Finds the first incomplete workflow for each (name, tags) pair in a single batch transaction. + #[tracing::instrument(skip_all)] + pub async fn find_workflows( + &self, + queries: &[(&str, serde_json::Value)], + ) -> Result>> { + common::find_workflows(&self.db, queries) + .in_current_span() + .await + } + /// Finds the first incomplete workflow with the given tags. #[tracing::instrument(skip_all)] pub async fn get_workflows(&self, workflow_ids: Vec) -> Result> { diff --git a/engine/packages/gasoline/src/ctx/common.rs b/engine/packages/gasoline/src/ctx/common.rs index c0da28c4ad..6f1d8e413f 100644 --- a/engine/packages/gasoline/src/ctx/common.rs +++ b/engine/packages/gasoline/src/ctx/common.rs @@ -66,6 +66,14 @@ pub async fn find_workflow( .map_err(Into::into) } +/// Finds the first incomplete workflow for each (name, tags) pair in a single batch transaction. +pub async fn find_workflows( + db: &DatabaseHandle, + queries: &[(&str, serde_json::Value)], +) -> Result>> { + db.find_workflows(queries).await.map_err(Into::into) +} + /// Finds the first incomplete workflow with the given tags. pub async fn get_workflows( db: &DatabaseHandle, diff --git a/engine/packages/gasoline/src/ctx/operation.rs b/engine/packages/gasoline/src/ctx/operation.rs index 7e2030205e..2b3908a88a 100644 --- a/engine/packages/gasoline/src/ctx/operation.rs +++ b/engine/packages/gasoline/src/ctx/operation.rs @@ -91,6 +91,17 @@ impl OperationCtx { .await } + /// Finds the first incomplete workflow for each (name, tags) pair in a single batch transaction. + #[tracing::instrument(skip_all)] + pub async fn find_workflows( + &self, + queries: &[(&str, serde_json::Value)], + ) -> Result>> { + common::find_workflows(&self.db, queries) + .in_current_span() + .await + } + /// Finds the first incomplete workflow with the given tags. #[tracing::instrument(skip_all)] pub async fn get_workflows(&self, workflow_ids: Vec) -> Result> { diff --git a/engine/packages/gasoline/src/ctx/standalone.rs b/engine/packages/gasoline/src/ctx/standalone.rs index 3173b4ba35..a24d2b096c 100644 --- a/engine/packages/gasoline/src/ctx/standalone.rs +++ b/engine/packages/gasoline/src/ctx/standalone.rs @@ -129,6 +129,17 @@ impl StandaloneCtx { .await } + /// Finds the first incomplete workflow for each (name, tags) pair in a single batch transaction. + #[tracing::instrument(skip_all)] + pub async fn find_workflows( + &self, + queries: &[(&str, serde_json::Value)], + ) -> Result>> { + common::find_workflows(&self.db, queries) + .in_current_span() + .await + } + /// Finds the first incomplete workflow with the given tags. #[tracing::instrument(skip_all)] pub async fn get_workflows(&self, workflow_ids: Vec) -> Result> { diff --git a/engine/packages/gasoline/src/ctx/test.rs b/engine/packages/gasoline/src/ctx/test.rs index dd2d12ea0a..f53668b431 100644 --- a/engine/packages/gasoline/src/ctx/test.rs +++ b/engine/packages/gasoline/src/ctx/test.rs @@ -149,6 +149,17 @@ impl TestCtx { .await } + /// Finds the first incomplete workflow for each (name, tags) pair in a single batch transaction. + #[tracing::instrument(skip_all)] + pub async fn find_workflows( + &self, + queries: &[(&str, serde_json::Value)], + ) -> Result>> { + common::find_workflows(&self.db, queries) + .in_current_span() + .await + } + /// Finds the first incomplete workflow with the given tags. #[tracing::instrument(skip_all)] pub async fn get_workflows(&self, workflow_ids: Vec) -> Result> { diff --git a/engine/packages/gasoline/src/db/kv/mod.rs b/engine/packages/gasoline/src/db/kv/mod.rs index 8cdb7c194e..e8d2d61eca 100644 --- a/engine/packages/gasoline/src/db/kv/mod.rs +++ b/engine/packages/gasoline/src/db/kv/mod.rs @@ -9,7 +9,7 @@ use std::{ }; use anyhow::{Context, Result, ensure}; -use futures_util::{StreamExt, TryStreamExt, stream::BoxStream}; +use futures_util::{StreamExt, TryStreamExt, future::try_join_all, stream::BoxStream}; use rivet_util::Id; use rivet_util::future::CustomInstrumentExt; use serde_json::json; @@ -996,6 +996,34 @@ impl Database for DatabaseKv { Ok(workflow_id) } + #[tracing::instrument(skip_all)] + async fn find_workflows( + &self, + queries: &[(&str, serde_json::Value)], + ) -> WorkflowResult>> { + let start_instant = Instant::now(); + + let workflow_ids = self + .pools + .udb() + .map_err(WorkflowError::PoolsGeneric)? + .run(|tx| async move { + let futures = queries.iter().map(|(workflow_name, tags)| { + self.find_workflow_inner(workflow_name, tags, &tx) + }); + try_join_all(futures).await + }) + .custom_instrument(tracing::info_span!("find_workflows_batch_tx")) + .await + .context("failed to find workflows") + .map_err(WorkflowError::Udb)?; + + let dt = start_instant.elapsed().as_secs_f64(); + metrics::FIND_WORKFLOWS_DURATION.record(dt, &[KeyValue::new("workflow_name", "batch")]); + + Ok(workflow_ids) + } + #[tracing::instrument(skip_all)] async fn pull_workflows( &self, diff --git a/engine/packages/gasoline/src/db/mod.rs b/engine/packages/gasoline/src/db/mod.rs index 0909ecfacd..6207fb69dd 100644 --- a/engine/packages/gasoline/src/db/mod.rs +++ b/engine/packages/gasoline/src/db/mod.rs @@ -101,6 +101,12 @@ pub trait Database: Send { tags: &serde_json::Value, ) -> WorkflowResult>; + /// Retrieves the first incomplete workflow for each (name, tags) pair in a single batch transaction. + async fn find_workflows( + &self, + queries: &[(&str, serde_json::Value)], + ) -> WorkflowResult>>; + /// Pulls workflows for processing by the worker. Will only pull workflows with names matching the filter. /// Should also update the ping of this worker. async fn pull_workflows(