Skip to content

Commit 67a7ab7

Browse files
committed
add event notification integration points
1 parent 81d56cc commit 67a7ab7

File tree

7 files changed

+246
-10
lines changed

7 files changed

+246
-10
lines changed

executors/src/eoa/events.rs

Lines changed: 119 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,119 @@
1+
use std::fmt::Display;
2+
3+
use engine_core::error::EngineError;
4+
use serde::{Deserialize, Serialize};
5+
use twmq::job::RequeuePosition;
6+
7+
use crate::{
8+
eoa::{
9+
store::{SubmittedTransaction, TransactionData},
10+
worker::ConfirmedTransactionWithRichReceipt,
11+
},
12+
webhook::envelope::{
13+
BareWebhookNotificationEnvelope, SerializableNackData, SerializableSuccessData, StageEvent,
14+
},
15+
};
16+
17+
pub struct EoaExecutorEvent {
18+
pub transaction_data: TransactionData,
19+
}
20+
21+
#[derive(Debug, Clone, Serialize, Deserialize)]
22+
pub struct EoaSendAttemptNackData {
23+
pub nonce: u64,
24+
pub error: EngineError,
25+
}
26+
27+
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
28+
pub enum EoaExecutorStage {
29+
SendAttempt,
30+
SendAttemptNack,
31+
TransactionReplaced,
32+
TransactionConfirmed,
33+
}
34+
35+
impl Display for EoaExecutorStage {
36+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
37+
match self {
38+
EoaExecutorStage::SendAttempt => write!(f, "send_attempt"),
39+
EoaExecutorStage::SendAttemptNack => write!(f, "send_attempt_nack"),
40+
EoaExecutorStage::TransactionReplaced => write!(f, "transaction_replaced"),
41+
EoaExecutorStage::TransactionConfirmed => write!(f, "transaction_confirmed"),
42+
}
43+
}
44+
}
45+
46+
const EXECUTOR_NAME: &str = "eoa";
47+
48+
impl EoaExecutorEvent {
49+
pub fn send_attempt_success_envelopes(
50+
&self,
51+
submitted_transaction: SubmittedTransaction,
52+
) -> BareWebhookNotificationEnvelope<SerializableSuccessData<SubmittedTransaction>> {
53+
BareWebhookNotificationEnvelope {
54+
transaction_id: self.transaction_data.transaction_id.clone(),
55+
executor_name: EXECUTOR_NAME.to_string(),
56+
stage_name: EoaExecutorStage::SendAttempt.to_string(),
57+
event_type: StageEvent::Success,
58+
payload: SerializableSuccessData {
59+
result: submitted_transaction.clone(),
60+
},
61+
}
62+
}
63+
64+
pub fn send_attempt_nack_envelopes(
65+
&self,
66+
nonce: u64,
67+
error: EngineError,
68+
attempt_number: u32,
69+
) -> BareWebhookNotificationEnvelope<SerializableNackData<EoaSendAttemptNackData>> {
70+
BareWebhookNotificationEnvelope {
71+
transaction_id: self.transaction_data.transaction_id.clone(),
72+
executor_name: EXECUTOR_NAME.to_string(),
73+
stage_name: EoaExecutorStage::SendAttemptNack.to_string(),
74+
event_type: StageEvent::Nack,
75+
payload: SerializableNackData {
76+
error: EoaSendAttemptNackData {
77+
nonce,
78+
error: error.clone(),
79+
},
80+
delay_ms: None,
81+
position: RequeuePosition::Last,
82+
attempt_number,
83+
max_attempts: None,
84+
next_retry_at: None,
85+
},
86+
}
87+
}
88+
89+
pub fn transaction_replaced_envelopes(
90+
&self,
91+
replaced_transaction: SubmittedTransaction,
92+
) -> BareWebhookNotificationEnvelope<SerializableSuccessData<SubmittedTransaction>> {
93+
BareWebhookNotificationEnvelope {
94+
transaction_id: self.transaction_data.transaction_id.clone(),
95+
executor_name: EXECUTOR_NAME.to_string(),
96+
stage_name: EoaExecutorStage::TransactionReplaced.to_string(),
97+
event_type: StageEvent::Success,
98+
payload: SerializableSuccessData {
99+
result: replaced_transaction.clone(),
100+
},
101+
}
102+
}
103+
104+
pub fn transaction_confirmed_envelopes(
105+
&self,
106+
confirmed_transaction: ConfirmedTransactionWithRichReceipt,
107+
) -> BareWebhookNotificationEnvelope<SerializableSuccessData<ConfirmedTransactionWithRichReceipt>>
108+
{
109+
BareWebhookNotificationEnvelope {
110+
transaction_id: self.transaction_data.transaction_id.clone(),
111+
executor_name: EXECUTOR_NAME.to_string(),
112+
stage_name: EoaExecutorStage::TransactionConfirmed.to_string(),
113+
event_type: StageEvent::Success,
114+
payload: SerializableSuccessData {
115+
result: confirmed_transaction.clone(),
116+
},
117+
}
118+
}
119+
}

