Skip to content

Commit ce2a7a5

Browse files
committed
feature complete logservice
1 parent 2ca8a5f commit ce2a7a5

File tree

16 files changed

+310
-321
lines changed

16 files changed

+310
-321
lines changed

examples/delete_helloworld.sh

Lines changed: 0 additions & 10 deletions
This file was deleted.

src/common/src/etcd.rs

Lines changed: 82 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -16,14 +16,18 @@ lazy_static::lazy_static! {
1616
};
1717
}
1818

19+
const DEV: bool = false;
20+
1921
/// Put a key-value pair into the gRPC RocksDB service
2022
pub async fn put(key: &str, value: &str) -> Result<(), String> {
21-
logd!(
22-
2,
23-
"[RocksDB] Putting key '{}' to service: {}",
24-
key,
25-
*ROCKSDB_SERVICE_URL
26-
);
23+
if DEV {
24+
logd!(
25+
1,
26+
"[RocksDB] Putting key '{}' to service: {}",
27+
key,
28+
*ROCKSDB_SERVICE_URL
29+
);
30+
}
2731

2832
match RocksDbServiceClient::connect(ROCKSDB_SERVICE_URL.clone()).await {
2933
Ok(mut client) => {
@@ -60,12 +64,14 @@ pub async fn put(key: &str, value: &str) -> Result<(), String> {
6064

6165
/// Get a value by key from the gRPC RocksDB service
6266
pub async fn get(key: &str) -> Result<String, String> {
63-
logd!(
64-
2,
65-
"[RocksDB] Getting key '{}' from service: {}",
66-
key,
67-
*ROCKSDB_SERVICE_URL
68-
);
67+
if DEV {
68+
logd!(
69+
1,
70+
"[RocksDB] Getting key '{}' from service: {}",
71+
key,
72+
*ROCKSDB_SERVICE_URL
73+
);
74+
}
6975

7076
match RocksDbServiceClient::connect(ROCKSDB_SERVICE_URL.clone()).await {
7177
Ok(mut client) => {
@@ -77,12 +83,14 @@ pub async fn get(key: &str) -> Result<String, String> {
7783
Ok(response) => {
7884
let get_response = response.into_inner();
7985
if get_response.success {
80-
logd!(
81-
2,
82-
"[RocksDB] Successfully retrieved key: {} (value length: {})",
83-
key,
84-
get_response.value.len()
85-
);
86+
if DEV {
87+
logd!(
88+
1,
89+
"[RocksDB] Successfully retrieved key: {} (value length: {})",
90+
key,
91+
get_response.value.len()
92+
);
93+
}
8694
Ok(get_response.value)
8795
} else {
8896
logd!(5, "[RocksDB] Key not found: {}", key);
@@ -106,12 +114,14 @@ pub async fn get(key: &str) -> Result<String, String> {
106114

107115
/// Get all key-value pairs with the specified prefix using gRPC RocksDB service
108116
pub async fn get_all_with_prefix(prefix: &str) -> Result<Vec<(String, String)>, String> {
109-
logd!(
110-
2,
111-
"[RocksDB] Getting all keys with prefix '{}' from service: {}",
112-
prefix,
113-
*ROCKSDB_SERVICE_URL
114-
);
117+
if DEV {
118+
logd!(
119+
1,
120+
"[RocksDB] Getting all keys with prefix '{}' from service: {}",
121+
prefix,
122+
*ROCKSDB_SERVICE_URL
123+
);
124+
}
115125

116126
match RocksDbServiceClient::connect(ROCKSDB_SERVICE_URL.clone()).await {
117127
Ok(mut client) => {
@@ -129,12 +139,14 @@ pub async fn get_all_with_prefix(prefix: &str) -> Result<Vec<(String, String)>,
129139
.into_iter()
130140
.map(|kv| (kv.key, kv.value))
131141
.collect();
132-
logd!(
133-
2,
134-
"[RocksDB] Successfully retrieved {} keys with prefix '{}'",
135-
result.len(),
136-
prefix
137-
);
142+
if DEV {
143+
logd!(
144+
1,
145+
"[RocksDB] Successfully retrieved {} keys with prefix '{}'",
146+
result.len(),
147+
prefix
148+
);
149+
}
138150
Ok(result)
139151
} else {
140152
logd!(5, "[RocksDB] Error from service: {}", get_response.error);
@@ -158,12 +170,14 @@ pub async fn get_all_with_prefix(prefix: &str) -> Result<Vec<(String, String)>,
158170

159171
/// Delete a key from the gRPC RocksDB service
160172
pub async fn delete(key: &str) -> Result<(), String> {
161-
logd!(
162-
2,
163-
"[RocksDB] Deleting key '{}' from service: {}",
164-
key,
165-
*ROCKSDB_SERVICE_URL
166-
);
173+
if DEV {
174+
logd!(
175+
1,
176+
"[RocksDB] Deleting key '{}' from service: {}",
177+
key,
178+
*ROCKSDB_SERVICE_URL
179+
);
180+
}
167181

168182
match RocksDbServiceClient::connect(ROCKSDB_SERVICE_URL.clone()).await {
169183
Ok(mut client) => {
@@ -175,7 +189,9 @@ pub async fn delete(key: &str) -> Result<(), String> {
175189
Ok(response) => {
176190
let delete_response = response.into_inner();
177191
if delete_response.success {
178-
logd!(2, "[RocksDB] Successfully deleted key: {}", key);
192+
if DEV {
193+
logd!(1, "[RocksDB] Successfully deleted key: {}", key);
194+
}
179195
Ok(())
180196
} else {
181197
let error_msg = delete_response.error;
@@ -200,12 +216,14 @@ pub async fn delete(key: &str) -> Result<(), String> {
200216

201217
/// Batch put operation to store multiple key-value pairs using gRPC RocksDB service
202218
pub async fn batch_put(items: Vec<(String, String)>) -> Result<(), String> {
203-
logd!(
204-
3,
205-
"[RocksDB] Batch putting {} items to service: {}",
206-
items.len(),
207-
*ROCKSDB_SERVICE_URL
208-
);
219+
if DEV {
220+
logd!(
221+
1,
222+
"[RocksDB] Batch putting {} items to service: {}",
223+
items.len(),
224+
*ROCKSDB_SERVICE_URL
225+
);
226+
}
209227

210228
match RocksDbServiceClient::connect(ROCKSDB_SERVICE_URL.clone()).await {
211229
Ok(mut client) => {
@@ -220,11 +238,13 @@ pub async fn batch_put(items: Vec<(String, String)>) -> Result<(), String> {
220238
Ok(response) => {
221239
let batch_response = response.into_inner();
222240
if batch_response.success {
223-
logd!(
224-
2,
225-
"[RocksDB] Successfully stored {} items in batch",
226-
batch_response.processed_count
227-
);
241+
if DEV {
242+
logd!(
243+
1,
244+
"[RocksDB] Successfully stored {} items in batch",
245+
batch_response.processed_count
246+
);
247+
}
228248
Ok(())
229249
} else {
230250
let error_msg = batch_response.error;
@@ -249,11 +269,13 @@ pub async fn batch_put(items: Vec<(String, String)>) -> Result<(), String> {
249269

250270
/// Health check for the gRPC RocksDB service
251271
pub async fn health_check() -> Result<bool, String> {
252-
logd!(
253-
1,
254-
"[RocksDB] Health check for service: {}",
255-
*ROCKSDB_SERVICE_URL
256-
);
272+
if DEV {
273+
logd!(
274+
1,
275+
"[RocksDB] Health check for service: {}",
276+
*ROCKSDB_SERVICE_URL
277+
);
278+
}
257279

258280
match RocksDbServiceClient::connect(ROCKSDB_SERVICE_URL.clone()).await {
259281
Ok(mut client) => {
@@ -263,11 +285,13 @@ pub async fn health_check() -> Result<bool, String> {
263285
Ok(response) => {
264286
let health_response = response.into_inner();
265287
let is_healthy = health_response.status == "healthy";
266-
logd!(
267-
2,
268-
"[RocksDB] Health check result: {}",
269-
health_response.status
270-
);
288+
if DEV {
289+
logd!(
290+
1,
291+
"[RocksDB] Health check result: {}",
292+
health_response.status
293+
);
294+
}
271295
Ok(is_healthy)
272296
}
273297
Err(e) => {

src/server/apiserver/src/artifact/data.rs

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@
55

66
//! Read/Write/Delete artifact data in etcd
77
8+
use common::logd;
9+
810
/// Read yaml string of artifacts from etcd
911
///
1012
/// ### Parameters
@@ -43,7 +45,7 @@ pub async fn write_to_etcd(key: &str, artifact_str: &str) -> common::Result<()>
4345
let result = common::etcd::put(key, artifact_str).await;
4446
let elapsed = start.elapsed();
4547

46-
println!("write_to_etcd: elapsed = {:?}", elapsed);
48+
logd!(1, "write_to_etcd: elapsed = {:?}", elapsed);
4749

4850
result?;
4951
Ok(())
@@ -121,7 +123,7 @@ spec:
121123
#[tokio::test]
122124
async fn test_read_from_etcd_positive() {
123125
let result = read_from_etcd(TEST_KEY).await;
124-
println!("read_from_etcd (positive) result = {:?}", result);
126+
logd!(1, "read_from_etcd (positive) result = {:?}", result);
125127

126128
//we accept both Ok and Err depending on etcd state
127129
assert!(
@@ -135,7 +137,8 @@ spec:
135137
#[tokio::test]
136138
async fn test_read_all_scenario_from_etcd_positive() {
137139
let result = read_all_scenario_from_etcd().await;
138-
println!(
140+
logd!(
141+
2,
139142
"read_all_scenario_from_etcd (positive) result = {:?}",
140143
result
141144
);
@@ -155,9 +158,11 @@ spec:
155158
let start = Instant::now();
156159
let result = write_to_etcd(TEST_KEY, TEST_YAML).await;
157160
let duration = start.elapsed();
158-
println!(
161+
logd!(
162+
2,
159163
"write_to_etcd (positive) result = {:?}, elapsed = {:?}",
160-
result, duration
164+
result,
165+
duration
161166
);
162167
assert!(
163168
result.is_ok() || result.is_err(),
@@ -170,7 +175,7 @@ spec:
170175
#[tokio::test]
171176
async fn test_delete_at_etcd_positive() {
172177
let result = delete_at_etcd(TEST_KEY).await;
173-
println!("delete_at_etcd (positive) result = {:?}", result);
178+
logd!(2, "delete_at_etcd (positive) result = {:?}", result);
174179
// We accept Ok (key deleted) or Err (key not found) as valid outcomes
175180
assert!(
176181
result.is_ok() || result.is_err(),

src/server/apiserver/src/artifact/mod.rs

Lines changed: 23 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
88
pub mod data;
99

10+
use common::logd;
1011
use common::spec::artifact::{Artifact, Model, Network, Node, Package, Scenario, Volume};
1112
use common::spec::k8s::Pod;
1213

@@ -67,18 +68,27 @@ async fn notify_scenario_state(scenario_name: &str, target_state: &str) {
6768
source: "apiserver".to_string(),
6869
};
6970

70-
println!("🔄 SCENARIO STATE INITIALIZATION: ApiServer Setting Initial State");
71-
println!(" 📋 Scenario: {}", scenario_name);
72-
println!(" 🔄 Initial State: → {}", target_state);
73-
println!(" 📤 Sending StateChange to StateManager");
71+
logd!(
72+
1,
73+
"🔄 SCENARIO STATE INITIALIZATION: ApiServer Setting Initial State"
74+
);
75+
logd!(1, " 📋 Scenario: {}", scenario_name);
76+
logd!(1, " 🔄 Initial State: → {}", target_state);
77+
logd!(1, " 📤 Sending StateChange to StateManager");
7478

7579
let mut state_sender = crate::grpc::sender::statemanager::StateManagerSender::new();
7680
match state_sender.send_state_change(state_change).await {
77-
Ok(_) => println!(
81+
Ok(_) => logd!(
82+
2,
7883
" ✅ Successfully set scenario {} to {} state",
79-
scenario_name, target_state
84+
scenario_name,
85+
target_state
86+
),
87+
Err(e) => logd!(
88+
5,
89+
" ❌ Failed to send state change to StateManager: {:?}",
90+
e
8091
),
81-
Err(e) => println!(" ❌ Failed to send state change to StateManager: {:?}", e),
8292
}
8393
}
8494

@@ -89,15 +99,16 @@ async fn process_artifact_document(doc: &str) -> common::Result<Option<(String,
8999
let parse_start = Instant::now();
90100
let value: serde_yaml::Value = serde_yaml::from_str(doc)?;
91101
let artifact_str = serde_yaml::to_string(&value)?;
92-
println!(
102+
logd!(
103+
1,
93104
"process_artifact: YAML parse elapsed = {:?}",
94105
parse_start.elapsed()
95106
);
96107

97108
let (kind, name) = match parse_artifact_info(&value) {
98109
Some(info) => info,
99110
None => {
100-
println!("Unknown or invalid artifact");
111+
logd!(5, "Unknown or invalid artifact");
101112
return Ok(None);
102113
}
103114
};
@@ -106,7 +117,8 @@ async fn process_artifact_document(doc: &str) -> common::Result<Option<(String,
106117

107118
let etcd_start = Instant::now();
108119
data::write_to_etcd(&key, &artifact_str).await?;
109-
println!(
120+
logd!(
121+
1,
110122
"process_artifact: etcd write elapsed for {} = {:?}",
111123
key,
112124
etcd_start.elapsed()
@@ -145,7 +157,7 @@ pub async fn apply(body: &str) -> common::Result<String> {
145157
}
146158
}
147159

148-
println!("apply: total elapsed = {:?}", total_start.elapsed());
160+
logd!(1, "apply: total elapsed = {:?}", total_start.elapsed());
149161

150162
if scenario_str.is_empty() {
151163
Err("There is not any scenario in yaml string".into())

0 commit comments

Comments
 (0)