Skip to content

Commit 1ad846d

Browse files
committed
feat: add bulk upsert methods for all database entities
Introduce efficient bulk upsert operations for orders, CJIT entries, Blocktank info, onchain activities, lightning activities, and closed channels. All bulk operations use transactions and prepared statements for optimal performance. **Blocktank Module:** - Add `upsert_orders()` for bulk upsert of IBtOrder records - Add `upsert_cjit_entries()` for bulk upsert of ICJitEntry records - Add `upsert_info()` method (existing, now exposed in bindings) - Refactor `upsert_order()` and `upsert_cjit_entry()` to use helper functions - Extract SQL constants to `models.rs` for reusability - Add helper functions `build_order_params()` and `build_cjit_params()` to reduce duplication **Activity Module:** - Add `upsert_onchain_activities()` for bulk upsert of OnchainActivity records - Add `upsert_lightning_activities()` for bulk upsert of LightningActivity records - Rename `insert_closed_channel()` to `upsert_closed_channel()` (uses INSERT OR REPLACE) - Add `upsert_closed_channels()` for bulk operations **FFI Bindings:** - Export `upsert_orders()` in lib.rs - Export `upsert_cjit_entries()` in lib.rs - Export `upsert_info()` in lib.rs - Export `upsert_onchain_activities()` in lib.rs - Export `upsert_lightning_activities()` in lib.rs - Export `upsert_closed_channels()` in lib.rs (single method already exported) **Testing:** - Add tests for all bulk upsert methods - Add tests for empty input handling
1 parent 1ba4859 commit 1ad846d

File tree

6 files changed

+957
-123
lines changed

6 files changed

+957
-123
lines changed

src/lib.rs

Lines changed: 85 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -402,12 +402,39 @@ pub fn get_all_unique_tags() -> Result<Vec<String>, ActivityError> {
402402
}
403403

404404
#[uniffi::export]
405-
pub fn insert_closed_channel(channel: ClosedChannelDetails) -> Result<(), ActivityError> {
405+
pub fn upsert_closed_channel(channel: ClosedChannelDetails) -> Result<(), ActivityError> {
406406
let mut guard = get_activity_db()?;
407407
let db = guard.activity_db.as_mut().ok_or(ActivityError::ConnectionError {
408408
error_details: "Database not initialized. Call init_db first.".to_string()
409409
})?;
410-
db.insert_closed_channel(&channel)
410+
db.upsert_closed_channel(&channel)
411+
}
412+
413+
#[uniffi::export]
414+
pub fn upsert_closed_channels(channels: Vec<ClosedChannelDetails>) -> Result<(), ActivityError> {
415+
let mut guard = get_activity_db()?;
416+
let db = guard.activity_db.as_mut().ok_or(ActivityError::ConnectionError {
417+
error_details: "Database not initialized. Call init_db first.".to_string()
418+
})?;
419+
db.upsert_closed_channels(&channels)
420+
}
421+
422+
#[uniffi::export]
423+
pub fn upsert_onchain_activities(activities: Vec<OnchainActivity>) -> Result<(), ActivityError> {
424+
let mut guard = get_activity_db()?;
425+
let db = guard.activity_db.as_mut().ok_or(ActivityError::ConnectionError {
426+
error_details: "Database not initialized. Call init_db first.".to_string()
427+
})?;
428+
db.upsert_onchain_activities(&activities)
429+
}
430+
431+
#[uniffi::export]
432+
pub fn upsert_lightning_activities(activities: Vec<LightningActivity>) -> Result<(), ActivityError> {
433+
let mut guard = get_activity_db()?;
434+
let db = guard.activity_db.as_mut().ok_or(ActivityError::ConnectionError {
435+
error_details: "Database not initialized. Call init_db first.".to_string()
436+
})?;
437+
db.upsert_lightning_activities(&activities)
411438
}
412439

413440
#[uniffi::export]
@@ -1320,6 +1347,62 @@ pub async fn blocktank_wipe_all() -> Result<(), BlocktankError> {
13201347
}).await.unwrap()
13211348
}
13221349

