Skip to content

Commit ef7dfef

Browse files
committed
feat(hydro_lang)!: require explicit failure policy for TCP channels
Currently, we only offer `fail_stop` as a policy, which was the implicit default TCP guarantees thus far.
1 parent 771411e commit ef7dfef

File tree

21 files changed

+117
-76
lines changed

21 files changed

+117
-76
lines changed

hydro_lang/src/live_collections/keyed_stream/networking.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ impl<'a, T, L, L2, B: Boundedness, O: Ordering, R: Retries>
6161
where
6262
T: Serialize + DeserializeOwned,
6363
{
64-
self.demux(other, TCP.bincode())
64+
self.demux(other, TCP.fail_stop().bincode())
6565
}
6666

6767
/// Sends each group of this stream to a specific member of a cluster, with the [`MemberId`] key
@@ -182,7 +182,7 @@ impl<'a, K, T, L, L2, B: Boundedness, O: Ordering, R: Retries>
182182
K: Serialize + DeserializeOwned,
183183
T: Serialize + DeserializeOwned,
184184
{
185-
self.demux(other, TCP.bincode())
185+
self.demux(other, TCP.fail_stop().bincode())
186186
}
187187

188188
/// Sends each group of this stream to a specific member of a cluster. The input stream has a
@@ -317,7 +317,7 @@ impl<'a, T, L, L2, B: Boundedness, O: Ordering, R: Retries>
317317
where
318318
T: Serialize + DeserializeOwned,
319319
{
320-
self.demux(other, TCP.bincode())
320+
self.demux(other, TCP.fail_stop().bincode())
321321
}
322322

323323
/// Sends each group of this stream at each source member to a specific member of a destination
@@ -475,7 +475,7 @@ impl<'a, K, V, L, B: Boundedness, O: Ordering, R: Retries>
475475
K: Serialize + DeserializeOwned,
476476
V: Serialize + DeserializeOwned,
477477
{
478-
self.send(other, TCP.bincode())
478+
self.send(other, TCP.fail_stop().bincode())
479479
}
480480

481481
#[expect(clippy::type_complexity, reason = "compound key types with ordering")]

