Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions docker/dev-host/grafana/dashboards/cache.json
Original file line number Diff line number Diff line change
Expand Up @@ -1170,7 +1170,7 @@
"timepicker": {},
"timezone": "browser",
"title": "Rivet Guard",
"uid": "cen785ige8fswd",
"uid": "cen785ige8fswd2",
"version": 1,
"weekStart": ""
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1170,7 +1170,7 @@
"timepicker": {},
"timezone": "browser",
"title": "Rivet Guard",
"uid": "cen785ige8fswd",
"uid": "cen785ige8fswd2",
"version": 1,
"weekStart": ""
}
}
4 changes: 2 additions & 2 deletions docker/dev-multidc/core/grafana/dashboards/cache.json
Original file line number Diff line number Diff line change
Expand Up @@ -1170,7 +1170,7 @@
"timepicker": {},
"timezone": "browser",
"title": "Rivet Guard",
"uid": "cen785ige8fswd",
"uid": "cen785ige8fswd2",
"version": 1,
"weekStart": ""
}
}
4 changes: 2 additions & 2 deletions docker/dev-multinode/grafana/dashboards/cache.json
Original file line number Diff line number Diff line change
Expand Up @@ -1170,7 +1170,7 @@
"timepicker": {},
"timezone": "browser",
"title": "Rivet Guard",
"uid": "cen785ige8fswd",
"uid": "cen785ige8fswd2",
"version": 1,
"weekStart": ""
}
}
4 changes: 2 additions & 2 deletions docker/dev/grafana/dashboards/cache.json
Original file line number Diff line number Diff line change
Expand Up @@ -1170,7 +1170,7 @@
"timepicker": {},
"timezone": "browser",
"title": "Rivet Guard",
"uid": "cen785ige8fswd",
"uid": "cen785ige8fswd2",
"version": 1,
"weekStart": ""
}
}
17 changes: 8 additions & 9 deletions packages/common/config/src/config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -187,15 +187,14 @@
}

pub fn validate_and_set_defaults(&mut self) -> Result<()> {
// TODO: Add back
//// Set default pubsub to Postgres if configured for database
//if self.pubsub.is_none()
// && let Some(Database::Postgres(pg)) = &self.database
//{
// self.pubsub = Some(PubSub::PostgresNotify(pubsub::Postgres {
// url: pg.url.clone(),
// }));
//}
// Set default pubsub to Postgres if configured for database
if self.pubsub.is_none()
&& let Some(Database::Postgres(pg)) = &self.database
{
self.pubsub = Some(PubSub::PostgresNotify(pubsub::Postgres {

Check failure on line 194 in packages/common/config/src/config/mod.rs

View workflow job for this annotation

GitHub Actions / Test

missing field `memory_optimization` in initializer of `pubsub::Postgres`

Check failure on line 194 in packages/common/config/src/config/mod.rs

View workflow job for this annotation

GitHub Actions / Check

missing field `memory_optimization` in initializer of `pubsub::Postgres`
url: pg.url.clone(),
}));
}

Ok(())
}
Expand Down
2 changes: 1 addition & 1 deletion packages/common/config/src/config/pegboard_tunnel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::net::IpAddr;
use schemars::JsonSchema;
use serde::{Deserialize, Serialize};

/// The tunnel service that forwards tunnel-protocol messages between NATS and WebSocket connections.
/// The tunnel service that forwards tunnel-protocol messages between pubsub and WebSocket connections.
#[derive(Debug, Serialize, Deserialize, Clone, Default, JsonSchema)]
#[serde(deny_unknown_fields)]
pub struct PegboardTunnel {
Expand Down
7 changes: 7 additions & 0 deletions packages/common/config/src/config/pubsub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,15 @@ pub struct Postgres {
///
/// See: https://docs.rs/postgres/0.19.10/postgres/config/struct.Config.html#url
pub url: Secret<String>,
#[serde(default = "default_mem_opt")]
pub memory_optimization: bool,
}

impl Default for Postgres {
fn default() -> Self {
Self {
url: Secret::new("postgresql://postgres:[email protected]:5432/postgres".into()),
memory_optimization: true,
}
}
}
Expand Down Expand Up @@ -81,3 +84,7 @@ impl Memory {
"default".to_string()
}
}

fn default_mem_opt() -> bool {
true
}
66 changes: 33 additions & 33 deletions packages/common/gasoline/core/src/ctx/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,14 @@ use universalpubsub::{NextOutput, Subscriber};

use crate::{
error::{WorkflowError, WorkflowResult},
message::{Message, NatsMessage, NatsMessageWrapper},
message::{Message, PubsubMessage, PubsubMessageWrapper},
utils::{self, tags::AsTags},
};

