Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/workflows/publish.yml
Original file line number Diff line number Diff line change
Expand Up @@ -618,7 +618,7 @@ jobs:
- name: Setup Rust toolchain
uses: actions-rust-lang/setup-rust-toolchain@v1
with:
toolchain: nightly-2024-01-29
toolchain: nightly-2025-08-01
target: ${{ matrix.target }}
# override: true # this is by default on
rustflags: ""
Expand Down Expand Up @@ -705,7 +705,7 @@ jobs:
- name: Setup Rust toolchain
uses: actions-rust-lang/setup-rust-toolchain@v1
with:
toolchain: nightly-2024-01-29
toolchain: nightly-2025-08-01
target: ${{ matrix.target }}
# override: true # this is by default on
rustflags: ""
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/push.yml
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,7 @@ jobs:
- name: Install Rust
uses: actions-rust-lang/setup-rust-toolchain@v1
with:
toolchain: nightly-2024-01-29
toolchain: nightly-2025-08-01
# override: true # this is by default on
rustflags: ""
components: rustfmt
Expand Down
4 changes: 2 additions & 2 deletions .github/workflows/rust-cubestore-master.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ jobs:
strategy:
fail-fast: false
matrix:
rust: [nightly-2024-01-29]
rust: [nightly-2025-08-01]
env:
RUST: ${{ matrix.rust }}
steps:
Expand Down Expand Up @@ -290,7 +290,7 @@ jobs:
- name: Setup Rust toolchain
uses: actions-rust-lang/setup-rust-toolchain@v1
with:
toolchain: nightly-2024-01-29
toolchain: nightly-2025-08-01
target: ${{ matrix.target }}
# override: true # this is by default on
rustflags: ""
Expand Down
4 changes: 2 additions & 2 deletions .github/workflows/rust-cubestore.yml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ jobs:
strategy:
fail-fast: false
matrix:
rust: [nightly-2024-01-29]
rust: [nightly-2025-08-01]
container:
image: cubejs/rust-builder:bookworm-llvm-18
env:
Expand Down Expand Up @@ -229,7 +229,7 @@ jobs:
- name: Setup Rust toolchain
uses: actions-rust-lang/setup-rust-toolchain@v1
with:
toolchain: nightly-2024-01-29
toolchain: nightly-2025-08-01
target: ${{ matrix.target }}
# override: true # this is by default on
rustflags: ""
Expand Down
8 changes: 4 additions & 4 deletions rust/cubeshared/src/codegen/http_message_generated.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1246,7 +1246,7 @@ impl core::fmt::Debug for HttpColumnValue<'_> {
/// catch every error, or be maximally performant. For the
/// previous, unchecked, behavior use
/// `root_as_http_message_unchecked`.
pub fn root_as_http_message(buf: &[u8]) -> Result<HttpMessage, flatbuffers::InvalidFlatbuffer> {
pub fn root_as_http_message(buf: &[u8]) -> Result<HttpMessage<'_>, flatbuffers::InvalidFlatbuffer> {
flatbuffers::root::<HttpMessage>(buf)
}
#[inline]
Expand All @@ -1258,7 +1258,7 @@ pub fn root_as_http_message(buf: &[u8]) -> Result<HttpMessage, flatbuffers::Inva
/// `size_prefixed_root_as_http_message_unchecked`.
pub fn size_prefixed_root_as_http_message(
buf: &[u8],
) -> Result<HttpMessage, flatbuffers::InvalidFlatbuffer> {
) -> Result<HttpMessage<'_>, flatbuffers::InvalidFlatbuffer> {
flatbuffers::size_prefixed_root::<HttpMessage>(buf)
}
#[inline]
Expand Down Expand Up @@ -1291,14 +1291,14 @@ pub fn size_prefixed_root_as_http_message_with_opts<'b, 'o>(
/// Assumes, without verification, that a buffer of bytes contains a HttpMessage and returns it.
/// # Safety
/// Callers must trust the given bytes do indeed contain a valid `HttpMessage`.
pub unsafe fn root_as_http_message_unchecked(buf: &[u8]) -> HttpMessage {
pub unsafe fn root_as_http_message_unchecked(buf: &[u8]) -> HttpMessage<'_> {
flatbuffers::root_unchecked::<HttpMessage>(buf)
}
#[inline]
/// Assumes, without verification, that a buffer of bytes contains a size prefixed HttpMessage and returns it.
/// # Safety
/// Callers must trust the given bytes do indeed contain a valid size prefixed `HttpMessage`.
pub unsafe fn size_prefixed_root_as_http_message_unchecked(buf: &[u8]) -> HttpMessage {
pub unsafe fn size_prefixed_root_as_http_message_unchecked(buf: &[u8]) -> HttpMessage<'_> {
flatbuffers::size_prefixed_root_unchecked::<HttpMessage>(buf)
}
#[inline]
Expand Down
23 changes: 21 additions & 2 deletions rust/cubestore/CLAUDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -133,8 +133,27 @@ The codebase uses a custom dependency injection system defined in `config/inject

## Important Notes

- This is a Rust nightly project (see `rust-toolchain.toml`)
- **Rust Nightly**: Uses nightly-2025-08-01 (see `rust-toolchain.toml`)
- Uses custom forks of Arrow/DataFusion and sqlparser-rs for Cube-specific features
- Distributed mode involves router and worker nodes communicating via RPC
- Heavy use of async/await patterns with Tokio runtime
- Parquet files are the primary storage format for data
- Parquet files are the primary storage format for data

## Docker Configuration

The project includes Docker configurations for building and deploying CubeStore:

- **`builder.Dockerfile`**: Defines the base build image with Rust nightly-2025-08-01, LLVM 18, and build dependencies
- **`Dockerfile`**: Production Dockerfile that uses `cubejs/rust-builder:bookworm-llvm-18` base image and copies rust-toolchain.toml
- **GitHub Actions**: Multiple CI/CD workflows use the same Rust version

## Updating Rust Version

When updating the Rust version, ensure ALL these files are kept in sync:

1. **`rust-toolchain.toml`** - Primary source of truth for local development
2. **`builder.Dockerfile`** - Update the rustup default command with the new nightly version
3. **`Dockerfile`** - Copies rust-toolchain.toml (no manual update needed if builder image is updated)
4. **GitHub Workflows** - Update all occurrences of the Rust nightly version in `.github/workflows/` directory

**Note**: The `cubejs/rust-builder:bookworm-llvm-18` Docker image tag may also need updating if the builder.Dockerfile changes significantly.
19 changes: 4 additions & 15 deletions rust/cubestore/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions rust/cubestore/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ WORKDIR /build/cubestore

COPY cubeshared /build/cubeshared

COPY cubestore/rust-toolchain.toml .
COPY cubestore/Cargo.toml .
COPY cubestore/Cargo.lock .
COPY cubestore/cuberockstore cuberockstore
Expand Down
4 changes: 2 additions & 2 deletions rust/cubestore/builder.Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ FROM rust:$OS_NAME
ARG LLVM_VERSION=18

RUN rustup update && \
rustup default nightly-2024-01-29 && \
rustup component add --toolchain nightly-2024-01-29 rustfmt clippy;
rustup default nightly-2025-08-01 && \
rustup component add --toolchain nightly-2025-08-01 rustfmt clippy;

RUN apt update \
&& apt upgrade -y \
Expand Down
2 changes: 1 addition & 1 deletion rust/cubestore/cubehll/src/instance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1217,7 +1217,7 @@ struct BitCursor<'a> {
}

impl BitCursor<'_> {
pub fn new(input: &[u8]) -> BitCursor {
pub fn new(input: &[u8]) -> BitCursor<'_> {
BitCursor {
input,
pos: 0,
Expand Down
1 change: 0 additions & 1 deletion rust/cubestore/cubestore-sql-tests/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
#![feature(async_closure)]
#![feature(test)]

pub use crate::benches::cubestore_benches;
Expand Down
12 changes: 8 additions & 4 deletions rust/cubestore/cubestore/src/cachestore/cache_rocksstore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -273,8 +273,10 @@ impl RocksCacheStore {
.upload_loop
.process(
cachestore.clone(),
async move |_| Ok(Delay::new(Duration::from_secs(upload_interval)).await),
async move |m, _| m.store.run_upload().await,
move |_| async move {
Ok(Delay::new(Duration::from_secs(upload_interval)).await)
},
move |m, _| async move { m.store.run_upload().await },
)
.await;

Expand All @@ -292,8 +294,10 @@ impl RocksCacheStore {
.metrics_loop
.process(
cachestore.clone(),
async move |_| Ok(Delay::new(Duration::from_secs(metrics_interval)).await),
async move |m, _| {
move |_| async move {
Ok(Delay::new(Duration::from_secs(metrics_interval)).await)
},
move |m, _| async move {
if let Err(err) = m.submit_metrics().await {
log::error!("Error while submitting cachestore metrics: {}", err)
};
Expand Down
6 changes: 3 additions & 3 deletions rust/cubestore/cubestore/src/cachestore/lazy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ pub enum LazyRocksCacheStoreState {
metastore_fs: Arc<dyn MetaStoreFs>,
config: Arc<dyn ConfigObj>,
listeners: Vec<tokio::sync::broadcast::Sender<MetaStoreEvent>>,
init_flag: Sender<bool>,
_init_flag: Sender<bool>,
},
Closed {},
Initialized {
Expand Down Expand Up @@ -72,7 +72,7 @@ impl LazyRocksCacheStore {
metastore_fs,
config,
listeners,
init_flag,
_init_flag: init_flag,
}),
}))
}
Expand Down Expand Up @@ -101,7 +101,7 @@ impl LazyRocksCacheStore {
config,
listeners,
// receiver will be closed on drop
init_flag: _,
_init_flag: _,
} => {
let store =
RocksCacheStore::load_from_remote(&path, metastore_fs.clone(), config.clone())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ impl QueueItemPayload {
}

#[derive(Clone, Copy, Debug)]
#[allow(dead_code)]
pub(crate) enum QueueItemPayloadRocksIndex {}

pub struct QueueItemPayloadRocksTable<'a> {
Expand Down Expand Up @@ -80,6 +81,7 @@ rocks_table_new!(
);

#[derive(Hash, Clone, Debug)]
#[allow(dead_code)]
pub enum QueueItemPayloadIndexKey {}

base_rocks_secondary_index!(QueueItemPayload, QueueItemPayloadRocksIndex);
Expand Down
4 changes: 2 additions & 2 deletions rust/cubestore/cubestore/src/http/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -403,8 +403,8 @@ impl HttpServer {
let drop_processing_messages_after = self.drop_processing_messages_after.clone();
let drop_orphaned_messages_loop = self.drop_orphaned_messages_loop.process(
messages_state,
async move |_| Ok(Delay::new(check_orphaned_messages_interval.clone()).await),
async move |messages_state, _| {
move |_| async move { Ok(Delay::new(check_orphaned_messages_interval.clone()).await) },
move |messages_state, _| async move {
let mut messages_state = messages_state.lock().await;
let mut keys_to_remove = Vec::new();
let mut orphaned_complete_results = 0;
Expand Down
2 changes: 1 addition & 1 deletion rust/cubestore/cubestore/src/import/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -323,7 +323,7 @@ impl<'a> CsvLineParser<'a> {
}
}

fn next_value(&mut self) -> Result<MaybeOwnedStr, CubeError> {
fn next_value(&mut self) -> Result<MaybeOwnedStr<'_>, CubeError> {
Ok(
if let Some(b'"') = self.remaining.as_bytes().iter().nth(0) {
let mut closing_index = None;
Expand Down
3 changes: 0 additions & 3 deletions rust/cubestore/cubestore/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,7 @@
#![feature(test)]
#![feature(async_closure)]
#![feature(box_patterns)]
#![feature(vec_into_raw_parts)]
#![feature(hash_set_entry)]
#![feature(is_sorted)]
#![feature(result_flattening)]
// #![feature(trace_macros)]

// trace_macros!(true);
Expand Down
6 changes: 3 additions & 3 deletions rust/cubestore/cubestore/src/metastore/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1428,8 +1428,8 @@ impl RocksMetaStore {
self.upload_loop
.process(
self.clone(),
async move |_| Ok(Delay::new(Duration::from_secs(upload_interval)).await),
async move |m, _| m.store.run_upload().await,
move |_| async move { Ok(Delay::new(Duration::from_secs(upload_interval)).await) },
move |m, _| async move { m.store.run_upload().await },
)
.await;
}
Expand Down Expand Up @@ -4956,7 +4956,7 @@ mod tests {

#[test]
fn test_structures_size() {
assert_eq!(std::mem::size_of::<MetaStoreEvent>(), 680);
assert_eq!(std::mem::size_of::<MetaStoreEvent>(), 672);
}

#[tokio::test]
Expand Down
8 changes: 4 additions & 4 deletions rust/cubestore/cubestore/src/metastore/rocks_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,15 +61,15 @@ macro_rules! rocks_table_new {
self.db.db
}

fn snapshot(&self) -> &cuberockstore::rocksdb::Snapshot {
fn snapshot(&self) -> &cuberockstore::rocksdb::Snapshot<'_> {
self.db.snapshot
}

fn mem_seq(&self) -> &crate::metastore::MemorySequence {
&self.db.mem_seq
}

fn table_ref(&self) -> &crate::metastore::DbTableRef {
fn table_ref(&self) -> &crate::metastore::DbTableRef<'_> {
&self.db
}

Expand Down Expand Up @@ -448,8 +448,8 @@ pub trait RocksTable: BaseRocksTable + Debug + Send + Sync {
fn delete_event(&self, row: IdRow<Self::T>) -> MetaStoreEvent;
fn update_event(&self, old_row: IdRow<Self::T>, new_row: IdRow<Self::T>) -> MetaStoreEvent;
fn db(&self) -> &DB;
fn table_ref(&self) -> &DbTableRef;
fn snapshot(&self) -> &Snapshot;
fn table_ref(&self) -> &DbTableRef<'_>;
fn snapshot(&self) -> &Snapshot<'_>;
fn mem_seq(&self) -> &MemorySequence;
fn index_id(index_num: IndexId) -> IndexId;
fn table_id() -> TableId;
Expand Down
2 changes: 1 addition & 1 deletion rust/cubestore/cubestore/src/queryplanner/topk/plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ fn extract_having(p: &Arc<LogicalPlan>) -> (Option<Expr>, &Arc<LogicalPlan>) {
}
}

fn extract_projection_and_having(p: &LogicalPlan) -> Option<ColumnProjection> {
fn extract_projection_and_having(p: &LogicalPlan) -> Option<ColumnProjection<'_>> {
match p {
LogicalPlan::Projection {
expr,
Expand Down
2 changes: 1 addition & 1 deletion rust/cubestore/cubestore/src/remotefs/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ pub trait ExtendedRemoteFs: DIService + RemoteFs {
async fn list_by_page(
&self,
remote_prefix: String,
) -> Result<BoxStream<Result<Vec<String>, CubeError>>, CubeError> {
) -> Result<BoxStream<'static, Result<Vec<String>, CubeError>>, CubeError> {
// Note, this implementation doesn't actually paginate.
let list: Vec<String> = self.list(remote_prefix).await?;

Expand Down
2 changes: 1 addition & 1 deletion rust/cubestore/cubestore/src/remotefs/queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -346,7 +346,7 @@ impl ExtendedRemoteFs for QueueRemoteFs {
async fn list_by_page(
&self,
remote_prefix: String,
) -> Result<BoxStream<Result<Vec<String>, CubeError>>, CubeError> {
) -> Result<BoxStream<'static, Result<Vec<String>, CubeError>>, CubeError> {
self.remote_fs.list_by_page(remote_prefix).await
}
}
Expand Down
2 changes: 1 addition & 1 deletion rust/cubestore/cubestore/src/remotefs/s3.rs
Original file line number Diff line number Diff line change
Expand Up @@ -346,7 +346,7 @@ impl ExtendedRemoteFs for S3RemoteFs {
async fn list_by_page(
&self,
remote_prefix: String,
) -> Result<BoxStream<Result<Vec<String>, CubeError>>, CubeError> {
) -> Result<BoxStream<'static, Result<Vec<String>, CubeError>>, CubeError> {
let path = self.s3_path(&remote_prefix);
let bucket = self.bucket.load();
let leading_subpath = self.leading_subpath_regex();
Expand Down
Loading
Loading