Skip to content

Commit 9a3c720

Browse files
authored
Merge pull request #46 from synonymdev/feat/bulk-upserts
feat: add bulk upsert methods for all database entities
2 parents 1ba4859 + 6f144f4 commit 9a3c720

File tree

6 files changed

+984
-123
lines changed

6 files changed

+984
-123
lines changed

src/lib.rs

Lines changed: 112 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -402,12 +402,66 @@ pub fn get_all_unique_tags() -> Result<Vec<String>, ActivityError> {
402402
}
403403

404404
#[uniffi::export]
405-
pub fn insert_closed_channel(channel: ClosedChannelDetails) -> Result<(), ActivityError> {
405+
pub fn upsert_closed_channel(channel: ClosedChannelDetails) -> Result<(), ActivityError> {
406406
let mut guard = get_activity_db()?;
407407
let db = guard.activity_db.as_mut().ok_or(ActivityError::ConnectionError {
408408
error_details: "Database not initialized. Call init_db first.".to_string()
409409
})?;
410-
db.insert_closed_channel(&channel)
410+
db.upsert_closed_channel(&channel)
411+
}
412+
413+
#[uniffi::export]
414+
pub fn upsert_closed_channels(channels: Vec<ClosedChannelDetails>) -> Result<(), ActivityError> {
415+
let mut guard = get_activity_db()?;
416+
let db = guard.activity_db.as_mut().ok_or(ActivityError::ConnectionError {
417+
error_details: "Database not initialized. Call init_db first.".to_string()
418+
})?;
419+
db.upsert_closed_channels(&channels)
420+
}
421+
422+
#[uniffi::export]
423+
pub fn upsert_onchain_activities(activities: Vec<OnchainActivity>) -> Result<(), ActivityError> {
424+
let mut guard = get_activity_db()?;
425+
let db = guard.activity_db.as_mut().ok_or(ActivityError::ConnectionError {
426+
error_details: "Database not initialized. Call init_db first.".to_string()
427+
})?;
428+
db.upsert_onchain_activities(&activities)
429+
}
430+
431+
#[uniffi::export]
432+
pub fn upsert_lightning_activities(activities: Vec<LightningActivity>) -> Result<(), ActivityError> {
433+
let mut guard = get_activity_db()?;
434+
let db = guard.activity_db.as_mut().ok_or(ActivityError::ConnectionError {
435+
error_details: "Database not initialized. Call init_db first.".to_string()
436+
})?;
437+
db.upsert_lightning_activities(&activities)
438+
}
439+
440+
#[uniffi::export]
441+
pub fn upsert_activities(activities: Vec<Activity>) -> Result<(), ActivityError> {
442+
let mut guard = get_activity_db()?;
443+
let db = guard.activity_db.as_mut().ok_or(ActivityError::ConnectionError {
444+
error_details: "Database not initialized. Call init_db first.".to_string()
445+
})?;
446+
447+
let mut onchain_list: Vec<OnchainActivity> = Vec::new();
448+
let mut lightning_list: Vec<LightningActivity> = Vec::new();
449+
450+
for activity in activities {
451+
match activity {
452+
Activity::Onchain(a) => onchain_list.push(a),
453+
Activity::Lightning(a) => lightning_list.push(a),
454+
}
455+
}
456+
457+
if !onchain_list.is_empty() {
458+
db.upsert_onchain_activities(&onchain_list)?;
459+
}
460+
if !lightning_list.is_empty() {
461+
db.upsert_lightning_activities(&lightning_list)?;
462+
}
463+
464+
Ok(())
411465
}
412466

413467
#[uniffi::export]
@@ -1320,6 +1374,62 @@ pub async fn blocktank_wipe_all() -> Result<(), BlocktankError> {
13201374
}).await.unwrap()
13211375
}
13221376

