Skip to content

Commit 474a5d5

Browse files
author
Devdutt Shenoi
committed
refactor: merge init and initialize
1 parent 7a1aeda commit 474a5d5

File tree

3 files changed

+190
-201
lines changed

3 files changed

+190
-201
lines changed

server/src/handlers/http/modal/ingest_server.rs

Lines changed: 57 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,7 @@ impl ParseableServer for IngestServer {
8484
.service(Server::get_ingest_otel_factory());
8585
}
8686

87-
/// implement the init method will just invoke the initialize method
87+
/// configure the server and start an instance to ingest data
8888
async fn init(&self) -> anyhow::Result<()> {
8989
if CONFIG.get_storage_mode_string() == "Local drive" {
9090
return Err(anyhow::Error::msg(
@@ -103,7 +103,62 @@ impl ParseableServer for IngestServer {
103103
rbac::map::init(&metadata);
104104
// set the info in the global metadata
105105
metadata.set_global();
106-
self.initialize().await
106+
107+
// ! Undefined and Untested behaviour
108+
if let Some(cache_manager) = LocalCacheManager::global() {
109+
cache_manager
110+
.validate(CONFIG.parseable.local_cache_size)
111+
.await?;
112+
};
113+
114+
let prometheus = metrics::build_metrics_handler();
115+
CONFIG.storage().register_store_metrics(&prometheus);
116+
117+
migration::run_migration(&CONFIG).await?;
118+
119+
let (localsync_handler, mut localsync_outbox, localsync_inbox) =
120+
sync::run_local_sync().await;
121+
let (mut remote_sync_handler, mut remote_sync_outbox, mut remote_sync_inbox) =
122+
sync::object_store_sync().await;
123+
124+
tokio::spawn(airplane::server());
125+
126+
// set the ingestor metadata
127+
self.set_ingestor_metadata().await?;
128+
129+
// Ingestors shouldn't have to deal with OpenId auth flow
130+
let app = self.start(prometheus, None);
131+
132+
tokio::pin!(app);
133+
loop {
134+
tokio::select! {
135+
e = &mut app => {
136+
// actix server finished .. stop other threads and stop the server
137+
remote_sync_inbox.send(()).unwrap_or(());
138+
localsync_inbox.send(()).unwrap_or(());
139+
if let Err(e) = localsync_handler.await {
140+
log::error!("Error joining remote_sync_handler: {:?}", e);
141+
}
142+
if let Err(e) = remote_sync_handler.await {
143+
log::error!("Error joining remote_sync_handler: {:?}", e);
144+
}
145+
return e
146+
},
147+
_ = &mut localsync_outbox => {
148+
// crash the server if localsync fails for any reason
149+
// panic!("Local Sync thread died. Server will fail now!")
150+
return Err(anyhow::Error::msg("Failed to sync local data to drive. Please restart the Parseable server.\n\nJoin us on Parseable Slack if the issue persists after restart : https://launchpass.com/parseable"))
151+
},
152+
_ = &mut remote_sync_outbox => {
153+
// remote_sync failed, this is recoverable by just starting remote_sync thread again
154+
if let Err(e) = remote_sync_handler.await {
155+
log::error!("Error joining remote_sync_handler: {:?}", e);
156+
}
157+
(remote_sync_handler, remote_sync_outbox, remote_sync_inbox) = sync::object_store_sync().await;
158+
}
159+
160+
};
161+
}
107162
}
108163
}
109164

@@ -346,62 +401,4 @@ impl IngestServer {
346401

347402
Ok(())
348403
}
349-
350-
async fn initialize(&self) -> anyhow::Result<()> {
351-
// ! Undefined and Untested behaviour
352-
if let Some(cache_manager) = LocalCacheManager::global() {
353-
cache_manager
354-
.validate(CONFIG.parseable.local_cache_size)
355-
.await?;
356-
};
357-
358-
let prometheus = metrics::build_metrics_handler();
359-
CONFIG.storage().register_store_metrics(&prometheus);
360-
361-
migration::run_migration(&CONFIG).await?;
362-
363-
let (localsync_handler, mut localsync_outbox, localsync_inbox) =
364-
sync::run_local_sync().await;
365-
let (mut remote_sync_handler, mut remote_sync_outbox, mut remote_sync_inbox) =
366-
sync::object_store_sync().await;
367-
368-
tokio::spawn(airplane::server());
369-
370-
// set the ingestor metadata
371-
self.set_ingestor_metadata().await?;
372-
373-
// Ingestors shouldn't have to deal with OpenId auth flow
374-
let app = self.start(prometheus, None);
375-
376-
tokio::pin!(app);
377-
loop {
378-
tokio::select! {
379-
e = &mut app => {
380-
// actix server finished .. stop other threads and stop the server
381-
remote_sync_inbox.send(()).unwrap_or(());
382-
localsync_inbox.send(()).unwrap_or(());
383-
if let Err(e) = localsync_handler.await {
384-
log::error!("Error joining remote_sync_handler: {:?}", e);
385-
}
386-
if let Err(e) = remote_sync_handler.await {
387-
log::error!("Error joining remote_sync_handler: {:?}", e);
388-
}
389-
return e
390-
},
391-
_ = &mut localsync_outbox => {
392-
// crash the server if localsync fails for any reason
393-
// panic!("Local Sync thread died. Server will fail now!")
394-
return Err(anyhow::Error::msg("Failed to sync local data to drive. Please restart the Parseable server.\n\nJoin us on Parseable Slack if the issue persists after restart : https://launchpass.com/parseable"))
395-
},
396-
_ = &mut remote_sync_outbox => {
397-
// remote_sync failed, this is recoverable by just starting remote_sync thread again
398-
if let Err(e) = remote_sync_handler.await {
399-
log::error!("Error joining remote_sync_handler: {:?}", e);
400-
}
401-
(remote_sync_handler, remote_sync_outbox, remote_sync_inbox) = sync::object_store_sync().await;
402-
}
403-
404-
};
405-
}
406-
}
407404
}

server/src/handlers/http/modal/query_server.rs

Lines changed: 66 additions & 70 deletions
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ impl ParseableServer for QueryServer {
6767
.service(Server::get_generated());
6868
}
6969

70-
/// implementation of init should just invoke a call to initialize
70+
/// initialize the server, run migrations as needed and start an instance
7171
async fn init(&self) -> anyhow::Result<()> {
7272
if CONFIG.get_storage_mode_string() == "Local drive" {
7373
return Err(anyhow::anyhow!(
@@ -84,7 +84,71 @@ impl ParseableServer for QueryServer {
8484
rbac::map::init(&metadata);
8585
// keep metadata info in mem
8686
metadata.set_global();
87-
self.initialize().await
87+
88+
let prometheus = metrics::build_metrics_handler();
89+
CONFIG.storage().register_store_metrics(&prometheus);
90+
91+
migration::run_migration(&CONFIG).await?;
92+
93+
//create internal stream at server start
94+
create_internal_stream_if_not_exists().await?;
95+
96+
FILTERS.load().await?;
97+
DASHBOARDS.load().await?;
98+
// track all parquet files already in the data directory
99+
storage::retention::load_retention_from_global();
100+
101+
// all internal data structures populated now.
102+
// start the analytics scheduler if enabled
103+
if CONFIG.parseable.send_analytics {
104+
analytics::init_analytics_scheduler()?;
105+
}
106+
107+
if matches!(init_cluster_metrics_schedular(), Ok(())) {
108+
log::info!("Cluster metrics scheduler started successfully");
109+
}
110+
if let Some(hot_tier_manager) = HotTierManager::global() {
111+
hot_tier_manager.put_internal_stream_hot_tier().await?;
112+
hot_tier_manager.download_from_s3()?;
113+
};
114+
let (localsync_handler, mut localsync_outbox, localsync_inbox) =
115+
sync::run_local_sync().await;
116+
let (mut remote_sync_handler, mut remote_sync_outbox, mut remote_sync_inbox) =
117+
sync::object_store_sync().await;
118+
119+
tokio::spawn(airplane::server());
120+
let app = self.start(prometheus, CONFIG.parseable.openid.clone());
121+
122+
tokio::pin!(app);
123+
loop {
124+
tokio::select! {
125+
e = &mut app => {
126+
// actix server finished .. stop other threads and stop the server
127+
remote_sync_inbox.send(()).unwrap_or(());
128+
localsync_inbox.send(()).unwrap_or(());
129+
if let Err(e) = localsync_handler.await {
130+
log::error!("Error joining localsync_handler: {:?}", e);
131+
}
132+
if let Err(e) = remote_sync_handler.await {
133+
log::error!("Error joining remote_sync_handler: {:?}", e);
134+
}
135+
return e
136+
},
137+
_ = &mut localsync_outbox => {
138+
// crash the server if localsync fails for any reason
139+
// panic!("Local Sync thread died. Server will fail now!")
140+
return Err(anyhow::Error::msg("Failed to sync local data to drive. Please restart the Parseable server.\n\nJoin us on Parseable Slack if the issue persists after restart : https://launchpass.com/parseable"))
141+
},
142+
_ = &mut remote_sync_outbox => {
143+
// remote_sync failed, this is recoverable by just starting remote_sync thread again
144+
if let Err(e) = remote_sync_handler.await {
145+
log::error!("Error joining remote_sync_handler: {:?}", e);
146+
}
147+
(remote_sync_handler, remote_sync_outbox, remote_sync_inbox) = sync::object_store_sync().await;
148+
}
149+
150+
};
151+
}
88152
}
89153
}
90154

@@ -327,72 +391,4 @@ impl QueryServer {
327391
),
328392
)
329393
}
330-
331-
/// initialize the server, run migrations as needed and start the server
332-
async fn initialize(&self) -> anyhow::Result<()> {
333-
let prometheus = metrics::build_metrics_handler();
334-
CONFIG.storage().register_store_metrics(&prometheus);
335-
336-
migration::run_migration(&CONFIG).await?;
337-
338-
//create internal stream at server start
339-
create_internal_stream_if_not_exists().await?;
340-
341-
FILTERS.load().await?;
342-
DASHBOARDS.load().await?;
343-
// track all parquet files already in the data directory
344-
storage::retention::load_retention_from_global();
345-
346-
// all internal data structures populated now.
347-
// start the analytics scheduler if enabled
348-
if CONFIG.parseable.send_analytics {
349-
analytics::init_analytics_scheduler()?;
350-
}
351-
352-
if matches!(init_cluster_metrics_schedular(), Ok(())) {
353-
log::info!("Cluster metrics scheduler started successfully");
354-
}
355-
if let Some(hot_tier_manager) = HotTierManager::global() {
356-
hot_tier_manager.put_internal_stream_hot_tier().await?;
357-
hot_tier_manager.download_from_s3()?;
358-
};
359-
let (localsync_handler, mut localsync_outbox, localsync_inbox) =
360-
sync::run_local_sync().await;
361-
let (mut remote_sync_handler, mut remote_sync_outbox, mut remote_sync_inbox) =
362-
sync::object_store_sync().await;
363-
364-
tokio::spawn(airplane::server());
365-
let app = self.start(prometheus, CONFIG.parseable.openid.clone());
366-
367-
tokio::pin!(app);
368-
loop {
369-
tokio::select! {
370-
e = &mut app => {
371-
// actix server finished .. stop other threads and stop the server
372-
remote_sync_inbox.send(()).unwrap_or(());
373-
localsync_inbox.send(()).unwrap_or(());
374-
if let Err(e) = localsync_handler.await {
375-
log::error!("Error joining localsync_handler: {:?}", e);
376-
}
377-
if let Err(e) = remote_sync_handler.await {
378-
log::error!("Error joining remote_sync_handler: {:?}", e);
379-
}
380-
return e
381-
},
382-
_ = &mut localsync_outbox => {
383-
// crash the server if localsync fails for any reason
384-
// panic!("Local Sync thread died. Server will fail now!")
385-
return Err(anyhow::Error::msg("Failed to sync local data to drive. Please restart the Parseable server.\n\nJoin us on Parseable Slack if the issue persists after restart : https://launchpass.com/parseable"))
386-
},
387-
_ = &mut remote_sync_outbox => {
388-
// remote_sync failed, this is recoverable by just starting remote_sync thread again
389-
if let Err(e) = remote_sync_handler.await {
390-
log::error!("Error joining remote_sync_handler: {:?}", e);
391-
}
392-
(remote_sync_handler, remote_sync_outbox, remote_sync_inbox) = sync::object_store_sync().await;
393-
}
394-
395-
};
396-
}
397-
}
398394
}

0 commit comments

Comments
 (0)