executors/src/eoa/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
pub mod error_classifier;
2+
pub mod events;
23
pub mod store;
34
pub mod worker;
45

executors/src/eoa/store/submitted.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,14 @@
11
use std::collections::{BTreeMap, HashMap, HashSet};
22

3+
use serde::{Deserialize, Serialize};
34
use twmq::redis::{AsyncCommands, Pipeline, aio::ConnectionManager};
45

56
use crate::eoa::store::{
67
ConfirmedTransaction, EoaExecutorStoreKeys, TransactionStoreError, atomic::SafeRedisTransaction,
78
};
89

9-
#[derive(Debug, Clone)]
10+
#[derive(Debug, Clone, Serialize, Deserialize)]
11+
#[serde(rename_all = "camelCase")]
1012
pub struct SubmittedTransaction {
1113
pub nonce: u64,
1214
pub hash: String,

executors/src/eoa/worker.rs

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ use hex;
2121
use serde::{Deserialize, Serialize};
2222
use std::{sync::Arc, time::Duration};
2323
use tokio::time::sleep;
24+
use twmq::Queue;
2425
use twmq::redis::AsyncCommands;
2526
use twmq::redis::aio::ConnectionManager;
2627
use twmq::{
@@ -35,6 +36,7 @@ use crate::eoa::store::{
3536
EoaExecutorStore, EoaExecutorStoreKeys, EoaHealth, EoaTransactionRequest, ReplacedTransaction,
3637
SubmittedTransaction, TransactionData, TransactionStoreError,
3738
};
39+
use crate::webhook::WebhookJobHandler;
3840

3941
// ========== SPEC-COMPLIANT CONSTANTS ==========
4042
const MAX_INFLIGHT_PER_EOA: u64 = 100; // Default from spec
@@ -236,8 +238,9 @@ fn is_retryable_rpc_error(kind: &RpcErrorKind) -> bool {
236238
}
237239
}
238240

239-
#[derive(Debug, Clone)]
240-
struct ConfirmedTransactionWithRichReceipt {
241+
#[derive(Debug, Clone, Serialize, Deserialize)]
242+
#[serde(rename_all = "camelCase")]
243+
pub struct ConfirmedTransactionWithRichReceipt {
241244
pub nonce: u64,
242245
pub hash: String,
243246
pub transaction_id: String,
@@ -273,6 +276,8 @@ where
273276
CS: ChainService + Send + Sync + 'static,
274277
{
275278
pub chain_service: Arc<CS>,
279+
pub webhook_queue: Arc<Queue<WebhookJobHandler>>,
280+
276281
pub redis: ConnectionManager,
277282
pub namespace: Option<String>,
278283

executors/src/webhook/envelope.rs

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,35 @@ pub struct WebhookNotificationEnvelope<T> {
4141
pub delivery_target_url: Option<String>,
4242
}
4343

44+
#[derive(Serialize, Deserialize, Debug, Clone)]
45+
#[serde(rename_all = "camelCase")]
46+
pub struct BareWebhookNotificationEnvelope<T: Clone> {
47+
pub transaction_id: String,
48+
pub event_type: StageEvent,
49+
pub executor_name: String,
50+
pub stage_name: String,
51+
pub payload: T,
52+
}
53+
54+
impl<T: Serialize + Clone> BareWebhookNotificationEnvelope<T> {
55+
pub fn into_webhook_notification_envelope(
56+
self,
57+
timestamp: u64,
58+
delivery_target_url: String,
59+
) -> WebhookNotificationEnvelope<T> {
60+
WebhookNotificationEnvelope {
61+
notification_id: Uuid::new_v4().to_string(),
62+
transaction_id: self.transaction_id,
63+
timestamp,
64+
executor_name: self.executor_name,
65+
stage_name: self.stage_name,
66+
event_type: self.event_type,
67+
payload: self.payload,
68+
delivery_target_url: Some(delivery_target_url),
69+
}
70+
}
71+
}
72+
4473
// --- Serializable Hook Data Wrappers ---
4574
// These wrap the hook data to make them serializable (removing lifetimes)
4675
#[derive(Serialize, Deserialize, Debug, Clone)]

executors/src/webhook/mod.rs

Lines changed: 84 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,20 @@
11
use std::collections::HashMap;
2+
use std::env;
23
use std::sync::Arc;
34
use std::time::{Duration, SystemTime, UNIX_EPOCH};
45

6+
use engine_core::execution_options::WebhookOptions;
57
use hex;
68
use hmac::{Hmac, Mac};
79
use reqwest::header::{HeaderMap, HeaderName, HeaderValue};
810
use serde::{Deserialize, Serialize};
911
use twmq::error::TwmqError;
1012
use twmq::hooks::TransactionContext;
1113
use twmq::job::{BorrowedJob, JobError, JobResult, RequeuePosition, ToJobResult};
12-
use twmq::{DurableExecution, FailHookData, NackHookData, SuccessHookData, UserCancellable};
14+
use twmq::{DurableExecution, FailHookData, NackHookData, Queue, SuccessHookData, UserCancellable};
15+
use uuid::Uuid;
16+
17+
use crate::webhook::envelope::{BareWebhookNotificationEnvelope, WebhookNotificationEnvelope};
1318

1419
pub mod envelope;
1520

@@ -113,7 +118,10 @@ impl DurableExecution for WebhookJobHandler {
113118
type JobData = WebhookJobPayload;
114119

115120
#[tracing::instrument(skip_all, fields(queue = "webhook", job_id = job.job.id))]
116-
async fn process(&self, job: &BorrowedJob<Self::JobData>) -> JobResult<Self::Output, Self::ErrorData> {
121+
async fn process(
122+
&self,
123+
job: &BorrowedJob<Self::JobData>,
124+
) -> JobResult<Self::Output, Self::ErrorData> {
117125
let payload = &job.job.data;
118126
let mut request_headers = HeaderMap::new();
119127

@@ -423,3 +431,77 @@ impl DurableExecution for WebhookJobHandler {
423431
);
424432
}
425433
}
434+
435+
pub fn queue_webhook_envelopes<T: Serialize + Clone>(
436+
envelope: BareWebhookNotificationEnvelope<T>,
437+
webhook_options: Vec<WebhookOptions>,
438+
tx: &mut TransactionContext<'_>,
439+
webhook_queue: Arc<Queue<WebhookJobHandler>>,
440+
) -> Result<(), TwmqError> {
441+
let now = chrono::Utc::now().timestamp().min(0) as u64;
442+
let serialised_webhook_envelopes =
443+
webhook_options
444+
.iter()
445+
.map(|webhook_option| {
446+
let webhook_notification_envelope = envelope
447+
.clone()
448+
.into_webhook_notification_envelope(now, webhook_option.url.clone());
449+
let serialised_envelope = serde_json::to_string(&webhook_notification_envelope)?;
450+
Ok((
451+
serialised_envelope,
452+
webhook_notification_envelope,
453+
webhook_option.clone(),
454+
))
455+
})
456+
.collect::<Result<
457+
Vec<(String, WebhookNotificationEnvelope<T>, WebhookOptions)>,
458+
serde_json::Error,
459+
>>()?;
460+
461+
let webhook_payloads = serialised_webhook_envelopes
462+
.into_iter()
463+
.map(
464+
|(serialised_envelope, webhook_notification_envelope, webhook_option)| {
465+
let payload = WebhookJobPayload {
466+
url: webhook_option.url,
467+
body: serialised_envelope,
468+
headers: Some(
469+
[
470+
("Content-Type".to_string(), "application/json".to_string()),
471+
(
472+
"User-Agent".to_string(),
473+
format!("{}/{}", envelope.executor_name, envelope.stage_name),
474+
),
475+
]
476+
.into_iter()
477+
.collect(),
478+
),
479+
hmac_secret: webhook_option.secret, // TODO: Add HMAC support if needed
480+
http_method: Some("POST".to_string()),
481+
};
482+
return (payload, webhook_notification_envelope);
483+
},
484+
)
485+
.collect::<Vec<_>>();
486+
487+
for (payload, webhook_notification_envelope) in webhook_payloads {
488+
let mut webhook_job = webhook_queue.clone().job(payload);
489+
webhook_job.options.id = format!(
490+
"{}_{}_webhook",
491+
webhook_notification_envelope.transaction_id,
492+
webhook_notification_envelope.notification_id
493+
);
494+
495+
tx.queue_job(webhook_job)?;
496+
tracing::info!(
497+
transaction_id = %webhook_notification_envelope.transaction_id,
498+
executor = %webhook_notification_envelope.executor_name,
499+
stage = %webhook_notification_envelope.stage_name,
500+
event = ?webhook_notification_envelope.event_type,
501+
notification_id = %webhook_notification_envelope.notification_id,
502+
"Queued webhook notification"
503+
);
504+
}
505+
506+
Ok(())
507+
}

server/src/queue/manager.rs

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ use alloy::transports::http::reqwest;
55
use engine_core::error::EngineError;
66
use engine_executors::{
77
eip7702_executor::{confirm::Eip7702ConfirmationHandler, send::Eip7702SendHandler},
8-
eoa::{EoaExecutorStore, EoaExecutorWorker},
8+
eoa::EoaExecutorWorker,
99
external_bundler::{
1010
confirm::UserOpConfirmationHandler,
1111
deployment::{RedisDeploymentCache, RedisDeploymentLock},
@@ -16,10 +16,7 @@ use engine_executors::{
1616
};
1717
use twmq::{Queue, queue::QueueOptions, shutdown::ShutdownHandle};
1818

19-
use crate::{
20-
chains::ThirdwebChainService,
21-
config::{QueueConfig, RedisConfig},
22-
};
19+
use crate::{chains::ThirdwebChainService, config::QueueConfig};
2320

2421
pub struct QueueManager {
2522
pub webhook_queue: Arc<Queue<WebhookJobHandler>>,
@@ -212,6 +209,7 @@ impl QueueManager {
212209
let eoa_executor_handler = EoaExecutorWorker {
213210
chain_service: chain_service.clone(),
214211
eoa_signer: eoa_signer.clone(),
212+
webhook_queue: webhook_queue.clone(),
215213
namespace: queue_config.execution_namespace.clone(),
216214
redis: redis_client.get_connection_manager().await?,
217215
max_inflight: 100,

0 commit comments

Comments
 (0)