1377+
#[uniffi::export]
1378+
pub async fn upsert_info(info: IBtInfo) -> Result<(), BlocktankError> {
1379+
let rt = ensure_runtime();
1380+
rt.spawn(async move {
1381+
let cell = ASYNC_DB.get().ok_or(BlocktankError::ConnectionError {
1382+
error_details: "Database not initialized. Call init_db first.".to_string()
1383+
})?;
1384+
let guard = cell.lock().await;
1385+
let db = guard.blocktank_db.as_ref().ok_or(BlocktankError::ConnectionError {
1386+
error_details: "Database not initialized. Call init_db first.".to_string()
1387+
})?;
1388+
let external_info: rust_blocktank_client::IBtInfo = info.into();
1389+
db.upsert_info(&external_info).await
1390+
}).await.unwrap_or_else(|e| Err(BlocktankError::ConnectionError {
1391+
error_details: format!("Runtime error: {}", e)
1392+
}))
1393+
}
1394+
1395+
#[uniffi::export]
1396+
pub async fn upsert_orders(orders: Vec<IBtOrder>) -> Result<(), BlocktankError> {
1397+
let rt = ensure_runtime();
1398+
rt.spawn(async move {
1399+
let cell = ASYNC_DB.get().ok_or(BlocktankError::ConnectionError {
1400+
error_details: "Database not initialized. Call init_db first.".to_string()
1401+
})?;
1402+
let guard = cell.lock().await;
1403+
let db = guard.blocktank_db.as_ref().ok_or(BlocktankError::ConnectionError {
1404+
error_details: "Database not initialized. Call init_db first.".to_string()
1405+
})?;
1406+
1407+
let external_orders: Vec<rust_blocktank_client::IBtOrder> = orders.into_iter().map(|order| order.into()).collect();
1408+
db.upsert_orders(&external_orders).await
1409+
}).await.unwrap_or_else(|e| Err(BlocktankError::ConnectionError {
1410+
error_details: format!("Runtime error: {}", e)
1411+
}))
1412+
}
1413+
1414+
#[uniffi::export]
1415+
pub async fn upsert_cjit_entries(entries: Vec<ICJitEntry>) -> Result<(), BlocktankError> {
1416+
let rt = ensure_runtime();
1417+
rt.spawn(async move {
1418+
let cell = ASYNC_DB.get().ok_or(BlocktankError::ConnectionError {
1419+
error_details: "Database not initialized. Call init_db first.".to_string()
1420+
})?;
1421+
let guard = cell.lock().await;
1422+
let db = guard.blocktank_db.as_ref().ok_or(BlocktankError::ConnectionError {
1423+
error_details: "Database not initialized. Call init_db first.".to_string()
1424+
})?;
1425+
1426+
let external_entries: Vec<rust_blocktank_client::ICJitEntry> = entries.into_iter().map(|e| e.into()).collect();
1427+
db.upsert_cjit_entries(&external_entries).await
1428+
}).await.unwrap_or_else(|e| Err(BlocktankError::ConnectionError {
1429+
error_details: format!("Runtime error: {}", e)
1430+
}))
1431+
}
1432+
13231433
#[uniffi::export]
13241434
pub async fn wipe_all_databases() -> Result<String, DbError> {
13251435
let rt = ensure_runtime();

src/modules/activity/implementation.rs

Lines changed: 194 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,17 @@ const CREATE_CLOSED_CHANNELS_TABLE: &str = "
8080
channel_closure_reason TEXT NOT NULL
8181
)";
8282

