diff --git a/.github/workflows/docker-publish-ghcr.yml b/.github/workflows/docker-publish-ghcr.yml new file mode 100644 index 000000000..a81ee5c77 --- /dev/null +++ b/.github/workflows/docker-publish-ghcr.yml @@ -0,0 +1,102 @@ +name: Docker + +# This workflow uses actions that are not certified by GitHub. +# They are provided by a third-party and are governed by +# separate terms of service, privacy policy, and support +# documentation. + +on: + schedule: + - cron: '17 10 * * *' + push: + branches: [ "experimental" ] + # Publish semver tags as releases. + tags: [ 'v*.*.*' ] + pull_request: + branches: [ "experimental" ] + workflow_dispatch: {} + +env: + # Use docker.io for Docker Hub if empty + REGISTRY: ghcr.io + # github.repository as / + IMAGE_NAME: ${{ github.repository }} + +concurrency: + group: build-experimental + cancel-in-progress: ${{ github.event_name == 'push' }} + +jobs: + build-experimental: + + runs-on: ubuntu-latest + permissions: + contents: read + packages: write + # This is used to complete the identity challenge + # with sigstore/fulcio when running outside of PRs. + id-token: write + + steps: + - name: Checkout repository + uses: actions/checkout@v4 + + # Install the cosign tool except on PR + # https://github.com/sigstore/cosign-installer + - name: Install cosign + if: github.event_name != 'pull_request' + uses: sigstore/cosign-installer@59acb6260d9c0ba8f4a2f9d9b48431a222b68e20 #v3.5.0 + with: + cosign-release: 'v2.2.4' + + # Set up BuildKit Docker container builder to be able to build + # multi-platform images and export cache + # https://github.com/docker/setup-buildx-action + - name: Set up Docker Buildx + uses: docker/setup-buildx-action@f95db51fddba0c2d1ec667646a06c2ce06100226 # v3.0.0 + + # Login against a Docker registry except on PR + # https://github.com/docker/login-action + - name: Log into registry ${{ env.REGISTRY }} + if: github.event_name != 'pull_request' + uses: docker/login-action@343f7c4344506bcbf9b4de18042ae17996df046d # v3.0.0 + with: + registry: ${{ env.REGISTRY }} + username: ${{ github.actor }} + password: ${{ secrets.GITHUB_TOKEN }} + + # Extract metadata (tags, labels) for Docker + # https://github.com/docker/metadata-action + - name: Extract Docker metadata + id: meta + uses: docker/metadata-action@96383f45573cb7f253c731d3b3ab81c87ef81934 # v5.0.0 + with: + images: ${{ env.REGISTRY }}/${{ env.IMAGE_NAME }} + + # Build and push Docker image with Buildx (don't push on PR) + # https://github.com/docker/build-push-action + - name: Build and push Docker image + id: build-and-push + uses: docker/build-push-action@0565240e2d4ab88bba5387d719585280857ece09 # v5.0.0 + with: + context: . + push: ${{ github.event_name != 'pull_request' }} + tags: ${{ steps.meta.outputs.tags }} + labels: ${{ steps.meta.outputs.labels }} + cache-from: type=gha + cache-to: type=gha,mode=max + + # Sign the resulting Docker image digest except on PRs. + # This will only write to the public Rekor transparency log when the Docker + # repository is public to avoid leaking data. If you would like to publish + # transparency data even for private images, pass --force to cosign below. + # https://github.com/sigstore/cosign + - name: Sign the published Docker image + if: ${{ github.event_name != 'pull_request' }} + env: + # https://docs.github.com/en/actions/security-guides/security-hardening-for-github-actions#using-an-intermediate-environment-variable + TAGS: ${{ steps.meta.outputs.tags }} + DIGEST: ${{ steps.build-and-push.outputs.digest }} + # This step uses the identity token to provision an ephemeral certificate + # against the sigstore community Fulcio instance. + run: echo "${TAGS}" | xargs -I {} cosign sign --yes {}@${DIGEST} diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index 24b73cdb1..f7153a38e 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -40,4 +40,4 @@ jobs: toolchain: ${{ matrix.toolchain }} - uses: Swatinem/rust-cache@v2 - name: cargo test - run: RUSTFLAGS="-C linker=clang -C link-arg=-fuse-ld=lld" cargo test --profile=ci --workspace --all-features --all-targets + run: RUSTFLAGS="-C linker=clang -C link-arg=-fuse-ld=lld" cargo test --profile=ci --workspace --no-default-features --features "alloc-tracing" --all-targets diff --git a/Dockerfile b/Dockerfile index c9296ba30..160aefcfe 100644 --- a/Dockerfile +++ b/Dockerfile @@ -11,22 +11,32 @@ RUN apt-get update && apt-get install -y \ ca-certificates \ && rm -rf /var/lib/apt/lists/* -# Copy source code +# Copy all source code, including the pre-built frontend and entrypoint script COPY . . +RUN chmod +x entrypoint.sh +RUN tar -xf /app/ui/dist.tar -C ui + # Build the application with optimizations RUN cargo build --release --bin embucketd -# Stage 4: Final runtime image +# Stage 2: Final runtime image FROM gcr.io/distroless/cc-debian12 AS runtime -# Set working directory -USER nonroot:nonroot WORKDIR /app -# Copy the binary and required files +# Copy the compiled binary, API spec, frontend build, and entrypoint script COPY --from=builder /app/target/release/embucketd ./embucketd COPY --from=builder /app/rest-catalog-open-api.yaml ./rest-catalog-open-api.yaml +COPY --from=builder /app/ui/dist ./dist +COPY --from=builder /app/entrypoint.sh /usr/local/bin/entrypoint.sh + +# Make the script executable and ensure the nonroot user can modify app files +# no chown chroot executables +# RUN chown -R nonroot:nonroot /app + +# Switch to a non-privileged user +USER nonroot:nonroot # Expose port (adjust as needed) EXPOSE 8080 @@ -37,5 +47,7 @@ ENV FILE_STORAGE_PATH=data/ ENV BUCKET_HOST=0.0.0.0 ENV JWT_SECRET=63f4945d921d599f27ae4fdf5bada3f1 -# Default command +# Set the entrypoint to our script +ENTRYPOINT ["/usr/local/bin/entrypoint.sh"] + CMD ["./embucketd"] diff --git a/crates/api-snowflake-rest/Cargo.toml b/crates/api-snowflake-rest/Cargo.toml index 117a6961c..fd516bfe4 100644 --- a/crates/api-snowflake-rest/Cargo.toml +++ b/crates/api-snowflake-rest/Cargo.toml @@ -24,6 +24,7 @@ default-server = [ "dep:tower", "dep:strum" ] +vanilla-tokio-runtime = ["core-executor/vanilla-tokio-runtime"] [dependencies] api-sessions = { path = "../api-sessions", optional = true } diff --git a/crates/api-snowflake-rest/src/tests/test_abort_by_request_id.rs b/crates/api-snowflake-rest/src/tests/test_abort_by_request_id.rs index 960ff26ad..195229d38 100644 --- a/crates/api-snowflake-rest/src/tests/test_abort_by_request_id.rs +++ b/crates/api-snowflake-rest/src/tests/test_abort_by_request_id.rs @@ -10,6 +10,7 @@ mod tests { use axum::http; + #[cfg(not(feature = "vanilla-tokio-runtime"))] #[tokio::test] async fn test_abort_by_request_id() { let addr = run_test_rest_api_server(JSON).await; @@ -32,7 +33,6 @@ mod tests { .await .expect("Failed to run query"); let query_id = query_id_from_snapshot(&res).expect("Failed to get query ID"); - let (_headers, _res) = abort::<()>(&client, &addr, &access_token, request_id, sql) .await .expect("Failed to abort query"); diff --git a/crates/api-snowflake-rest/src/tests/test_generic_sqls.rs b/crates/api-snowflake-rest/src/tests/test_generic_sqls.rs index 916b5eeba..dc13f1bb9 100644 --- a/crates/api-snowflake-rest/src/tests/test_generic_sqls.rs +++ b/crates/api-snowflake-rest/src/tests/test_generic_sqls.rs @@ -23,6 +23,7 @@ mod snowflake_generic { use super::*; use crate::tests::sql_macro::{ARROW, JSON}; + #[cfg(not(feature = "vanilla-tokio-runtime"))] sql_test!( JSON, submit_ok_query_with_concurrent_limit, diff --git a/crates/api-snowflake-rest/src/tests/test_rest_quick_sqls.rs b/crates/api-snowflake-rest/src/tests/test_rest_quick_sqls.rs index d4c311357..8c1e68813 100644 --- a/crates/api-snowflake-rest/src/tests/test_rest_quick_sqls.rs +++ b/crates/api-snowflake-rest/src/tests/test_rest_quick_sqls.rs @@ -137,7 +137,7 @@ mod snowflake_compatibility { "!abort 1", ] ); - + #[cfg(not(feature = "vanilla-tokio-runtime"))] sql_test!( JSON, abort_ok_query, @@ -149,6 +149,7 @@ mod snowflake_compatibility { ] ); + #[cfg(not(feature = "vanilla-tokio-runtime"))] sql_test!( JSON, cancel_ok_query, diff --git a/crates/api-ui/Cargo.toml b/crates/api-ui/Cargo.toml index 97f903932..0b106b09e 100644 --- a/crates/api-ui/Cargo.toml +++ b/crates/api-ui/Cargo.toml @@ -6,6 +6,7 @@ license-file.workspace = true [features] client = [] +none-durable-history-writes = ["core-utils/none-durable-history-write"] [dependencies] api-ui-static-assets = { path = "../api-ui-static-assets" } diff --git a/crates/api-ui/src/tests/worksheets.rs b/crates/api-ui/src/tests/worksheets.rs index 6aef23b39..f72360737 100644 --- a/crates/api-ui/src/tests/worksheets.rs +++ b/crates/api-ui/src/tests/worksheets.rs @@ -98,6 +98,7 @@ async fn test_ui_worksheets_sort() { }, ) .await; + #[cfg(not(feature = "none-durable-history-writes"))] assert_eq!( vec!["name1", "name2", "name3", "name4"], sort_by_name_asc @@ -552,6 +553,7 @@ async fn test_ui_worksheets_search() { }, ) .await; + #[cfg(not(feature = "none-durable-history-writes"))] assert_eq!( vec!["work1", "work2", "work3", "sheet1", "work4"], search.into_iter().map(|w| w.name).collect::>(), diff --git a/crates/core-executor/Cargo.toml b/crates/core-executor/Cargo.toml index 987fd3682..76aaf4095 100644 --- a/crates/core-executor/Cargo.toml +++ b/crates/core-executor/Cargo.toml @@ -71,3 +71,7 @@ aws-sdk-sts = { version = "1.80.0", features = ["behavior-version-latest"] } [lints] workspace = true + +[features] +vanilla-tokio-runtime = [] +sort-merge-join = [] diff --git a/crates/core-executor/src/error.rs b/crates/core-executor/src/error.rs index 85c10cfd7..7f93107fa 100644 --- a/crates/core-executor/src/error.rs +++ b/crates/core-executor/src/error.rs @@ -300,6 +300,17 @@ pub enum Error { location: Location, }, + #[cfg(feature = "vanilla-tokio-runtime")] + #[snafu(display("Threaded Job error: {error}: {backtrace}"))] + JobError { + #[snafu(source)] + error: tokio::task::JoinError, + backtrace: Backtrace, + #[snafu(implicit)] + location: Location, + }, + + #[cfg(not(feature = "vanilla-tokio-runtime"))] #[snafu(display("Threaded Job error: {error}: {backtrace}"))] JobError { #[snafu(source)] diff --git a/crates/core-executor/src/lib.rs b/crates/core-executor/src/lib.rs index aed97467c..ff2c632a7 100644 --- a/crates/core-executor/src/lib.rs +++ b/crates/core-executor/src/lib.rs @@ -1,6 +1,5 @@ pub use df_catalog as catalog; pub mod datafusion; -pub mod dedicated_executor; pub mod error; pub mod error_code; pub mod models; @@ -11,6 +10,8 @@ pub mod session; pub mod snowflake_error; pub mod utils; +#[cfg(not(feature = "vanilla-tokio-runtime"))] +pub mod dedicated_executor; #[cfg(test)] pub mod tests; diff --git a/crates/core-executor/src/query.rs b/crates/core-executor/src/query.rs index 1ccb0d23e..aaf70a0f1 100644 --- a/crates/core-executor/src/query.rs +++ b/crates/core-executor/src/query.rs @@ -2202,6 +2202,23 @@ impl UserQuery { let session = self.session.clone(); let query_id = self.query_context.query_id; let query = query.to_string(); + #[cfg(feature = "vanilla-tokio-runtime")] + let stream = tokio::task::spawn(async move { + let df = session + .ctx + .sql(&query) + .await + .context(ex_error::DataFusionSnafu)?; + let mut schema = df.schema().as_arrow().clone(); + let records = df.collect().await.context(ex_error::DataFusionSnafu)?; + if !records.is_empty() { + schema = records[0].schema().as_ref().clone(); + } + Ok::(QueryResult::new(records, Arc::new(schema), query_id)) + }) + .await + .context(ex_error::JobSnafu)??; + #[cfg(not(feature = "vanilla-tokio-runtime"))] let stream = self .session .executor @@ -2229,6 +2246,26 @@ impl UserQuery { let span = tracing::debug_span!("UserQuery::execute_logical_plan"); + #[cfg(feature = "vanilla-tokio-runtime")] + let stream = tokio::task::spawn(async move { + let mut schema = plan.schema().as_arrow().clone(); + let records = session + .ctx + .execute_logical_plan(plan) + .await + .context(ex_error::DataFusionSnafu)? + .collect() + .instrument(span) + .await + .context(ex_error::DataFusionSnafu)?; + if !records.is_empty() { + schema = records[0].schema().as_ref().clone(); + } + Ok::(QueryResult::new(records, Arc::new(schema), query_id)) + }) + .await + .context(ex_error::JobSnafu)??; + #[cfg(not(feature = "vanilla-tokio-runtime"))] let stream = self .session .executor @@ -2260,6 +2297,35 @@ impl UserQuery { ) -> Result { let session = self.session.clone(); let query_id = self.query_context.query_id; + #[cfg(feature = "vanilla-tokio-runtime")] + let stream = tokio::task::spawn(async move { + let mut schema = plan.schema().as_arrow().clone(); + let df = session + .ctx + .execute_logical_plan(plan) + .await + .context(ex_error::DataFusionSnafu)?; + let task_ctx = df.task_ctx(); + let mut physical_plan = df + .create_physical_plan() + .await + .context(ex_error::DataFusionSnafu)?; + for rule in rules { + physical_plan = rule + .optimize(physical_plan, &ConfigOptions::new()) + .context(ex_error::DataFusionSnafu)?; + } + let records = collect(physical_plan, Arc::new(task_ctx)) + .await + .context(ex_error::DataFusionSnafu)?; + if !records.is_empty() { + schema = records[0].schema().as_ref().clone(); + } + Ok::(QueryResult::new(records, Arc::new(schema), query_id)) + }) + .await + .context(ex_error::JobSnafu)??; + #[cfg(not(feature = "vanilla-tokio-runtime"))] let stream = self .session .executor diff --git a/crates/core-executor/src/session.rs b/crates/core-executor/src/session.rs index 64fb8e64e..fb18a24a3 100644 --- a/crates/core-executor/src/session.rs +++ b/crates/core-executor/src/session.rs @@ -1,7 +1,6 @@ //use super::datafusion::functions::geospatial::register_udfs as register_geo_udfs; use super::datafusion::functions::register_udfs; use super::datafusion::type_planner::CustomTypePlanner; -use super::dedicated_executor::DedicatedExecutor; use super::error::{self as ex_error, Result}; // TODO: We need to fix this after geodatafusion is updated to datafusion 47 //use geodatafusion::udf::native::register_native as register_geo_native; @@ -9,6 +8,8 @@ use crate::datafusion::logical_analyzer::analyzer_rules; use crate::datafusion::logical_optimizer::split_ordered_aggregates::SplitOrderedAggregates; use crate::datafusion::physical_optimizer::physical_optimizer_rules; use crate::datafusion::query_planner::CustomQueryPlanner; +#[cfg(not(feature = "vanilla-tokio-runtime"))] +use crate::dedicated_executor::DedicatedExecutor; use crate::models::QueryContext; use crate::query::UserQuery; use crate::running_queries::RunningQueries; @@ -51,10 +52,11 @@ pub struct UserSession { pub running_queries: Arc, pub ctx: SessionContext, pub ident_normalizer: IdentNormalizer, - pub executor: DedicatedExecutor, pub config: Arc, pub expiry: AtomicI64, pub session_params: Arc, + #[cfg(not(feature = "vanilla-tokio-runtime"))] + pub executor: DedicatedExecutor, } impl UserSession { @@ -93,9 +95,10 @@ impl UserSession { .set_str("datafusion.catalog.default_catalog", DEFAULT_CATALOG) .set_bool( "datafusion.execution.skip_physical_aggregate_schema_check", - true, + cfg!(feature = "sort-merge-join"), ) .set_bool("datafusion.sql_parser.parse_float_as_decimal", true) + .set_bool("datafusion.optimizer.prefer_hash_join", false) .set_usize( "datafusion.execution.minimum_parallel_output_files", MINIMUM_PARALLEL_OUTPUT_FILES, @@ -128,13 +131,28 @@ impl UserSession { //register_geo_udfs(&ctx); let enable_ident_normalization = ctx.enable_ident_normalization(); + #[cfg(not(feature = "vanilla-tokio-runtime"))] let session = Self { metastore, history_store, running_queries, ctx, ident_normalizer: IdentNormalizer::new(enable_ident_normalization), + config, + expiry: AtomicI64::new(to_unix( + OffsetDateTime::now_utc() + + Duration::seconds(SESSION_INACTIVITY_EXPIRATION_SECONDS), + )), + session_params: session_params_arc, executor: DedicatedExecutor::builder().build(), + }; + #[cfg(feature = "vanilla-tokio-runtime")] + let session = Self { + metastore, + history_store, + running_queries, + ctx, + ident_normalizer: IdentNormalizer::new(enable_ident_normalization), config, expiry: AtomicI64::new(to_unix( OffsetDateTime::now_utc() diff --git a/crates/core-utils/Cargo.toml b/crates/core-utils/Cargo.toml index 74f1c1bee..077106c47 100644 --- a/crates/core-utils/Cargo.toml +++ b/crates/core-utils/Cargo.toml @@ -27,3 +27,6 @@ workspace = true [dev-dependencies] insta = { workspace = true } tokio = { workspace = true } + +[features] +none-durable-history-write = [] diff --git a/crates/core-utils/src/lib.rs b/crates/core-utils/src/lib.rs index fcb6d74ed..0528ff90f 100644 --- a/crates/core-utils/src/lib.rs +++ b/crates/core-utils/src/lib.rs @@ -13,7 +13,8 @@ use serde_json::de; use serde_json::ser; use slatedb::Db as SlateDb; use slatedb::DbIterator; -// use slatedb::config::{PutOptions, WriteOptions}; +#[cfg(feature = "none-durable-history-write")] +use slatedb::config::{PutOptions, WriteOptions}; use snafu::location; use snafu::prelude::*; use std::fmt::Debug; @@ -150,18 +151,26 @@ impl Db { entity: &T, ) -> Result<()> { let serialized = ser::to_vec(entity).context(errors::SerializeValueSnafu)?; - self.0 + #[cfg(feature = "none-durable-history-write")] + let result = self + .0 + .put_with_options( + entity.key().as_ref(), + serialized, + &PutOptions::default(), + &WriteOptions { + await_durable: false, + }, + ) + .await + .context(errors::DatabaseSnafu); + #[cfg(not(feature = "none-durable-history-write"))] + let result = self + .0 .put(entity.key().as_ref(), serialized) - // .put_with_options( - // entity.key().as_ref(), - // serialized, - // &PutOptions::default(), - // &WriteOptions { - // await_durable: false, - // }, - // ) .await - .context(errors::DatabaseSnafu) + .context(errors::DatabaseSnafu); + result } /// Iterator for iterating in range diff --git a/crates/embucketd/Cargo.toml b/crates/embucketd/Cargo.toml index 8b05e66d2..53c1f143d 100644 --- a/crates/embucketd/Cargo.toml +++ b/crates/embucketd/Cargo.toml @@ -49,3 +49,7 @@ workspace = true [features] alloc-tracing = ["dep:tracing-allocations"] +vanilla-tokio-runtime = ["core-executor/vanilla-tokio-runtime"] +none-durable-history-write = ["core-utils/none-durable-history-write"] +sort-merge-join = ["core-executor/sort-merge-join"] +experimental = ["vanilla-tokio-runtime", "none-durable-history-write", "sort-merge-join"] diff --git a/entrypoint.sh b/entrypoint.sh new file mode 100644 index 000000000..e73b1cc95 --- /dev/null +++ b/entrypoint.sh @@ -0,0 +1,14 @@ +#!/bin/sh +set -e + +INDEX_FILE=/app/dist/index.html + +# Use the provided API_URL or default to http://localhost:3000 if it's not set +FINAL_API_URL=${API_URL:-http://localhost:3000} + +echo "Setting API URL to $FINAL_API_URL in $INDEX_FILE" + +# Use sed to replace the placeholder with the actual API_URL. +sed -i "s#__API_URL__#$FINAL_API_URL#g" $INDEX_FILE + +exec "$@" \ No newline at end of file