Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions frontend/packages/components/src/lib/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ const getApiEndpoint = (apiEndpoint: string) => {
}
// Default to staging servers for all other endpoints
return "https://api.staging2.gameinc.io";
} else if (apiEndpoint === "__SAME__") {
return location.origin;
}
return apiEndpoint;
};
Expand Down
2 changes: 2 additions & 0 deletions frontend/src/components/lib/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ const getApiEndpoint = (apiEndpoint: string) => {
}
// Default to staging servers for all other endpoints
return "https://api.staging2.gameinc.io";
} else if (apiEndpoint === "__SAME__") {
return location.origin;
}
return apiEndpoint;
};
Expand Down
2 changes: 0 additions & 2 deletions packages/common/universaldb/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,6 @@ pub enum DatabaseError {

#[error("operation issued while a commit was outstanding")]
UsedDuringCommit,
// #[error(transparent)]
// Custom(Box<dyn std::error::Error + Send + Sync>),
}

impl DatabaseError {
Expand Down
2 changes: 1 addition & 1 deletion packages/core/bootstrap/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ async fn setup_epoxy_coordinator(ctx: &StandaloneCtx) -> Result<()> {
//
// This does not guarantee the config will change immediately since we can't guarantee that the
// coordinator workflow is running on a node with the newest version of the config.
ctx.signal(epoxy::workflows::coordinator::ReconfigureSignal {})
ctx.signal(epoxy::workflows::coordinator::Reconfigure {})
.to_workflow_id(workflow_id)
.send()
.await?;
Expand Down
5 changes: 4 additions & 1 deletion packages/services/epoxy/src/ops/explicit_prepare.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,10 @@ pub enum ExplicitPrepareResult {
}

#[operation]
pub async fn explicit_prepare(ctx: &OperationCtx, input: &Input) -> Result<ExplicitPrepareResult> {
pub async fn epoxy_explicit_prepare(
ctx: &OperationCtx,
input: &Input,
) -> Result<ExplicitPrepareResult> {
let replica_id = ctx.config().epoxy_replica_id();
let instance = &input.instance;

Expand Down
2 changes: 1 addition & 1 deletion packages/services/epoxy/src/ops/kv/get_local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ pub struct Output {
}

#[operation]
pub async fn get_local(ctx: &OperationCtx, input: &Input) -> Result<Output> {
pub async fn epoxy_kv_get_local(ctx: &OperationCtx, input: &Input) -> Result<Output> {
// Read from local KV store only
let kv_key = keys::keys::KvValueKey::new(input.key.clone());
let subspace = keys::subspace(input.replica_id);
Expand Down
2 changes: 1 addition & 1 deletion packages/services/epoxy/src/ops/kv/get_optimistic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ pub struct Output {
///
/// We cannot use quorum reads for the fanout read because of the constraints of Epaxos.
#[operation]
pub async fn get_optimistic(ctx: &OperationCtx, input: &Input) -> Result<Output> {
pub async fn epoxy_kv_get_optimistic(ctx: &OperationCtx, input: &Input) -> Result<Output> {
// Try to read locally
let kv_key = keys::keys::KvValueKey::new(input.key.clone());
let cache_key = keys::keys::KvOptimisticCacheKey::new(input.key.clone());
Expand Down
2 changes: 1 addition & 1 deletion packages/services/epoxy/src/ops/propose.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ pub struct Input {
}

#[operation]
pub async fn propose(ctx: &OperationCtx, input: &Input) -> Result<ProposalResult> {
pub async fn epoxy_propose(ctx: &OperationCtx, input: &Input) -> Result<ProposalResult> {
let replica_id = ctx.config().epoxy_replica_id();

// Read config
Expand Down
2 changes: 1 addition & 1 deletion packages/services/epoxy/src/ops/read_cluster_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ pub struct Output {
}

#[operation]
pub async fn read_config(ctx: &OperationCtx, input: &Input) -> Result<Output> {
pub async fn epoxy_read_cluster_config(ctx: &OperationCtx, input: &Input) -> Result<Output> {
let config = ctx
.udb()?
.run(|tx| {
Expand Down
4 changes: 2 additions & 2 deletions packages/services/epoxy/src/replica/message_request.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ pub async fn message_request(
"received coordinator update replica status request"
);

ctx.signal(crate::workflows::coordinator::ReplicaStatusChangeSignal {
ctx.signal(crate::workflows::coordinator::ReplicaStatusChange {
replica_id: req.replica_id,
status: req.status.into(),
})
Expand All @@ -118,7 +118,7 @@ pub async fn message_request(
"received begin learning request"
);

ctx.signal(crate::workflows::replica::BeginLearningSignal {
ctx.signal(crate::workflows::replica::BeginLearning {
config: req.config.clone().into(),
})
.to_workflow::<crate::workflows::replica::Workflow>()
Expand Down
14 changes: 7 additions & 7 deletions packages/services/epoxy/src/workflows/coordinator/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,16 +25,16 @@ pub struct ReplicaState {
}

#[workflow]
pub async fn coordinator(ctx: &mut WorkflowCtx, _input: &Input) -> Result<()> {
pub async fn epoxy_coordinator(ctx: &mut WorkflowCtx, _input: &Input) -> Result<()> {
ctx.activity(InitInput {}).await?;

ctx.repeat(|ctx| {
async move {
match ctx.listen::<Main>().await? {
Main::ReconfigureSignal(_) => {
Main::Reconfigure(_) => {
reconfigure::reconfigure(ctx).await?;
}
Main::ReplicaStatusChangeSignal(sig) => {
Main::ReplicaStatusChange(sig) => {
replica_status_change::replica_status_change(ctx, sig).await?;
}
}
Expand Down Expand Up @@ -73,15 +73,15 @@ pub struct ConfigChangeMessage {
///
/// This gets called any time an engine node starts.
#[signal("epoxy_coordinator_reconfigure")]
pub struct ReconfigureSignal {}
pub struct Reconfigure {}

#[signal("epoxy_coordinator_replica_status_change")]
pub struct ReplicaStatusChangeSignal {
pub struct ReplicaStatusChange {
pub replica_id: protocol::ReplicaId,
pub status: types::ReplicaStatus,
}

join_signal!(Main {
ReconfigureSignal,
ReplicaStatusChangeSignal,
Reconfigure,
ReplicaStatusChange,
});
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use crate::types;

pub async fn replica_status_change(
ctx: &mut WorkflowCtx,
signal: super::ReplicaStatusChangeSignal,
signal: super::ReplicaStatusChange,
) -> Result<()> {
// Update replica status
let should_increment_epoch = ctx
Expand Down
10 changes: 5 additions & 5 deletions packages/services/epoxy/src/workflows/replica/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,14 @@ pub use setup::*;
pub struct Input {}

#[workflow]
pub async fn replica(ctx: &mut WorkflowCtx, input: &Input) -> Result<()> {
pub async fn epoxy_replica(ctx: &mut WorkflowCtx, input: &Input) -> Result<()> {
setup_replica(ctx, input).await?;

// Main loop
ctx.repeat(|ctx| {
async move {
// Noop for now
ctx.listen::<MainSignal>().await?;
ctx.listen::<Main>().await?;
Ok(Loop::<()>::Continue)
}
.boxed()
Expand All @@ -32,9 +32,9 @@ pub async fn replica(ctx: &mut WorkflowCtx, input: &Input) -> Result<()> {
}

#[signal("epoxy_replica_begin_learning")]
pub struct BeginLearningSignal {
pub struct BeginLearning {
pub config: types::ClusterConfig,
}

#[signal("main")]
pub struct MainSignal {}
#[signal("epoxy_replica_main")]
pub struct Main {}
14 changes: 7 additions & 7 deletions packages/services/epoxy/src/workflows/replica/setup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,14 @@ use universaldb::{KeySelector, RangeOption, options::StreamingMode};
use crate::types;

// IMPORTANT: Do not use `read_cluster_config`. Instead, use the config provided by
// `BeginLearningSignal`. This is because the value of `read_cluster_config` may change between
// `BeginLearning`. This is because the value of `read_cluster_config` may change between
// activities which can cause the learning process to enter an invalid state.

pub async fn setup_replica(ctx: &mut WorkflowCtx, _input: &super::Input) -> Result<()> {
// Wait for cooridinator to send begin learning signal
let begin_learning = ctx.listen::<super::BeginLearningSignal>().await?;
// Wait for coordiinator to send begin learning signal
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's a small typo in the comment: coordiinator should be spelled coordinator.

Suggested change
// Wait for coordiinator to send begin learning signal
// Wait for coordinator to send begin learning signal

Spotted by Diamond

Fix in Graphite


Is this helpful? React 👍 or 👎 to let us know.

let begin_learning = ctx.listen::<super::BeginLearning>().await?;

// TODO: Paralellize replicas
// TODO: Parallelize replicas
let total_replicas = begin_learning.config.replicas.len();
let mut replica_index = 0;

Expand Down Expand Up @@ -134,7 +134,7 @@ pub async fn setup_replica(ctx: &mut WorkflowCtx, _input: &super::Input) -> Resu

#[derive(Debug, Clone, Serialize, Deserialize, Hash)]
pub struct DownloadInstancesChunkInput {
/// Config received from BeginLearningSignal
/// Config received from BeginLearning
pub learning_config: types::ClusterConfig,
pub from_replica_id: protocol::ReplicaId,
pub after_instance: Option<types::Instance>,
Expand Down Expand Up @@ -312,7 +312,7 @@ async fn apply_log_entry(

#[derive(Debug, Clone, Serialize, Deserialize, Hash)]
pub struct RecoverKeysChunkInput {
/// Config received from BeginLearningSignal
/// Config received from BeginLearning
pub learning_config: types::ClusterConfig,
/// The last key value from the previous chunk, used for pagination
pub after_key: Option<Vec<u8>>,
Expand Down Expand Up @@ -728,7 +728,7 @@ struct CommittedEntry {

#[derive(Debug, Clone, Serialize, Deserialize, Hash)]
pub struct NotifyActiveInput {
/// Config received from BeginLearningSignal
/// Config received from BeginLearning
pub learning_config: types::ClusterConfig,
}

Expand Down
2 changes: 1 addition & 1 deletion packages/services/epoxy/tests/common/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,7 @@ async fn setup_epoxy_coordinator_wf(
)
.await?;
leader_ctx
.signal(epoxy::workflows::coordinator::ReconfigureSignal {})
.signal(epoxy::workflows::coordinator::Reconfigure {})
.to_workflow_id(workflow_id)
.send()
.await?;
Expand Down
2 changes: 1 addition & 1 deletion packages/services/epoxy/tests/reconfigure.rs
Original file line number Diff line number Diff line change
Expand Up @@ -321,7 +321,7 @@ async fn test_inner(config: TestConfig) {
.await
.unwrap();
leader_ctx
.signal(epoxy::workflows::coordinator::ReconfigureSignal {})
.signal(epoxy::workflows::coordinator::Reconfigure {})
.to_workflow_id(test_ctx.coordinator_workflow_id)
.send()
.await
Expand Down
10 changes: 6 additions & 4 deletions sdks/typescript/test-runner/src/main.ts

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading