Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 54 additions & 17 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,27 +15,22 @@ using the [tpchgen](https://github.com/clflushopt/tpchgen-rs) crates.

## Usage

The `datafusion-tpch` crate offers two possible ways to register the TPCH individual
table functions.
The `datafusion-tpch` crate offers two possible ways to register the TPCH table
functions.

You can register functions individually.
You can register the individual udtfs separately.

```rust
use datafusion_tpch::register_tpch_udtfs;

#[tokio::main]
async fn main() -> Result<()> {
// create local execution context
let ctx = SessionContext::new();

// Register all the UDTFs.
ctx.register_udtf(TpchNation::name(), Arc::new(TpchNation {}));
ctx.register_udtf(TpchCustomer::name(), Arc::new(TpchCustomer {}));
ctx.register_udtf(TpchOrders::name(), Arc::new(TpchOrders {}));
ctx.register_udtf(TpchLineitem::name(), Arc::new(TpchLineitem {}));
ctx.register_udtf(TpchPart::name(), Arc::new(TpchPart {}));
ctx.register_udtf(TpchPartsupp::name(), Arc::new(TpchPartsupp {}));
ctx.register_udtf(TpchSupplier::name(), Arc::new(TpchSupplier {}));
ctx.register_udtf(TpchRegion::name(), Arc::new(TpchRegion {}));

register_tpch_udtfs(&ctx);

// Generate the nation table with a scale factor of 1.
let df = ctx
.sql(format!("SELECT * FROM tpch_nation(1.0);").as_str())
Expand All @@ -45,8 +40,7 @@ async fn main() -> Result<()> {
}
```

Or use the helper function `register_tpch_udtfs` to register all of them
at once (which is the preferred approach).
Or you can register a single UDTF which generates all tables at once.

```rust
use datafusion_tpch::register_tpch_udtfs;
Expand All @@ -57,17 +51,60 @@ async fn main() -> Result<()> {
let ctx = SessionContext::new();

// Register all the UDTFs.
register_tpch_udtfs(&ctx);
register_tpch_udtf(&ctx);

// Generate the nation table with a scale factor of 1.
let df = ctx
.sql(format!("SELECT * FROM tpch_nation(1.0);").as_str())
.sql(format!("SELECT * FROM tpch(1.0);").as_str())
.await?;
df.show().await?;
Ok(())
}
```

## Examples

To keep things simple we don't bundle writing to parquet in the table provider
but instead defer that to the user who can use the `COPY` command.


```rust
use datafusion::prelude::{SessionConfig, SessionContext};
use datafusion_tpch::{register_tpch_udtf, register_tpch_udtfs};

#[tokio::main]
async fn main() -> datafusion::error::Result<()> {
let ctx = SessionContext::new_with_config(SessionConfig::new().with_information_schema(true));
register_tpch_udtf(&ctx);

let sql_df = ctx.sql(&format!("SELECT * FROM tpch(1.0);")).await?;
sql_df.show().await?;

let sql_df = ctx.sql(&format!("SHOW TABLES;")).await?;
sql_df.show().await?;

let sql_df = ctx
.sql(&format!(
"COPY nation TO './tpch_nation.parquet' STORED AS PARQUET"
))
.await?;
sql_df.show().await?;

register_tpch_udtfs(&ctx)?;

let sql_df = ctx
.sql(&format!(
"COPY (SELECT * FROM tpch_lineitem(1.0)) TO './tpch_lineitem_sf_10.parquet' STORED AS PARQUET"
))
.await?;
sql_df.show().await?;

Ok(())
}
```

You can find other examples in the [examples](examples/) directory.

## License

The project is licensed under the [APACHE 2.0](LICENSE) license.
The project is licensed under the [APACHE 2.0](LICENSE) license.
1 change: 0 additions & 1 deletion examples/parquet.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
//! Example of using the datafusion-tpch extension to generate TPCH tables
//! and writing them to disk via `COPY`.

use datafusion::prelude::{SessionConfig, SessionContext};
use datafusion_tpch::{register_tpch_udtf, register_tpch_udtfs};

Expand Down
11 changes: 2 additions & 9 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,15 +33,8 @@ macro_rules! define_tpch_udtf_provider {
/// async fn main() -> Result<(), Error> {
/// // create local execution context
/// let ctx = SessionContext::new();
/// // Register all the UDTFs.
/// ctx.register_udtf(TpchNation::name(), Arc::new(TpchNation {}));
/// ctx.register_udtf(TpchCustomer::name(), Arc::new(TpchCustomer {}));
/// ctx.register_udtf(TpchOrders::name(), Arc::new(TpchOrders {}));
/// ctx.register_udtf(TpchLineitem::name(), Arc::new(TpchLineitem {}));
/// ctx.register_udtf(TpchPart::name(), Arc::new(TpchPart {}));
/// ctx.register_udtf(TpchPartsupp::name(), Arc::new(TpchPartsupp {}));
/// ctx.register_udtf(TpchSupplier::name(), Arc::new(TpchSupplier {}));
/// ctx.register_udtf(TpchRegion::name(), Arc::new(TpchRegion {}));
/// // Register all udtfs.
/// register_tpch_udtfs(&ctx);
/// // Generate the nation table with a scale factor of 1.
/// let df = ctx
/// .sql(format!("SELECT * FROM tpch_nation(1.0);").as_str())
Expand Down