Skip to content

Commit 6f6b9f1

Browse files
committed
Add Windows Implementation for sync server and client
Adds the windows functionality for PipeListener, PipeConnection, and ClientConnection and does the few other changes required to build and run the example projects. This includes adding feature support to the examples so they wouldn't build the async projects (as the unix specific code hasn't been removed yet). Namedpipes are used as Containerd is one of the main use cases for this project on Windows and containerd only supports namedpipes. Signed-off-by: James Sturtevant <[email protected]>
1 parent e735bb3 commit 6f6b9f1

File tree

23 files changed

+521
-113
lines changed

23 files changed

+521
-113
lines changed

.github/workflows/bvt.yml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ jobs:
66
runs-on: ${{ matrix.os }}
77
strategy:
88
matrix:
9-
os: [ubuntu-latest, macos-latest]
9+
os: [ubuntu-latest, macos-latest, windows-latest]
1010
steps:
1111
- name: Checkout
1212
uses: actions/checkout@v3
@@ -22,7 +22,7 @@ jobs:
2222
runs-on: ${{ matrix.os }}
2323
strategy:
2424
matrix:
25-
os: [ubuntu-latest, macos-latest]
25+
os: [ubuntu-latest, macos-latest, windows-latest]
2626
steps:
2727
- name: Checkout
2828
uses: actions/checkout@v3

Cargo.toml

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,17 +17,22 @@ nix = "0.23.0"
1717
log = "0.4"
1818
byteorder = "1.3.2"
1919
thiserror = "1.0"
20-
2120
async-trait = { version = "0.1.31", optional = true }
2221
tokio = { version = "1", features = ["rt", "sync", "io-util", "macros", "time"], optional = true }
2322
futures = { version = "0.3", optional = true }
2423

24+
[target.'cfg(windows)'.dependencies]
25+
windows-sys = {version = "0.45", features = [ "Win32_Foundation", "Win32_Storage_FileSystem", "Win32_System_IO", "Win32_System_Pipes", "Win32_Security", "Win32_System_Threading"]}
26+
2527
[target.'cfg(any(target_os = "linux", target_os = "android"))'.dependencies]
2628
tokio-vsock = { version = "0.3.1", optional = true }
2729

2830
[build-dependencies]
2931
protobuf-codegen = "3.1.0"
3032

33+
[dev-dependencies]
34+
assert_cmd = "2.0.7"
35+
3136
[features]
3237
default = ["sync"]
3338
async = ["async-trait", "tokio", "futures", "tokio-vsock"]

Makefile

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,13 @@ build: debug
2323

2424
.PHONY: test
2525
test:
26+
ifeq ($OS,Windows_NT)
27+
# async isn't enabled for windows, don't test that feature
28+
cargo test --verbose
29+
else
2630
cargo test --all-features --verbose
27-
31+
endif
32+
2833
.PHONY: check
2934
check:
3035
cargo fmt --all -- --check

example/async-client.rs

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,11 +5,18 @@
55

66
mod protocols;
77
mod utils;
8-
8+
#[cfg(unix)]
99
use protocols::r#async::{agent, agent_ttrpc, health, health_ttrpc};
1010
use ttrpc::context::{self, Context};
11+
#[cfg(unix)]
1112
use ttrpc::r#async::Client;
1213

