advance-practice/bridging-with-sync #900
Replies: 2 comments 2 replies
-
我才意识到运行时是runtime... |
Beta Was this translation helpful? Give feedback.
0 replies
-
发现个奇怪的问题, 用block请求chunk的http, 中间就断了。 如果用单纯的异步是可以的, 谁知道怎么回事吗? use std::sync::mpsc::{self, Sender};
use std::thread;
use futures_util::{StreamExt, Stream, TryStreamExt};
use http::{request::Builder, Method};
use hyper::body::HttpBody;
use hyper::{Body};
use hyper::client::Client;
use serde_derive::{Serialize, Deserialize};
use http::header::CONTENT_TYPE;
use futures_util::future::FutureExt;
use tokio::sync::mpsc as tokio_mpsc;
/**
cargo.toml文件:
[package]
name = "stream-test"
version = "0.1.0"
edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
hyper = { version = "0.14", features = [ "full" ]}
tokio = { version = "1", features = ["full"] }
http = "0.2"
serde = "1.0"
serde_json = "1.0"
base64 = "0.13"
serde_derive = "1.0"
futures-util = "0.3"
*/
fn main() {
// println!("{:?}", response.body());
// let (tx, mut rx) = tokio_mpsc::channel::<String>(1000);
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap();
let handle = thread::spawn(move || {
rt.block_on(async {
let client = Client::new();
let req_builder = Builder::new().method(Method::POST).header("X-Registry-Auth", base64_url_encode(&get_payload()));
let req = req_builder
.uri("http://127.0.0.1:2375/v1.40/images/create?fromImage=ibmcom%2Fhelloworld&fromSrc=&repo=&tag=latest&platform=")
.header(CONTENT_TYPE, "application/json")
.body(Body::empty()).unwrap();
let mut response = client.request(req).await.unwrap();
// let mut response = client.request(req).await.unwrap();
let body = response.body_mut();
let mut data = body.data().into_stream();
while let Some(data) = data.next().await {
println!("aaaaaaaaaaaaaaaaaaaaaaa: {:?}", data);
if let Some(Ok(d)) = data {
let string = String::from_utf8(d.into_iter().collect::<Vec<u8>>()).unwrap_or_default();
println!("{}", string);
// tx.send(string);
} else {
println!("data_stream next value {:?}", data);
}
}
});
});
// for x in tokio::runtime::Builder::new_current_thread().enable_io().build().unwrap().block_on(rx.recv()) {
// println!("x: {}", x);
// }
handle.join().unwrap();
} |
Beta Was this translation helpful? Give feedback.
2 replies
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Uh oh!
There was an error while loading. Please reload this page.
Uh oh!
There was an error while loading. Please reload this page.
-
advance-practice/bridging-with-sync
https://course.rs/advance-practice/bridging-with-sync.html
Beta Was this translation helpful? Give feedback.
All reactions