hydro_lang/src/live_collections/stream/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2480,7 +2480,7 @@ mod tests {
24802480
let numbers = first_node.source_iter(q!(0..10));
24812481
let out_port = numbers
24822482
.map(q!(|n| SendOverNetwork { n }))
2483-
.send(&second_node, TCP.bincode())
2483+
.send(&second_node, TCP.fail_stop().bincode())
24842484
.send_bincode_external(&external);
24852485

24862486
let nodes = flow

hydro_lang/src/live_collections/stream/networking.rs

Lines changed: 19 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -126,7 +126,7 @@ impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<T, Process<'a, L>
126126
where
127127
T: Serialize + DeserializeOwned,
128128
{
129-
self.send(other, TCP.bincode())
129+
self.send(other, TCP.fail_stop().bincode())
130130
}
131131

132132
/// "Moves" elements of this stream to a new distributed location by sending them over the network,
@@ -239,7 +239,7 @@ impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<T, Process<'a, L>
239239
where
240240
T: Clone + Serialize + DeserializeOwned,
241241
{
242-
self.broadcast(other, TCP.bincode(), nondet_membership)
242+
self.broadcast(other, TCP.fail_stop().bincode(), nondet_membership)
243243
}
244244

245245
/// Broadcasts elements of this stream to all members of a cluster by sending them over the network,
@@ -429,7 +429,7 @@ impl<'a, T, L, L2, B: Boundedness, O: Ordering, R: Retries>
429429
where
430430
T: Serialize + DeserializeOwned,
431431
{
432-
self.demux(other, TCP.bincode())
432+
self.demux(other, TCP.fail_stop().bincode())
433433
}
434434

435435
/// Sends elements of this stream to specific members of a cluster, identified by a [`MemberId`],
@@ -535,7 +535,7 @@ impl<'a, T, L, B: Boundedness> Stream<T, Process<'a, L>, B, TotalOrder, ExactlyO
535535
where
536536
T: Serialize + DeserializeOwned,
537537
{
538-
self.round_robin(other, TCP.bincode(), nondet_membership)
538+
self.round_robin(other, TCP.fail_stop().bincode(), nondet_membership)
539539
}
540540

541541
/// Distributes elements of this stream to cluster members in a round-robin fashion, using
@@ -672,7 +672,7 @@ impl<'a, T, L, B: Boundedness> Stream<T, Cluster<'a, L>, B, TotalOrder, ExactlyO
672672
where
673673
T: Serialize + DeserializeOwned,
674674
{
675-
self.round_robin(other, TCP.bincode(), nondet_membership)
675+
self.round_robin(other, TCP.fail_stop().bincode(), nondet_membership)
676676
}
677677

678678
/// Distributes elements of this stream to cluster members in a round-robin fashion, using
@@ -816,7 +816,7 @@ impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<T, Cluster<'a, L>
816816
where
817817
T: Serialize + DeserializeOwned,
818818
{
819-
self.send(other, TCP.bincode())
819+
self.send(other, TCP.fail_stop().bincode())
820820
}
821821

822822
/// "Moves" elements of this stream from a cluster to a process by sending them over the network,
@@ -968,7 +968,7 @@ impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<T, Cluster<'a, L>
968968
where
969969
T: Clone + Serialize + DeserializeOwned,
970970
{
971-
self.broadcast(other, TCP.bincode(), nondet_membership)
971+
self.broadcast(other, TCP.fail_stop().bincode(), nondet_membership)
972972
}
973973

974974
/// Broadcasts elements of this stream at each source member to all members of a destination
@@ -1095,7 +1095,7 @@ impl<'a, T, L, L2, B: Boundedness, O: Ordering, R: Retries>
10951095
where
10961096
T: Serialize + DeserializeOwned,
10971097
{
1098-
self.demux(other, TCP.bincode())
1098+
self.demux(other, TCP.fail_stop().bincode())
10991099
}
11001100

11011101
/// Sends elements of this stream at each source member to specific members of a destination
@@ -1182,7 +1182,7 @@ mod tests {
11821182
let (in_send, input) = node.sim_input();
11831183

11841184
let out_recv = input
1185-
.send(&node2, TCP.bincode())
1185+
.send(&node2, TCP.fail_stop().bincode())
11861186
.batch(&node2.tick(), nondet!(/** test */))
11871187
.count()
11881188
.all_ticks()
@@ -1210,7 +1210,7 @@ mod tests {
12101210
let input = cluster.source_iter(q!(vec![1]));
12111211

12121212
let out_recv = input
1213-
.send(&node, TCP.bincode())
1213+
.send(&node, TCP.fail_stop().bincode())
12141214
.entries()
12151215
.batch(&node.tick(), nondet!(/** test */))
12161216
.all_ticks()
@@ -1243,13 +1243,13 @@ mod tests {
12431243

12441244
let out_recv_1 = cluster1
12451245
.source_iter(q!(vec![1]))
1246-
.send(&node, TCP.bincode())
1246+
.send(&node, TCP.fail_stop().bincode())
12471247
.entries()
12481248
.sim_output();
12491249

12501250
let out_recv_2 = cluster2
12511251
.source_iter(q!(vec![2]))
1252-
.send(&node, TCP.bincode())
1252+
.send(&node, TCP.fail_stop().bincode())
12531253
.entries()
12541254
.sim_output();
12551255

@@ -1292,9 +1292,9 @@ mod tests {
12921292
]));
12931293

12941294
let out_recv = input
1295-
.demux(&cluster, TCP.bincode())
1295+
.demux(&cluster, TCP.fail_stop().bincode())
12961296
.map(q!(|x| x + 1))
1297-
.send(&node, TCP.bincode())
1297+
.send(&node, TCP.fail_stop().bincode())
12981298
.entries()
12991299
.sim_output();
13001300

@@ -1320,9 +1320,9 @@ mod tests {
13201320
let input = node.source_iter(q!(vec![123, 456]));
13211321

13221322
let out_recv = input
1323-
.broadcast(&cluster, TCP.bincode(), nondet!(/** test */))
1323+
.broadcast(&cluster, TCP.fail_stop().bincode(), nondet!(/** test */))
13241324
.map(q!(|x| x + 1))
1325-
.send(&node, TCP.bincode())
1325+
.send(&node, TCP.fail_stop().bincode())
13261326
.entries()
13271327
.sim_output();
13281328

@@ -1362,15 +1362,15 @@ mod tests {
13621362
]));
13631363

13641364
let out_recv = input
1365-
.demux(&cluster, TCP.bincode())
1365+
.demux(&cluster, TCP.fail_stop().bincode())
13661366
.map(q!(|x| x + 1))
13671367
.flat_map_ordered(q!(|x| vec![
13681368
(MemberId::from_raw_id(0), x),
13691369
(MemberId::from_raw_id(1), x),
13701370
]))
1371-
.demux(&cluster, TCP.bincode())
1371+
.demux(&cluster, TCP.fail_stop().bincode())
13721372
.entries()
1373-
.send(&node, TCP.bincode())
1373+
.send(&node, TCP.fail_stop().bincode())
13741374
.entries()
13751375
.sim_output();
13761376

hydro_lang/src/location/cluster.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -195,12 +195,12 @@ mod tests {
195195

196196
let out_recv = cluster1
197197
.source_iter(q!(vec![CLUSTER_SELF_ID]))
198-
.send(&node, TCP.bincode())
198+
.send(&node, TCP.fail_stop().bincode())
199199
.values()
200200
.interleave(
201201
cluster2
202202
.source_iter(q!(vec![CLUSTER_SELF_ID]))
203-
.send(&node, TCP.bincode())
203+
.send(&node, TCP.fail_stop().bincode())
204204
.values(),
205205
)
206206
.sim_output();
@@ -229,7 +229,7 @@ mod tests {
229229
.batch(&cluster.tick(), nondet!(/** test */))
230230
.count()
231231
.all_ticks()
232-
.send(&node, TCP.bincode())
232+
.send(&node, TCP.fail_stop().bincode())
233233
.entries()
234234
.map(q!(|(id, v)| (id, v)))
235235
.sim_output();

hydro_lang/src/networking/mod.rs

Lines changed: 32 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -34,11 +34,24 @@ pub enum NoSer {}
3434
#[sealed::sealed]
3535
trait TransportKind {}
3636

37+
#[sealed::sealed]
38+
#[diagnostic::on_unimplemented(
39+
message = "TCP transport requires a failure policy. For example, `TCP.fail_stop()` stops sending messages after a failed connection."
40+
)]
41+
trait TcpFailPolicy {}
42+
43+
/// A TCP failure policy that stops sending messages after a failed connection.
44+
pub enum FailStop {}
45+
#[sealed::sealed]
46+
impl TcpFailPolicy for FailStop {}
47+
3748
/// Send items across a length-delimited TCP channel.
38-
pub enum Tcp {}
49+
pub struct Tcp<F> {
50+
_phantom: PhantomData<F>,
51+
}
3952

4053
#[sealed::sealed]
41-
impl TransportKind for Tcp {}
54+
impl<F: TcpFailPolicy> TransportKind for Tcp<F> {}
4255

4356
/// A networking backend implementation that supports items of type `T`.
4457
#[sealed::sealed]
@@ -78,6 +91,22 @@ impl<Tr: ?Sized, S: ?Sized> NetworkingConfig<Tr, S> {
7891
}
7992
}
8093

