|
| 1 | +use chrono::{NaiveDateTime, Utc}; |
1 | 2 | use dolphindb::{ |
2 | | - client::ClientBuilder, |
3 | | - error::Error, |
4 | | - types::{ConstantImpl, DoubleArrayVector, Int, IntVector, TableBuilder, VectorImpl}, |
| 3 | + client::{ClientBuilder, TableWriter}, |
| 4 | + types::PrimitiveType, |
5 | 5 | }; |
| 6 | +use tokio::{ |
| 7 | + sync::mpsc, |
| 8 | + time::{sleep, Duration}, |
| 9 | +}; |
| 10 | + |
| 11 | +#[derive(Clone)] |
| 12 | +struct TickerEvent { |
| 13 | + event_time: i64, |
| 14 | + event_time2: NaiveDateTime, |
| 15 | + symbol: String, |
| 16 | + event_id: i64, |
| 17 | + prices: Vec<f64>, |
| 18 | +} |
| 19 | + |
| 20 | +fn build_table_row(event: &TickerEvent) -> Vec<PrimitiveType> { |
| 21 | + vec![ |
| 22 | + event.event_time.into(), |
| 23 | + event.event_time2.into(), |
| 24 | + event.symbol.clone().into(), |
| 25 | + event.event_id.into(), |
| 26 | + event.prices.clone().into(), |
| 27 | + ] |
| 28 | +} |
6 | 29 |
|
7 | | -async fn table_writer() -> Result<(), Error> { |
| 30 | +#[tokio::main] |
| 31 | +async fn main() { |
| 32 | + // connect to DolphinDB |
8 | 33 | let mut builder = ClientBuilder::new("127.0.0.1:8848"); |
9 | 34 | builder.with_auth(("admin", "123456")); |
10 | 35 | let mut client = builder.connect().await.unwrap(); |
11 | 36 |
|
12 | | - let mut prices = DoubleArrayVector::new(); |
13 | | - let price1 = vec![1.1, 2.2, 3.3]; |
14 | | - prices.push(price1); |
15 | | - println!("{prices}"); |
16 | | - |
17 | | - // write one row |
18 | | - let c_int = ConstantImpl::from(Int::new(1)); |
19 | | - let c_double_array_vector = ConstantImpl::Vector(VectorImpl::from(prices.clone())); |
20 | | - let res = client |
21 | | - .run_function("tableInsert{testTable}", &[c_int, c_double_array_vector]) |
22 | | - .await |
23 | | - .unwrap() |
24 | | - .unwrap(); |
25 | | - println!("{res}"); |
26 | | - |
27 | | - // write a table |
28 | | - let price2 = vec![4.4, 5.5]; |
29 | | - prices.push(price2); |
30 | | - let v_int = IntVector::from_raw(&[2, 3]).into(); |
31 | | - let v_double_array_vector = VectorImpl::from(prices); |
32 | | - println!("{v_double_array_vector}"); |
33 | | - let mut builder = TableBuilder::new(); |
34 | | - builder.with_name("my_table".to_string()); |
35 | | - builder.with_contents( |
36 | | - vec![v_int, v_double_array_vector], |
37 | | - vec!["volume".to_string(), "price".to_string()], |
| 37 | + // create a stream table |
| 38 | + let stream_table = "depthStreamTable"; |
| 39 | + let script = format!( |
| 40 | + r#" |
| 41 | + colNames = ["event_time", "event_time2", "symbol", "event_id", "prices"] |
| 42 | + colTypes = [TIMESTAMP, TIMESTAMP, SYMBOL, LONG, DOUBLE[]] |
| 43 | +
|
| 44 | + if (!existsStreamTable("{stream_table}")) {{ |
| 45 | + enableTableShareAndCachePurge(streamTable(1000000:0, colNames, colTypes), "{stream_table}", 1000000) |
| 46 | + }} |
| 47 | + "# |
38 | 48 | ); |
39 | | - let table = builder.build().unwrap(); |
40 | | - println!("{table}"); |
41 | | - let res = client |
42 | | - .run_function("tableInsert{testTable}", &[table.into()]) |
43 | | - .await? |
44 | | - .unwrap(); |
45 | | - println!("{res}"); |
46 | | - Ok(()) |
47 | | -} |
| 49 | + client.run_script(&script).await.unwrap(); |
48 | 50 |
|
49 | | -#[tokio::main] |
50 | | -async fn main() { |
51 | | - table_writer().await.unwrap(); |
| 51 | + // generate data in rust |
| 52 | + let event = TickerEvent { |
| 53 | + event_time: Utc::now().timestamp_millis(), |
| 54 | + event_time2: Utc::now().naive_utc(), |
| 55 | + symbol: "BTCUSDT".into(), |
| 56 | + event_id: 1000, |
| 57 | + prices: vec![5000.0; 100], |
| 58 | + }; |
| 59 | + |
| 60 | + // TableWriter is NOT thread safe. |
| 61 | + // This example show how to insert data from multiple sources. |
| 62 | + let (tx, mut rx) = mpsc::unbounded_channel::<TickerEvent>(); |
| 63 | + let symbol_number = 500; |
| 64 | + let tx1 = tx.clone(); |
| 65 | + let tx2 = tx.clone(); |
| 66 | + let event1 = event.clone(); |
| 67 | + let event2 = event.clone(); |
| 68 | + |
| 69 | + // This data source generates 500 * 20 rows every second. |
| 70 | + tokio::spawn(async move { |
| 71 | + loop { |
| 72 | + for _ in 0..symbol_number { |
| 73 | + let _ = tx1.send(event1.clone()); |
| 74 | + } |
| 75 | + sleep(Duration::from_millis(1000 / 20)).await; |
| 76 | + } |
| 77 | + }); |
| 78 | + // This data source generates 500 * 10 rows every second. |
| 79 | + tokio::spawn(async move { |
| 80 | + loop { |
| 81 | + for _ in 0..symbol_number { |
| 82 | + let _ = tx2.send(event2.clone()); |
| 83 | + } |
| 84 | + sleep(Duration::from_millis(1000 / 10)).await; |
| 85 | + } |
| 86 | + }); |
| 87 | + |
| 88 | + tokio::spawn(async move { |
| 89 | + let mut inserted = 0usize; |
| 90 | + let mut writer = TableWriter::new(client, stream_table, 512).await; |
| 91 | + while let Some(event) = rx.recv().await { |
| 92 | + let mut row = build_table_row(&event); |
| 93 | + let res = writer.append_row(&mut row).await; |
| 94 | + |
| 95 | + match res { |
| 96 | + Ok(_) => { |
| 97 | + inserted += 1; |
| 98 | + if inserted % 10000 == 0 { |
| 99 | + println!("{} rows inserted and {} rows in buffer", inserted, rx.len()); |
| 100 | + } |
| 101 | + } |
| 102 | + Err(e) => { |
| 103 | + eprintln!("Insertion failed: {:?}", e); |
| 104 | + inserted += 1; |
| 105 | + } |
| 106 | + } |
| 107 | + } |
| 108 | + }); |
| 109 | + |
| 110 | + println!("Insertion started."); |
| 111 | + futures::future::pending::<()>().await; |
52 | 112 | } |
0 commit comments