You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
Copy file name to clipboardExpand all lines: docs/docs/hydro/learn/quickstart/partitioned-counter.mdx
+4-4Lines changed: 4 additions & 4 deletions
Display the source diff
Display the rich diff
Original file line number
Diff line number
Diff line change
@@ -83,7 +83,7 @@ let sharded_increment_requests = increment_requests
83
83
key.hash(&mut hasher);
84
84
MemberId::from_raw_id(hasher.finish() as u32 % 5)
85
85
}))
86
-
.demux(shard_servers, TCP.bincode());
86
+
.demux(shard_servers, TCP.fail_stop().bincode());
87
87
`)}</CodeBlock>
88
88
89
89
The `prefix_key` method adds a new key to the front of the keyed stream. Here, you compute a `MemberId` by hashing the key name and taking modulo 5 (assuming 5 shards). The result is a stream keyed by `(MemberId, client_id, key_name)`.
@@ -109,15 +109,15 @@ let sharded_increment_requests = increment_requests
109
109
key.hash(&mut hasher);
110
110
MemberId::from_raw_id(hasher.finish() as u32 % 5)
111
111
}))
112
-
.demux(shard_servers, TCP.bincode());
112
+
.demux(shard_servers, TCP.fail_stop().bincode());
113
113
114
114
let sharded_get_requests = get_requests
115
115
.prefix_key(q!(|(_client, key)| {
116
116
let mut hasher = DefaultHasher::new();
117
117
key.hash(&mut hasher);
118
118
MemberId::from_raw_id(hasher.finish() as u32 % 5)
119
119
}))
120
-
.demux(shard_servers, TCP.bincode());
120
+
.demux(shard_servers, TCP.fail_stop().bincode());
121
121
`), [7, 15])}</CodeBlock>
122
122
123
123
After `demux`, the stream is now located at the cluster. The stream returned by a demux operator preserves the remaining keys after the `MemberId` is consumed for routing. In this case, we are left with a `KeyedStream<u32, String, Cluster<'a, CounterShard>>` - a stream keyed by client ID and key name, located on the cluster.
@@ -143,7 +143,7 @@ Finally, you need to send the responses back to the leader process:
The ordering of a stream determines which APIs are available on it. For example, `map` and `filter` are available on all streams, but `last` is only available on streams with `TotalOrder`. This ensures that even when the network introduces non-determinism, the program will not compile if it tries to use an API that requires a deterministic order.
@@ -82,7 +82,7 @@ let process: Process<()> = flow.process::<()>();
82
82
let all_words: Stream<_, Process<_>, Unbounded, NoOrder> = workers
83
83
.source_iter(q!(vec!["hello", "world"]))
84
84
.map(q!(|x| x.to_string()))
85
-
.send(&process, TCP.bincode())
85
+
.send(&process, TCP.fail_stop().bincode())
86
86
.values();
87
87
88
88
let words_concat = all_words
@@ -109,7 +109,7 @@ To perform an aggregation with an unordered stream, you must add a **property an
109
109
# let all_words: Stream<_, Process<_>, _, hydro_lang::live_collections::stream::NoOrder> = workers
0 commit comments