|
16 | 16 |
|
17 | 17 | use std::os::unix::io::RawFd;
|
18 | 18 |
|
| 19 | +use async_trait::async_trait; |
19 | 20 | use containerd_shim_protos::{
|
20 |
| - api::Envelope, |
| 21 | + api::{Empty, Envelope}, |
21 | 22 | protobuf::MessageDyn,
|
22 | 23 | shim::events,
|
23 |
| - shim_async::{Client, EventsClient}, |
| 24 | + shim_async::{Client, Events, EventsClient}, |
24 | 25 | ttrpc,
|
25 |
| - ttrpc::context::Context, |
| 26 | + ttrpc::{context::Context, r#async::TtrpcContext}, |
26 | 27 | };
|
27 | 28 | use log::{debug, error, warn};
|
28 | 29 | use tokio::sync::mpsc;
|
@@ -169,6 +170,29 @@ impl RemotePublisher {
|
169 | 170 | }
|
170 | 171 | }
|
171 | 172 |
|
| 173 | +#[async_trait] |
| 174 | +impl Events for RemotePublisher { |
| 175 | + async fn forward( |
| 176 | + &self, |
| 177 | + _ctx: &TtrpcContext, |
| 178 | + req: events::ForwardRequest, |
| 179 | + ) -> ttrpc::Result<Empty> { |
| 180 | + let item = Item { |
| 181 | + ev: req.envelope().clone(), |
| 182 | + ctx: Context::default(), |
| 183 | + count: 0, |
| 184 | + }; |
| 185 | + |
| 186 | + //if channel is full and send fail ,release it after 3 seconds |
| 187 | + self.sender |
| 188 | + .send_timeout(item, tokio::time::Duration::from_secs(3)) |
| 189 | + .await |
| 190 | + .map_err(|e| error::Error::Ttrpc(ttrpc::error::Error::Others(e.to_string())))?; |
| 191 | + |
| 192 | + Ok(Empty::default()) |
| 193 | + } |
| 194 | +} |
| 195 | + |
172 | 196 | #[cfg(test)]
|
173 | 197 | mod tests {
|
174 | 198 | use std::{
|
|
0 commit comments