Skip to content

Commit a6201d7

Browse files
committed
fix(gas): clear chunked keys before writing new (#3010)
1 parent 7e1ba5c commit a6201d7

File tree

11 files changed

+36
-36
lines changed

11 files changed

+36
-36
lines changed

packages/common/gasoline/core/src/db/kv/keys/history.rs

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -577,7 +577,7 @@ impl FormalChunkedKey for InputKey {
577577
.flatten()
578578
.collect(),
579579
)?)
580-
.map_err(Into::into)
580+
.context("failed to combine `InputKey`")
581581
}
582582

583583
fn split(&self, value: Self::Value) -> Result<Vec<Vec<u8>>> {
@@ -693,7 +693,7 @@ impl FormalChunkedKey for OutputKey {
693693
.flatten()
694694
.collect(),
695695
)?)
696-
.map_err(Into::into)
696+
.context("failed to combine `OutputKey`")
697697
}
698698

699699
fn split(&self, value: Self::Value) -> Result<Vec<Vec<u8>>> {
@@ -1621,8 +1621,12 @@ pub mod insert {
16211621
);
16221622

16231623
let state_key = super::InputKey::new(workflow_id, location.clone());
1624+
let state_subspace = subspace.subspace(&state_key);
16241625

1625-
// Write state
1626+
// Clear old state
1627+
tx.clear_subspace_range(&state_subspace);
1628+
1629+
// Write new state
16261630
for (i, chunk) in state_key.split_ref(&state)?.into_iter().enumerate() {
16271631
let chunk_key = state_key.chunk(i);
16281632

packages/common/gasoline/core/src/db/kv/keys/signal.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ impl FormalChunkedKey for BodyKey {
4242
.flatten()
4343
.collect(),
4444
)?)
45-
.map_err(Into::into)
45+
.context("failed to combine `BodyKey`")
4646
}
4747

4848
fn split(&self, value: Self::Value) -> Result<Vec<Vec<u8>>> {

packages/common/gasoline/core/src/db/kv/keys/workflow.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -184,7 +184,7 @@ impl FormalChunkedKey for InputKey {
184184
.flatten()
185185
.collect(),
186186
)?)
187-
.map_err(Into::into)
187+
.context("failed to combine `InputKey`")
188188
}
189189

190190
fn split(&self, value: Self::Value) -> Result<Vec<Vec<u8>>> {
@@ -271,7 +271,7 @@ impl FormalChunkedKey for OutputKey {
271271
.flatten()
272272
.collect(),
273273
)?)
274-
.map_err(Into::into)
274+
.context("failed to combine `OutputKey`")
275275
}
276276

277277
fn split(&self, value: Self::Value) -> Result<Vec<Vec<u8>>> {
@@ -358,7 +358,7 @@ impl FormalChunkedKey for StateKey {
358358
.flatten()
359359
.collect(),
360360
)?)
361-
.map_err(Into::into)
361+
.context("failed to combine `StateKey`")
362362
}
363363

