|
4 | 4 |
|
5 | 5 | //! A transport implementation which uses Zircon channels. |
6 | 6 |
|
7 | | -use core::future::Future; |
8 | 7 | use core::mem::replace; |
9 | 8 | use core::pin::Pin; |
10 | 9 | use core::ptr::NonNull; |
@@ -123,111 +122,21 @@ impl HandleEncoder for Buffer { |
123 | 122 | } |
124 | 123 | } |
125 | 124 |
|
126 | | -/// A channel send future. |
127 | | -#[must_use = "futures do nothing unless polled"] |
128 | | -pub struct SendFuture<'s> { |
129 | | - shared: &'s Shared, |
| 125 | +/// The state for a channel send future. |
| 126 | +pub struct SendFutureState { |
130 | 127 | buffer: Buffer, |
131 | 128 | } |
132 | 129 |
|
133 | | -impl Future for SendFuture<'_> { |
134 | | - type Output = Result<(), Status>; |
135 | | - |
136 | | - fn poll(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Self::Output> { |
137 | | - let this = Pin::into_inner(self); |
138 | | - |
139 | | - let result = unsafe { |
140 | | - zx_channel_write( |
141 | | - this.shared.channel.get_ref().raw_handle(), |
142 | | - 0, |
143 | | - this.buffer.chunks.as_ptr().cast::<u8>(), |
144 | | - (this.buffer.chunks.len() * CHUNK_SIZE) as u32, |
145 | | - this.buffer.handles.as_ptr().cast(), |
146 | | - this.buffer.handles.len() as u32, |
147 | | - ) |
148 | | - }; |
149 | | - |
150 | | - if result == ZX_OK { |
151 | | - // Handles were written to the channel, so we must not drop them. |
152 | | - unsafe { |
153 | | - this.buffer.handles.set_len(0); |
154 | | - } |
155 | | - Poll::Ready(Ok(())) |
156 | | - } else { |
157 | | - Poll::Ready(Err(Status::from_raw(result))) |
158 | | - } |
159 | | - } |
160 | | -} |
161 | | - |
162 | 130 | /// A channel receiver. |
163 | 131 | pub struct Receiver { |
164 | 132 | shared: Arc<Shared>, |
165 | 133 | } |
166 | 134 |
|
167 | | -/// A channel receive future. |
168 | | -#[must_use = "futures do nothing unless polled"] |
169 | | -pub struct RecvFuture<'r> { |
170 | | - shared: &'r Shared, |
| 135 | +/// The state for a channel receive future. |
| 136 | +pub struct RecvFutureState { |
171 | 137 | buffer: Option<Buffer>, |
172 | 138 | } |
173 | 139 |
|
174 | | -impl Future for RecvFuture<'_> { |
175 | | - type Output = Result<Option<RecvBuffer>, Status>; |
176 | | - |
177 | | - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { |
178 | | - let this = Pin::into_inner(self); |
179 | | - let buffer = this.buffer.as_mut().unwrap(); |
180 | | - |
181 | | - let mut actual_bytes = 0; |
182 | | - let mut actual_handles = 0; |
183 | | - |
184 | | - loop { |
185 | | - let result = unsafe { |
186 | | - zx_channel_read( |
187 | | - this.shared.channel.get_ref().raw_handle(), |
188 | | - 0, |
189 | | - buffer.chunks.as_mut_ptr().cast(), |
190 | | - buffer.handles.as_mut_ptr().cast(), |
191 | | - (buffer.chunks.capacity() * CHUNK_SIZE) as u32, |
192 | | - buffer.handles.capacity() as u32, |
193 | | - &mut actual_bytes, |
194 | | - &mut actual_handles, |
195 | | - ) |
196 | | - }; |
197 | | - |
198 | | - match result { |
199 | | - ZX_OK => { |
200 | | - unsafe { |
201 | | - buffer.chunks.set_len(actual_bytes as usize / CHUNK_SIZE); |
202 | | - buffer.handles.set_len(actual_handles as usize); |
203 | | - } |
204 | | - return Poll::Ready(Ok(Some(RecvBuffer { |
205 | | - buffer: this.buffer.take().unwrap(), |
206 | | - chunks_taken: 0, |
207 | | - handles_taken: 0, |
208 | | - }))); |
209 | | - } |
210 | | - ZX_ERR_PEER_CLOSED => return Poll::Ready(Ok(None)), |
211 | | - ZX_ERR_BUFFER_TOO_SMALL => { |
212 | | - let min_chunks = (actual_bytes as usize).div_ceil(CHUNK_SIZE); |
213 | | - buffer.chunks.reserve(min_chunks - buffer.chunks.capacity()); |
214 | | - buffer.handles.reserve(actual_handles as usize - buffer.handles.capacity()); |
215 | | - } |
216 | | - ZX_ERR_SHOULD_WAIT => { |
217 | | - if matches!(this.shared.channel.need_readable(cx)?, Poll::Pending) { |
218 | | - this.shared.closed_waker.register(cx.waker()); |
219 | | - if this.shared.is_closed.load(Ordering::Relaxed) { |
220 | | - return Poll::Ready(Ok(None)); |
221 | | - } |
222 | | - return Poll::Pending; |
223 | | - } |
224 | | - } |
225 | | - raw => return Poll::Ready(Err(Status::from_raw(raw))), |
226 | | - } |
227 | | - } |
228 | | - } |
229 | | -} |
230 | | - |
231 | 140 | /// A channel receive buffer. |
232 | 141 | pub struct RecvBuffer { |
233 | 142 | buffer: Buffer, |
@@ -311,26 +220,109 @@ impl Transport for Channel { |
311 | 220 |
|
312 | 221 | type Sender = Sender; |
313 | 222 | type SendBuffer = Buffer; |
314 | | - type SendFuture<'s> = SendFuture<'s>; |
| 223 | + type SendFutureState = SendFutureState; |
315 | 224 |
|
316 | 225 | fn acquire(_: &Self::Sender) -> Self::SendBuffer { |
317 | 226 | Buffer::new() |
318 | 227 | } |
319 | 228 |
|
320 | | - fn send(sender: &Self::Sender, buffer: Self::SendBuffer) -> Self::SendFuture<'_> { |
321 | | - SendFuture { shared: &sender.shared, buffer } |
| 229 | + fn begin_send(_: &Self::Sender, buffer: Self::SendBuffer) -> Self::SendFutureState { |
| 230 | + SendFutureState { buffer } |
| 231 | + } |
| 232 | + |
| 233 | + fn poll_send( |
| 234 | + mut future_state: Pin<&mut Self::SendFutureState>, |
| 235 | + _: &mut Context<'_>, |
| 236 | + sender: &Self::Sender, |
| 237 | + ) -> Poll<Result<(), Self::Error>> { |
| 238 | + let result = unsafe { |
| 239 | + zx_channel_write( |
| 240 | + sender.shared.channel.get_ref().raw_handle(), |
| 241 | + 0, |
| 242 | + future_state.buffer.chunks.as_ptr().cast::<u8>(), |
| 243 | + (future_state.buffer.chunks.len() * CHUNK_SIZE) as u32, |
| 244 | + future_state.buffer.handles.as_ptr().cast(), |
| 245 | + future_state.buffer.handles.len() as u32, |
| 246 | + ) |
| 247 | + }; |
| 248 | + |
| 249 | + if result == ZX_OK { |
| 250 | + // Handles were written to the channel, so we must not drop them. |
| 251 | + unsafe { |
| 252 | + future_state.buffer.handles.set_len(0); |
| 253 | + } |
| 254 | + Poll::Ready(Ok(())) |
| 255 | + } else { |
| 256 | + Poll::Ready(Err(Status::from_raw(result))) |
| 257 | + } |
322 | 258 | } |
323 | 259 |
|
324 | 260 | fn close(sender: &Self::Sender) { |
325 | 261 | sender.shared.close(); |
326 | 262 | } |
327 | 263 |
|
328 | 264 | type Receiver = Receiver; |
329 | | - type RecvFuture<'r> = RecvFuture<'r>; |
| 265 | + type RecvFutureState = RecvFutureState; |
330 | 266 | type RecvBuffer = RecvBuffer; |
331 | 267 |
|
332 | | - fn recv(receiver: &mut Self::Receiver) -> Self::RecvFuture<'_> { |
333 | | - RecvFuture { shared: &receiver.shared, buffer: Some(Buffer::new()) } |
| 268 | + fn begin_recv(_: &mut Self::Receiver) -> Self::RecvFutureState { |
| 269 | + RecvFutureState { buffer: Some(Buffer::new()) } |
| 270 | + } |
| 271 | + |
| 272 | + fn poll_recv( |
| 273 | + mut future_state: Pin<&mut Self::RecvFutureState>, |
| 274 | + cx: &mut Context<'_>, |
| 275 | + receiver: &mut Self::Receiver, |
| 276 | + ) -> Poll<Result<Option<Self::RecvBuffer>, Self::Error>> { |
| 277 | + let buffer = future_state.buffer.as_mut().unwrap(); |
| 278 | + |
| 279 | + let mut actual_bytes = 0; |
| 280 | + let mut actual_handles = 0; |
| 281 | + |
| 282 | + loop { |
| 283 | + let result = unsafe { |
| 284 | + zx_channel_read( |
| 285 | + receiver.shared.channel.get_ref().raw_handle(), |
| 286 | + 0, |
| 287 | + buffer.chunks.as_mut_ptr().cast(), |
| 288 | + buffer.handles.as_mut_ptr().cast(), |
| 289 | + (buffer.chunks.capacity() * CHUNK_SIZE) as u32, |
| 290 | + buffer.handles.capacity() as u32, |
| 291 | + &mut actual_bytes, |
| 292 | + &mut actual_handles, |
| 293 | + ) |
| 294 | + }; |
| 295 | + |
| 296 | + match result { |
| 297 | + ZX_OK => { |
| 298 | + unsafe { |
| 299 | + buffer.chunks.set_len(actual_bytes as usize / CHUNK_SIZE); |
| 300 | + buffer.handles.set_len(actual_handles as usize); |
| 301 | + } |
| 302 | + return Poll::Ready(Ok(Some(RecvBuffer { |
| 303 | + buffer: future_state.buffer.take().unwrap(), |
| 304 | + chunks_taken: 0, |
| 305 | + handles_taken: 0, |
| 306 | + }))); |
| 307 | + } |
| 308 | + ZX_ERR_PEER_CLOSED => return Poll::Ready(Ok(None)), |
| 309 | + ZX_ERR_BUFFER_TOO_SMALL => { |
| 310 | + let min_chunks = (actual_bytes as usize).div_ceil(CHUNK_SIZE); |
| 311 | + buffer.chunks.reserve(min_chunks - buffer.chunks.capacity()); |
| 312 | + buffer.handles.reserve(actual_handles as usize - buffer.handles.capacity()); |
| 313 | + } |
| 314 | + ZX_ERR_SHOULD_WAIT => { |
| 315 | + if matches!(receiver.shared.channel.need_readable(cx)?, Poll::Pending) { |
| 316 | + receiver.shared.closed_waker.register(cx.waker()); |
| 317 | + if receiver.shared.is_closed.load(Ordering::Relaxed) { |
| 318 | + return Poll::Ready(Ok(None)); |
| 319 | + } |
| 320 | + return Poll::Pending; |
| 321 | + } |
| 322 | + } |
| 323 | + raw => return Poll::Ready(Err(Status::from_raw(raw))), |
| 324 | + } |
| 325 | + } |
334 | 326 | } |
335 | 327 | } |
336 | 328 |
|
|
0 commit comments