|
1 | | -use anyhow::{Context, Result}; |
2 | | -use api::{ |
3 | | - GetConfigRequest, HttpResponse, |
4 | | - relay_service_client::RelayServiceClient, |
5 | | -}; |
6 | | -use tokio_stream::StreamExt; |
7 | | -use tonic::{ |
8 | | - Request, |
9 | | - metadata::MetadataValue, |
10 | | - transport::Channel, |
11 | | -}; |
| 1 | +use api::HttpResponse; |
| 2 | +use common::RelayClient; |
12 | 3 |
|
13 | 4 | use crate::proxy::Proxy; |
14 | 5 |
|
15 | 6 | pub struct GrpcClient { |
16 | | - channel: Channel, |
17 | | - access_token: String, |
| 7 | + relay_client: RelayClient, |
18 | 8 | } |
19 | 9 |
|
20 | 10 | impl GrpcClient { |
21 | | - pub async fn connect(server_address: &str, access_token: String) -> Result<Self> { |
22 | | - let channel = Channel::from_shared(server_address.to_string())? |
23 | | - .connect() |
24 | | - .await |
25 | | - .context("Failed to connect to server")?; |
26 | | - |
27 | | - Ok(Self { |
28 | | - channel, |
29 | | - access_token, |
30 | | - }) |
31 | | - } |
32 | | - |
33 | | - fn create_client(&self) -> RelayServiceClient<Channel> { |
34 | | - RelayServiceClient::new(self.channel.clone()) |
| 11 | + pub async fn connect(server_address: &str, access_token: String) -> anyhow::Result<Self> { |
| 12 | + let relay_client = RelayClient::connect(server_address, access_token).await?; |
| 13 | + Ok(Self { relay_client }) |
35 | 14 | } |
36 | | - |
37 | | - fn add_auth<T>(&self, request: &mut Request<T>) -> Result<()> { |
38 | | - let token: MetadataValue<_> = format!("Bearer {}", self.access_token) |
39 | | - .parse() |
40 | | - .context("Invalid token format")?; |
41 | | - request.metadata_mut().insert("authorization", token); |
42 | | - Ok(()) |
43 | | - } |
44 | | - |
45 | | - pub async fn get_config(&self) -> Result<api::ClientConfig> { |
46 | | - let mut client = self.create_client(); |
47 | | - let mut request = Request::new(GetConfigRequest {}); |
48 | | - self.add_auth(&mut request)?; |
49 | | - |
50 | | - let response = client.get_config(request).await |
51 | | - .context("GetConfig RPC failed")?; |
52 | | - |
53 | | - response.into_inner().config |
54 | | - .context("Server returned empty config") |
| 15 | + |
| 16 | + pub async fn get_config(&mut self) -> anyhow::Result<api::ClientConfig> { |
| 17 | + self.relay_client.get_config().await |
55 | 18 | } |
56 | | - |
57 | | - pub async fn run_webhook_stream(&self, proxy: Proxy) -> Result<()> { |
58 | | - let mut client = self.create_client(); |
59 | | - |
60 | | - // Create channel for sending responses |
61 | | - let (response_tx, response_rx) = tokio::sync::mpsc::channel::<HttpResponse>(32); |
62 | | - let response_stream = tokio_stream::wrappers::ReceiverStream::new(response_rx); |
63 | | - |
64 | | - let mut request = Request::new(response_stream); |
65 | | - self.add_auth(&mut request)?; |
66 | | - |
67 | | - let response = client.do_webhook(request).await |
68 | | - .context("DoWebhook RPC failed")?; |
69 | | - |
70 | | - let mut request_stream = response.into_inner(); |
71 | | - |
72 | | - tracing::info!("Connected to webhook stream"); |
73 | | - |
74 | | - while let Some(result) = request_stream.next().await { |
75 | | - match result { |
76 | | - Ok(http_request) => { |
| 19 | + |
| 20 | + pub async fn run_webhook_stream(&self, proxy: Proxy) -> anyhow::Result<()> { |
| 21 | + self.relay_client |
| 22 | + .run_webhook_loop(move |http_request| { |
| 23 | + let proxy = proxy.clone(); |
| 24 | + async move { |
77 | 25 | let request_id = http_request.request_id.clone(); |
78 | | - tracing::info!( |
79 | | - request_id = %request_id, |
80 | | - method = %http_request.method, |
81 | | - path = %http_request.path, |
82 | | - "Received webhook request" |
83 | | - ); |
84 | | - |
85 | | - // Forward to local endpoint |
86 | | - let response = match proxy.forward(http_request).await { |
| 26 | + match proxy.forward(http_request).await { |
87 | 27 | Ok(resp) => resp, |
88 | 28 | Err(e) => { |
89 | 29 | tracing::error!( |
90 | 30 | request_id = %request_id, |
91 | 31 | error = %e, |
92 | 32 | "Failed to forward request" |
93 | 33 | ); |
94 | | - // Return error response |
95 | 34 | HttpResponse { |
96 | 35 | request_id, |
97 | 36 | status_code: 502, |
98 | 37 | headers: Default::default(), |
99 | 38 | body: format!("Failed to forward request: {}", e).into_bytes(), |
100 | 39 | } |
101 | 40 | } |
102 | | - }; |
103 | | - |
104 | | - // Send response back to server |
105 | | - if response_tx.send(response).await.is_err() { |
106 | | - tracing::error!("Failed to send response to server stream"); |
107 | | - break; |
108 | 41 | } |
109 | 42 | } |
110 | | - Err(e) => { |
111 | | - tracing::error!(error = %e, "Error receiving from server"); |
112 | | - break; |
113 | | - } |
114 | | - } |
115 | | - } |
116 | | - |
117 | | - tracing::info!("Webhook stream ended"); |
118 | | - Ok(()) |
| 43 | + }) |
| 44 | + .await |
119 | 45 | } |
120 | 46 | } |
0 commit comments