364364
fn split(&self, value: Self::Value) -> Result<Vec<Vec<u8>>> {

packages/common/gasoline/core/src/db/kv/mod.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2305,8 +2305,12 @@ impl Database for DatabaseKv {
23052305
.run(|tx| {
23062306
async move {
23072307
let state_key = keys::workflow::StateKey::new(workflow_id);
2308+
let state_subspace = self.subspace.subspace(&state_key);
2309+
2310+
// Clear old state
2311+
tx.clear_subspace_range(&state_subspace);
23082312

2309-
// Write state
2313+
// Write new state
23102314
for (i, chunk) in state_key.split_ref(&state)?.into_iter().enumerate() {
23112315
let chunk_key = state_key.chunk(i);
23122316

packages/common/universaldb/src/database.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
use std::future::Future;
22

3-
use anyhow::{Result, anyhow};
3+
use anyhow::{Context, Result, anyhow};
44
use futures_util::FutureExt;
55

66
use crate::{
@@ -37,6 +37,7 @@ impl Database {
3737
.map(|x| *x)
3838
.map_err(|_| anyhow!("failed to downcast `run` return type"))
3939
})
40+
.context("transaction failed")
4041
}
4142

4243
/// Creates a new txn instance.

packages/common/universaldb/src/driver/postgres/transaction_task.rs

Lines changed: 9 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -394,9 +394,7 @@ impl TransactionTask {
394394
} => {
395395
if let TransactionIsolationLevel::RepeatableReadReadOnly = self.isolation_level
396396
{
397-
tracing::error!("cannot set in read only txn");
398-
let _ =
399-
response.send(Err(anyhow!("postgres transaction connection failed")));
397+
let _ = response.send(Err(anyhow!("cannot set in read only txn")));
400398
continue;
401399
};
402400

@@ -418,9 +416,7 @@ impl TransactionTask {
418416
TransactionCommand::Clear { key, response } => {
419417
if let TransactionIsolationLevel::RepeatableReadReadOnly = self.isolation_level
420418
{
421-
tracing::error!("cannot set in read only txn");
422-
let _ =
423-
response.send(Err(anyhow!("postgres transaction connection failed")));
419+
let _ = response.send(Err(anyhow!("cannot set in read only txn")));
424420
continue;
425421
};
426422

@@ -443,9 +439,7 @@ impl TransactionTask {
443439
} => {
444440
if let TransactionIsolationLevel::RepeatableReadReadOnly = self.isolation_level
445441
{
446-
tracing::error!("cannot clear range in read only txn");
447-
let _ =
448-
response.send(Err(anyhow!("postgres transaction connection failed")));
442+
let _ = response.send(Err(anyhow!("cannot clear range in read only txn")));
449443
continue;
450444
};
451445

@@ -478,9 +472,8 @@ impl TransactionTask {
478472
} => {
479473
if let TransactionIsolationLevel::RepeatableReadReadOnly = self.isolation_level
480474
{
481-
tracing::error!("cannot apply atomic op in read only txn");
482475
let _ =
483-
response.send(Err(anyhow!("postgres transaction connection failed")));
476+
response.send(Err(anyhow!("cannot apply atomic op in read only txn")));
484477
continue;
485478
};
486479

@@ -539,9 +532,9 @@ impl TransactionTask {
539532
if let TransactionIsolationLevel::RepeatableReadReadOnly =
540533
self.isolation_level
541534
{
542-
tracing::error!("cannot release conflict ranges in read only txn");
543-
let _ = response
544-
.send(Err(anyhow!("postgres transaction connection failed")));
535+
let _ = response.send(Err(anyhow!(
536+
"cannot release conflict ranges in read only txn"
537+
)));
545538
continue;
546539
};
547540

@@ -567,9 +560,8 @@ impl TransactionTask {
567560
} => {
568561
if let TransactionIsolationLevel::RepeatableReadReadOnly = self.isolation_level
569562
{
570-
tracing::error!("cannot add conflict range in read only txn");
571-
let _ =
572-
response.send(Err(anyhow!("postgres transaction connection failed")));
563+
let _ = response
564+
.send(Err(anyhow!("cannot add conflict range in read only txn")));
573565
continue;
574566
};
575567

packages/common/universaldb/src/transaction.rs

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -232,7 +232,7 @@ impl Transaction {
232232
self.driver.clear_range(begin, end)
233233
}
234234

235-
pub fn clear_subspace_range(&self, subspace: &Subspace) {
235+
pub fn clear_subspace_range(&self, subspace: &tuple::Subspace) {
236236
let (begin, end) = subspace.range();
237237
self.driver.clear_range(&begin, &end);
238238
}
@@ -321,10 +321,6 @@ impl<'t> InformalTransaction<'t> {
321321
self.inner.driver.clear_range(&begin, &end);
322322
}
323323

324-
// pub fn commit(self: Box<Self>) -> Pin<Box<dyn Future<Output = Result<()>> + Send>> {
325-
// self.inner.driver.commit()
326-
// }
327-
328324
pub fn cancel(&self) {
329325
self.inner.driver.cancel()
330326
}

packages/core/pegboard-runner/src/ping_task.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@ async fn task_inner(ctx: StandaloneCtx, conn: Arc<Conn>) -> Result<()> {
1919
loop {
2020
tokio::time::sleep(UPDATE_PING_INTERVAL).await;
2121

22-
// Check that workflow is not dead
2322
let Some(wf) = ctx
2423
.workflow::<pegboard::workflows::runner::Input>(conn.workflow_id)
2524
.get()

packages/core/pegboard-serverless/src/lib.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -232,8 +232,8 @@ async fn outbound_handler(
232232
let mut req = client.get(url).headers(headers);
233233

234234
// Add admin token if configured
235-
if let Some(auth) = ctx.config().auth {
236-
req = req.header(X_RIVET_TOKEN, &auth.admin_token);
235+
if let Some(auth) = &ctx.config().auth {
236+
req = req.header(X_RIVET_TOKEN, auth.admin_token.read());
237237
}
238238

239239
let mut source = sse::EventSource::new(req)?;

packages/services/pegboard/src/keys/runner.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -760,7 +760,8 @@ impl FormalChunkedKey for MetadataKey {
760760
.map(|x| x.value().iter().map(|x| *x))
761761
.flatten()
762762
.collect::<Vec<_>>(),
763-
)?
763+
)
764+
.context("failed to combine `MetadataKey`")?
764765
.try_into()
765766
}
766767

0 commit comments

Comments
 (0)