Skip to content

Commit e5ab97c

Browse files
committed
feat: read instructions added
1 parent 58a6205 commit e5ab97c

File tree

1 file changed

+41
-4
lines changed
  • docs/data_engineering/data_lakehouse/delta_lake

1 file changed

+41
-4
lines changed

docs/data_engineering/data_lakehouse/delta_lake/rust.md

Lines changed: 41 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ This guide provides an introduction to using the [Delta Lake Rust library (delta
77
```toml
88
[dependencies]
99
deltalake = { version = "0.22.3", features = ["datafusion", "s3"] }
10+
tokio = "1.42.0"
1011
```
1112

1213
## Initial setup
@@ -30,8 +31,14 @@ fn configure_s3() {
3031

3132
/// Builds a `DeltaOps` instance for the specified Delta table.
3233
/// Enabling operations such as creating, reading and writing data in the Delta Lake format.
33-
fn get_delta_ops(table_name: &str) -> Result<DeltaOps, DeltaTableError> {
34-
let delta_table = DeltaTableBuilder::from_uri(format!("s3://data-lakehouse/{}", table_name)).build()?;
34+
async fn get_delta_ops(table_name: &str, load_state: bool) -> Result<DeltaOps, DeltaTableError> {
35+
let delta_table_builder = DeltaTableBuilder::from_uri(format!("s3://data-lakehouse/{}", table_name));
36+
let delta_table = match load_state {
37+
// Load the existing table state
38+
true => delta_table_builder.load().await?,
39+
// Build the table without loading existing state
40+
false => delta_table_builder.build()?,
41+
};
3542

3643
Ok(DeltaOps::from(delta_table))
3744
}
@@ -42,6 +49,8 @@ async fn main() {
4249
}
4350
```
4451

52+
If the table doesn't exist yet, the `load_state` parameter in `get_delta_ops` should be set to `false`, as setting it to `true` would attempt to read a non-existent state, resulting in an error. On the other hand, if you want to read from an existing table, `load_state` must be set to `true` to successfully load the data; otherwise, the load operation will fail.
53+
4554
## Create table
4655

4756
```rust
@@ -51,7 +60,7 @@ use deltalake::{DeltaTable, DeltaTableError};
5160
// ...
5261

5362
async fn create_table(table_name: &str) -> Result<DeltaTable, DeltaTableError> {
54-
let delta_ops = get_delta_ops(table_name)?;
63+
let delta_ops = get_delta_ops(table_name, false)?;
5564

5665
let table = delta_ops
5766
.create()
@@ -95,7 +104,7 @@ async fn insert(table_name: &str, save_mode: SaveMode) -> Result<DeltaTable, Del
95104
let names = StringArray::from(vec!["Tom", "Tim", "Titus"]);
96105
let employee_record = RecordBatch::try_new(schema, vec![Arc::new(ids), Arc::new(names)]).unwrap();
97106

98-
let delta_ops = get_delta_ops(table_name)?;
107+
let delta_ops = get_delta_ops(table_name, false)?;
99108
// Insert record
100109
let table = delta_ops.write(vec![employee_record]).with_save_mode(save_mode).await?;
101110

@@ -112,3 +121,31 @@ async fn main() {
112121
```
113122

114123
The default save mode for the `delta_ops.write` function is `SaveMode::Append`. To overwrite existing data instead of appending, use `SaveMode::Overwrite`.
124+
125+
## Read
126+
127+
```rust
128+
use deltalake::arrow::array::RecordBatch;
129+
use deltalake::operations::collect_sendable_stream;
130+
use deltalake::DeltaTableError;
131+
132+
// ...
133+
134+
async fn read(table_name: &str) -> Result<Vec<RecordBatch>, DeltaTableError> {
135+
let delta_ops = get_delta_ops(table_name, true).await?;
136+
137+
let (_, stream) = delta_ops.load().await?;
138+
let employee_records: Vec<RecordBatch> = collect_sendable_stream(stream).await?;
139+
140+
Ok(employee_records)
141+
}
142+
143+
#[tokio::main()]
144+
async fn main() {
145+
// ...
146+
147+
let table_name = "employee";
148+
let employee_records = read(&table_name).await.expect("Read failed");
149+
println!("employee_records: {:?}", employee_records);
150+
}
151+
```

0 commit comments

Comments
 (0)