Skip to content

Commit 7824edd

Browse files
feat(graph-proxy): add logs subscription
1 parent 88d013e commit 7824edd

File tree

1 file changed

+119
-15
lines changed

1 file changed

+119
-15
lines changed

backend/graph-proxy/src/graphql/subscription.rs

Lines changed: 119 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
use argo_workflows_openapi::IoArgoprojWorkflowV1alpha1WorkflowWatchEvent;
2-
use async_graphql::{Context, Subscription};
2+
use async_graphql::{Context, SimpleObject, Subscription};
3+
use async_stream::stream;
34
use axum_extra::headers::{authorization::Bearer, Authorization};
45
use eventsource_stream::Eventsource;
56
use futures_util::{Stream, StreamExt};
@@ -18,19 +19,131 @@ use crate::{
1819
#[derive(Debug, Clone, Default)]
1920
pub struct WorkflowsSubscription;
2021

22+
/// A single log line streamed from a pod
23+
#[derive(Debug, Clone, SimpleObject)]
24+
pub struct LogEntry {
25+
/// The log line content
26+
content: String,
27+
/// The name of the pod producing the log
28+
pod_name: String,
29+
}
30+
31+
/// A log response returned by the Argo logs API
32+
#[derive(Debug, Deserialize)]
33+
struct LogResponse {
34+
/// The result of the log response
35+
result: Option<LogContent>,
36+
}
37+
38+
/// The data from the log result returned by the Argo logs API
39+
#[derive(Debug, Deserialize)]
40+
struct LogContent {
41+
/// The log content
42+
content: String,
43+
/// The name of the pod producing the log
44+
#[serde(rename = "podName")]
45+
pod_name: String,
46+
}
47+
48+
/// Succees/fail events from Workflows API
49+
#[derive(Debug, Deserialize)]
50+
struct WatchEvent {
51+
/// Successful event
52+
result: Option<IoArgoprojWorkflowV1alpha1WorkflowWatchEvent>,
53+
/// Error returned by API
54+
error: Option<StreamError>,
55+
}
56+
fn get_auth_token(ctx: &Context<'_>) -> anyhow::Result<String> {
57+
ctx.data_unchecked::<Option<Authorization<Bearer>>>()
58+
.as_ref()
59+
.map(|auth| auth.token().to_string())
60+
.ok_or_else(|| WorkflowParsingError::MissingAuthToken.into())
61+
}
62+
2163
#[Subscription]
2264
impl WorkflowsSubscription {
65+
/// Processing to subscribe to logs for a single pod of a workflow
66+
async fn logs(
67+
&self,
68+
ctx: &Context<'_>,
69+
visit: VisitInput,
70+
workflow_name: String,
71+
task_id: String,
72+
) -> anyhow::Result<impl Stream<Item = Result<LogEntry, String>>> {
73+
let auth_token = get_auth_token(ctx)?;
74+
75+
let namespace = visit.to_string();
76+
let server_url = ctx.data_unchecked::<ArgoServerUrl>().deref().clone();
77+
let mut url = server_url;
78+
79+
url.path_segments_mut().expect("Invalid base URL").extend([
80+
"api",
81+
"v1",
82+
"workflows",
83+
&namespace,
84+
&workflow_name,
85+
"log",
86+
]);
87+
88+
url.query_pairs_mut()
89+
.append_pair("podName", &task_id)
90+
.append_pair("logOptions.container", "main")
91+
.append_pair("logOptions.follow", "true");
92+
93+
let client = reqwest::Client::new();
94+
let response = client
95+
.get(url)
96+
.bearer_auth(auth_token)
97+
.header("Accept", "text/plain")
98+
.send()
99+
.await?;
100+
101+
let status = response.status();
102+
let byte_stream = response.bytes_stream();
103+
let log_stream = stream! {
104+
for await chunk_result in byte_stream {
105+
match chunk_result {
106+
Ok(chunk) if status.is_success() => {
107+
let text = String::from_utf8_lossy(&chunk).to_string();
108+
for line in text.lines() {
109+
match serde_json::from_str::<LogResponse>(line) {
110+
Ok(parsed) => {
111+
if let Some(result) = parsed.result {
112+
yield Ok(LogEntry {
113+
content: result.content,
114+
pod_name: result.pod_name,
115+
});
116+
} else {
117+
yield Err("Missing result in log response".to_string());
118+
}
119+
}
120+
Err(_) => {
121+
yield Ok(LogEntry {
122+
content: line.trim().to_string(),
123+
pod_name: task_id.clone(),
124+
});
125+
}
126+
}
127+
}
128+
}
129+
Ok(_) | Err(_) => {
130+
yield Err("Failed to read log chunk".to_string());
131+
}
132+
}
133+
}
134+
};
135+
136+
Ok(log_stream)
137+
}
138+
23139
/// Processing to subscribe to data for all workflows in a session
24140
async fn workflow(
25141
&self,
26142
ctx: &Context<'_>,
27143
visit: VisitInput,
28144
name: String,
29145
) -> anyhow::Result<impl Stream<Item = Result<Workflow, String>>> {
30-
let auth_token = ctx
31-
.data_unchecked::<Option<Authorization<Bearer>>>()
32-
.as_ref()
33-
.ok_or(WorkflowParsingError::MissingAuthToken)?;
146+
let auth_token = get_auth_token(ctx)?;
34147

35148
let session = visit.to_string();
36149
let server_url = ctx.data_unchecked::<ArgoServerUrl>().deref();
@@ -51,7 +164,7 @@ impl WorkflowsSubscription {
51164
let client = reqwest::Client::new();
52165
let response = client
53166
.get(url)
54-
.bearer_auth(auth_token.token())
167+
.bearer_auth(auth_token)
55168
.header("Accept", "text/event-stream")
56169
.send()
57170
.await?
@@ -96,13 +209,4 @@ struct StreamError {
96209
message: String,
97210
}
98211

99-
/// Succees/fail events from Workflows API
100-
#[derive(Debug, Deserialize)]
101-
struct WatchEvent {
102-
/// Successful event
103-
result: Option<IoArgoprojWorkflowV1alpha1WorkflowWatchEvent>,
104-
/// Error returned by API
105-
error: Option<StreamError>,
106-
}
107-
108212
// TODO! Write tests for this

0 commit comments

Comments
 (0)