14+
#[cfg(windows)]
15+
fn main() {
16+
println!("This example only works on Unix-like OSes");
17+
}
18+
19+
#[cfg(unix)]
1320
#[tokio::main(flavor = "current_thread")]
1421
async fn main() {
1522
let c = Client::connect(utils::SOCK_ADDR).unwrap();

example/async-server.rs

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,17 +13,22 @@ use std::sync::Arc;
1313

1414
use log::LevelFilter;
1515

16+
#[cfg(unix)]
1617
use protocols::r#async::{agent, agent_ttrpc, health, health_ttrpc, types};
18+
#[cfg(unix)]
1719
use ttrpc::asynchronous::Server;
1820
use ttrpc::error::{Error, Result};
1921
use ttrpc::proto::{Code, Status};
2022

23+
#[cfg(unix)]
2124
use async_trait::async_trait;
25+
#[cfg(unix)]
2226
use tokio::signal::unix::{signal, SignalKind};
2327
use tokio::time::sleep;
2428

2529
struct HealthService;
2630

31+
#[cfg(unix)]
2732
#[async_trait]
2833
impl health_ttrpc::Health for HealthService {
2934
async fn check(
@@ -58,7 +63,7 @@ impl health_ttrpc::Health for HealthService {
5863
}
5964

6065
struct AgentService;
61-
66+
#[cfg(unix)]
6267
#[async_trait]
6368
impl agent_ttrpc::AgentService for AgentService {
6469
async fn list_interfaces(
@@ -82,6 +87,12 @@ impl agent_ttrpc::AgentService for AgentService {
8287
}
8388
}
8489

90+
#[cfg(windows)]
91+
fn main() {
92+
println!("This example only works on Unix-like OSes");
93+
}
94+
95+
#[cfg(unix)]
8596
#[tokio::main(flavor = "current_thread")]
8697
async fn main() {
8798
simple_logging::log_to_stderr(LevelFilter::Trace);

example/async-stream-client.rs

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,11 +5,18 @@
55

66
mod protocols;
77
mod utils;
8-
8+
#[cfg(unix)]
99
use protocols::r#async::{empty, streaming, streaming_ttrpc};
1010
use ttrpc::context::{self, Context};
11+
#[cfg(unix)]
1112
use ttrpc::r#async::Client;
1213

14+
#[cfg(windows)]
15+
fn main() {
16+
println!("This example only works on Unix-like OSes");
17+
}
18+
19+
#[cfg(unix)]
1320
#[tokio::main(flavor = "current_thread")]
1421
async fn main() {
1522
simple_logging::log_to_stderr(log::LevelFilter::Info);
@@ -48,6 +55,7 @@ fn default_ctx() -> Context {
4855
ctx
4956
}
5057

58+
#[cfg(unix)]
5159
async fn echo_request(cli: streaming_ttrpc::StreamingClient) {
5260
let echo1 = streaming::EchoPayload {
5361
seq: 1,
@@ -59,6 +67,7 @@ async fn echo_request(cli: streaming_ttrpc::StreamingClient) {
5967
assert_eq!(resp.seq, echo1.seq + 1);
6068
}
6169

70+
#[cfg(unix)]
6271
async fn echo_stream(cli: streaming_ttrpc::StreamingClient) {
6372
let mut stream = cli.echo_stream(default_ctx()).await.unwrap();
6473

@@ -81,6 +90,7 @@ async fn echo_stream(cli: streaming_ttrpc::StreamingClient) {
8190
assert!(matches!(ret, Err(ttrpc::Error::Eof)));
8291
}
8392

93+
#[cfg(unix)]
8494
async fn sum_stream(cli: streaming_ttrpc::StreamingClient) {
8595
let mut stream = cli.sum_stream(default_ctx()).await.unwrap();
8696

@@ -108,6 +118,7 @@ async fn sum_stream(cli: streaming_ttrpc::StreamingClient) {
108118
assert_eq!(ssum.num, sum.num);
109119
}
110120

121+
#[cfg(unix)]
111122
async fn divide_stream(cli: streaming_ttrpc::StreamingClient) {
112123
let expected = streaming::Sum {
113124
sum: 392,
@@ -127,6 +138,7 @@ async fn divide_stream(cli: streaming_ttrpc::StreamingClient) {
127138
assert_eq!(actual.num, expected.num);
128139
}
129140

141+
#[cfg(unix)]
130142
async fn echo_null(cli: streaming_ttrpc::StreamingClient) {
131143
let mut stream = cli.echo_null(default_ctx()).await.unwrap();
132144

@@ -142,6 +154,7 @@ async fn echo_null(cli: streaming_ttrpc::StreamingClient) {
142154
assert_eq!(res, empty::Empty::new());
143155
}
144156

157+
#[cfg(unix)]
145158
async fn echo_null_stream(cli: streaming_ttrpc::StreamingClient) {
146159
let stream = cli.echo_null_stream(default_ctx()).await.unwrap();
147160

example/async-stream-server.rs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,15 +10,20 @@ use std::sync::Arc;
1010

1111
use log::{info, LevelFilter};
1212

13+
#[cfg(unix)]
1314
use protocols::r#async::{empty, streaming, streaming_ttrpc};
15+
#[cfg(unix)]
1416
use ttrpc::asynchronous::Server;
1517

18+
#[cfg(unix)]
1619
use async_trait::async_trait;
20+
#[cfg(unix)]
1721
use tokio::signal::unix::{signal, SignalKind};
1822
use tokio::time::sleep;
1923

2024
struct StreamingService;
2125

26+
#[cfg(unix)]
2227
#[async_trait]
2328
impl streaming_ttrpc::Streaming for StreamingService {
2429
async fn echo(
@@ -131,6 +136,12 @@ impl streaming_ttrpc::Streaming for StreamingService {
131136
}
132137
}
133138

139+
#[cfg(windows)]
140+
fn main() {
141+
println!("This example only works on Unix-like OSes");
142+
}
143+
144+
#[cfg(unix)]
134145
#[tokio::main(flavor = "current_thread")]
135146
async fn main() {
136147
simple_logging::log_to_stderr(LevelFilter::Info);

example/build.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ fn main() {
4545
async_all: true,
4646
..Default::default()
4747
})
48-
.rust_protobuf_customize(protobuf_customized.clone())
48+
.rust_protobuf_customize(protobuf_customized)
4949
.run()
5050
.expect("Gen async code failed.");
5151

@@ -75,7 +75,7 @@ fn replace_text_in_file(file_name: &str, from: &str, to: &str) -> Result<(), std
7575

7676
let new_contents = contents.replace(from, to);
7777

78-
let mut dst = File::create(&file_name)?;
78+
let mut dst = File::create(file_name)?;
7979
dst.write(new_contents.as_bytes())?;
8080

8181
Ok(())

example/protocols/mod.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,8 @@
22
//
33
// SPDX-License-Identifier: Apache-2.0
44
//
5-
5+
#[cfg(unix)]
66
pub mod asynchronous;
7-
pub mod sync;
7+
#[cfg(unix)]
88
pub use asynchronous as r#async;
9+
pub mod sync;

example/utils.rs

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,28 @@
11
#![allow(dead_code)]
2-
use std::fs;
32
use std::io::Result;
4-
use std::path::Path;
53

6-
pub const SOCK_ADDR: &str = "unix:///tmp/ttrpc-test";
4+
#[cfg(unix)]
5+
pub const SOCK_ADDR: &str = r"unix:///tmp/ttrpc-test";
76

7+
#[cfg(windows)]
8+
pub const SOCK_ADDR: &str = r"\\.\pipe\ttrpc-test";
9+
10+
#[cfg(unix)]
811
pub fn remove_if_sock_exist(sock_addr: &str) -> Result<()> {
912
let path = sock_addr
1013
.strip_prefix("unix://")
1114
.expect("socket address is not expected");
1215

13-
if Path::new(path).exists() {
14-
fs::remove_file(&path)?;
16+
if std::path::Path::new(path).exists() {
17+
std::fs::remove_file(path)?;
1518
}
1619

1720
Ok(())
1821
}
22+
23+
#[cfg(windows)]
24+
pub fn remove_if_sock_exist(_sock_addr: &str) -> Result<()> {
25+
//todo force close file handle?
26+
27+
Ok(())
28+
}

0 commit comments

Comments
 (0)