Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
102 changes: 102 additions & 0 deletions .github/workflows/docker-publish-ghcr.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
name: Docker

# This workflow uses actions that are not certified by GitHub.
# They are provided by a third-party and are governed by
# separate terms of service, privacy policy, and support
# documentation.

on:
schedule:
- cron: '17 10 * * *'
push:
branches: [ "experimental" ]
# Publish semver tags as releases.
tags: [ 'v*.*.*' ]
pull_request:
branches: [ "experimental" ]
workflow_dispatch: {}

env:
# Use docker.io for Docker Hub if empty
REGISTRY: ghcr.io
# github.repository as <account>/<repo>
IMAGE_NAME: ${{ github.repository }}

concurrency:
group: build-experimental
cancel-in-progress: ${{ github.event_name == 'push' }}

jobs:
build-experimental:

runs-on: ubuntu-latest
permissions:
contents: read
packages: write
# This is used to complete the identity challenge
# with sigstore/fulcio when running outside of PRs.
id-token: write

steps:
- name: Checkout repository
uses: actions/checkout@v4

# Install the cosign tool except on PR
# https://github.com/sigstore/cosign-installer
- name: Install cosign
if: github.event_name != 'pull_request'
uses: sigstore/cosign-installer@59acb6260d9c0ba8f4a2f9d9b48431a222b68e20 #v3.5.0
with:
cosign-release: 'v2.2.4'

# Set up BuildKit Docker container builder to be able to build
# multi-platform images and export cache
# https://github.com/docker/setup-buildx-action
- name: Set up Docker Buildx
uses: docker/setup-buildx-action@f95db51fddba0c2d1ec667646a06c2ce06100226 # v3.0.0

# Login against a Docker registry except on PR
# https://github.com/docker/login-action
- name: Log into registry ${{ env.REGISTRY }}
if: github.event_name != 'pull_request'
uses: docker/login-action@343f7c4344506bcbf9b4de18042ae17996df046d # v3.0.0
with:
registry: ${{ env.REGISTRY }}
username: ${{ github.actor }}
password: ${{ secrets.GITHUB_TOKEN }}

# Extract metadata (tags, labels) for Docker
# https://github.com/docker/metadata-action
- name: Extract Docker metadata
id: meta
uses: docker/metadata-action@96383f45573cb7f253c731d3b3ab81c87ef81934 # v5.0.0
with:
images: ${{ env.REGISTRY }}/${{ env.IMAGE_NAME }}

# Build and push Docker image with Buildx (don't push on PR)
# https://github.com/docker/build-push-action
- name: Build and push Docker image
id: build-and-push
uses: docker/build-push-action@0565240e2d4ab88bba5387d719585280857ece09 # v5.0.0
with:
context: .
push: ${{ github.event_name != 'pull_request' }}
tags: ${{ steps.meta.outputs.tags }}
labels: ${{ steps.meta.outputs.labels }}
cache-from: type=gha
cache-to: type=gha,mode=max