94+
impl<S: ?Sized> NetworkingConfig<Tcp<()>, S> {
95+
/// Configures the TCP transport to stop sending messages after a failed connection.
96+
///
97+
/// Note that the Hydro simulator will not simulate connection failures that impact the
98+
/// *liveness* of a program. If an output assertion depends on a `fail_stop` channel
99+
/// making progress, that channel will not experience a failure that would cause the test to
100+
/// block indefinitely. However, any *safety* issues caused by connection failures will still
101+
/// be caught, such as a race condition between a failed connection and some other message.
102+
pub const fn fail_stop(self) -> NetworkingConfig<Tcp<FailStop>, S> {
103+
NetworkingConfig {
104+
name: self.name,
105+
_phantom: (PhantomData, PhantomData),
106+
}
107+
}
108+
}
109+
81110
#[sealed::sealed]
82111
impl<Tr: ?Sized, S: ?Sized, T: ?Sized> NetworkFor<T> for NetworkingConfig<Tr, S>
83112
where
@@ -117,7 +146,7 @@ where
117146
}
118147

119148
/// A network channel that uses length-delimited TCP for transport.
120-
pub const TCP: NetworkingConfig<Tcp, NoSer> = NetworkingConfig {
149+
pub const TCP: NetworkingConfig<Tcp<()>, NoSer> = NetworkingConfig {
121150
name: None,
122151
_phantom: (PhantomData, PhantomData),
123152
};

hydro_std/src/bench_client/mod.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -226,7 +226,7 @@ pub fn aggregate_bench_results<'a, Client: 'a, Aggregator>(
226226
let keyed_throughputs = results
227227
.throughput
228228
.sample_every(q!(Duration::from_millis(interval_millis)), nondet_sampling)
229-
.send(aggregator, TCP.bincode());
229+
.send(aggregator, TCP.fail_stop().bincode());
230230

231231
let latest_throughputs = keyed_throughputs.reduce(q!(
232232
|combined, new| {
@@ -291,7 +291,7 @@ pub fn aggregate_bench_results<'a, Client: 'a, Aggregator>(
291291
histogram: latencies,
292292
}
293293
}))
294-
.send(aggregator, TCP.bincode());
294+
.send(aggregator, TCP.fail_stop().bincode());
295295

296296
let most_recent_histograms = keyed_latencies
297297
.map(q!(|histogram| histogram.histogram.borrow().clone()))

hydro_std/src/compartmentalize.rs

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,9 @@ impl<'a, T, C1, C2, Order: Ordering> PartitionStream<'a, T, C1, C2, Order>
2828
where
2929
T: Clone + Serialize + DeserializeOwned,
3030
{
31-
self.map(dist_policy).demux(other, TCP.bincode()).values()
31+
self.map(dist_policy)
32+
.demux(other, TCP.fail_stop().bincode())
33+
.values()
3234
}
3335
}
3436

@@ -60,7 +62,7 @@ where
6062
MemberId::from_tagless(CLUSTER_SELF_ID.clone().into_tagless()), // this is a seemingly round about way to convert from one member id tag to another.
6163
b
6264
)))
63-
.demux(other, TCP.bincode())
65+
.demux(other, TCP.fail_stop().bincode())
6466
.values();
6567

