Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 4 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,15 +40,14 @@ Public-facing projects:

- **Rivet Engine** (you are here): Engine that powers Rivet Actors at scale
- **[RivetKit](https://github.com/rivet-gg/rivetkit)**: Lightweight TypeScript library for building Rivet Actors — works with Redis or Rivet Engine
- **[Rivet Studio](/frontend/apps/studio)**: Like Postman, but for Rivet Actors
- **[Rivet Hub](/frontend/apps/hub)**: UI for Rivet Engine
- **[Rivet Hub](/frontend/)**: UI for Rivet Engine
- **[Rivet Documentation](/site/src/content/docs)**

Projects powering Rivet Engine:

- **[Pegboard](packages/edge/services/pegboard/)**: Actor orchestrator
- **[Guard](packages/edge/infra/guard/)**: Proxy for routing traffic to Rivet Actors
- **[Chirp](packages/common/chirp-workflow/)**: Core workflow engine that powers Rivet
- **[Pegboard](packages/services/pegboard/)**: Actor orchestrator
- **[Guard](packages/core/guard/)**: Proxy for routing traffic to Rivet Actors
- **[Gasoline](packages/common/gasoline/)**: Core durable execution engine that powers Rivet

## Get Started

Expand Down
4 changes: 2 additions & 2 deletions docker/template/grafana-dashboards/cache.json
Original file line number Diff line number Diff line change
Expand Up @@ -1170,7 +1170,7 @@
"timepicker": {},
"timezone": "browser",
"title": "Rivet Guard",
"uid": "cen785ige8fswd",
"uid": "cen785ige8fswd2",
"version": 1,
"weekStart": ""
}
}
247 changes: 0 additions & 247 deletions packages/core/guard/server/src/routing/actor.rs

This file was deleted.

7 changes: 0 additions & 7 deletions packages/core/guard/server/src/routing/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ use rivet_guard_core::RoutingFn;

use crate::errors;

//pub(crate) mod actor;
mod api_peer;
mod api_public;
pub mod pegboard_gateway;
Expand Down Expand Up @@ -36,12 +35,6 @@ pub fn create_routing_function(ctx: StandaloneCtx) -> RoutingFn {
// Read target
if let Some(target) = headers.get(X_RIVET_TARGET).and_then(|x| x.to_str().ok())
{
// if let Some(routing_output) =
// actor::route_request(&ctx, target, host, path, headers).await?
// {
// return Ok(routing_output);
// }

if let Some(routing_output) =
runner_ws::route_request(&ctx, target, host, path).await?
{
Expand Down
62 changes: 14 additions & 48 deletions packages/core/guard/server/src/routing/pegboard_gateway.rs
Original file line number Diff line number Diff line change
Expand Up @@ -154,28 +154,20 @@ async fn find_actor(
}

// Check if actor is connectable and get runner_id
let runner_info = {
let get_runner_fut = ctx.op(pegboard::ops::actor::get_runner::Input {
actor_ids: vec![actor_id],
});
let output = tokio::time::timeout(Duration::from_secs(5), get_runner_fut).await??;
output.actors.into_iter().find(|a| a.actor_id == actor_id)
};

let Some(runner_info) = runner_info else {
return Err(errors::ActorNotFound {
actor_id,
port_name: port_name.to_string(),
}
.build());
};

if !runner_info.is_connectable {
let get_runner_fut = ctx.op(pegboard::ops::actor::get_runner::Input {
actor_ids: vec![actor_id],
});
let res = tokio::time::timeout(Duration::from_secs(5), get_runner_fut).await??;
let runner_info = res.actors.into_iter().next().filter(|x| x.is_connectable);

let runner_id = if let Some(runner_info) = runner_info {
runner_info.runner_id
} else {
tracing::info!(?actor_id, "waiting for actor to become ready");

// Wait for ready, fail, or destroy
tokio::select! {
res = ready_sub.next() => { res?; },
res = ready_sub.next() => { res?.runner_id },
res = fail_sub.next() => {
let msg = res?;
return Err(msg.error.clone().build());
Expand All @@ -185,45 +177,19 @@ async fn find_actor(
return Err(pegboard::errors::Actor::DestroyedWhileWaitingForReady.build());
}
// Ready timeout
_ = tokio::time::sleep(ACTOR_READY_TIMEOUT) => {
_ = tokio::time::sleep(ACTOR_READY_TIMEOUT) => {
return Err(errors::ActorReadyTimeout { actor_id }.build());
}
}
};

// TODO: Is this needed? Can't we just re-check the actor exists if it fails to connect?
// Verify actor is connectable again
let runner_info = {
let get_runner_fut = ctx.op(pegboard::ops::actor::get_runner::Input {
actor_ids: vec![actor_id],
});
let output = tokio::time::timeout(Duration::from_secs(5), get_runner_fut).await??;
output.actors.into_iter().find(|a| a.actor_id == actor_id)
};

let Some(runner_info) = runner_info else {
return Err(errors::ActorNotFound {
actor_id,
port_name: port_name.to_string(),
}
.build());
};

if !runner_info.is_connectable {
return Err(errors::ActorNotFound {
actor_id,
port_name: port_name.to_string(),
}
.build());
};
}

tracing::debug!(?actor_id, runner_id = ?runner_info.runner_id, "actor ready");
tracing::debug!(?actor_id, ?runner_id, "actor ready");

// Return pegboard-gateway instance
let gateway = pegboard_gateway::PegboardGateway::new(
ctx.clone(),
actor_id,
runner_info.runner_id,
runner_id,
port_name.to_string(),
);
Ok(Some(RoutingOutput::CustomServe(std::sync::Arc::new(
Expand Down
Loading
Loading