1350+
#[uniffi::export]
1351+
pub async fn upsert_info(info: IBtInfo) -> Result<(), BlocktankError> {
1352+
let rt = ensure_runtime();
1353+
rt.spawn(async move {
1354+
let cell = ASYNC_DB.get().ok_or(BlocktankError::ConnectionError {
1355+
error_details: "Database not initialized. Call init_db first.".to_string()
1356+
})?;
1357+
let guard = cell.lock().await;
1358+
let db = guard.blocktank_db.as_ref().ok_or(BlocktankError::ConnectionError {
1359+
error_details: "Database not initialized. Call init_db first.".to_string()
1360+
})?;
1361+
let external_info: rust_blocktank_client::IBtInfo = info.into();
1362+
db.upsert_info(&external_info).await
1363+
}).await.unwrap_or_else(|e| Err(BlocktankError::ConnectionError {
1364+
error_details: format!("Runtime error: {}", e)
1365+
}))
1366+
}
1367+
1368+
#[uniffi::export]
1369+
pub async fn upsert_orders(orders: Vec<IBtOrder>) -> Result<(), BlocktankError> {
1370+
let rt = ensure_runtime();
1371+
rt.spawn(async move {
1372+
let cell = ASYNC_DB.get().ok_or(BlocktankError::ConnectionError {
1373+
error_details: "Database not initialized. Call init_db first.".to_string()
1374+
})?;
1375+
let guard = cell.lock().await;
1376+
let db = guard.blocktank_db.as_ref().ok_or(BlocktankError::ConnectionError {
1377+
error_details: "Database not initialized. Call init_db first.".to_string()
1378+
})?;
1379+
1380+
let external_orders: Vec<rust_blocktank_client::IBtOrder> = orders.into_iter().map(|order| order.into()).collect();
1381+
db.upsert_orders(&external_orders).await
1382+
}).await.unwrap_or_else(|e| Err(BlocktankError::ConnectionError {
1383+
error_details: format!("Runtime error: {}", e)
1384+
}))
1385+
}
1386+
1387+
#[uniffi::export]
1388+
pub async fn upsert_cjit_entries(entries: Vec<ICJitEntry>) -> Result<(), BlocktankError> {
1389+
let rt = ensure_runtime();
1390+
rt.spawn(async move {
1391+
let cell = ASYNC_DB.get().ok_or(BlocktankError::ConnectionError {
1392+
error_details: "Database not initialized. Call init_db first.".to_string()
1393+
})?;
1394+
let guard = cell.lock().await;
1395+
let db = guard.blocktank_db.as_ref().ok_or(BlocktankError::ConnectionError {
1396+
error_details: "Database not initialized. Call init_db first.".to_string()
1397+
})?;
1398+
1399+
let external_entries: Vec<rust_blocktank_client::ICJitEntry> = entries.into_iter().map(|e| e.into()).collect();
1400+
db.upsert_cjit_entries(&external_entries).await
1401+
}).await.unwrap_or_else(|e| Err(BlocktankError::ConnectionError {
1402+
error_details: format!("Runtime error: {}", e)
1403+
}))
1404+
}
1405+
13231406
#[uniffi::export]
13241407
pub async fn wipe_all_databases() -> Result<String, DbError> {
13251408
let rt = ensure_runtime();

src/modules/activity/implementation.rs

Lines changed: 194 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,17 @@ const CREATE_CLOSED_CHANNELS_TABLE: &str = "
8080
channel_closure_reason TEXT NOT NULL
8181
)";
8282