6668
sent.assume_ordering(
@@ -88,6 +90,6 @@ impl<'a, T, L, B: Boundedness, Order: Ordering>
8890
where
8991
T: Clone + Serialize + DeserializeOwned,
9092
{
91-
self.send(other, TCP.bincode())
93+
self.send(other, TCP.fail_stop().bincode())
9294
}
9395
}

hydro_test/src/cluster/compartmentalized_paxos.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -277,7 +277,7 @@ fn sequence_payload<'a, P: PaxosPayload>(
277277
((slot, ballot), payload)
278278
))))
279279
.all_ticks()
280-
.demux(proxy_leaders, TCP.bincode())
280+
.demux(proxy_leaders, TCP.fail_stop().bincode())
281281
.values()
282282
.atomic(proxy_leader_tick);
283283

@@ -305,7 +305,7 @@ fn sequence_payload<'a, P: PaxosPayload>(
305305
p2as
306306
}))
307307
.end_atomic()
308-
.demux(acceptors, TCP.bincode())
308+
.demux(acceptors, TCP.fail_stop().bincode())
309309
.values();
310310

311311
let (a_log, a_to_proxy_leaders_p2b) = acceptor_p2(
@@ -336,7 +336,7 @@ fn sequence_payload<'a, P: PaxosPayload>(
336336
let pl_failed_p2b_to_proposer = fails
337337
.map(q!(|(_, ballot)| (ballot.proposer_id.clone(), ballot)))
338338
.inspect(q!(|(_, ballot)| println!("Failed P2b: {:?}", ballot)))
339-
.demux(proposers, TCP.bincode())
339+
.demux(proposers, TCP.fail_stop().bincode())
340340
.values();
341341

342342
(

hydro_test/src/cluster/compute_pi.rs

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -29,13 +29,16 @@ pub fn compute_pi<'a>(
2929
)
3030
.all_ticks();
3131

32-
let estimate = trials.send(&process, TCP.bincode()).values().reduce(q!(
33-
|(inside, total), (inside_batch, total_batch)| {
34-
*inside += inside_batch;
35-
*total += total_batch;
36-
},
37-
commutative = ManualProof(/* int addition is commutative */)
38-
));
32+
let estimate = trials
33+
.send(&process, TCP.fail_stop().bincode())
34+
.values()
35+
.reduce(q!(
36+
|(inside, total), (inside_batch, total_batch)| {
37+
*inside += inside_batch;
38+
*total += total_batch;
39+
},
40+
commutative = ManualProof(/* int addition is commutative */)
41+
));
3942

4043
estimate
4144
.sample_every(

hydro_test/src/cluster/many_to_many.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ pub fn many_to_many<'a>(flow: &mut FlowBuilder<'a>) -> Cluster<'a, ()> {
66
.source_iter(q!(0..2))
77
.broadcast(
88
&cluster,
9-
TCP.bincode().name("m2m_broadcast"),
9+
TCP.fail_stop().bincode().name("m2m_broadcast"),
1010
nondet!(/** test */),
1111
)
1212
.entries()

0 commit comments

Comments
 (0)