Skip to content

Commit e2abfae

Browse files
committed
feat(hydro_test): websocket protocol demo
1 parent d5430c3 commit e2abfae

File tree

15 files changed

+777
-5
lines changed

15 files changed

+777
-5
lines changed

Cargo.lock

Lines changed: 2 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

hydro_lang/src/lib.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,13 @@ fn init_rewrites() {
8888
vec!["tokio_util", "codec", "lines_codec"],
8989
vec!["tokio_util", "codec"],
9090
);
91+
92+
stageleft::add_private_reexport(
93+
vec!["tokio_util", "codec", "bytes_codec"],
94+
vec!["tokio_util", "codec"],
95+
);
96+
97+
stageleft::add_private_reexport(vec!["bytes", "buf", "iter"], vec!["bytes", "buf"]);
9198
}
9299

93100
#[cfg(test)]

hydro_test/Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@ colored = "3.0.0"
1818
palette = "0.7.6"
1919
tokio = "1.29.0"
2020
bytes = "1.1.0"
21+
sha1 = "0.10.6"
22+
base64 = "0.22.1"
2123

2224
# added to workaround `cargo smart-release` https://github.com/Byron/cargo-smart-release/issues/36
2325
example_test = { path = "../example_test", version = "^0.0.0", optional = true }

hydro_test/examples/http_counter.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,9 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
2424
let (port, input, _membership, output_ref) = process
2525
.bidi_external_many_bytes::<_, _, LinesCodec>(&external, NetworkHint::TcpPort(Some(4001)));
2626

27-
output_ref
28-
.complete(hydro_test::external_client::http_counter::http_counter_server(input, &process));
27+
output_ref.complete(
28+
hydro_test::external_client::http::http_counter::http_counter_server(input, &process),
29+
);
2930

3031
// Extract the IR BEFORE the builder is consumed by deployment methods
3132
let built = flow.finalize();

hydro_test/examples/http_hello.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
2525
.bidi_external_many_bytes::<_, _, LinesCodec>(&external, NetworkHint::TcpPort(Some(4000)));
2626

2727
output_ref.complete(
28-
hydro_test::external_client::http_hello::http_hello_server(input).into_keyed_stream(),
28+
hydro_test::external_client::http::http_hello::http_hello_server(input).into_keyed_stream(),
2929
);
3030