83+
const UPSERT_CLOSED_CHANNEL_SQL: &str = "
84+
INSERT OR REPLACE INTO closed_channels (
85+
channel_id, counterparty_node_id, funding_txo_txid, funding_txo_index,
86+
channel_value_sats, closed_at, outbound_capacity_msat, inbound_capacity_msat,
87+
counterparty_unspendable_punishment_reserve, unspendable_punishment_reserve,
88+
forwarding_fee_proportional_millionths, forwarding_fee_base_msat,
89+
channel_name, channel_closure_reason
90+
) VALUES (
91+
?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13, ?14
92+
)";
93+
8394
const INDEX_STATEMENTS: &[&str] = &[
8495
// Activity indexes
8596
"CREATE INDEX IF NOT EXISTS idx_activities_type_timestamp ON activities(activity_type, timestamp DESC)",
@@ -379,6 +390,138 @@ impl ActivityDB {
379390
Ok(())
380391
}
381392

393+
pub fn upsert_onchain_activities(&mut self, activities: &[OnchainActivity]) -> Result<(), ActivityError> {
394+
if activities.is_empty() {
395+
return Ok(());
396+
}
397+
398+
let tx = self.conn.transaction().map_err(|e| ActivityError::DataError {
399+
error_details: format!("Failed to start transaction: {}", e),
400+
})?;
401+
402+
{
403+
let mut stmt_act = tx.prepare(
404+
"INSERT OR REPLACE INTO activities (id, activity_type, tx_type, timestamp) VALUES (?1, 'onchain', ?2, ?3)"
405+
).map_err(|e| ActivityError::DataError {
406+
error_details: format!("Failed to prepare activities statement: {}", e),
407+
})?;
408+
let mut stmt_onchain = tx.prepare(
409+
"INSERT OR REPLACE INTO onchain_activity (
410+
id, tx_id, address, confirmed, value, fee, fee_rate, is_boosted,
411+
boost_tx_ids, is_transfer, does_exist, confirm_timestamp,
412+
channel_id, transfer_tx_id
413+
) VALUES (
414+
?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13, ?14
415+
)"
416+
).map_err(|e| ActivityError::DataError {
417+
error_details: format!("Failed to prepare onchain statement: {}", e),
418+
})?;
419+
420+
for activity in activities {
421+
if activity.id.is_empty() {
422+
return Err(ActivityError::DataError {
423+
error_details: "Activity ID cannot be empty".to_string(),
424+
});
425+
}
426+
427+
stmt_act.execute((
428+
&activity.id,
429+
Self::payment_type_to_string(&activity.tx_type),
430+
activity.timestamp,
431+
)).map_err(|e| ActivityError::InsertError {
432+
error_details: format!("Failed to upsert activities: {}", e),
433+
})?;
434+
435+
let boost_tx_ids_str = activity.boost_tx_ids.join(",");
436+
stmt_onchain.execute((
437+
&activity.id,
438+
&activity.tx_id,
439+
&activity.address,
440+
activity.confirmed,
441+
activity.value,
442+
activity.fee,
443+
activity.fee_rate,
444+
activity.is_boosted,
445+
&boost_tx_ids_str,
446+
activity.is_transfer,
447+
activity.does_exist,
448+
activity.confirm_timestamp,
449+
&activity.channel_id,
450+
&activity.transfer_tx_id,
451+
)).map_err(|e| ActivityError::InsertError {
452+
error_details: format!("Failed to upsert onchain_activity: {}", e),
453+
})?;
454+
}
455+
}
456+
457+
tx.commit().map_err(|e| ActivityError::DataError {
458+
error_details: format!("Failed to commit transaction: {}", e),
459+
})?;
460+
461+
Ok(())
462+
}
463+
464+
pub fn upsert_lightning_activities(&mut self, activities: &[LightningActivity]) -> Result<(), ActivityError> {
465+
if activities.is_empty() {
466+
return Ok(());
467+
}
468+
469+
let tx = self.conn.transaction().map_err(|e| ActivityError::DataError {
470+
error_details: format!("Failed to start transaction: {}", e),
471+
})?;
472+
473+
{
474+
let mut stmt_act = tx.prepare(
475+
"INSERT OR REPLACE INTO activities (id, activity_type, tx_type, timestamp) VALUES (?1, 'lightning', ?2, ?3)"
476+
).map_err(|e| ActivityError::DataError {
477+
error_details: format!("Failed to prepare activities statement: {}", e),
478+
})?;
479+
let mut stmt_ln = tx.prepare(
480+
"INSERT OR REPLACE INTO lightning_activity (
481+
id, invoice, value, status, fee, message, preimage
482+
) VALUES (
483+
?1, ?2, ?3, ?4, ?5, ?6, ?7
484+
)"
485+
).map_err(|e| ActivityError::DataError {
486+
error_details: format!("Failed to prepare lightning statement: {}", e),
487+
})?;
488+
489+
for activity in activities {
490+
if activity.id.is_empty() {
491+
return Err(ActivityError::DataError {
492+
error_details: "Activity ID cannot be empty".to_string(),
493+
});
494+
}
495+
496+
stmt_act.execute((
497+
&activity.id,
498+
Self::payment_type_to_string(&activity.tx_type),
499+
activity.timestamp,
500+
)).map_err(|e| ActivityError::InsertError {
501+
error_details: format!("Failed to upsert activities: {}", e),
502+
})?;
503+
504+
stmt_ln.execute((
505+
&activity.id,
506+
&activity.invoice,
507+
activity.value,
508+
Self::payment_state_to_string(&activity.status),
509+
activity.fee,
510+
&activity.message,
511+
&activity.preimage,
512+
)).map_err(|e| ActivityError::InsertError {
513+
error_details: format!("Failed to upsert lightning_activity: {}", e),
514+
})?;
515+
}
516+
}
517+
518+
tx.commit().map_err(|e| ActivityError::DataError {
519+
error_details: format!("Failed to commit transaction: {}", e),
520+
})?;
521+
522+
Ok(())
523+
}
524+
382525
pub fn get_activities(
383526
&self,
384527
filter: Option<ActivityFilter>,
@@ -1019,26 +1162,15 @@ impl ActivityDB {
10191162
Ok(tags)
10201163
}
10211164

1022-
pub fn insert_closed_channel(&mut self, channel: &ClosedChannelDetails) -> Result<(), ActivityError> {
1165+
pub fn upsert_closed_channel(&mut self, channel: &ClosedChannelDetails) -> Result<(), ActivityError> {
10231166
if channel.channel_id.is_empty() {
10241167
return Err(ActivityError::DataError {
10251168
error_details: "Channel ID cannot be empty".to_string(),
10261169
});
10271170
}
10281171

1029-
let sql = "
1030-
INSERT INTO closed_channels (
1031-
channel_id, counterparty_node_id, funding_txo_txid, funding_txo_index,
1032-
channel_value_sats, closed_at, outbound_capacity_msat, inbound_capacity_msat,
1033-
counterparty_unspendable_punishment_reserve, unspendable_punishment_reserve,
1034-
forwarding_fee_proportional_millionths, forwarding_fee_base_msat,
1035-
channel_name, channel_closure_reason
1036-
) VALUES (
1037-
?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13, ?14
1038-
)";
1039-
10401172
self.conn.execute(
1041-
sql,
1173+
UPSERT_CLOSED_CHANNEL_SQL,
10421174
rusqlite::params![
10431175
&channel.channel_id,
10441176
&channel.counterparty_node_id,
@@ -1062,6 +1194,55 @@ impl ActivityDB {
10621194
Ok(())
10631195
}
10641196

1197+
pub fn upsert_closed_channels(&mut self, channels: &[ClosedChannelDetails]) -> Result<(), ActivityError> {
1198+
if channels.is_empty() {
1199+
return Ok(());
1200+
}
1201+
1202+
let tx = self.conn.transaction().map_err(|e| ActivityError::DataError {
1203+
error_details: format!("Failed to start transaction: {}", e),
1204+
})?;
1205+
1206+
{
1207+
let mut stmt = tx.prepare(UPSERT_CLOSED_CHANNEL_SQL).map_err(|e| ActivityError::DataError {
1208+
error_details: format!("Failed to prepare statement: {}", e),
1209+
})?;
1210+
1211+
for channel in channels {
1212+
if channel.channel_id.is_empty() {
1213+
return Err(ActivityError::DataError {
1214+
error_details: "Channel ID cannot be empty".to_string(),
1215+
});
1216+
}
1217+
1218+
stmt.execute(rusqlite::params![
1219+
&channel.channel_id,
1220+
&channel.counterparty_node_id,
1221+
&channel.funding_txo_txid,
1222+
channel.funding_txo_index as i64,
1223+
channel.channel_value_sats as i64,
1224+
channel.closed_at as i64,
1225+
channel.outbound_capacity_msat as i64,
1226+
channel.inbound_capacity_msat as i64,
1227+
channel.counterparty_unspendable_punishment_reserve as i64,
1228+
channel.unspendable_punishment_reserve as i64,
1229+
channel.forwarding_fee_proportional_millionths as i64,
1230+
channel.forwarding_fee_base_msat as i64,
1231+
&channel.channel_name,
1232+
&channel.channel_closure_reason,
1233+
]).map_err(|e| ActivityError::InsertError {
1234+
error_details: format!("Failed to insert closed channel {}: {}", channel.channel_id, e),
1235+
})?;
1236+
}
1237+
}
1238+
1239+
tx.commit().map_err(|e| ActivityError::DataError {
1240+
error_details: format!("Failed to commit transaction: {}", e),
1241+
})?;
1242+
1243+
Ok(())
1244+
}
1245+
10651246
pub fn get_closed_channel_by_id(&self, channel_id: &str) -> Result<Option<ClosedChannelDetails>, ActivityError> {
10661247
let sql = "
10671248
SELECT

0 commit comments

Comments
 (0)