Skip to content

Commit 7768b28

Browse files
Merge pull request #109 from reddevilmidzy/result
SSE 실시간 통신 마지막에 전체 결과 반환
2 parents 4ab38dd + 62fa852 commit 7768b28

File tree

3 files changed

+186
-34
lines changed

3 files changed

+186
-34
lines changed

rook/Cargo.lock

Lines changed: 0 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

rook/Cargo.toml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@ once_cell = "1.18"
2020
tracing = "0.1"
2121
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
2222
futures = "0.3"
23-
tokio-stream = "0.1"
2423

2524
# shuttle 의존성 지우기
2625
shuttle-axum = "0.55.0"

rook/src/link_checker/sse.rs

Lines changed: 186 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,15 @@
1-
use crate::git;
2-
use crate::link_checker::link::{LinkCheckResult, check_link};
3-
41
use axum::response::sse::{Event, KeepAlive, Sse};
52
use futures::stream::{self, Stream, StreamExt};
6-
use std::{convert::Infallible, pin::Pin};
3+
use serde::{Deserialize, Serialize};
4+
use std::convert::Infallible;
5+
use std::pin::Pin;
6+
use std::sync::Arc;
7+
use std::sync::atomic::{AtomicUsize, Ordering};
78
use tracing::{error, info, instrument};
89