#[derive(Clone)]
pub struct MessageCtx {
/// The connection used to communicate with NATS.
nats: UpsPool,
/// The connection used to communicate with pubsub.
pubsub: UpsPool,

ray_id: Id,

Expand All @@ -35,7 +35,7 @@ impl MessageCtx {
ray_id: Id,
) -> WorkflowResult<Self> {
Ok(MessageCtx {
nats: pools.ups().map_err(WorkflowError::PoolsGeneric)?,
pubsub: pools.ups().map_err(WorkflowError::PoolsGeneric)?,
ray_id,
config: config.clone(),
})
Expand All @@ -44,7 +44,7 @@ impl MessageCtx {

// MARK: Publishing messages
impl MessageCtx {
/// Publishes a message to NATS and to a durable message stream if a topic is
/// Publishes a message to pubsub and to a durable message stream if a topic is
/// set.
///
/// Use `subscribe` to consume these messages ephemerally and `tail` to read
Expand Down Expand Up @@ -94,7 +94,7 @@ impl MessageCtx {
where
M: Message,
{
let nats_subject = M::nats_subject();
let subject = M::subject();
let duration_since_epoch = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_else(|err| unreachable!("time is broken: {}", err));
Expand All @@ -109,7 +109,7 @@ impl MessageCtx {

// Serialize message
let req_id = Id::new_v1(self.config.dc_label());
let message = NatsMessageWrapper {
let message = PubsubMessageWrapper {
req_id,
ray_id: self.ray_id,
tags: tags.as_tags()?,
Expand All @@ -119,7 +119,7 @@ impl MessageCtx {
let message_buf = serde_json::to_vec(&message).map_err(WorkflowError::SerializeMessage)?;

tracing::debug!(
%nats_subject,
%subject,
body_bytes = ?body_buf_len,
message_bytes = ?message_buf.len(),
"publish message"
Expand All @@ -128,15 +128,15 @@ impl MessageCtx {
// It's important to write to the stream as fast as possible in order to
// ensure messages are handled quickly.
let message_buf = Arc::new(message_buf);
self.message_publish_nats::<M>(&nats_subject, message_buf)
self.message_publish_pubsub::<M>(&subject, message_buf)
.await;

Ok(())
}

/// Publishes the message to NATS.
/// Publishes the message to pubsub.
#[tracing::instrument(level = "debug", skip_all)]
async fn message_publish_nats<M>(&self, nats_subject: &str, message_buf: Arc<Vec<u8>>)
async fn message_publish_pubsub<M>(&self, subject: &str, message_buf: Arc<Vec<u8>>)
where
M: Message,
{
Expand All @@ -146,19 +146,19 @@ impl MessageCtx {
// Ignore for infinite backoff
backoff.tick().await;

let nats_subject = nats_subject.to_owned();
let subject = subject.to_owned();

tracing::trace!(
%nats_subject,
%subject,
message_len = message_buf.len(),
"publishing message to nats"
"publishing message to pubsub"
);
if let Err(err) = self.nats.publish(&nats_subject, &(*message_buf)).await {
if let Err(err) = self.pubsub.publish(&subject, &(*message_buf)).await {
tracing::warn!(?err, "publish message failed, trying again");
continue;
}

tracing::debug!("publish nats message succeeded");
tracing::debug!("publish pubsub message succeeded");
break;
}
}
Expand All @@ -172,21 +172,21 @@ impl MessageCtx {

// MARK: Subscriptions
impl MessageCtx {
/// Listens for gasoline messages globally on NATS.
/// Listens for gasoline messages globally on pubsub.
#[tracing::instrument(skip_all, fields(message = M::NAME))]
pub async fn subscribe<M>(&self, tags: impl AsTags) -> WorkflowResult<SubscriptionHandle<M>>
where
M: Message,
{
self.subscribe_opt::<M>(SubscribeOpts {
tags: tags.as_tags()?,
flush_nats: true,
flush: true,
})
.in_current_span()
.await
}

/// Listens for gasoline messages globally on NATS.
/// Listens for gasoline messages globally on pubsub.
#[tracing::instrument(skip_all, fields(message = M::NAME))]
pub async fn subscribe_opt<M>(
&self,
Expand All @@ -195,32 +195,32 @@ impl MessageCtx {
where
M: Message,
{
let nats_subject = M::nats_subject();
let subject = M::subject();

// Create subscription and flush immediately.
tracing::debug!(%nats_subject, tags = ?opts.tags, "creating subscription");
tracing::debug!(%subject, tags = ?opts.tags, "creating subscription");
let subscription = self
.nats
.subscribe(&nats_subject)
.pubsub
.subscribe(&subject)
.await
.map_err(|x| WorkflowError::CreateSubscription(x.into()))?;
if opts.flush_nats {
self.nats
if opts.flush {
self.pubsub
.flush()
.await
.map_err(|x| WorkflowError::FlushNats(x.into()))?;
.map_err(|x| WorkflowError::FlushPubsub(x.into()))?;
}

// Return handle
let subscription = SubscriptionHandle::new(nats_subject, subscription, opts.tags.clone());
let subscription = SubscriptionHandle::new(subject, subscription, opts.tags.clone());
Ok(subscription)
}
}

#[derive(Debug)]
pub struct SubscribeOpts {
pub tags: serde_json::Value,
pub flush_nats: bool,
pub flush: bool,
}

/// Used to receive messages from other contexts.
Expand Down Expand Up @@ -291,15 +291,15 @@ where
///
/// This future can be safely dropped.
#[tracing::instrument(name="message_next", skip_all, fields(message = M::NAME))]
pub async fn next(&mut self) -> WorkflowResult<NatsMessage<M>> {
pub async fn next(&mut self) -> WorkflowResult<PubsubMessage<M>> {
tracing::debug!("waiting for message");

loop {
// Poll the subscription.
//
// Use blocking threads instead of `try_next`, since I'm not sure
// try_next works as intended.
let nats_message = match self.subscription.next().await {
let message = match self.subscription.next().await {
Ok(NextOutput::Message(msg)) => msg,
Ok(NextOutput::Unsubscribed) => {
tracing::debug!("unsubscribed");
Expand All @@ -311,11 +311,11 @@ where
}
};

let message_wrapper = NatsMessage::<M>::deserialize_wrapper(&nats_message.payload)?;
let message_wrapper = PubsubMessage::<M>::deserialize_wrapper(&message.payload)?;

// Check if the subscription tags match a subset of the message tags
if utils::is_value_subset(&self.tags, &message_wrapper.tags) {
let message = NatsMessage::<M>::deserialize_from_wrapper(message_wrapper)?;
let message = PubsubMessage::<M>::deserialize_from_wrapper(message_wrapper)?;
tracing::debug!(?message, "received message");

return Ok(message);
Expand All @@ -326,7 +326,7 @@ where
}

/// Converts the subscription in to a stream.
pub fn into_stream(self) -> impl futures_util::Stream<Item = WorkflowResult<NatsMessage<M>>> {
pub fn into_stream(self) -> impl futures_util::Stream<Item = WorkflowResult<PubsubMessage<M>>> {
futures_util::stream::try_unfold(self, |mut sub| {
async move {
let message = sub.next().await?;
Expand Down
12 changes: 6 additions & 6 deletions packages/common/gasoline/core/src/db/kv/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
//! Implementation of a workflow database driver with UniversalDB and NATS.
//! Implementation of a workflow database driver with UniversalDB and UniversalPubSub.
// TODO: Move code to smaller functions for readability

use std::{
Expand Down Expand Up @@ -43,7 +43,7 @@ mod keys;
const WORKER_INSTANCE_LOST_THRESHOLD_MS: i64 = rivet_util::duration::seconds(30);
/// How long before overwriting an existing metrics lock.
const METRICS_LOCK_TIMEOUT_MS: i64 = rivet_util::duration::seconds(30);
/// For NATS wake mechanism.
/// For pubsub wake mechanism.
const WORKER_WAKE_SUBJECT: &str = "gasoline.worker.wake";

pub struct DatabaseKv {
Expand All @@ -52,17 +52,17 @@ pub struct DatabaseKv {
}

impl DatabaseKv {
/// Spawns a new thread and publishes a worker wake message to nats.
/// Spawns a new thread and publishes a worker wake message to pubsub.
fn wake_worker(&self) {
let Ok(nats) = self.pools.ups() else {
tracing::debug!("failed to acquire nats pool");
let Ok(pubsub) = self.pools.ups() else {
tracing::debug!("failed to acquire pubsub pool");
return;
};

let spawn_res = tokio::task::Builder::new().name("wake").spawn(
async move {
// Fail gracefully
if let Err(err) = nats.publish(WORKER_WAKE_SUBJECT, &Vec::new()).await {
if let Err(err) = pubsub.publish(WORKER_WAKE_SUBJECT, &Vec::new()).await {
tracing::warn!(?err, "failed to publish wake message");
}
}
Expand Down
4 changes: 2 additions & 2 deletions packages/common/gasoline/core/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,8 +110,8 @@ pub enum WorkflowError {
#[error("failed to create subscription: {0}")]
CreateSubscription(#[source] anyhow::Error),

#[error("failed to flush nats: {0}")]
FlushNats(#[source] anyhow::Error),
#[error("failed to flush pubsub: {0}")]
FlushPubsub(#[source] anyhow::Error),

#[error("subscription unsubscribed")]
SubscriptionUnsubscribed,
Expand Down
Loading
Loading