Skip to content

Commit e310c94

Browse files
danwtclaude
andauthored
claude: feat(relayer): add deposit-force endpoint (#460)
Add POST /kaspa/deposit-force endpoint to manually process Kaspa deposits that fell outside the normal lookback window due to relayer DB being wiped or other issues. The endpoint: - Accepts a kaspa_tx ID - Fetches the transaction from Kaspa REST API - Queues it for processing via the existing deposit pipeline Co-authored-by: Claude Opus 4.5 <noreply@anthropic.com>
1 parent 1da10c9 commit e310c94

File tree

6 files changed

+202
-6
lines changed

6 files changed

+202
-6
lines changed

rust/main/agents/relayer/src/relayer.rs

Lines changed: 33 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -585,6 +585,27 @@ impl BaseAgent for Relayer {
585585
.map(|(key, origin)| (key.id(), origin.prover_sync.clone()))
586586
.collect();
587587

588+
// Build deposit-force config if available (requires sender AND REST URL)
589+
let deposit_force = if let Some(dym_args) = self.dymension_kaspa_args.as_ref() {
590+
let sender_guard = dym_args.force_sender.read().unwrap();
591+
let rest_url = dym_args
592+
.kas_provider
593+
.conf()
594+
.kaspa_urls_rest
595+
.first()
596+
.map(|u| u.to_string());
597+
598+
match (sender_guard.as_ref(), rest_url) {
599+
(Some(sender), Some(url)) => Some(relayer_server::DepositForceConfig {
600+
sender: sender.clone(),
601+
rest_api_url: url,
602+
}),
603+
_ => None,
604+
}
605+
} else {
606+
None
607+
};
608+
588609
let relayer_router = relayer_server::Server::new(self.destinations.len())
589610
.with_op_retry(sender.clone())
590611
.with_message_queue(prep_queues)
@@ -596,7 +617,8 @@ impl BaseAgent for Relayer {
596617
self.dymension_kaspa_args
597618
.as_ref()
598619
.and_then(|dym_args| dym_args.kas_provider.kaspa_db().cloned()),
599-
) // Set kaspa_db to server_builder from dymension_args provider if available
620+
)
621+
.with_deposit_force(deposit_force)
600622
.router();
601623

602624
let server = self
@@ -1126,6 +1148,8 @@ impl Relayer {
11261148
struct DymensionKaspaArgs {
11271149
kas_provider: Box<KaspaProvider>,
11281150
dym_mailbox: Arc<CosmosNativeMailbox>,
1151+
/// Sender for force-deposit requests, populated when Foo is created
1152+
force_sender: Arc<std::sync::RwLock<Option<hyperlane_base::kas_hack::DepositForceSender>>>,
11291153
}
11301154

11311155
// Manual Debug since KaspaMailbox now has a trait object
@@ -1135,6 +1159,7 @@ impl std::fmt::Debug for DymensionKaspaArgs {
11351159
.field("kas_provider", &self.kas_provider)
11361160
.field("kas_mailbox", &"KaspaMailbox")
11371161
.field("dym_mailbox", &self.dym_mailbox)
1162+
.field("force_sender", &"<RwLock>")
11381163
.finish()
11391164
}
11401165
}
@@ -1197,6 +1222,7 @@ impl Relayer {
11971222
Ok(Some(DymensionKaspaArgs {
11981223
kas_provider,
11991224
dym_mailbox,
1225+
force_sender: Arc::new(std::sync::RwLock::new(None)),
12001226
}))
12011227
}
12021228

@@ -1218,6 +1244,12 @@ impl Relayer {
12181244

12191245
let b = KaspaBridgeFoo::new(kas_provider.clone(), hub_mailbox.clone(), metadata_getter);
12201246

1247+
// Store the force sender for use by the server endpoint
1248+
{
1249+
let mut sender_guard = args.force_sender.write().unwrap();
1250+
*sender_guard = Some(b.force_sender());
1251+
}
1252+
12211253
// sync relayer before starting other tasks
12221254
b.sync_hub_if_needed().await.unwrap();
12231255

Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
use axum::{extract::State, http::StatusCode, Json};
2+
use dymension_kaspa::dym_kas_core::api::client::Deposit;
3+
use serde::{Deserialize, Serialize};
4+
5+
use hyperlane_base::server::utils::{
6+
ServerErrorBody, ServerErrorResponse, ServerResult, ServerSuccessResponse,
7+
};
8+
9+
use super::ServerState;
10+
11+
#[derive(Clone, Debug, Deserialize)]
12+
pub struct RequestBody {
13+
pub kaspa_tx: String,
14+
}
15+
16+
#[derive(Clone, Debug, Serialize)]
17+
pub struct ResponseBody {
18+
pub message: String,
19+
pub deposit_id: String,
20+
}
21+
22+
/// Force processing of a Kaspa deposit by fetching it from the REST API.
23+
/// Useful for deposits that fell outside the normal lookback window.
24+
///
25+
/// POST /kaspa/deposit-force
26+
/// Body: { "kaspa_tx": "242b5987..." }
27+
pub async fn handler(
28+
State(state): State<ServerState>,
29+
Json(body): Json<RequestBody>,
30+
) -> ServerResult<ServerSuccessResponse<ResponseBody>> {
31+
let RequestBody { kaspa_tx } = body;
32+
tracing::info!(%kaspa_tx, "Received deposit force request");
33+
34+
let (sender, client) = match (&state.force_sender, &state.http_client) {
35+
(Some(s), Some(c)) => (s, c),
36+
_ => {
37+
return Err(ServerErrorResponse::new(
38+
StatusCode::SERVICE_UNAVAILABLE,
39+
ServerErrorBody {
40+
message: "Deposit force is not enabled on this relayer".to_string(),
41+
},
42+
));
43+
}
44+
};
45+
46+
let tx = client.get_tx_by_id(&kaspa_tx).await.map_err(|e| {
47+
tracing::error!(%kaspa_tx, error = ?e, "Failed to fetch transaction from Kaspa API");
48+
ServerErrorResponse::new(
49+
StatusCode::NOT_FOUND,
50+
ServerErrorBody {
51+
message: format!("Transaction not found or API error: {}", e),
52+
},
53+
)
54+
})?;
55+
56+
let deposit: Deposit = tx.try_into().map_err(|e: eyre::Error| {
57+
tracing::error!(%kaspa_tx, error = ?e, "Failed to convert transaction to deposit");
58+
ServerErrorResponse::new(
59+
StatusCode::BAD_REQUEST,
60+
ServerErrorBody {
61+
message: format!("Invalid deposit transaction: {}", e),
62+
},
63+
)
64+
})?;
65+
66+
let deposit_id = deposit.id.to_string();
67+
68+
sender.send(deposit).await.map_err(|e| {
69+
tracing::error!(%kaspa_tx, error = ?e, "Failed to send deposit to processing channel");
70+
ServerErrorResponse::new(
71+
StatusCode::INTERNAL_SERVER_ERROR,
72+
ServerErrorBody {
73+
message: "Failed to queue deposit for processing".to_string(),
74+
},
75+
)
76+
})?;
77+
78+
tracing::info!(%kaspa_tx, %deposit_id, "Deposit queued for processing");
79+
80+
Ok(ServerSuccessResponse::new(ResponseBody {
81+
message: "Deposit queued for processing".to_string(),
82+
deposit_id,
83+
}))
84+
}

rust/main/agents/relayer/src/server/kaspa/mod.rs

Lines changed: 35 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,50 @@
11
use std::sync::Arc;
22

3-
use axum::{routing::get, Router};
4-
use derive_new::new;
3+
use axum::{
4+
routing::{get, post},
5+
Router,
6+
};
7+
use dymension_kaspa::dym_kas_core::api::{base::RateLimitConfig, client::HttpClient};
8+
use hyperlane_base::kas_hack::DepositForceSender;
59
use hyperlane_core::KaspaDb;
610
use tower_http::cors::{Any, CorsLayer};
711

12+
pub mod deposit_force;
813
pub mod list_deposits;
914
pub mod list_withdrawals;
1015

11-
#[derive(Clone, Debug, new)]
16+
#[derive(Clone)]
1217
pub struct ServerState {
1318
pub kaspa_db: Arc<dyn KaspaDb>,
19+
pub force_sender: Option<DepositForceSender>,
20+
pub http_client: Option<HttpClient>,
21+
}
22+
23+
impl std::fmt::Debug for ServerState {
24+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
25+
f.debug_struct("ServerState")
26+
.field("kaspa_db", &"<dyn KaspaDb>")
27+
.field("force_sender", &self.force_sender.is_some())
28+
.field("http_client", &self.http_client.is_some())
29+
.finish()
30+
}
1431
}
1532

1633
impl ServerState {
34+
pub fn new(kaspa_db: Arc<dyn KaspaDb>) -> Self {
35+
Self {
36+
kaspa_db,
37+
force_sender: None,
38+
http_client: None,
39+
}
40+
}
41+
42+
pub fn with_deposit_force(mut self, sender: DepositForceSender, rest_api_url: String) -> Self {
43+
self.force_sender = Some(sender);
44+
self.http_client = Some(HttpClient::new(rest_api_url, RateLimitConfig::default()));
45+
self
46+
}
47+
1748
pub fn router(self) -> Router {
1849
let cors = CorsLayer::new()
1950
.allow_origin(Any)
@@ -23,6 +54,7 @@ impl ServerState {
2354
Router::new()
2455
.route("/kaspa/deposit", get(list_deposits::handler))
2556
.route("/kaspa/withdrawal", get(list_withdrawals::handler))
57+
.route("/kaspa/deposit-force", post(deposit_force::handler))
2658
.layer(cors)
2759
.with_state(self)
2860
}

rust/main/agents/relayer/src/server/mod.rs

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ use std::sync::Arc;
44

55
use axum::Router;
66
use derive_new::new;
7+
use hyperlane_base::kas_hack::DepositForceSender;
78
use hyperlane_core::HyperlaneDomain;
89
use tokio::sync::broadcast::Sender;
910

@@ -27,6 +28,12 @@ pub mod messages;
2728
pub mod operations;
2829
pub mod proofs;
2930

31+
/// Config for deposit-force endpoint
32+
pub struct DepositForceConfig {
33+
pub sender: DepositForceSender,
34+
pub rest_api_url: String,
35+
}
36+
3037
#[derive(new)]
3138
pub struct Server {
3239
destination_chains: usize,
@@ -45,6 +52,8 @@ pub struct Server {
4552
prover_syncs: Option<HashMap<u32, Arc<RwLock<MerkleTreeBuilder>>>>,
4653
#[new(default)]
4754
kaspa_db: Option<Arc<dyn KaspaDb>>,
55+
#[new(default)]
56+
deposit_force: Option<DepositForceConfig>,
4857
}
4958

5059
impl Server {
@@ -92,6 +101,11 @@ impl Server {
92101
self
93102
}
94103

104+
pub fn with_deposit_force(mut self, config: Option<DepositForceConfig>) -> Self {
105+
self.deposit_force = config;
106+
self
107+
}
108+
95109
// return a custom router that can be used in combination with other routers
96110
pub fn router(self) -> Router {
97111
let mut router = Router::new();
@@ -127,7 +141,13 @@ impl Server {
127141
router = router.merge(proofs::ServerState::new(prover_syncs).router());
128142
}
129143
if let Some(kaspa_db) = self.kaspa_db {
130-
router = router.merge(kaspa::ServerState::new(kaspa_db).router());
144+
let kaspa_state = kaspa::ServerState::new(kaspa_db);
145+
let kaspa_state = if let Some(df) = self.deposit_force {
146+
kaspa_state.with_deposit_force(df.sender, df.rest_api_url)
147+
} else {
148+
kaspa_state
149+
};
150+
router = router.merge(kaspa_state.router());
131151
}
132152

133153
let expose_environment_variable_endpoint =

rust/main/hyperlane-base/src/kas_hack/logic_loop.rs

Lines changed: 28 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,11 @@ use hyperlane_core::{
1313
};
1414
use hyperlane_cosmos::native::{h512_to_cosmos_hash, CosmosNativeMailbox};
1515
use std::{collections::HashSet, fmt::Debug, hash::Hash, sync::Arc, time::Duration};
16-
use tokio::{sync::Mutex, task::JoinHandle, time};
16+
use tokio::{
17+
sync::{mpsc, Mutex},
18+
task::JoinHandle,
19+
time,
20+
};
1721
use tokio_metrics::TaskMonitor;
1822
use tracing::{debug, error, info, info_span, Instrument};
1923

@@ -23,6 +27,9 @@ use super::{
2327
};
2428
use dymension_kaspa::conf::RelayerDepositTimings;
2529

30+
pub type DepositForceSender = mpsc::Sender<Deposit>;
31+
pub type DepositForceReceiver = mpsc::Receiver<Deposit>;
32+
2633
enum DepositRelayResult {
2734
Success {
2835
deposit_id: String,
@@ -51,6 +58,8 @@ pub struct Foo<C: MetadataConstructor> {
5158
metadata_constructor: C,
5259
deposit_tracker: Mutex<DepositTracker>,
5360
config: RelayerDepositTimings,
61+
force_sender: DepositForceSender,
62+
force_receiver: Mutex<DepositForceReceiver>,
5463
}
5564

5665
impl<C: MetadataConstructor> Foo<C>
@@ -64,15 +73,23 @@ where
6473
) -> Self {
6574
// Get config from provider, or use defaults if not available
6675
let config = provider.must_relayer_stuff().deposit_timings.clone();
76+
let (force_sender, force_receiver) = mpsc::channel(100);
6777
Self {
6878
provider,
6979
hub_mailbox,
7080
metadata_constructor,
7181
deposit_tracker: Mutex::new(DepositTracker::new()),
7282
config,
83+
force_sender,
84+
force_receiver: Mutex::new(force_receiver),
7385
}
7486
}
7587

88+
/// Get a sender for submitting deposits to be recovered/reprocessed
89+
pub fn force_sender(&self) -> DepositForceSender {
90+
self.force_sender.clone()
91+
}
92+
7693
/// Run deposit and progress indication loops
7794
pub fn run_loops(self, task_monitor: TaskMonitor) -> JoinHandle<()> {
7895
let foo = Arc::new(self);
@@ -122,6 +139,7 @@ where
122139

123140
loop {
124141
self.process_deposit_queue().await;
142+
self.process_force_requests().await;
125143

126144
let now = std::time::SystemTime::now()
127145
.duration_since(std::time::UNIX_EPOCH)
@@ -189,6 +207,15 @@ where
189207
}
190208
}
191209

210+
/// Process deposits submitted via the force channel
211+
async fn process_force_requests(&self) {
212+
let mut receiver = self.force_receiver.lock().await;
213+
while let Ok(deposit) = receiver.try_recv() {
214+
info!(deposit_id = %deposit.id, "Processing forced deposit");
215+
self.queue_new_deposits(vec![deposit]).await;
216+
}
217+
}
218+
192219
/// Process the retry queue for failed deposit operations
193220
async fn process_deposit_queue(&self) {
194221
loop {

rust/main/hyperlane-base/src/kas_hack/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,5 +6,6 @@ pub mod migration;
66
pub mod sync;
77

88
pub use kaspa_db::KaspaRocksDB;
9+
pub use logic_loop::DepositForceSender;
910
pub use migration::run_migration_with_sync;
1011
pub use sync::{ensure_hub_synced, format_ad_hoc_signatures};

0 commit comments

Comments
 (0)