|
1 | | -use anyhow::Result; |
2 | | -use std::sync::Arc; |
3 | | -use std::{collections::HashSet, time::Instant}; |
4 | | -use tokio::task::JoinHandle; |
5 | | -use tokio_util::sync::CancellationToken; |
6 | | - |
7 | 1 | use crate::{ |
8 | 2 | app::config::Config, manager::Manager, redis::RedisClient, types::check_config::CheckConfig, |
9 | 3 | }; |
10 | | - |
| 4 | +use anyhow::Result; |
11 | 5 | use serde::{Deserialize, Serialize}; |
12 | 6 | use serde_with::serde_as; |
| 7 | +use std::sync::Arc; |
13 | 8 | use std::time::Duration; |
| 9 | +use std::{collections::HashSet, time::Instant}; |
| 10 | +use tokio::task::JoinHandle; |
14 | 11 | use tokio::time::interval; |
| 12 | +use tokio_util::sync::CancellationToken; |
15 | 13 | use uuid::Uuid; |
16 | 14 |
|
17 | 15 | #[derive(Debug)] |
@@ -73,6 +71,20 @@ pub struct RedisConfigProvider { |
73 | 71 | check_interval: Duration, |
74 | 72 | } |
75 | 73 |
|
| 74 | +fn deserialize_rmp(config_payload: &[u8]) -> Option<CheckConfig> { |
| 75 | + match rmp_serde::from_slice::<CheckConfig>(config_payload) { |
| 76 | + Ok(config) => Some(config), |
| 77 | + Err(err) => { |
| 78 | + if let Ok(config) = rmp_serde::from_slice::<serde_json::Value>(config_payload) { |
| 79 | + tracing::error!(?err, ?config, "config_consumer.invalid_config_message"); |
| 80 | + } else { |
| 81 | + tracing::error!(?err, "config_consumer.invalid_config_message"); |
| 82 | + } |
| 83 | + None |
| 84 | + } |
| 85 | + } |
| 86 | +} |
| 87 | + |
76 | 88 | impl RedisConfigProvider { |
77 | 89 | pub fn new( |
78 | 90 | redis_url: &str, |
@@ -158,14 +170,13 @@ impl RedisConfigProvider { |
158 | 170 | .set(config_payloads.len() as f64); |
159 | 171 |
|
160 | 172 | for config_payload in config_payloads { |
161 | | - match rmp_serde::from_slice(&config_payload) { |
162 | | - Ok(config) => manager |
| 173 | + if let Some(config) = deserialize_rmp(&config_payload) { |
| 174 | + manager |
163 | 175 | .get_service(partition.number) |
164 | 176 | .get_config_store() |
165 | 177 | .write() |
166 | 178 | .expect("lock not poisoned") |
167 | | - .add_config(config), |
168 | | - Err(err) => tracing::error!(?err, "config_consumer.invalid_config_message"), |
| 179 | + .add_config(config); |
169 | 180 | } |
170 | 181 | } |
171 | 182 | let partition_loading_time = partition_start_loading.elapsed().as_secs_f64(); |
@@ -250,21 +261,18 @@ impl RedisConfigProvider { |
250 | 261 | .await; |
251 | 262 |
|
252 | 263 | for config_payload in config_payloads { |
253 | | - match rmp_serde::from_slice::<CheckConfig>(&config_payload) { |
254 | | - Ok(config) => { |
255 | | - tracing::debug!( |
256 | | - partition = partition.number, |
257 | | - subscription_id = %config.subscription_id, |
258 | | - "redis_config_provider.upserting_config" |
259 | | - ); |
260 | | - manager |
261 | | - .get_service(partition.number) |
262 | | - .get_config_store() |
263 | | - .write() |
264 | | - .expect("lock not poisoned") |
265 | | - .add_config(config); |
266 | | - } |
267 | | - Err(err) => tracing::error!(?err, "config_consumer.invalid_config_message"), |
| 264 | + if let Some(config) = deserialize_rmp(&config_payload) { |
| 265 | + tracing::debug!( |
| 266 | + partition = partition.number, |
| 267 | + subscription_id = %config.subscription_id, |
| 268 | + "redis_config_provider.upserting_config" |
| 269 | + ); |
| 270 | + manager |
| 271 | + .get_service(partition.number) |
| 272 | + .get_config_store() |
| 273 | + .write() |
| 274 | + .expect("lock not poisoned") |
| 275 | + .add_config(config); |
268 | 276 | } |
269 | 277 | } |
270 | 278 | let partition_update_duration = partition_update_start.elapsed().as_secs_f64(); |
|
0 commit comments