# Sign the resulting Docker image digest except on PRs.
# This will only write to the public Rekor transparency log when the Docker
# repository is public to avoid leaking data. If you would like to publish
# transparency data even for private images, pass --force to cosign below.
# https://github.com/sigstore/cosign
- name: Sign the published Docker image
if: ${{ github.event_name != 'pull_request' }}
env:
# https://docs.github.com/en/actions/security-guides/security-hardening-for-github-actions#using-an-intermediate-environment-variable
TAGS: ${{ steps.meta.outputs.tags }}
DIGEST: ${{ steps.build-and-push.outputs.digest }}
# This step uses the identity token to provision an ephemeral certificate
# against the sigstore community Fulcio instance.
run: echo "${TAGS}" | xargs -I {} cosign sign --yes {}@${DIGEST}
2 changes: 1 addition & 1 deletion .github/workflows/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -40,4 +40,4 @@ jobs:
toolchain: ${{ matrix.toolchain }}
- uses: Swatinem/rust-cache@v2
- name: cargo test
run: RUSTFLAGS="-C linker=clang -C link-arg=-fuse-ld=lld" cargo test --profile=ci --workspace --all-features --all-targets
run: RUSTFLAGS="-C linker=clang -C link-arg=-fuse-ld=lld" cargo test --profile=ci --workspace --no-default-features --features "alloc-tracing" --all-targets
24 changes: 18 additions & 6 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -11,22 +11,32 @@ RUN apt-get update && apt-get install -y \
ca-certificates \
&& rm -rf /var/lib/apt/lists/*

# Copy source code
# Copy all source code, including the pre-built frontend and entrypoint script
COPY . .

RUN chmod +x entrypoint.sh
RUN tar -xf /app/ui/dist.tar -C ui

# Build the application with optimizations
RUN cargo build --release --bin embucketd

# Stage 4: Final runtime image
# Stage 2: Final runtime image
FROM gcr.io/distroless/cc-debian12 AS runtime

# Set working directory
USER nonroot:nonroot
WORKDIR /app

# Copy the binary and required files
# Copy the compiled binary, API spec, frontend build, and entrypoint script
COPY --from=builder /app/target/release/embucketd ./embucketd
COPY --from=builder /app/rest-catalog-open-api.yaml ./rest-catalog-open-api.yaml
COPY --from=builder /app/ui/dist ./dist
COPY --from=builder /app/entrypoint.sh /usr/local/bin/entrypoint.sh

# Make the script executable and ensure the nonroot user can modify app files
# no chown chroot executables
# RUN chown -R nonroot:nonroot /app

# Switch to a non-privileged user
USER nonroot:nonroot

# Expose port (adjust as needed)
EXPOSE 8080
Expand All @@ -37,5 +47,7 @@ ENV FILE_STORAGE_PATH=data/
ENV BUCKET_HOST=0.0.0.0
ENV JWT_SECRET=63f4945d921d599f27ae4fdf5bada3f1

# Default command
# Set the entrypoint to our script
ENTRYPOINT ["/usr/local/bin/entrypoint.sh"]

CMD ["./embucketd"]
1 change: 1 addition & 0 deletions crates/api-snowflake-rest/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ default-server = [
"dep:tower",
"dep:strum"
]
vanilla-tokio-runtime = ["core-executor/vanilla-tokio-runtime"]

[dependencies]
api-sessions = { path = "../api-sessions", optional = true }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ mod tests {

use axum::http;

#[cfg(not(feature = "vanilla-tokio-runtime"))]
#[tokio::test]
async fn test_abort_by_request_id() {
let addr = run_test_rest_api_server(JSON).await;
Expand All @@ -32,7 +33,6 @@ mod tests {
.await
.expect("Failed to run query");
let query_id = query_id_from_snapshot(&res).expect("Failed to get query ID");

let (_headers, _res) = abort::<()>(&client, &addr, &access_token, request_id, sql)
.await
.expect("Failed to abort query");
Expand Down
1 change: 1 addition & 0 deletions crates/api-snowflake-rest/src/tests/test_generic_sqls.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ mod snowflake_generic {
use super::*;
use crate::tests::sql_macro::{ARROW, JSON};

#[cfg(not(feature = "vanilla-tokio-runtime"))]
sql_test!(
JSON,
submit_ok_query_with_concurrent_limit,
Expand Down
3 changes: 2 additions & 1 deletion crates/api-snowflake-rest/src/tests/test_rest_quick_sqls.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ mod snowflake_compatibility {
"!abort 1",
]
);

#[cfg(not(feature = "vanilla-tokio-runtime"))]
sql_test!(
JSON,
abort_ok_query,
Expand All @@ -149,6 +149,7 @@ mod snowflake_compatibility {
]
);

#[cfg(not(feature = "vanilla-tokio-runtime"))]
sql_test!(
JSON,
cancel_ok_query,
Expand Down
1 change: 1 addition & 0 deletions crates/api-ui/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ license-file.workspace = true

[features]
client = []
none-durable-history-writes = ["core-utils/none-durable-history-write"]

[dependencies]
api-ui-static-assets = { path = "../api-ui-static-assets" }
Expand Down
2 changes: 2 additions & 0 deletions crates/api-ui/src/tests/worksheets.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ async fn test_ui_worksheets_sort() {
},
)
.await;
#[cfg(not(feature = "none-durable-history-writes"))]
assert_eq!(
vec!["name1", "name2", "name3", "name4"],
sort_by_name_asc
Expand Down Expand Up @@ -552,6 +553,7 @@ async fn test_ui_worksheets_search() {
},
)
.await;
#[cfg(not(feature = "none-durable-history-writes"))]
assert_eq!(
vec!["work1", "work2", "work3", "sheet1", "work4"],
search.into_iter().map(|w| w.name).collect::<Vec<String>>(),
Expand Down
4 changes: 4 additions & 0 deletions crates/core-executor/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -71,3 +71,7 @@ aws-sdk-sts = { version = "1.80.0", features = ["behavior-version-latest"] }

[lints]
workspace = true

[features]
vanilla-tokio-runtime = []
sort-merge-join = []
11 changes: 11 additions & 0 deletions crates/core-executor/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,17 @@ pub enum Error {
location: Location,
},

#[cfg(feature = "vanilla-tokio-runtime")]
#[snafu(display("Threaded Job error: {error}: {backtrace}"))]
JobError {
#[snafu(source)]
error: tokio::task::JoinError,
backtrace: Backtrace,
#[snafu(implicit)]
location: Location,
},

#[cfg(not(feature = "vanilla-tokio-runtime"))]
#[snafu(display("Threaded Job error: {error}: {backtrace}"))]
JobError {
#[snafu(source)]
Expand Down
3 changes: 2 additions & 1 deletion crates/core-executor/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
pub use df_catalog as catalog;
pub mod datafusion;
pub mod dedicated_executor;
pub mod error;
pub mod error_code;
pub mod models;
Expand All @@ -11,6 +10,8 @@ pub mod session;
pub mod snowflake_error;
pub mod utils;

#[cfg(not(feature = "vanilla-tokio-runtime"))]
pub mod dedicated_executor;
#[cfg(test)]
pub mod tests;

Expand Down
66 changes: 66 additions & 0 deletions crates/core-executor/src/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2202,6 +2202,23 @@ impl UserQuery {
let session = self.session.clone();
let query_id = self.query_context.query_id;
let query = query.to_string();
#[cfg(feature = "vanilla-tokio-runtime")]
let stream = tokio::task::spawn(async move {
let df = session
.ctx
.sql(&query)
.await
.context(ex_error::DataFusionSnafu)?;
let mut schema = df.schema().as_arrow().clone();
let records = df.collect().await.context(ex_error::DataFusionSnafu)?;
if !records.is_empty() {
schema = records[0].schema().as_ref().clone();
}
Ok::<QueryResult, Error>(QueryResult::new(records, Arc::new(schema), query_id))
})
.await
.context(ex_error::JobSnafu)??;
#[cfg(not(feature = "vanilla-tokio-runtime"))]
let stream = self
.session
.executor
Expand Down Expand Up @@ -2229,6 +2246,26 @@ impl UserQuery {

let span = tracing::debug_span!("UserQuery::execute_logical_plan");

#[cfg(feature = "vanilla-tokio-runtime")]
let stream = tokio::task::spawn(async move {
let mut schema = plan.schema().as_arrow().clone();
let records = session
.ctx
.execute_logical_plan(plan)
.await
.context(ex_error::DataFusionSnafu)?
.collect()
.instrument(span)
.await
.context(ex_error::DataFusionSnafu)?;
if !records.is_empty() {
schema = records[0].schema().as_ref().clone();
}
Ok::<QueryResult, Error>(QueryResult::new(records, Arc::new(schema), query_id))
})
.await
.context(ex_error::JobSnafu)??;
#[cfg(not(feature = "vanilla-tokio-runtime"))]
let stream = self
.session
.executor
Expand Down Expand Up @@ -2260,6 +2297,35 @@ impl UserQuery {
) -> Result<QueryResult> {
let session = self.session.clone();
let query_id = self.query_context.query_id;
#[cfg(feature = "vanilla-tokio-runtime")]
let stream = tokio::task::spawn(async move {
let mut schema = plan.schema().as_arrow().clone();
let df = session
.ctx
.execute_logical_plan(plan)
.await
.context(ex_error::DataFusionSnafu)?;
let task_ctx = df.task_ctx();
let mut physical_plan = df
.create_physical_plan()
.await
.context(ex_error::DataFusionSnafu)?;
for rule in rules {
physical_plan = rule
.optimize(physical_plan, &ConfigOptions::new())
.context(ex_error::DataFusionSnafu)?;
}
let records = collect(physical_plan, Arc::new(task_ctx))
.await
.context(ex_error::DataFusionSnafu)?;
if !records.is_empty() {
schema = records[0].schema().as_ref().clone();
}
Ok::<QueryResult, Error>(QueryResult::new(records, Arc::new(schema), query_id))
})
.await
.context(ex_error::JobSnafu)??;
#[cfg(not(feature = "vanilla-tokio-runtime"))]
let stream = self
.session
.executor
Expand Down
Loading
Loading