Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions .github/workflows/pr.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,9 @@ jobs:
- name: Build with only mysql
run: cargo check --no-default-features --features mysql

- name: Build with only trino
run: cargo check --no-default-features --features trino

integration-test-mysql:
name: Tests mysql
runs-on: ubuntu-latest
Expand Down Expand Up @@ -154,3 +157,24 @@ jobs:

- name: Run tests
run: cargo test --features mongodb

integration-test-trino:
name: Tests trino
runs-on: ubuntu-latest

env:
TRINO_DOCKER_IMAGE: trinodb/trino:latest

steps:
- uses: actions/checkout@v4

- uses: dtolnay/rust-toolchain@stable

- uses: ./.github/actions/setup-integration-test

- name: Pull the Trino image
run: |
docker pull ${{ env.TRINO_DOCKER_IMAGE }}

- name: Run tests
run: cargo test --features trino
17 changes: 17 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,9 @@ itertools = "0.13.0"
dyn-clone = { version = "1.0.17", optional = true }
geo-types = "0.7.13"
fundu = "2.0.1"
reqwest = { version = "0.12.5", optional = true, features = ["json", "rustls-tls"] }
regex = { version = "1.11.1", optional = true }
chrono-tz = { version = "0.8", optional = true }

[dev-dependencies]
anyhow = "1.0.86"
Expand All @@ -82,6 +85,7 @@ tokio-stream = { version = "0.1.15", features = ["net"] }
insta = { version = "1.40.0", features = ["filters"] }
datafusion-physical-plan = { version = "49.0.2" }
tempfile = "3.8.1"
mockito = "1.7.0"

[features]
mysql = ["dep:mysql_async", "dep:async-stream"]
Expand Down Expand Up @@ -111,6 +115,14 @@ mongodb = [
"dep:rust_decimal",
"dep:num-traits",
]
trino = [
"dep:arrow-schema",
"dep:async-stream",
"dep:base64",
"dep:regex",
"dep:reqwest",
"dep:chrono-tz",
]

[patch.crates-io]
datafusion-federation = { git = "https://github.com/spiceai/datafusion-federation.git", rev = "5ad2f52b9bafc6eaa50851f2e1fcf0585fb5184d" } # spiceai-49
Expand Down Expand Up @@ -172,3 +184,8 @@ required-features = ["postgres"]
name = "mongodb"
path = "examples/mongodb.rs"
required-features = ["mongodb"]

[[example]]
name = "trino"
path = "examples/trino.rs"
required-features = ["trino"]
13 changes: 13 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -132,3 +132,16 @@ EOF
# Run from repo folder
cargo run -p datafusion-table-providers --example mongodb --features mongodb
```

### Trino

In order to run the Trino example, you need to have a Trino server running. You can use the following command to start a Trino server in a Docker container the example can use:

```bash
docker run -d --name trino -p 8080:8080 trinodb/trino:latest
# Wait for the Trino server to start
sleep 30

# Run from repo folder
cargo run -p datafusion-table-providers --example trino --features trino
```
61 changes: 61 additions & 0 deletions examples/trino.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
use std::{collections::HashMap, sync::Arc};

use datafusion::prelude::SessionContext;
use datafusion::sql::TableReference;
use datafusion_table_providers::sql::db_connection_pool::trinodbpool::TrinoConnectionPool;
use datafusion_table_providers::trino::TrinoTableFactory;
use datafusion_table_providers::util::secrets::to_secret_map;

/// This example demonstrates how to:
/// 1. Create a Trino connection pool
/// 2. Create and use TrinoTableFactory to generate TableProvider
/// 3. Use SQL queries to access Trino table data
///
/// Prerequisites:
/// Start a Trino server using Docker:
/// ```bash
/// docker run -d --name trino -p 8080:8080 trinodb/trino:latest
/// # Wait for the Trino server to start
/// sleep 30
/// ```
#[tokio::main]
async fn main() {
// Create Trino connection parameters
let trino_params = to_secret_map(HashMap::from([
("host".to_string(), "localhost".to_string()),
("port".to_string(), "8080".to_string()),
("catalog".to_string(), "tpch".to_string()),
("schema".to_string(), "tiny".to_string()),
("user".to_string(), "test".to_string()),
("sslmode".to_string(), "disabled".to_string()),
]));

// Create Trino connection pool
let trino_pool = Arc::new(
TrinoConnectionPool::new(trino_params)
.await
.expect("unable to create Trino connection pool"),
);

// Create Trino table provider factory
let table_factory = TrinoTableFactory::new(trino_pool.clone());

// Create DataFusion session context
let ctx = SessionContext::new();

// Register the Trino "region" table as "region"
ctx.register_table(
"region",
table_factory
.table_provider(TableReference::bare("region"))
.await
.expect("failed to register table provider"),
)
.expect("failed to register table");

let df = ctx
.sql("SELECT * FROM region")
.await
.expect("select failed");
df.show().await.expect("show failed");
}
2 changes: 2 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ pub mod mysql;
pub mod postgres;
#[cfg(feature = "sqlite")]
pub mod sqlite;
#[cfg(feature = "trino")]
pub mod trino;

#[derive(Debug, Snafu)]
pub enum Error {
Expand Down
2 changes: 2 additions & 0 deletions src/sql/arrow_sql_gen/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,3 +50,5 @@ pub mod postgres;
#[cfg(feature = "sqlite")]
pub mod sqlite;
pub mod statement;
#[cfg(feature = "trino")]
pub mod trino;
47 changes: 47 additions & 0 deletions src/sql/arrow_sql_gen/trino.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
use snafu::Snafu;

pub mod arrow;
pub mod schema;

pub type Result<T, E = Error> = std::result::Result<T, E>;

#[derive(Debug, Snafu)]
pub enum Error {
#[snafu(display("Failed to build record batch: {source}"))]
FailedToBuildRecordBatch {
source: datafusion::arrow::error::ArrowError,
},

#[snafu(display("Failed to find field {column_name} in schema"))]
FailedToFindFieldInSchema { column_name: String },

#[snafu(display("Unsupported Trino type: {trino_type}"))]
UnsupportedTrinoType { trino_type: String },

#[snafu(display("Unsupported Arrow type: {arrow_type}"))]
UnsupportedArrowType { arrow_type: String },

#[snafu(display("Invalid date value: {value}"))]
InvalidDateValue { value: String },

#[snafu(display("Invalid time value: {value}"))]
InvalidTimeValue { value: String },

#[snafu(display("Invalid timestamp value: {value}"))]
InvalidTimestampValue { value: String },

#[snafu(display("Failed to parse decimal value: {value}"))]
FailedToParseDecimal { value: String },

#[snafu(display("Failed to downcast builder to expected type: {expected}"))]
BuilderDowncastError { expected: String },

#[snafu(display("Invalid or unsupported data type: {data_type}"))]
InvalidDataType { data_type: String },

#[snafu(display("Failed to parse precision from type: '{}'", trino_type))]
InvalidPrecision { trino_type: String },

#[snafu(display("Failed to compile regex pattern: {}", source))]
RegexError { source: regex::Error },
}
Loading