Skip to content

Commit bfcf998

Browse files
committed
spike feat flag
1 parent 2d70ac0 commit bfcf998

File tree

3 files changed

+38
-24
lines changed

3 files changed

+38
-24
lines changed

bin/tips-ingress-rpc/src/main.rs

Lines changed: 24 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -73,18 +73,27 @@ async fn main() -> anyhow::Result<()> {
7373
let (audit_tx, audit_rx) = mpsc::unbounded_channel::<BundleEvent>();
7474
connect_audit_to_publisher(audit_rx, audit_publisher);
7575

76-
let user_op_properties_file = &config.user_operation_consumer_properties;
77-
78-
let mempool_engine = create_mempool_engine(
79-
user_op_properties_file,
80-
&config.user_operation_topic,
81-
&config.user_operation_consumer_group_id,
82-
None,
83-
)?;
84-
85-
let mempool_engine_handle = {
86-
let engine = mempool_engine.clone();
87-
tokio::spawn(async move { engine.run().await })
76+
let (mempool_engine, mempool_engine_handle) = if let Some(user_op_properties_file) =
77+
&config.user_operation_consumer_properties
78+
{
79+
let engine = create_mempool_engine(
80+
user_op_properties_file,
81+
&config.user_operation_topic,
82+
&config.user_operation_consumer_group_id,
83+
None,
84+
)?;
85+
86+
let handle = {
87+
let engine_clone = engine.clone();
88+
tokio::spawn(async move { engine_clone.run().await })
89+
};
90+
91+
(Some(engine), Some(handle))
92+
} else {
93+
info!(
94+
"User operation consumer properties not provided, skipping mempool engine initialization"
95+
);
96+
(None, None)
8897
};
8998

9099
let (builder_tx, _) =
@@ -126,7 +135,9 @@ async fn main() -> anyhow::Result<()> {
126135

127136
handle.stopped().await;
128137
health_handle.abort();
129-
mempool_engine_handle.abort();
138+
if let Some(engine_handle) = mempool_engine_handle {
139+
engine_handle.abort();
140+
}
130141

131142
Ok(())
132143
}

crates/ingress-rpc/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,7 @@ pub struct Config {
9090
long,
9191
env = "TIPS_INGRESS_KAFKA_USER_OPERATION_CONSUMER_PROPERTIES_FILE"
9292
)]
93-
pub user_operation_consumer_properties: String,
93+
pub user_operation_consumer_properties: Option<String>,
9494

9595
/// Consumer group id for user operation topic (set uniquely per deployment)
9696
#[arg(

crates/ingress-rpc/src/service.rs

Lines changed: 13 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,7 @@ pub struct IngressService<Q: MessageQueue, M: Mempool> {
7373
tx_submission_method: TxSubmissionMethod,
7474
bundle_queue_publisher: BundleQueuePublisher<Q>,
7575
user_op_queue_publisher: UserOpQueuePublisher<Q>,
76-
reputation_service: Arc<ReputationServiceImpl<M>>,
76+
reputation_service: Option<Arc<ReputationServiceImpl<M>>>,
7777
audit_channel: mpsc::UnboundedSender<BundleEvent>,
7878
send_transaction_default_lifetime_seconds: u64,
7979
metrics: Metrics,
@@ -93,9 +93,10 @@ impl<Q: MessageQueue, M: Mempool> IngressService<Q, M> {
9393
audit_channel: mpsc::UnboundedSender<BundleEvent>,
9494
builder_tx: broadcast::Sender<MeterBundleResponse>,
9595
builder_backrun_tx: broadcast::Sender<AcceptedBundle>,
96-
mempool_engine: Arc<MempoolEngine<M>>,
96+
mempool_engine: impl Into<Option<Arc<MempoolEngine<M>>>>,
9797
config: Config,
9898
) -> Self {
99+
let mempool_engine = mempool_engine.into();
99100
let mempool_provider = Arc::new(providers.mempool);
100101
let simulation_provider = Arc::new(providers.simulation);
101102
let raw_tx_forward_provider = providers.raw_tx_forward.map(Arc::new);
@@ -104,7 +105,9 @@ impl<Q: MessageQueue, M: Mempool> IngressService<Q, M> {
104105
config.validate_user_operation_timeout_ms,
105106
);
106107
let queue_connection = Arc::new(queue);
107-
let reputation_service = ReputationServiceImpl::new(mempool_engine.get_mempool());
108+
let reputation_service = mempool_engine
109+
.as_ref()
110+
.map(|engine| Arc::new(ReputationServiceImpl::new(engine.get_mempool())));
108111
Self {
109112
mempool_provider,
110113
simulation_provider,
@@ -119,7 +122,7 @@ impl<Q: MessageQueue, M: Mempool> IngressService<Q, M> {
119122
queue_connection.clone(),
120123
config.ingress_topic,
121124
),
122-
reputation_service: Arc::new(reputation_service),
125+
reputation_service,
123126
audit_channel,
124127
send_transaction_default_lifetime_seconds: config
125128
.send_transaction_default_lifetime_seconds,
@@ -384,11 +387,11 @@ impl<Q: MessageQueue + 'static, M: Mempool + 'static> IngressApiServer for Ingre
384387
chain_id: 1,
385388
};
386389

387-
// DO Nothing with reputation at the moment as this is scafolding
388-
let _ = self
389-
.reputation_service
390-
.get_reputation(&request.user_operation.sender())
391-
.await;
390+
if let Some(reputation_service) = &self.reputation_service {
391+
let _ = reputation_service
392+
.get_reputation(&request.user_operation.sender())
393+
.await;
394+
}
392395

393396
let user_op_hash = request.hash().map_err(|e| {
394397
warn!(message = "Failed to hash user operation", error = %e);
@@ -592,7 +595,7 @@ mod tests {
592595
ingress_topic: String::new(),
593596
audit_kafka_properties: String::new(),
594597
audit_topic: String::new(),
595-
user_operation_consumer_properties: String::new(),
598+
user_operation_consumer_properties: None,
596599
user_operation_consumer_group_id: "tips-user-operation".to_string(),
597600
log_level: String::from("info"),
598601
log_format: tips_core::logger::LogFormat::Pretty,

0 commit comments

Comments
 (0)