Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -31,3 +31,15 @@ jobs:
- name: Test doc
run: cargo test --locked --all-features --doc

examples:
runs-on: ubuntu-latest
name: ubuntu / examples
steps:
- uses: actions/checkout@v5
- name: Install stable
uses: dtolnay/rust-toolchain@5d458579430fc14a04a08a1e7d3694f545e91ce6
with:
toolchain: stable
- name: Test axum-server-unary example
working-directory: ./examples/axum-server-unary
run: cargo test --locked --all-features --verbose
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[workspace]
resolver = "2"
members = [ "connectrpc" , "connectrpc-build", "examples/axum-server", "examples/reqwest-client"]
members = [ "connectrpc" , "connectrpc-build", "examples/axum-server-unary", "examples/reqwest-client"]

[workspace.package]
edition = "2024"
Expand Down
20 changes: 12 additions & 8 deletions connectrpc/src/server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,15 @@ impl CommonServer {
/// This parses only headers so if the request is not valid,
/// the body doesn't need to be read.
pub fn parse_unary_headers(&self, headers: &HeaderMap) -> Result<Codec> {
let connect_version = headers.get(CONNECT_PROTOCOL_VERSION).ok_or_else(|| {
Error::invalid_request(format!("missing {CONNECT_PROTOCOL_VERSION} header"))
})?;
if connect_version != CONNECT_PROTOCOL_VERSION_1 {
return Err(Error::invalid_request(format!(
"unsupported {CONNECT_PROTOCOL_VERSION} version: {connect_version:?}"
// Do not require version to be specified: https://connectrpc.com/docs/curl-and-other-clients#curl
let version = headers
.get(CONNECT_PROTOCOL_VERSION)
.cloned()
.unwrap_or(CONNECT_PROTOCOL_VERSION_1);
if version != CONNECT_PROTOCOL_VERSION_1 {
return Err(Error::unsupported_media_type(format!(
"unsupported connect-protocol-version version: {:?}",
version
)));
}

Expand Down Expand Up @@ -146,14 +149,15 @@ mod tests {
}

#[test]
fn test_parse_unary_headers_missing_version() {
fn test_parse_unary_headers_wrong_version() {
let srv = CommonServer::new();
let mut headers = HeaderMap::new();
headers.insert(CONTENT_TYPE, "application/json".parse().unwrap());
headers.insert(CONNECT_PROTOCOL_VERSION, "2".parse().unwrap());
let err = srv.parse_unary_headers(&headers).unwrap_err();
assert!(
err.to_string()
.contains("missing connect-protocol-version header")
.contains("unsupported connect-protocol-version version")
);
}

Expand Down
87 changes: 87 additions & 0 deletions examples/axum-reqwest/src/main.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
use axum_reqwest::{
client,
};
use connectrpc::{Error, Result, UnaryRequest, UnaryResponse, http::Uri};
use std::{
collections::BTreeMap, str::FromStr, sync::{Arc, Mutex}
};

const ADDR: &str = "127.0.0.1:50051";

#[derive(Clone, Debug)]
struct State {
cache: Arc<Mutex<BTreeMap<String, String>>>,
}

async fn say_hello(
state: State,
req: UnaryRequest<HelloRequest>,
) -> Result<UnaryResponse<HelloResponse>> {
let name = req
.into_message()
.name
.ok_or_else(|| Error::internal("Name is required"))?;

let message = {
let mut cache = state.cache.lock().unwrap();
match cache.get(&name) {
Some(cached_message) => cached_message.clone(),
None => {
let message = format!("Hello, {}!", name);
cache.insert(name.clone(), message.clone());
message
}
}
};

let response = HelloResponse { message };
Ok(UnaryResponse::new(response))
}

async fn start_server() {
// Create the Axum router with the generated server and your handler
//
// There is a big reason why we don't use `axum::Router::new()` directly, but rather
// as using the generated server struct:
// 1. You can swap handlers easily, without using a single struct that implements
// the whole service trait.
//
// 2. It issues a compile-time error when you re-generate the code and forget to
// implement a new method.
let router = axum_reqwest::server::HelloWorldServiceAxumServer {
// The server uses fields to store state and handlers
// Store the state directly in the server struct
state: State {
cache: Arc::new(Mutex::new(BTreeMap::new())),
},
// Provide the handler function for the SayHello RPC
say_hello,
}
.into_router();

let listener = tokio::net::TcpListener::bind("127.0.0.1:50051")
.await
.unwrap();

println!("Axum server listening on 127.0.0.1:50051");
axum::serve(listener, router).await.unwrap();
}

#[tokio::main]
async fn main() {
let server_handle = tokio::spawn(async {
start_server().await;
});

let reqwest = reqwest::Client::new(); // Create reqwest client. You can customize it as needed.
let json_client = client::HelloWorldServiceReqwestJsonClient::new(
reqwest,
Uri::from_str(&format!("http://{}", ADDR)).unwrap(),
)
.expect("Failed to create client");

json_client.say_hello(UnaryRequest::new())

// Wait for the server to finish
let _ = server_handle.await;
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[package]
name = "axum-server"
name = "axum-server-unary"
edition.workspace = true
version.workspace = true
authors.workspace = true
Expand All @@ -12,7 +12,7 @@ serde = { version = "1", features = ["derive"] }
connectrpc = { path = "../../connectrpc", features = ["axum"] }
axum = { version = "0.8" }
prost = "0.14"
tokio = { version = "1", features = ["rt-multi-thread", "macros", "net"] }
tokio = { version = "1", features = ["full"] }

[build-dependencies]
connectrpc-build = { path = "../../connectrpc-build" }
Expand Down
File renamed without changes.
File renamed without changes.
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use axum_server::{HelloRequest, HelloResponse};
use axum_server_unary::{HelloRequest, HelloResponse};
use connectrpc::{Error, Result, UnaryRequest, UnaryResponse};
use std::{
collections::BTreeMap,
Expand Down Expand Up @@ -46,7 +46,7 @@ async fn main() {
//
// 2. It issues a compile-time error when you re-generate the code and forget to
// implement a new method.
let router = axum_server::HelloWorldServiceAxumServer {
let router = axum_server_unary::HelloWorldServiceAxumServer {
// The server uses fields to store state and handlers
// Store the state directly in the server struct
state: State {
Expand All @@ -61,5 +61,23 @@ async fn main() {
.await
.unwrap();

axum::serve(listener, router).await.unwrap();
let sh = tokio::spawn(async move {
println!("Axum server listening on 127.0.0.1:50051");
axum::serve(listener, router).await.unwrap();
});

let mut cmd = tokio::process::Command::new("curl")
.arg("-X")
.arg("POST")
.arg("--header")
.arg("Content-Type: application/json")
.arg("-d")
.arg("{\"name\":\"World\"}")
.arg("http://127.0.0.1:50051/hello.HelloWorldService/SayHello")
.spawn()
.expect("Failed to start curl");

assert!(cmd.wait().await.expect("Failed to wait on curl").success());

sh.abort();
}