Skip to content

Commit 58a6205

Browse files
committed
feat: insert functionality added
1 parent cc83ad2 commit 58a6205

File tree

1 file changed

+74
-17
lines changed
  • docs/data_engineering/data_lakehouse/delta_lake

1 file changed

+74
-17
lines changed
Lines changed: 74 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,35 +1,61 @@
11
# delta-rs
22

3-
This guide provides an introduction to using the Delta Lake Rust library (delta-rs).
3+
This guide provides an introduction to using the [Delta Lake Rust library (delta-rs)](https://github.com/delta-io/delta-rs).
44

55
## Install dependencies
66

77
```toml
88
[dependencies]
9-
deltalake = { version = "0.22.3", features = ["s3"] }
9+
deltalake = { version = "0.22.3", features = ["datafusion", "s3"] }
1010
```
1111

12-
## Create table
12+
## Initial setup
1313

1414
```rust
15-
use deltalake::kernel::DataType;
16-
use deltalake::{DeltaOps, DeltaTable, DeltaTableBuilder, DeltaTableError};
15+
use deltalake::{DeltaOps, DeltaTableBuilder, DeltaTableError};
1716
use std::env;
1817

18+
fn configure_s3() {
19+
// Set S3 configuration options using environment variables
20+
env::set_var("AWS_ENDPOINT_URL", "http://localhost:5561");
21+
env::set_var("AWS_REGION", "us-east-1");
22+
env::set_var("AWS_ACCESS_KEY_ID", "admin");
23+
env::set_var("AWS_SECRET_ACCESS_KEY", "password");
24+
env::set_var("AWS_ALLOW_HTTP", "true");
25+
env::set_var("AWS_S3_ALLOW_UNSAFE_RENAME", "true");
26+
27+
// Register AWS S3 handlers for Delta Lake operations
28+
deltalake::aws::register_handlers(None);
29+
}
30+
1931
/// Builds a `DeltaOps` instance for the specified Delta table.
20-
/// Enabling operations such as creating, reading, and writing data in the Delta Lake format.
32+
/// Enabling operations such as creating, reading and writing data in the Delta Lake format.
2133
fn get_delta_ops(table_name: &str) -> Result<DeltaOps, DeltaTableError> {
2234
let delta_table = DeltaTableBuilder::from_uri(format!("s3://data-lakehouse/{}", table_name)).build()?;
2335

2436
Ok(DeltaOps::from(delta_table))
2537
}
2638

39+
#[tokio::main()]
40+
async fn main() {
41+
configure_s3()
42+
}
43+
```
44+
45+
## Create table
46+
47+
```rust
48+
use deltalake::kernel::DataType;
49+
use deltalake::{DeltaTable, DeltaTableError};
50+
51+
// ...
52+
2753
async fn create_table(table_name: &str) -> Result<DeltaTable, DeltaTableError> {
2854
let delta_ops = get_delta_ops(table_name)?;
2955

3056
let table = delta_ops
3157
.create()
32-
.with_table_name("employee")
58+
.with_table_name(table_name)
3359
.with_column("id", DataType::INTEGER, false, Default::default())
3460
.with_column("name", DataType::STRING, false, Default::default())
3561
.await?;
@@ -39,19 +65,50 @@ async fn create_table(table_name: &str) -> Result<DeltaTable, DeltaTableError> {
3965

4066
#[tokio::main()]
4167
async fn main() {
68+
configure_s3()
69+
4270
let table_name = "employee";
71+
create_table(&table_name).await.expect("Table creation failed");
72+
}
73+
```
4374

44-
// Set S3 configuration options using environment variables
45-
env::set_var("AWS_ENDPOINT_URL", "http://localhost:5561");
46-
env::set_var("AWS_REGION", "us-east-1");
47-
env::set_var("AWS_ACCESS_KEY_ID", "admin");
48-
env::set_var("AWS_SECRET_ACCESS_KEY", "password");
49-
env::set_var("AWS_ALLOW_HTTP", "true");
50-
env::set_var("AWS_S3_ALLOW_UNSAFE_RENAME", "true");
75+
## Insert
5176

52-
// Register AWS S3 handlers for Delta Lake operations.
53-
deltalake::aws::register_handlers(None);
77+
```rust
78+
use deltalake::arrow::array::{Int32Array, RecordBatch, StringArray};
79+
use deltalake::arrow::datatypes::{DataType as ArrowDataType, Field, Schema as ArrowSchema};
80+
use deltalake::protocol::SaveMode;
81+
use deltalake::{DeltaTable, DeltaTableError};
82+
use std::sync::Arc;
5483

55-
create_table(&table_name).await.expect("Table creation failed");
84+
// ...
85+
86+
async fn insert(table_name: &str, save_mode: SaveMode) -> Result<DeltaTable, DeltaTableError> {
87+
// Define the schema for the record
88+
let schema = Arc::new(ArrowSchema::new(vec![
89+
Field::new("int", ArrowDataType::Int32, false),
90+
Field::new("string", ArrowDataType::Utf8, true),
91+
]));
92+
93+
// Create a employee record
94+
let ids = Int32Array::from(vec![1, 2, 3]);
95+
let names = StringArray::from(vec!["Tom", "Tim", "Titus"]);
96+
let employee_record = RecordBatch::try_new(schema, vec![Arc::new(ids), Arc::new(names)]).unwrap();
97+
98+
let delta_ops = get_delta_ops(table_name)?;
99+
// Insert record
100+
let table = delta_ops.write(vec![employee_record]).with_save_mode(save_mode).await?;
101+
102+
Ok(table)
103+
}
104+
105+
#[tokio::main()]
106+
async fn main() {
107+
// ...
108+
109+
let table_name = "employee";
110+
insert(&table_name, SaveMode::Append).await.expect("Insert failed");
56111
}
57112
```
113+
114+
The default save mode for the `delta_ops.write` function is `SaveMode::Append`. To overwrite existing data instead of appending, use `SaveMode::Overwrite`.

0 commit comments

Comments
 (0)