Skip to content

Commit b53a40b

Browse files
committed
feat: time travel docs added
1 parent c4890de commit b53a40b

File tree

5 files changed

+53
-44
lines changed

5 files changed

+53
-44
lines changed

docs/data_engineering/data_lakehouse/delta_lake/rust/01_create_table.md

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,6 @@
1-
# Create Table
1+
description: delta-rs, rust, create table
2+
3+
# Create table
24

35
```rust
46
use deltalake::{kernel::DataType, DeltaOps};

docs/data_engineering/data_lakehouse/delta_lake/rust/02_insert_data.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
description: delta-rs, rust, insert data
2+
13
# Insert data
24

35
```rust
Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,28 +1,32 @@
1+
description: delta-rs, rust, load table
2+
13
# Load table
24

5+
## Open table
6+
37
```rust
4-
use deltalake::operations::collect_sendable_stream;
5-
use deltalake::DeltaOps;
68

79
#[tokio::main()]
810
async fn main() {
911
// ...
1012

11-
let (_, stream) = DeltaOps(table).load().await.expect("Load failed");
12-
let records = collect_sendable_stream(stream).await.unwrap();
13-
14-
println!("{:?}", records)
13+
let table = deltalake::open_table("s3://data-lakehouse/employee").await.expect("Load failed");
1514
}
1615
```
1716

18-
or
17+
## Load table data
1918

2019
```rust
20+
use deltalake::operations::collect_sendable_stream;
21+
use deltalake::DeltaOps;
2122

2223
#[tokio::main()]
2324
async fn main() {
2425
// ...
2526

26-
let table = deltalake::open_table("s3://data-lakehouse/employee").await.expect("Load failed");
27+
let (_, stream) = DeltaOps(table).load().await.expect("Load failed");
28+
let records = collect_sendable_stream(stream).await.unwrap();
29+
30+
println!("{:?}", records)
2731
}
2832
```
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
description: delta-rs, rust, time travel
2+
3+
# Time travel
4+
5+
To load the previous state of a table, you can use the `open_table_with_version` function:
6+
7+
```rust
8+
let version = 1;
9+
let mut table = deltalake::open_table_with_version("s3://data-lakehouse/employee", version).await.expect("Load failed");
10+
```
11+
12+
If the table is already loaded and you want to change the version number, just use the `load_version` function.
13+
14+
```rust
15+
table.load_version(2).await.expect("Load failed");
16+
```
Lines changed: 20 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -1,44 +1,29 @@
1-
# S3 storage integration
1+
description: delta-rs, rust, S3 storage integration
22

3-
```rust
4-
use deltalake::{DeltaOps, DeltaTableBuilder, DeltaTableError};
5-
use std::env;
6-
7-
fn configure_s3() {
8-
// Set S3 configuration options using environment variables
9-
env::set_var("AWS_ENDPOINT_URL", "http://localhost:5561");
10-
env::set_var("AWS_REGION", "us-east-1");
11-
env::set_var("AWS_ACCESS_KEY_ID", "admin");
12-
env::set_var("AWS_SECRET_ACCESS_KEY", "password");
13-
env::set_var("AWS_ALLOW_HTTP", "true");
14-
env::set_var("AWS_S3_ALLOW_UNSAFE_RENAME", "true");
15-
16-
// Register AWS S3 handlers for Delta Lake operations
17-
deltalake::aws::register_handlers(None);
18-
}
3+
# S3
194

20-
/// Builds a `DeltaOps` instance for the specified Delta table.
21-
/// Enabling operations such as creating, reading and writing data in the Delta Lake format.
22-
async fn get_delta_ops(table_name: &str, load_state: bool) -> Result<DeltaOps, DeltaTableError> {
23-
let delta_table_builder = DeltaTableBuilder::from_uri(format!("s3://data-lakehouse/{}", table_name));
24-
let delta_table = match load_state {
25-
// Load the existing table state
26-
true => delta_table_builder.load().await?,
27-
// Build the table without loading existing state
28-
false => delta_table_builder.build()?,
29-
};
30-
31-
Ok(DeltaOps::from(delta_table))
32-
}
5+
```rust
6+
use std::collections::HashMap;
337

348
#[tokio::main()]
359
async fn main() {
36-
configure_s3();
10+
// Register AWS S3 handlers for Delta Lake operations
11+
deltalake::aws::register_handlers(None);
3712

38-
let table_name = "employee";
39-
let load_state = false;
40-
let delta_ops = get_delta_ops(table_name, load_state).await.expect("Failed to create data_ops object");
13+
let mut storage_options = HashMap::new();
14+
storage_options.insert("AWS_ENDPOINT_URL".to_string(), "http://localhost:5561".to_string());
15+
storage_options.insert("AWS_REGION".to_string(), "us-east-1".to_string());
16+
storage_options.insert("AWS_ACCESS_KEY_ID".to_string(), "admin".to_string());
17+
storage_options.insert("AWS_SECRET_ACCESS_KEY".to_string(), "password".to_string());
18+
storage_options.insert("AWS_ALLOW_HTTP".to_string(), "true".to_string());
19+
storage_options.insert("AWS_S3_ALLOW_UNSAFE_RENAME".to_string(), "true".to_string());
20+
21+
let table = deltalake::open_table_with_storage_options("s3://data-lakehouse/employee", storage_options)
22+
.await
23+
.expect("Load failed");
4124
}
4225
```
4326

44-
If the table doesn't exist yet, the `load_state` parameter in `get_delta_ops` should be set to `false`, as setting it to `true` would attempt to read a non-existent state, resulting in an error. On the other hand, if you want to read from an existing table, `load_state` must be set to `true` to successfully load the data; otherwise, the load operation will fail.
27+
You can set the storage option parameters as environment variables too.
28+
29+
S3 requires a locking provider by default ([more information](https://delta-io.github.io/delta-rs/usage/writing/writing-to-s3-with-locking-provider/)). If you don't want to use a locking provider, you can disable it by setting the `AWS_S3_ALLOW_UNSAFE_RENAME` variable to `true`.

0 commit comments

Comments
 (0)