|
| 1 | +# delta-rs |
| 2 | + |
| 3 | +## Create table |
| 4 | + |
| 5 | +```rust |
| 6 | +use deltalake::{kernel::DataType, DeltaOps}; |
| 7 | + |
| 8 | +#[tokio::main()] |
| 9 | +async fn main() { |
| 10 | + let delta_ops = DeltaOps::new_in_memory(); |
| 11 | + |
| 12 | + let table = delta_ops |
| 13 | + .create() |
| 14 | + .with_table_name("table_01") |
| 15 | + .with_column("id", DataType::INTEGER, false, Default::default()) |
| 16 | + .with_column("name", DataType::STRING, false, Default::default()) |
| 17 | + .await |
| 18 | + .expect("Table creation failed"); |
| 19 | +} |
| 20 | +``` |
| 21 | + |
| 22 | +## Insert data |
| 23 | + |
| 24 | +```rust |
| 25 | +use deltalake::arrow::array::{Int32Array, RecordBatch, StringArray}; |
| 26 | +use deltalake::arrow::datatypes::{DataType as ArrowDataType, Field, Schema as ArrowSchema}; |
| 27 | +use deltalake::DeltaOps; |
| 28 | +use std::sync::Arc; |
| 29 | + |
| 30 | +#[tokio::main()] |
| 31 | +async fn main() { |
| 32 | + // ... |
| 33 | + |
| 34 | + let schema = Arc::new(ArrowSchema::new(vec![ |
| 35 | + Field::new("id", ArrowDataType::Int32, false), |
| 36 | + Field::new("name", ArrowDataType::Utf8, true), |
| 37 | + ])); |
| 38 | + |
| 39 | + // Create employee records |
| 40 | + let ids = Int32Array::from(vec![1, 2, 3]); |
| 41 | + let names = StringArray::from(vec!["Tom", "Tim", "Titus"]); |
| 42 | + let employee_record = RecordBatch::try_new(schema, vec![Arc::new(ids), Arc::new(names)]).unwrap(); |
| 43 | + |
| 44 | + // Insert records |
| 45 | + let table = DeltaOps(table).write(vec![employee_record]).await.expect("Insert failed"); |
| 46 | +} |
| 47 | +``` |
| 48 | + |
| 49 | +> The Arrow Rust array primitives are _very_ fickle and so creating a direct transformation is quite tricky in Rust, whereas in Python or another loosely typed language it might be simpler. |
| 50 | +
|
| 51 | +[(source)](https://github.com/delta-io/delta-rs/blob/99e39ca1ca372211cf7b90b62d33878fa961881c/crates/deltalake/examples/recordbatch-writer.rs#L156) |
| 52 | + |
| 53 | +The default save mode for the `delta_ops.write` function is `SaveMode::Append`. To overwrite existing data instead of appending, use `SaveMode::Overwrite`: |
| 54 | + |
| 55 | +```rust |
| 56 | +use deltalake::protocol::SaveMode; |
| 57 | +use deltalake::DeltaOps; |
| 58 | + |
| 59 | +#[tokio::main()] |
| 60 | +async fn main() { |
| 61 | + // ... |
| 62 | + |
| 63 | + let table = DeltaOps(table) |
| 64 | + .write(vec![employee_record]) |
| 65 | + .with_save_mode(SaveMode::Overwrite) |
| 66 | + .await |
| 67 | + .expect("Insert failed"); |
| 68 | +} |
| 69 | +``` |
| 70 | + |
| 71 | +## Load table |
| 72 | + |
| 73 | +Open table: |
| 74 | + |
| 75 | +```rust |
| 76 | + |
| 77 | +#[tokio::main()] |
| 78 | +async fn main() { |
| 79 | + // ... |
| 80 | + |
| 81 | + let table = deltalake::open_table("s3://data-lakehouse/employee").await.expect("Load failed"); |
| 82 | +} |
| 83 | +``` |
| 84 | + |
| 85 | +Load table data: |
| 86 | + |
| 87 | +```rust |
| 88 | +use deltalake::operations::collect_sendable_stream; |
| 89 | +use deltalake::DeltaOps; |
| 90 | + |
| 91 | +#[tokio::main()] |
| 92 | +async fn main() { |
| 93 | + // ... |
| 94 | + |
| 95 | + let (_, stream) = DeltaOps(table).load().await.expect("Load failed"); |
| 96 | + let records = collect_sendable_stream(stream).await.unwrap(); |
| 97 | + |
| 98 | + println!("{:?}", records) |
| 99 | +} |
| 100 | +``` |
| 101 | + |
| 102 | +### S3 storage |
| 103 | + |
| 104 | +```rust |
| 105 | +use std::collections::HashMap; |
| 106 | + |
| 107 | +#[tokio::main()] |
| 108 | +async fn main() { |
| 109 | + // Register AWS S3 handlers for Delta Lake operations |
| 110 | + deltalake::aws::register_handlers(None); |
| 111 | + |
| 112 | + let mut storage_options = HashMap::new(); |
| 113 | + storage_options.insert("AWS_ENDPOINT_URL".to_string(), "http://localhost:5561".to_string()); |
| 114 | + storage_options.insert("AWS_REGION".to_string(), "us-east-1".to_string()); |
| 115 | + storage_options.insert("AWS_ACCESS_KEY_ID".to_string(), "admin".to_string()); |
| 116 | + storage_options.insert("AWS_SECRET_ACCESS_KEY".to_string(), "password".to_string()); |
| 117 | + storage_options.insert("AWS_ALLOW_HTTP".to_string(), "true".to_string()); |
| 118 | + storage_options.insert("AWS_S3_ALLOW_UNSAFE_RENAME".to_string(), "true".to_string()); |
| 119 | + |
| 120 | + let table = deltalake::open_table_with_storage_options("s3://data-lakehouse/employee", storage_options) |
| 121 | + .await |
| 122 | + .expect("Load failed"); |
| 123 | +} |
| 124 | +``` |
| 125 | + |
| 126 | +You can set the storage option parameters as environment variables too. |
| 127 | + |
| 128 | +S3 requires a locking provider by default ([more information](https://delta-io.github.io/delta-rs/usage/writing/writing-to-s3-with-locking-provider/)). If you don't want to use a locking provider, you can disable it by setting the `AWS_S3_ALLOW_UNSAFE_RENAME` variable to `true`. |
| 129 | + |
| 130 | +## Time travel |
| 131 | + |
| 132 | +To load the previous state of a table, you can use the `open_table_with_version` function: |
| 133 | + |
| 134 | +```rust |
| 135 | +let version = 1; |
| 136 | +let mut table = deltalake::open_table_with_version("s3://data-lakehouse/employee", version).await.expect("Load failed"); |
| 137 | +``` |
| 138 | + |
| 139 | +If the table is already loaded and you want to change the version number, just use the `load_version` function. |
| 140 | + |
| 141 | +```rust |
| 142 | +table.load_version(2).await.expect("Load failed"); |
| 143 | +``` |
0 commit comments