3131
// Extract the IR BEFORE the builder is consumed by deployment methods
Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
use bytes::BytesMut;
2+
use clap::Parser;
3+
use dfir_rs::tokio_util::codec::{BytesCodec, LinesCodec};
4+
use hydro_deploy::Deployment;
5+
use hydro_deploy::custom_service::ServerPort;
6+
use hydro_lang::deploy::TrybuildHost;
7+
use hydro_lang::graph::config::GraphConfig;
8+
use hydro_lang::location::{Location, NetworkHint};
9+
10+
#[derive(Parser, Debug)]
11+
struct Args {
12+
#[clap(flatten)]
13+
graph: GraphConfig,
14+
}
15+
16+
#[tokio::main]
17+
async fn main() -> Result<(), Box<dyn std::error::Error>> {
18+
let args = Args::parse();
19+
let mut deployment = Deployment::new();
20+
let flow = hydro_lang::compile::builder::FlowBuilder::new();
21+
22+
let process = flow.process::<()>();
23+
let external = flow.external::<()>();
24+
25+
let (http_port, http_input, _http_membership, http_output_ref) = process
26+
.bidi_external_many_bytes::<_, _, LinesCodec>(&external, NetworkHint::TcpPort(Some(4000)));
27+
28+
http_output_ref.complete(
29+
hydro_test::external_client::http::http_static::http_serve_static(
30+
http_input,
31+
include_str!("./websocket_test_client.html"),
32+
)
33+
.into_keyed_stream(),
34+
);
35+
36+
let (port, input, _membership, output_ref) = process
37+
.bidi_external_many_bytes::<_, BytesMut, BytesCodec>(
38+
&external,
39+
NetworkHint::TcpPort(Some(8080)),
40+
);
41+
42+
output_ref.complete(hydro_test::external_client::websocket::echo::websocket_echo(input));
43+
44+
// Extract the IR BEFORE the builder is consumed by deployment methods
45+
let built = flow.finalize();
46+
47+
// Generate graph visualizations based on command line arguments
48+
built.generate_graph_with_config(&args.graph, None)?;
49+
50+
// Now use the built flow for deployment with optimization
51+
let nodes = built
52+
.with_default_optimize()
53+
.with_process(&process, TrybuildHost::new(deployment.Localhost()))
54+
.with_external(&external, deployment.Localhost())
55+
.deploy(&mut deployment);
56+
57+
deployment.deploy().await.unwrap();
58+
59+
let http_raw_port = nodes.raw_port(http_port);
60+
let http_server_port = http_raw_port.server_port().await;
61+
62+
let raw_port = nodes.raw_port(port);
63+
let server_port = raw_port.server_port().await;
64+
65+
deployment.start().await.unwrap();
66+
67+
let port = if let ServerPort::TcpPort(p) = server_port {
68+
p
69+
} else {
70+
panic!("Expected a TCP port");
71+
};
72+
println!("WebSocket echo server listening on: ws://{}", port);
73+
74+
let http_port = if let ServerPort::TcpPort(p) = http_server_port {
75+
p
76+
} else {
77+
panic!("Expected a TCP port");
78+
};
79+
println!("Browser Demo at: http://{}", http_port);
80+
81+
tokio::signal::ctrl_c().await.unwrap();
82+
Ok(())
83+
}
Lines changed: 203 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,203 @@
1+
<!DOCTYPE html>
2+
<html lang="en">
3+
<head>
4+
<meta charset="UTF-8">
5+
<meta name="viewport" content="width=device-width, initial-scale=1.0">
6+
<title>WebSocket Echo Test Client</title>
7+
<style>
8+
body {
9+
font-family: Arial, sans-serif;
10+
max-width: 800px;
11+
margin: 0 auto;
12+
padding: 20px;
13+
}
14+
.container {
15+
border: 1px solid #ccc;
16+
border-radius: 8px;
17+
padding: 20px;
18+
margin: 10px 0;
19+
}
20+
.status {
21+
padding: 10px;
22+
border-radius: 4px;
23+
margin: 10px 0;
24+
}
25+
.connected { background-color: #d4edda; color: #155724; }
26+
.disconnected { background-color: #f8d7da; color: #721c24; }
27+
.connecting { background-color: #fff3cd; color: #856404; }
28+
#messages {
29+
height: 300px;
30+
overflow-y: auto;
31+
border: 1px solid #ddd;
32+
padding: 10px;
33+
background-color: #f8f9fa;
34+
font-family: monospace;
35+
white-space: pre-wrap;
36+
}
37+
input[type="text"] {
38+
width: 70%;
39+
padding: 8px;
40+
margin: 5px;
41+
}
42+
button {
43+
padding: 8px 16px;
44+
margin: 5px;
45+
cursor: pointer;
46+
}
47+
.controls {
48+
margin: 10px 0;
49+
}
50+
</style>
51+
</head>
52+
<body>
53+
<h1>🌊 Hydro WebSocket Echo Test Client</h1>
54+
55+
<div class="container">
56+
<h3>Connection</h3>
57+
<div>
58+
<label>WebSocket URL:</label>
59+
<input type="text" id="wsUrl" value="ws://localhost:8080" placeholder="ws://localhost:8080">
60+
<button onclick="connect()">Connect</button>
61+
<button onclick="disconnect()">Disconnect</button>
62+
</div>
63+
<div id="status" class="status disconnected">Disconnected</div>
64+
</div>
65+
66+
<div class="container">
67+
<h3>Send Messages</h3>
68+
<div class="controls">
69+
<input type="text" id="messageInput" placeholder="Type your message here..." onkeypress="handleKeyPress(event)">
70+
<button onclick="sendMessage()">Send Text</button>
71+
<button onclick="sendBinary()">Send Binary</button>
72+
</div>
73+
</div>
74+
75+
<div class="container">
76+
<h3>Messages</h3>
77+
<div id="messages"></div>
78+
<button onclick="clearMessages()">Clear Messages</button>
79+
</div>
80+
81+
<script>
82+
let ws = null;
83+
let messageCount = 0;
84+
85+
function updateStatus(message, className) {
86+
const status = document.getElementById('status');
87+
status.textContent = message;
88+
status.className = `status ${className}`;
89+
}
90+
91+
function addMessage(message, type = 'info') {
92+
const messages = document.getElementById('messages');
93+
const timestamp = new Date().toLocaleTimeString();
94+
const prefix = type === 'sent' ? '→ SENT' :
95+
type === 'received' ? '← RECV' :
96+
type === 'error' ? '✗ ERROR' :
97+
type === 'info' ? 'ℹ INFO' : '• ';
98+
99+
messages.textContent += `[${timestamp}] ${prefix}: ${message}\n`;
100+
messages.scrollTop = messages.scrollHeight;
101+
}
102+
103+
function connect() {
104+
const url = document.getElementById('wsUrl').value;
105+
106+
if (ws && ws.readyState === WebSocket.OPEN) {
107+
addMessage('Already connected', 'info');
108+
return;
109+
}
110+
111+
updateStatus('Connecting...', 'connecting');
112+
addMessage(`Connecting to ${url}`, 'info');
113+
114+
ws = new WebSocket(url);
115+
116+
ws.onopen = function(event) {
117+
updateStatus('Connected', 'connected');
118+
addMessage('WebSocket connection established', 'info');
119+
};
120+
121+
ws.onmessage = function(event) {
122+
messageCount++;
123+
if (event.data instanceof Blob) {
124+
// Handle binary data
125+
event.data.text().then(text => {
126+
addMessage(`Binary message: ${text}`, 'received');
127+
});
128+
} else {
129+
addMessage(event.data, 'received');
130+
}
131+
};
132+
133+
ws.onclose = function(event) {
134+
updateStatus('Disconnected', 'disconnected');
135+
addMessage(`Connection closed (code: ${event.code}, reason: ${event.reason})`, 'info');
136+
ws = null;
137+
};
138+
139+
ws.onerror = function(error) {
140+
updateStatus('Error', 'disconnected');
141+
addMessage(`WebSocket error: ${error}`, 'error');
142+
};
143+
}
144+
145+
function disconnect() {
146+
if (ws) {
147+
ws.close();
148+
addMessage('Disconnecting...', 'info');
149+
} else {
150+
addMessage('Not connected', 'info');
151+
}
152+
}
153+
154+
function sendMessage() {
155+
const input = document.getElementById('messageInput');
156+
const message = input.value.trim();
157+
158+
if (!message) {
159+
addMessage('Please enter a message', 'error');
160+
return;
161+
}
162+
163+
if (!ws || ws.readyState !== WebSocket.OPEN) {
164+
addMessage('Not connected to WebSocket server', 'error');
165+
return;
166+
}
167+
168+
ws.send(message);
169+
addMessage(message, 'sent');
170+
input.value = '';
171+
}
172+
173+
function sendBinary() {
174+
if (!ws || ws.readyState !== WebSocket.OPEN) {
175+
addMessage('Not connected to WebSocket server', 'error');
176+
return;
177+
}
178+
179+
// Send some binary data
180+
const binaryData = new Uint8Array([72, 101, 108, 108, 111, 32, 66, 105, 110, 97, 114, 121]); // "Hello Binary"
181+
ws.send(binaryData);
182+
addMessage('Binary data: [72, 101, 108, 108, 111, 32, 66, 105, 110, 97, 114, 121] ("Hello Binary")', 'sent');
183+
}
184+
185+
function clearMessages() {
186+
document.getElementById('messages').textContent = '';
187+
messageCount = 0;
188+
}
189+
190+
function handleKeyPress(event) {
191+
if (event.key === 'Enter') {
192+
sendMessage();
193+
}
194+
}
195+
196+
// Auto-connect on page load
197+
window.onload = function() {
198+
addMessage('WebSocket Echo Test Client loaded', 'info');
199+
addMessage('Click "Connect" to start testing the Hydro WebSocket server', 'info');
200+
};
201+
</script>
202+
</body>
203+
</html>
File renamed without changes.
File renamed without changes.
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
use hydro_lang::live_collections::keyed_singleton::{BoundedValue, KeyedSingleton};
2+
use hydro_lang::live_collections::stream::TotalOrder;
3+
use hydro_lang::prelude::*;
4+
5+
pub fn http_serve_static<'a, P>(
6+
in_stream: KeyedStream<u64, String, Process<'a, P>, Unbounded, TotalOrder>,
7+
content: &'static str,
8+
) -> KeyedSingleton<u64, String, Process<'a, P>, BoundedValue> {
9+
in_stream
10+
.fold_early_stop(
11+
q!(|| String::new()),
12+
q!(|buffer, line| {
13+
buffer.push_str(&line);
14+
buffer.push_str("\r\n");
15+
16+
// Check if this is an empty line (end of HTTP headers)
17+
line.trim().is_empty()
18+
}),
19+
)
20+
.map(q!(move |_| {
21+
format!(
22+
"HTTP/1.1 200 OK\r\n\
23+
Content-Type: text/html; charset=utf-8\r\n\
24+
Content-Length: {}\r\n\
25+
Connection: close\r\n\
26+
\r\n\
27+
{}",
28+
content.len(),
29+
content
30+
)
31+
}))
32+
}

0 commit comments

Comments
 (0)