Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion engine/docker/dev-host/grafana/dashboards/guard.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion engine/docker/dev-multinode/grafana/dashboards/guard.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion engine/docker/dev/grafana/dashboards/guard.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion engine/docker/template/grafana-dashboards/guard.json
Original file line number Diff line number Diff line change
Expand Up @@ -1324,7 +1324,7 @@
},
"timepicker": {},
"timezone": "browser",
"title": "Rivet Guard",
"title": "Guard",
"uid": "cen785ige8fswd",
"version": 3
}
1 change: 1 addition & 0 deletions engine/packages/pegboard-runner/src/conn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,7 @@ impl Init {
}
}

#[tracing::instrument(skip_all)]
pub async fn handle_init(
ctx: &StandaloneCtx,
conn: &Conn,
Expand Down
10 changes: 5 additions & 5 deletions engine/packages/pegboard/src/workflows/actor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1058,11 +1058,11 @@ async fn handle_stopped(

// Set Crashed failure reason for actual crashes.
// Runner failure reasons are already set at the start of handle_stopped.
if let StoppedVariant::Normal { code, message } = &variant {
ensure!(
*code != protocol::mk2::StopCode::Ok,
"expected non-Ok stop code in crash handler, got Ok"
);
if let StoppedVariant::Normal {
code: protocol::mk2::StopCode::Error,
message,
} = &variant
{
ctx.v(3)
.activity(runtime::SetFailureReasonInput {
failure_reason: FailureReason::Crashed {
Expand Down
12 changes: 2 additions & 10 deletions engine/packages/pegboard/src/workflows/runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
use universalpubsub::PublishOpts;
use vbare::OwnedVersionedData;

use crate::{keys, metrics, workflows::actor::Allocate};

Check failure on line 14 in engine/packages/pegboard/src/workflows/runner.rs

View workflow job for this annotation

GitHub Actions / Check

unused import: `metrics`

/// Batch size of how many events to ack.
const EVENT_ACK_BATCH_SIZE: i64 = 500;
Expand Down Expand Up @@ -971,7 +971,7 @@
let runner_eligible_threshold = ctx.config().pegboard().runner_eligible_threshold();

// NOTE: This txn should closely resemble the one found in the allocate_actor activity of the actor wf
let (allocations, pending_actor_count) = ctx
let allocations = ctx
.udb()?
.run(|tx| async move {
let start = Instant::now();
Expand All @@ -993,7 +993,6 @@
// the one we choose
Snapshot,
);
let mut pending_actor_count = 0;
let ping_threshold_ts = util::timestamp::now() - runner_eligible_threshold;

'queue_loop: loop {
Expand All @@ -1006,8 +1005,6 @@
break;
};

pending_actor_count += 1;

let (queue_key, generation) =
tx.read_entry::<keys::ns::PendingActorByRunnerNameSelectorKey>(&queue_entry)?;

Expand Down Expand Up @@ -1123,20 +1120,15 @@
},
});

pending_actor_count -= 1;
continue 'queue_loop;
}
}

Ok((allocations, pending_actor_count))
Ok(allocations)
})
.custom_instrument(tracing::info_span!("runner_allocate_pending_actors_tx"))
.await?;

metrics::ACTOR_PENDING_ALLOCATION
.with_label_values(&[&input.namespace_id.to_string(), &input.name.to_string()])
.set(pending_actor_count as i64);

Ok(AllocatePendingActorsOutput { allocations })
}

Expand Down
18 changes: 2 additions & 16 deletions engine/packages/pegboard/src/workflows/runner2.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
use universalpubsub::PublishOpts;
use vbare::OwnedVersionedData;

use crate::{keys, metrics, workflows::actor::Allocate};

Check failure on line 14 in engine/packages/pegboard/src/workflows/runner2.rs

View workflow job for this annotation

GitHub Actions / Check

unused import: `metrics`

const EARLY_TXN_TIMEOUT: Duration = Duration::from_millis(2500);

Expand Down Expand Up @@ -649,7 +649,7 @@
let runner_eligible_threshold = ctx.config().pegboard().runner_eligible_threshold();

// NOTE: This txn should closely resemble the one found in the allocate_actor activity of the actor wf
let (allocations, pending_actor_count) = ctx
let allocations = ctx
.udb()?
.run(|tx| async move {
let start = Instant::now();
Expand All @@ -671,7 +671,6 @@
// the one we choose
Snapshot,
);
let mut pending_actor_count = 0;
let ping_threshold_ts = util::timestamp::now() - runner_eligible_threshold;

'queue_loop: loop {
Expand All @@ -684,8 +683,6 @@
break;
};

pending_actor_count += 1;

let (queue_key, generation) =
tx.read_entry::<keys::ns::PendingActorByRunnerNameSelectorKey>(&queue_entry)?;

Expand Down Expand Up @@ -801,20 +798,15 @@
},
});

pending_actor_count -= 1;
continue 'queue_loop;
}
}

Ok((allocations, pending_actor_count))
Ok(allocations)
})
.custom_instrument(tracing::info_span!("runner_allocate_pending_actors_tx"))
.await?;

metrics::ACTOR_PENDING_ALLOCATION
.with_label_values(&[&input.namespace_id.to_string(), &input.name.to_string()])
.set(pending_actor_count as i64);

Ok(AllocatePendingActorsOutput { allocations })
}

Expand Down Expand Up @@ -856,12 +848,6 @@
ctx: &ActivityCtx,
input: &DrainOlderVersionsInput,
) -> Result<crate::ops::runner::drain::Output> {
tracing::info!(
namespace_id = %input.namespace_id,
name = %input.name,
version = input.version,
"drain_older_versions activity called"
);
ctx.op(crate::ops::runner::drain::Input {
namespace_id: input.namespace_id,
name: input.name.clone(),
Expand Down
2 changes: 1 addition & 1 deletion engine/sdks/typescript/runner/src/mod.ts
Original file line number Diff line number Diff line change
Expand Up @@ -886,7 +886,7 @@

ws.addEventListener("error", (ev) => {
this.log?.error({
msg: `WebSocket error: ${ev.error}`,
msg: `WebSocket error: ${stringifyError(ev.error)}`,
});

if (!this.#shutdown) {
Expand Down Expand Up @@ -1166,7 +1166,7 @@
}

async #handleCommandStopActor(commandWrapper: protocol.CommandWrapper) {
const stopCommand = commandWrapper.inner

Check warning on line 1169 in engine/sdks/typescript/runner/src/mod.ts

View workflow job for this annotation

GitHub Actions / quality

lint/correctness/noUnusedVariables

This variable stopCommand is unused.

Check warning on line 1169 in engine/sdks/typescript/runner/src/mod.ts

View workflow job for this annotation

GitHub Actions / quality

lint/correctness/noUnusedVariables

This variable stopCommand is unused.
.val as protocol.CommandStopActor;

const actorId = commandWrapper.checkpoint.actorId;
Expand Down
Loading