Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ debug = 1
sentry_protos = "0.2.0"
anyhow = "1.0.92"
chrono = { version = "0.4.26" }
sqlx = { version = "0.8.3", features = ["sqlite", "runtime-tokio", "chrono"] }
sqlx = { version = "0.8.3", features = ["sqlite", "runtime-tokio", "chrono", "postgres"] }
tonic = "0.12"
tonic-health = "0.12.3"
prost = "0.13"
Expand Down
3 changes: 2 additions & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
# recent enough version of protobuf-compiler
FROM rust:1-bookworm AS build

RUN apt-get update && apt-get upgrade -y
RUN apt-get update && apt-get upgrade -y
RUN apt-get install -y cmake pkg-config libssl-dev librdkafka-dev protobuf-compiler

RUN USER=root cargo new --bin taskbroker
Expand All @@ -17,6 +17,7 @@ ENV TASKBROKER_VERSION=$TASKBROKER_GIT_REVISION
COPY ./Cargo.lock ./Cargo.lock
COPY ./Cargo.toml ./Cargo.toml
COPY ./migrations ./migrations
COPY ./pg_migrations ./pg_migrations
COPY ./benches ./benches

# Build dependencies in a way they can be cached
Expand Down
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ format: ## Run autofix mode for formatting and lint
# Tests

unit-test: ## Run unit tests
cargo test
cargo test -- --test-threads=1
.PHONY: unit-test

reset-kafka: setup ## Reset kafka
Expand Down
58 changes: 17 additions & 41 deletions benches/store_bench.rs
Original file line number Diff line number Diff line change
@@ -1,37 +1,24 @@
use std::sync::Arc;

use chrono::Utc;
use criterion::{Criterion, criterion_group, criterion_main};
use rand::Rng;
use taskbroker::{
store::inflight_activation::{
InflightActivationStatus, InflightActivationStore, InflightActivationStoreConfig,
},
test_utils::{generate_temp_filename, make_activations},
test_utils::{get_pg_database_name, get_pg_url, make_activations},
};
use tokio::task::JoinSet;

async fn get_pending_activations(num_activations: u32, num_workers: u32) {
let url = if cfg!(feature = "bench-with-mnt-disk") {
let mut rng = rand::thread_rng();
format!(
"/mnt/disks/sqlite/{}-{}.sqlite",
Utc::now(),
rng.r#gen::<u64>()
)
} else {
generate_temp_filename()
};
let store = Arc::new(
InflightActivationStore::new(
&url,
InflightActivationStoreConfig {
max_processing_attempts: 1,
vacuum_page_count: None,
processing_deadline_grace_sec: 3,
enable_sqlite_status_metrics: false,
},
)
InflightActivationStore::new(InflightActivationStoreConfig {
pg_url: get_pg_url(),
pg_database_name: get_pg_database_name(),
max_processing_attempts: 1,
vacuum_page_count: None,
processing_deadline_grace_sec: 3,
enable_sqlite_status_metrics: false,
})
.await
.unwrap(),
);
Expand Down Expand Up @@ -71,26 +58,15 @@ async fn get_pending_activations(num_activations: u32, num_workers: u32) {
async fn set_status(num_activations: u32, num_workers: u32) {
assert!(num_activations.is_multiple_of(num_workers));

let url = if cfg!(feature = "bench-with-mnt-disk") {
let mut rng = rand::thread_rng();
format!(
"/mnt/disks/sqlite/{}-{}.sqlite",
Utc::now(),
rng.r#gen::<u64>()
)
} else {
generate_temp_filename()
};
let store = Arc::new(
InflightActivationStore::new(
&url,
InflightActivationStoreConfig {
max_processing_attempts: 1,
vacuum_page_count: None,
processing_deadline_grace_sec: 3,
enable_sqlite_status_metrics: false,
},
)
InflightActivationStore::new(InflightActivationStoreConfig {
pg_url: get_pg_url(),
pg_database_name: get_pg_database_name(),
max_processing_attempts: 1,
vacuum_page_count: None,
processing_deadline_grace_sec: 3,
enable_sqlite_status_metrics: false,
})
.await
.unwrap(),
);
Expand Down
1 change: 1 addition & 0 deletions default_migrations/0001_create_database.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
CREATE DATABASE taskbroker;
22 changes: 22 additions & 0 deletions pg_migrations/0001_create_inflight_activations.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
-- PostgreSQL equivalent of the inflight_taskactivations table
CREATE TABLE IF NOT EXISTS inflight_taskactivations (
id TEXT NOT NULL PRIMARY KEY,
activation BYTEA NOT NULL,
partition INTEGER NOT NULL,
kafka_offset BIGINT NOT NULL,
added_at TIMESTAMPTZ NOT NULL,
received_at TIMESTAMPTZ NOT NULL,
processing_attempts INTEGER NOT NULL,
expires_at TIMESTAMPTZ,
delay_until TIMESTAMPTZ,
processing_deadline_duration INTEGER NOT NULL,
processing_deadline TIMESTAMPTZ,
status TEXT NOT NULL,
at_most_once BOOLEAN NOT NULL DEFAULT FALSE,
namespace TEXT NOT NULL,
taskname TEXT NOT NULL,
on_attempts_exceeded INTEGER NOT NULL DEFAULT 1
);

CREATE INDEX IF NOT EXISTS idx_pending_activation
ON inflight_taskactivations (status, added_at, namespace, id);
8 changes: 8 additions & 0 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,12 @@ pub struct Config {
/// The number of ms for timeouts when publishing messages to kafka.
pub kafka_send_timeout_ms: u64,

/// The url of the postgres database to use for the inflight activation store.
pub pg_url: String,

/// The name of the postgres database to use for the inflight activation store.
pub pg_database_name: String,
Comment on lines +125 to +128
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You should be able to combine these, and include the database name into the url.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I separated it because technically I interact with 2 databases: default, to create the DB, and then the actual DB to create the table.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, because you can't connect to a database that doesn't exist, and you need to create the db initially.


/// The path to the sqlite database
pub db_path: String,

Expand Down Expand Up @@ -256,6 +262,8 @@ impl Default for Config {
kafka_auto_offset_reset: "latest".to_owned(),
kafka_send_timeout_ms: 500,
db_path: "./taskbroker-inflight.sqlite".to_owned(),
pg_url: "postgres://postgres:password@sentry-postgres-1:5432/".to_owned(),
pg_database_name: "taskbroker".to_owned(),
db_write_failure_backoff_ms: 4000,
db_insert_batch_max_len: 256,
db_insert_batch_max_size: 16_000_000,
Expand Down
2 changes: 1 addition & 1 deletion src/kafka/deserialize_activation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ pub fn new(
added_at: Utc::now(),
received_at: activation_time,
processing_deadline: None,
processing_deadline_duration: activation.processing_deadline_duration as u32,
processing_deadline_duration: activation.processing_deadline_duration as i32,
processing_attempts: 0,
expires_at,
delay_until,
Expand Down
2 changes: 0 additions & 2 deletions src/kafka/inflight_activation_batcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -453,8 +453,6 @@ demoted_namespaces:
ActivationBatcherConfig::from_config(&config),
runtime_config,
);
println!("kafka_topic: {:?}", config.kafka_topic);
println!("kafka_long_topic: {:?}", config.kafka_long_topic);

let inflight_activation_0 = InflightActivation {
id: "0".to_string(),
Expand Down
Loading
Loading