83+
const UPSERT_CLOSED_CHANNEL_SQL: &str = "
84+
INSERT OR REPLACE INTO closed_channels (
85+
channel_id, counterparty_node_id, funding_txo_txid, funding_txo_index,
86+
channel_value_sats, closed_at, outbound_capacity_msat, inbound_capacity_msat,
87+
counterparty_unspendable_punishment_reserve, unspendable_punishment_reserve,
88+
forwarding_fee_proportional_millionths, forwarding_fee_base_msat,
89+
channel_name, channel_closure_reason
90+
) VALUES (
91+
?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13, ?14
92+
)";
93+
8394
const INDEX_STATEMENTS: &[&str] = &[
8495
// Activity indexes
8596
"CREATE INDEX IF NOT EXISTS idx_activities_type_timestamp ON activities(activity_type, timestamp DESC)",
@@ -379,6 +390,138 @@ impl ActivityDB {
379390
Ok(())
380391
}
381392

393+
pub fn upsert_onchain_activities(&mut self, activities: &[OnchainActivity]) -> Result<(), ActivityError> {
394+
if activities.is_empty() {
395+
return Ok(());
396+
}
397+
398+
let tx = self.conn.transaction().map_err(|e| ActivityError::DataError {
399+
error_details: format!("Failed to start transaction: {}", e),
400+
})?;
401+
402+
{
403+
let mut stmt_act = tx.prepare(
404+
"INSERT OR REPLACE INTO activities (id, activity_type, tx_type, timestamp) VALUES (?1, 'onchain', ?2, ?3)"
405+
).map_err(|e| ActivityError::DataError {
406+
error_details: format!("Failed to prepare activities statement: {}", e),
407+
})?;
408+
let mut stmt_onchain = tx.prepare(
409+
"INSERT OR REPLACE INTO onchain_activity (
410+
id, tx_id, address, confirmed, value, fee, fee_rate, is_boosted,
411+
boost_tx_ids, is_transfer, does_exist, confirm_timestamp,
412+
channel_id, transfer_tx_id
413+
) VALUES (
414+
?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13, ?14
415+
)"
416+
).map_err(|e| ActivityError::DataError {
417+
error_details: format!("Failed to prepare onchain statement: {}", e),
418+
})?;
419+
420+
for activity in activities {
421+
if activity.id.is_empty() {
422+
return Err(ActivityError::DataError {
423+
error_details: "Activity ID cannot be empty".to_string(),
424+
});
425+
}
426+
427+
stmt_act.execute((
428+
&activity.id,
429+
Self::payment_type_to_string(&activity.tx_type),
430+
activity.timestamp,
431+
)).map_err(|e| ActivityError::InsertError {
432+
error_details: format!("Failed to upsert activities: {}", e),
433+
})?;
434+
435+
let boost_tx_ids_str = activity.boost_tx_ids.join(",");
436+
stmt_onchain.execute((
437+
&activity.id,
438+
&activity.tx_id,
439+
&activity.address,
440+
activity.confirmed,
441+
activity.value,
442+
activity.fee,
443+
activity.fee_rate,
444+
activity.is_boosted,
445+
&boost_tx_ids_str,
446+
activity.is_transfer,
447+
activity.does_exist,
448+
activity.confirm_timestamp,
449+
&activity.channel_id,
450+
&activity.transfer_tx_id,
451+
)).map_err(|e| ActivityError::InsertError {
452+
error_details: format!("Failed to upsert onchain_activity: {}", e),
453+
})?;
454+
}
455+
}
456+
457+
tx.commit().map_err(|e| ActivityError::DataError {
458+
error_details: format!("Failed to commit transaction: {}", e),
459+
})?;
460+
461+
Ok(())
462+
}
463+
464+
pub fn upsert_lightning_activities(&mut self, activities: &[LightningActivity]) -> Result<(), ActivityError> {
465+
if activities.is_empty() {
466+
return Ok(());
467+
}
468+
469+
let tx = self.conn.transaction().map_err(|e| ActivityError::DataError {
470+
error_details: format!("Failed to start transaction: {}", e),
471+
})?;
472+
473+
{
474+
let mut stmt_act = tx.prepare(
475+
"INSERT OR REPLACE INTO activities (id, activity_type, tx_type, timestamp) VALUES (?1, 'lightning', ?2, ?3)"
476+
).map_err(|e| ActivityError::DataError {
477+
error_details: format!("Failed to prepare activities statement: {}", e),
478+
})?;
479+
let mut stmt_ln = tx.prepare(
480+
"INSERT OR REPLACE INTO lightning_activity (
481+
id, invoice, value, status, fee, message, preimage
482+
) VALUES (
483+
?1, ?2, ?3, ?4, ?5, ?6, ?7
484+
)"
485+
).map_err(|e| ActivityError::DataError {
486+
error_details: format!("Failed to prepare lightning statement: {}", e),
487+
})?;
488+
489+
for activity in activities {
490+
if activity.id.is_empty() {
491+
return Err(ActivityError::DataError {
492+
error_details: "Activity ID cannot be empty".to_string(),
493+
});
494+
}
495+
496+
stmt_act.execute((
497+
&activity.id,
498+
Self::payment_type_to_string(&activity.tx_type),
499+
activity.timestamp,
500+
)).map_err(|e| ActivityError::InsertError {
501+
error_details: format!("Failed to upsert activities: {}", e),
502+
})?;
503+
504+
stmt_ln.execute((
505+
&activity.id,
506+
&activity.invoice,
507+
activity.value,
508+
Self::payment_state_to_string(&activity.status),
509+
activity.fee,
510+
&activity.message,
511+
&activity.preimage,
512+
)).map_err(|e| ActivityError::InsertError {
513+
error_details: format!("Failed to upsert lightning_activity: {}", e),
514+
})?;
515+
}
516+
}
517+
518+
tx.commit().map_err(|e| ActivityError::DataError {
519+
error_details: format!("Failed to commit transaction: {}", e),
520+
})?;
521+
522+
Ok(())
523+
}
524+
382525
pub fn get_activities(
383526
&self,
384527
filter: Option<ActivityFilter>,
@@ -1019,26 +1162,15 @@ impl ActivityDB {
10191162
Ok(tags)
10201163
}
10211164

1022-
pub fn insert_closed_channel(&mut self, channel: &ClosedChannelDetails) -> Result<(), ActivityError> {
1165+
pub fn upsert_closed_channel(&mut self, channel: &ClosedChannelDetails) -> Result<(), ActivityError> {
10231166
if channel.channel_id.is_empty() {
10241167
return Err(ActivityError::DataError {
10251168
error_details: "Channel ID cannot be empty".to_string(),
10261169
});
10271170
}
10281171

1029-
let sql = "
1030-
INSERT INTO closed_channels (
1031-
channel_id, counterparty_node_id, funding_txo_txid, funding_txo_index,
1032-
channel_value_sats, closed_at, outbound_capacity_msat, inbound_capacity_msat,
1033-
counterparty_unspendable_punishment_reserve, unspendable_punishment_reserve,
1034-
forwarding_fee_proportional_millionths, forwarding_fee_base_msat,
1035-
channel_name, channel_closure_reason
1036-
) VALUES (
1037-
?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13, ?14
1038-
)";
1039-
10401172
self.conn.execute(
1041-
sql,
1173+
UPSERT_CLOSED_CHANNEL_SQL,
10421174
rusqlite::params![
10431175
&channel.channel_id,
10441176
&channel.counterparty_node_id,
@@ -1062,6 +1194,55 @@ impl ActivityDB {
10621194
Ok(())
10631195
}
10641196

1197+
pub fn upsert_closed_channels(&mut self, channels: &[ClosedChannelDetails]) -> Result<(), ActivityError> {
1198+
if channels.is_empty() {
1199+
return Ok(());
1200+
}
1201+
1202+
let tx = self.conn.transaction().map_err(|e| ActivityError::DataError {
1203+
error_details: format!("Failed to start transaction: {}", e),
1204+
})?;
1205+
1206+
{
1207+
let mut stmt = tx.prepare(UPSERT_CLOSED_CHANNEL_SQL).map_err(|e| ActivityError::DataError {
1208+
error_details: format!("Failed to prepare statement: {}", e),
1209+
})?;
1210+
1211+
for channel in channels {
1212+
if channel.channel_id.is_empty() {
1213+
return Err(ActivityError::DataError {
1214+
error_details: "Channel ID cannot be empty".to_string(),
1215+
});
1216+
}
1217+
1218+
stmt.execute(rusqlite::params![
1219+
&channel.channel_id,
1220+
&channel.counterparty_node_id,
1221+
&channel.funding_txo_txid,
1222+
channel.funding_txo_index as i64,
1223+
channel.channel_value_sats as i64,
1224+
channel.closed_at as i64,
1225+
channel.outbound_capacity_msat as i64,
1226+
channel.inbound_capacity_msat as i64,
1227+
channel.counterparty_unspendable_punishment_reserve as i64,
1228+
channel.unspendable_punishment_reserve as i64,
1229+
channel.forwarding_fee_proportional_millionths as i64,
1230+
channel.forwarding_fee_base_msat as i64,
1231+
&channel.channel_name,
1232+
&channel.channel_closure_reason,
1233+
]).map_err(|e| ActivityError::InsertError {
1234+
error_details: format!("Failed to insert closed channel {}: {}", channel.channel_id, e),
1235+
})?;
1236+
}
1237+
}
1238+
1239+
tx.commit().map_err(|e| ActivityError::DataError {
1240+
error_details: format!("Failed to commit transaction: {}", e),
1241+
})?;
1242+
1243+
Ok(())
1244+
}
1245+
10651246
pub fn get_closed_channel_by_id(&self, channel_id: &str) -> Result<Option<ClosedChannelDetails>, ActivityError> {
10661247
let sql = "
10671248
SELECT

0 commit comments

Comments
 (0)