9-
#[derive(Debug, serde::Serialize)]
10+
use crate::{LinkCheckResult, check_link, git};
11+
12+
#[derive(Debug, Serialize, Deserialize)]
1013
pub struct LinkCheckEvent {
1114
pub url: String,
1215
pub file_path: String,
@@ -15,6 +18,66 @@ pub struct LinkCheckEvent {
1518
pub message: Option<String>,
1619
}
1720

21+
#[derive(Debug, Serialize, Deserialize)]
22+
pub struct LinkCheckSummaryEvent {
23+
pub total: usize,
24+
pub valid: usize,
25+
pub invalid: usize,
26+
pub redirect: usize,
27+
pub moved: usize,
28+
}
29+
30+
#[derive(Debug)]
31+
struct LinkCheckCounters {
32+
total: AtomicUsize,
33+
valid: AtomicUsize,
34+
invalid: AtomicUsize,
35+
redirect: AtomicUsize,
36+
moved: AtomicUsize,
37+
}
38+
39+
impl LinkCheckCounters {
40+
fn new() -> Self {
41+
Self {
42+
total: AtomicUsize::new(0),
43+
valid: AtomicUsize::new(0),
44+
invalid: AtomicUsize::new(0),
45+
redirect: AtomicUsize::new(0),
46+
moved: AtomicUsize::new(0),
47+
}
48+
}
49+
50+
fn increment_total(&self) {
51+
self.total.fetch_add(1, Ordering::Relaxed);
52+
}
53+
54+
fn increment_valid(&self) {
55+
self.valid.fetch_add(1, Ordering::Relaxed);
56+
}
57+
58+
fn increment_invalid(&self) {
59+
self.invalid.fetch_add(1, Ordering::Relaxed);
60+
}
61+
62+
fn increment_redirect(&self) {
63+
self.redirect.fetch_add(1, Ordering::Relaxed);
64+
}
65+
66+
fn increment_moved(&self) {
67+
self.moved.fetch_add(1, Ordering::Relaxed);
68+
}
69+
70+
fn to_summary(&self) -> LinkCheckSummaryEvent {
71+
LinkCheckSummaryEvent {
72+
total: self.total.load(Ordering::Relaxed),
73+
valid: self.valid.load(Ordering::Relaxed),
74+
invalid: self.invalid.load(Ordering::Relaxed),
75+
redirect: self.redirect.load(Ordering::Relaxed),
76+
moved: self.moved.load(Ordering::Relaxed),
77+
}
78+
}
79+
}
80+
1881
#[instrument(skip(), fields(repo_url = repo_url))]
1982
pub async fn stream_link_checks(
2083
repo_url: String,
@@ -29,40 +92,72 @@ pub async fn stream_link_checks(
2992
Ok(links) => {
3093
info!("Found {} links to check", links.len());
3194

95+
let counters = Arc::new(LinkCheckCounters::new());
96+
3297
let links_stream = stream::iter(links);
3398
let events_stream = links_stream
34-
.map(move |link| async move {
35-
let result = check_link(&link.url).await;
36-
let event = LinkCheckEvent {
37-
url: link.url,
38-
file_path: link.file_path,
39-
line_number: link.line_number as u32,
40-
status: match &result {
41-
LinkCheckResult::Valid => "valid".to_string(),
42-
LinkCheckResult::Invalid(_) => "invalid".to_string(),
43-
LinkCheckResult::Redirect(_) => "redirect".to_string(),
44-
LinkCheckResult::GitHubFileMoved(_) => "file_moved".to_string(),
45-
},
46-
message: match result {
47-
LinkCheckResult::Valid => None,
48-
LinkCheckResult::Invalid(msg) => Some(msg),
49-
LinkCheckResult::Redirect(url) => {
50-
Some(format!("Redirected to: {}", url))
51-
}
52-
LinkCheckResult::GitHubFileMoved(msg) => {
53-
Some(format!("Moved to: {}", msg))
99+
.map({
100+
let counters = Arc::clone(&counters);
101+
move |link| {
102+
let counters = Arc::clone(&counters);
103+
async move {
104+
let result = check_link(&link.url).await;
105+
106+
counters.increment_total();
107+
match &result {
108+
LinkCheckResult::Valid => counters.increment_valid(),
109+
LinkCheckResult::Invalid(_) => counters.increment_invalid(),
110+
LinkCheckResult::Redirect(_) => counters.increment_redirect(),
111+
LinkCheckResult::GitHubFileMoved(_) => counters.increment_moved(),
112+
};
113+
114+
let event = LinkCheckEvent {
115+
url: link.url,
116+
file_path: link.file_path,
117+
line_number: link.line_number as u32,
118+
status: match &result {
119+
LinkCheckResult::Valid => "valid".to_string(),
120+
LinkCheckResult::Invalid(_) => "invalid".to_string(),
121+
LinkCheckResult::Redirect(_) => "redirect".to_string(),
122+
LinkCheckResult::GitHubFileMoved(_) => "file_moved".to_string(),
123+
},
124+
message: match result {
125+
LinkCheckResult::Valid => None,
126+
LinkCheckResult::Invalid(msg) => Some(msg),
127+
LinkCheckResult::Redirect(url) => {
128+
Some(format!("Redirected to: {}", url))
129+
}
130+
LinkCheckResult::GitHubFileMoved(msg) => {
131+
Some(format!("Moved to: {}", msg))
132+
}
133+
},
134+
};
135+
136+
match Event::default().json_data(event) {
137+
Ok(event) => Ok(event),
138+
Err(e) => {
139+
error!("Failed to serialize event: {}", e);
140+
Ok(Event::default()
141+
.data(format!("Error serializing event: {}", e)))
142+
}
54143
}
55-
},
56-
};
57-
match Event::default().json_data(event) {
144+
}
145+
}
146+
})
147+
.buffer_unordered(10)
148+
.chain(stream::once(async move {
149+
let counters = Arc::clone(&counters);
150+
let summary = counters.to_summary();
151+
152+
match Event::default().json_data(summary) {
58153
Ok(event) => Ok(event),
59154
Err(e) => {
60-
error!("Failed to serialize event: {}", e);
61-
Ok(Event::default().data(format!("Error serializing event: {}", e)))
155+
error!("Failed to serialize summary event: {}", e);
156+
Ok(Event::default().data(format!("Error serializing summary: {}", e)))
62157
}
63158
}
64-
})
65-
.buffer_unordered(10);
159+
}));
160+
66161
Box::pin(events_stream) as Pin<Box<dyn Stream<Item = Result<Event, Infallible>> + Send>>
67162
}
68163
Err(e) => {
@@ -75,3 +170,62 @@ pub async fn stream_link_checks(
75170

76171
Sse::new(stream).keep_alive(KeepAlive::default())
77172
}
173+
174+
#[cfg(test)]
175+
mod tests {
176+
use super::*;
177+
use axum::response::IntoResponse;
178+
use futures::StreamExt;
179+
use serde_json::Value;
180+
181+
#[tokio::test]
182+
async fn test_stream_link_checks() {
183+
let repo_url = "https://github.com/reddevilmidzy/kingsac".to_string();
184+
let branch = Some("main".to_string());
185+
let sse = stream_link_checks(repo_url, branch).await;
186+
let mut stream = sse.into_response().into_body().into_data_stream();
187+
188+
// 스트림에서 이벤트를 수집
189+
let mut events = Vec::new();
190+
let mut buffer = String::new();
191+
192+
while let Some(chunk) = stream.next().await {
193+
if let Ok(chunk) = chunk {
194+
if let Ok(text) = String::from_utf8(chunk.to_vec()) {
195+
buffer.push_str(&text);
196+
197+
if let Some(event_end) = buffer.find("\n\n") {
198+
let event_str = buffer[..event_end].to_string();
199+
buffer = buffer[event_end + 2..].to_string();
200+
// "data: " 접두사를 제거하고 JSON 파싱
201+
if let Some(json_str) = event_str.strip_prefix("data: ") {
202+
if let Ok(json) = serde_json::from_str::<Value>(json_str) {
203+
events.push(json);
204+
}
205+
}
206+
}
207+
}
208+
}
209+
}
210+
211+
assert!(!events.is_empty(), "No events were received");
212+
213+
assert!(events.last().is_some(), "Last event should exist");
214+
if let Some(last_event) = events.last() {
215+
assert!(last_event.get("total").is_some());
216+
assert!(last_event.get("total").unwrap().as_u64().unwrap() > 0);
217+
assert!(last_event.get("valid").is_some());
218+
assert!(last_event.get("invalid").is_some());
219+
assert!(last_event.get("redirect").is_some());
220+
assert!(last_event.get("moved").is_some());
221+
}
222+
223+
assert!(events.first().is_some(), "First event should exist");
224+
if let Some(first_event) = events.first() {
225+
assert!(first_event.get("url").is_some());
226+
assert!(first_event.get("file_path").is_some());
227+
assert!(first_event.get("line_number").is_some());
228+
assert!(first_event.get("status").is_some());
229+
}
230+
}
231+
}

0 commit comments

Comments
 (0)