Skip to content

Commit 09695a0

Browse files
committed
fix: fixed publish in webservice
Signed-off-by: Jan Zachmann <50990105+JanZachmann@users.noreply.github.com>
1 parent f7371c5 commit 09695a0

File tree

3 files changed

+39
-32
lines changed

3 files changed

+39
-32
lines changed

Cargo.lock

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ license = "MIT OR Apache-2.0"
88
name = "omnect-device-service"
99
readme = "README.md"
1010
repository = "https://github.com/omnect/omnect-device-service.git"
11-
version = "0.41.8"
11+
version = "0.41.9"
1212

1313
[dependencies]
1414
actix-server = { version = "2.6", default-features = false }

src/web_service.rs

Lines changed: 37 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -5,19 +5,25 @@ use crate::{
55
use actix_server::ServerHandle;
66
use actix_web::{App, HttpResponse, HttpServer, http::StatusCode, web};
77
use anyhow::{Context, Result};
8-
use lazy_static::lazy_static;
98
use log::{debug, error, info};
109
use reqwest::{Client, header};
1110
use reqwest_middleware::{ClientBuilder, ClientWithMiddleware};
1211
use reqwest_retry::{RetryTransientMiddleware, policies::ExponentialBackoff};
1312
use serde::{Deserialize, Serialize};
1413
use serde_json::json;
15-
use std::{collections::HashMap, env, path::Path, str::FromStr, sync::LazyLock};
14+
use std::{
15+
collections::HashMap,
16+
env,
17+
path::Path,
18+
str::FromStr,
19+
sync::{LazyLock, OnceLock},
20+
};
1621
use tokio::{
1722
sync::{Mutex, mpsc, oneshot},
1823
time::Duration,
1924
};
2025

26+
static IS_WEBSERVICE_DISABLED: OnceLock<bool> = OnceLock::new();
2127
static PUBLISH_CHANNEL_MAP: LazyLock<Mutex<serde_json::Map<String, serde_json::Value>>> =
2228
LazyLock::new(|| Mutex::new(serde_json::Map::default()));
2329
static PUBLISH_STATUS_MAP: LazyLock<Mutex<serde_json::Map<String, serde_json::Value>>> =
@@ -28,7 +34,6 @@ static PUBLISH_CLIENT: LazyLock<Mutex<ClientWithMiddleware>> = LazyLock::new(||
2834
Mutex::new(
2935
ClientBuilder::new(
3036
Client::builder()
31-
.timeout(Duration::from_secs(3))
3237
.danger_accept_invalid_certs(true)
3338
.build()
3439
.expect("building ClientWithMiddleware failed"),
@@ -88,30 +93,27 @@ struct PublishEndpointRequest {
8893
endpoint: PublishEndpoint,
8994
}
9095

91-
lazy_static! {
92-
static ref IS_WEBSERVICE_ENABLED: bool = {
93-
env::var("DISABLE_WEBSERVICE")
94-
.unwrap_or("false".to_string())
95-
.to_lowercase()
96-
!= "true"
97-
};
98-
}
99-
10096
pub struct WebService {
10197
srv_handle: ServerHandle,
10298
}
10399

104100
impl WebService {
105101
pub async fn run(tx_request: mpsc::Sender<CommandRequest>) -> Result<Option<Self>> {
106102
// we only start web service feature if not explicitly disabled by 'DISABLE_WEBSERVICE="true"' env var
107-
if !(*IS_WEBSERVICE_ENABLED) {
103+
if *IS_WEBSERVICE_DISABLED.get_or_init(|| {
104+
env::var("DISABLE_WEBSERVICE")
105+
.unwrap_or("false".to_string())
106+
.to_lowercase()
107+
== "true"
108+
}) {
108109
info!("WebService is disabled");
109110
return Ok(None);
110111
};
111112

112113
info!("WebService is enabled");
113114

114115
if matches!(Path::new(&publish_endpoints_path!()).try_exists(), Ok(true)) {
116+
debug!("restore publish endpoints");
115117
*PUBLISH_ENDPOINTS.lock().await = from_json_file(publish_endpoints_path!())?;
116118
}
117119

@@ -396,7 +398,7 @@ impl WebService {
396398
}
397399

398400
pub async fn publish(channel: PublishChannel, value: serde_json::Value) {
399-
if !(*IS_WEBSERVICE_ENABLED) {
401+
if *IS_WEBSERVICE_DISABLED.wait() {
400402
debug!("publish: skip since feature not enabled");
401403
return;
402404
}
@@ -416,7 +418,7 @@ pub async fn publish(channel: PublishChannel, value: serde_json::Value) {
416418
.insert(channel.to_status_string(), value.clone());
417419

418420
for endpoint in PUBLISH_ENDPOINTS.lock().await.values() {
419-
if let Err(e) = publish_to_endpoint(&msg, endpoint).await {
421+
if let Err(e) = publish_to_endpoint(&msg, endpoint) {
420422
error!("publish: {e:#}");
421423
}
422424
}
@@ -426,7 +428,7 @@ async fn republish_to_endpoint(endpoint: &PublishEndpoint) -> HttpResponse {
426428
for (channel, value) in PUBLISH_CHANNEL_MAP.lock().await.iter() {
427429
let msg = json!({"channel": channel, "data": value});
428430

429-
if let Err(e) = publish_to_endpoint(&msg, endpoint).await {
431+
if let Err(e) = publish_to_endpoint(&msg, endpoint) {
430432
error!("republish_to_endpoint: {e:#}");
431433
return HttpResponse::InternalServerError().body(e.to_string());
432434
}
@@ -435,10 +437,12 @@ async fn republish_to_endpoint(endpoint: &PublishEndpoint) -> HttpResponse {
435437
HttpResponse::Ok().finish()
436438
}
437439

438-
async fn publish_to_endpoint(msg: &serde_json::Value, endpoint: &PublishEndpoint) -> Result<()> {
440+
fn publish_to_endpoint(msg: &serde_json::Value, endpoint: &PublishEndpoint) -> Result<()> {
441+
let msg = msg.to_string();
442+
let url = endpoint.url.clone();
439443
let mut headers = header::HeaderMap::new();
440444

441-
info!("try to send {msg} to {}.", endpoint.url);
445+
info!("try to send {msg} to {url}");
442446

443447
for h in &endpoint.headers {
444448
headers.insert(
@@ -447,18 +451,21 @@ async fn publish_to_endpoint(msg: &serde_json::Value, endpoint: &PublishEndpoint
447451
);
448452
}
449453

450-
PUBLISH_CLIENT
451-
.lock()
452-
.await
453-
.post(&endpoint.url)
454-
.headers(headers)
455-
.body(msg.to_string())
456-
.send()
457-
.await
458-
.context("sending request failed")?
459-
.error_for_status()
460-
.context("response error")
461-
.map(|_| ())
454+
tokio::spawn(async move {
455+
if let Err(e) = PUBLISH_CLIENT
456+
.lock()
457+
.await
458+
.post(url)
459+
.headers(headers)
460+
.body(msg)
461+
.send()
462+
.await
463+
{
464+
error!("publish_to_endpoint: {e:#}")
465+
}
466+
});
467+
468+
Ok(())
462469
}
463470

464471
async fn save_publish_endpoints() -> Result<()> {

0 commit comments

Comments
 (0)