Skip to content

Commit e3af23b

Browse files
committed
Merge remote-tracking branch 'upstream/master' into style/whitespaces
2 parents 6a61b62 + 4f9d560 commit e3af23b

File tree

7 files changed

+72
-33
lines changed

7 files changed

+72
-33
lines changed

.github/workflows/beam_PostCommit_Python_ValidatesContainer_Dataflow_With_RC.yml

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ jobs:
5656
github.event_name == 'pull_request_target' ||
5757
startsWith(github.event.comment.body, 'Run Python RC Dataflow ValidatesContainer')
5858
runs-on: [self-hosted, ubuntu-20.04, main]
59-
timeout-minutes: 100
59+
timeout-minutes: 300
6060
name: ${{ matrix.job_name }} (${{ matrix.job_phrase }} ${{ matrix.python_version }})
6161
strategy:
6262
fail-fast: false
@@ -74,6 +74,8 @@ jobs:
7474
github_job: ${{ matrix.job_name }} (${{ matrix.job_phrase }} ${{ matrix.python_version }})
7575
- name: Install libsnappy-dev
7676
run: sudo apt-get update && sudo apt-get install -y libsnappy-dev
77+
- name: Install libpq-dev
78+
run: sudo apt-get update && sudo apt-get install -y libpq-dev
7779
- name: Setup environment
7880
uses: ./.github/actions/setup-environment-action
7981
with:

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919

2020
# Apache Beam
2121

22-
[Apache Beam](http://beam.apache.org/) is a unified model for defining both batch and streaming data-parallel processing pipelines, as well as a set of language-specific SDKs for constructing pipelines and Runners for executing them on distributed processing backends, including [Apache Flink](http://flink.apache.org/), [Apache Spark](http://spark.apache.org/), [Google Cloud Dataflow](http://cloud.google.com/dataflow/), and [Hazelcast Jet](https://jet.hazelcast.org/).
22+
[Apache Beam](http://beam.apache.org/) is a unified model for defining both batch and streaming data-parallel processing pipelines, as well as a set of language-specific SDKs for constructing pipelines and Runners for executing them on distributed processing backends, including [Apache Flink](http://flink.apache.org/), [Apache Spark](http://spark.apache.org/), [Google Cloud Dataflow](http://cloud.google.com/dataflow/), and [Hazelcast Jet](https://hazelcast.com/).
2323

2424
## Status
2525

runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcCommitWorkStreamTest.java

Lines changed: 62 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,9 @@
2626
import static org.junit.Assert.assertTrue;
2727

2828
import java.io.IOException;
29+
import java.util.HashMap;
2930
import java.util.HashSet;
31+
import java.util.Map;
3032
import java.util.Set;
3133
import java.util.concurrent.CountDownLatch;
3234
import java.util.concurrent.ExecutionException;
@@ -212,12 +214,21 @@ public void testCommitWorkItem_retryOnNewStream() throws Exception {
212214
}
213215
Windmill.StreamingCommitWorkRequest request = streamInfo.requests.take();
214216
assertEquals(5, request.getCommitChunkCount());
215-
for (int i = 0; i < 5; ++i) {
216-
assertEquals(i + 1, request.getCommitChunk(i).getRequestId());
217-
Windmill.WorkItemCommitRequest parsedRequest =
218-
Windmill.WorkItemCommitRequest.parseFrom(
219-
request.getCommitChunk(i).getSerializedWorkItemCommit());
220-
assertEquals(parsedRequest.getWorkToken(), i);
217+
{
218+
// Check if request ids and work tokens match.
219+
Map<Long, Long> requestIdWorkTokenMap = new HashMap<>();
220+
Map<Long, Long> expectedRequestIdWorkTokenMap = new HashMap<>();
221+
for (int i = 0; i < 5; ++i) {
222+
Windmill.WorkItemCommitRequest parsedRequest =
223+
Windmill.WorkItemCommitRequest.parseFrom(
224+
request.getCommitChunk(i).getSerializedWorkItemCommit());
225+
requestIdWorkTokenMap.put(
226+
request.getCommitChunk(i).getRequestId(), parsedRequest.getWorkToken());
227+
}
228+
for (int i = 1; i <= 5; ++i) {
229+
expectedRequestIdWorkTokenMap.put((long) i, (long) (i - 1));
230+
}
231+
assertThat(requestIdWorkTokenMap).containsExactlyEntriesIn(expectedRequestIdWorkTokenMap);
221232
}
222233
// Send back that 1 and 5 finished.
223234
streamInfo.responseObserver.onNext(
@@ -232,12 +243,21 @@ public void testCommitWorkItem_retryOnNewStream() throws Exception {
232243
waitForConnectionAndConsumeHeader();
233244
Windmill.StreamingCommitWorkRequest reconnectRequest = reconnectStreamInfo.requests.take();
234245
assertEquals(3, reconnectRequest.getCommitChunkCount());
235-
for (int i = 0; i < 3; ++i) {
236-
assertEquals(i + 2, reconnectRequest.getCommitChunk(i).getRequestId());
237-
Windmill.WorkItemCommitRequest parsedRequest =
238-
Windmill.WorkItemCommitRequest.parseFrom(
239-
reconnectRequest.getCommitChunk(i).getSerializedWorkItemCommit());
240-
assertEquals(i + 1, parsedRequest.getWorkToken());
246+
{
247+
// Check if request ids and work tokens match.
248+
Map<Long, Long> requestIdWorkTokenMap = new HashMap<>();
249+
Map<Long, Long> expectedRequestIdWorkTokenMap = new HashMap<>();
250+
for (int i = 0; i < 3; ++i) {
251+
Windmill.WorkItemCommitRequest parsedRequest =
252+
Windmill.WorkItemCommitRequest.parseFrom(
253+
reconnectRequest.getCommitChunk(i).getSerializedWorkItemCommit());
254+
requestIdWorkTokenMap.put(
255+
reconnectRequest.getCommitChunk(i).getRequestId(), parsedRequest.getWorkToken());
256+
}
257+
for (int i = 2; i <= 4; ++i) {
258+
expectedRequestIdWorkTokenMap.put((long) i, (long) (i - 1));
259+
}
260+
assertThat(requestIdWorkTokenMap).containsExactlyEntriesIn(expectedRequestIdWorkTokenMap);
241261
}
242262
// Send back that 2 and 3 finished.
243263
reconnectStreamInfo.responseObserver.onNext(
@@ -281,12 +301,21 @@ public void testCommitWorkItem_retryOnNewStreamHalfClose() throws Exception {
281301
}
282302
Windmill.StreamingCommitWorkRequest request = streamInfo.requests.take();
283303
assertEquals(5, request.getCommitChunkCount());
284-
for (int i = 0; i < 5; ++i) {
285-
assertEquals(i + 1, request.getCommitChunk(i).getRequestId());
286-
Windmill.WorkItemCommitRequest parsedRequest =
287-
Windmill.WorkItemCommitRequest.parseFrom(
288-
request.getCommitChunk(i).getSerializedWorkItemCommit());
289-
assertEquals(parsedRequest.getWorkToken(), i);
304+
{
305+
// Check if request ids and work tokens match.
306+
Map<Long, Long> requestIdWorkTokenMap = new HashMap<>();
307+
Map<Long, Long> expectedRequestIdWorkTokenMap = new HashMap<>();
308+
for (int i = 0; i < 5; ++i) {
309+
Windmill.WorkItemCommitRequest parsedRequest =
310+
Windmill.WorkItemCommitRequest.parseFrom(
311+
request.getCommitChunk(i).getSerializedWorkItemCommit());
312+
requestIdWorkTokenMap.put(
313+
request.getCommitChunk(i).getRequestId(), parsedRequest.getWorkToken());
314+
}
315+
for (int i = 1; i <= 5; ++i) {
316+
expectedRequestIdWorkTokenMap.put((long) i, (long) (i - 1));
317+
}
318+
assertThat(requestIdWorkTokenMap).containsExactlyEntriesIn(expectedRequestIdWorkTokenMap);
290319
}
291320
// Half-close the logical stream. This shouldn't prevent reconnection of the physical stream
292321
// from succeeding.
@@ -310,12 +339,21 @@ public void testCommitWorkItem_retryOnNewStreamHalfClose() throws Exception {
310339

311340
Windmill.StreamingCommitWorkRequest reconnectRequest = reconnectStreamInfo.requests.take();
312341
assertEquals(3, reconnectRequest.getCommitChunkCount());
313-
for (int i = 0; i < 3; ++i) {
314-
assertEquals(i + 2, reconnectRequest.getCommitChunk(i).getRequestId());
315-
Windmill.WorkItemCommitRequest parsedRequest =
316-
Windmill.WorkItemCommitRequest.parseFrom(
317-
reconnectRequest.getCommitChunk(i).getSerializedWorkItemCommit());
318-
assertEquals(i + 1, parsedRequest.getWorkToken());
342+
{
343+
// Check if request ids and work tokens match.
344+
Map<Long, Long> requestIdWorkTokenMap = new HashMap<>();
345+
Map<Long, Long> expectedRequestIdWorkTokenMap = new HashMap<>();
346+
for (int i = 0; i < 3; ++i) {
347+
Windmill.WorkItemCommitRequest parsedRequest =
348+
Windmill.WorkItemCommitRequest.parseFrom(
349+
reconnectRequest.getCommitChunk(i).getSerializedWorkItemCommit());
350+
requestIdWorkTokenMap.put(
351+
reconnectRequest.getCommitChunk(i).getRequestId(), parsedRequest.getWorkToken());
352+
}
353+
for (int i = 2; i <= 4; ++i) {
354+
expectedRequestIdWorkTokenMap.put((long) i, (long) (i - 1));
355+
}
356+
assertThat(requestIdWorkTokenMap).containsExactlyEntriesIn(expectedRequestIdWorkTokenMap);
319357
}
320358
assertNull(streamInfo.onDone.get());
321359

sdks/go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ require (
4646
github.com/johannesboyne/gofakes3 v0.0.0-20250106100439-5c39aecd6999
4747
github.com/lib/pq v1.10.9
4848
github.com/linkedin/goavro/v2 v2.14.0
49-
github.com/nats-io/nats-server/v2 v2.11.5
49+
github.com/nats-io/nats-server/v2 v2.11.6
5050
github.com/nats-io/nats.go v1.43.0
5151
github.com/proullon/ramsql v0.1.4
5252
github.com/spf13/cobra v1.9.1

sdks/go.sum

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1323,8 +1323,8 @@ github.com/morikuni/aec v1.0.0 h1:nP9CBfwrvYnBRgY6qfDQkygYDmYwOilePFkwzv4dU8A=
13231323
github.com/morikuni/aec v1.0.0/go.mod h1:BbKIizmSmc5MMPqRYbxO4ZU0S0+P200+tUnFx7PXmsc=
13241324
github.com/nats-io/jwt/v2 v2.7.4 h1:jXFuDDxs/GQjGDZGhNgH4tXzSUK6WQi2rsj4xmsNOtI=
13251325
github.com/nats-io/jwt/v2 v2.7.4/go.mod h1:me11pOkwObtcBNR8AiMrUbtVOUGkqYjMQZ6jnSdVUIA=
1326-
github.com/nats-io/nats-server/v2 v2.11.5 h1:yxwFASM5VrbHky6bCCame6g6fXZaayLoh7WFPWU9EEg=
1327-
github.com/nats-io/nats-server/v2 v2.11.5/go.mod h1:2xoztlcb4lDL5Blh1/BiukkKELXvKQ5Vy29FPVRBUYs=
1326+
github.com/nats-io/nats-server/v2 v2.11.6 h1:4VXRjbTUFKEB+7UoaKL3F5Y83xC7MxPoIONOnGgpkHw=
1327+
github.com/nats-io/nats-server/v2 v2.11.6/go.mod h1:2xoztlcb4lDL5Blh1/BiukkKELXvKQ5Vy29FPVRBUYs=
13281328
github.com/nats-io/nats.go v1.43.0 h1:uRFZ2FEoRvP64+UUhaTokyS18XBCR/xM2vQZKO4i8ug=
13291329
github.com/nats-io/nats.go v1.43.0/go.mod h1:iRWIPokVIFbVijxuMQq4y9ttaBTMe0SFdlZfMDd+33g=
13301330
github.com/nats-io/nkeys v0.4.11 h1:q44qGV008kYd9W1b1nEBkNzvnWxtRSQ7A8BoqRrcfa0=

sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -494,11 +494,10 @@ synchronized long approxBacklogInBytes() {
494494
}
495495

496496
synchronized long backlogMessageCount() {
497-
if (latestOffset < 0 || nextOffset < 0) {
497+
if (latestOffset < 0 || nextOffset < 0 || latestOffset < nextOffset) {
498498
return UnboundedReader.BACKLOG_UNKNOWN;
499499
}
500-
double remaining = latestOffset - nextOffset;
501-
return Math.max(0, (long) Math.ceil(remaining));
500+
return latestOffset - nextOffset;
502501
}
503502

504503
synchronized TimestampPolicyContext mkTimestampPolicyContext() {

website/www/site/data/en/documentation_runners.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,6 @@
3232
icon: icons/documentation/runners/nemo-icon.svg
3333
description: Runs on <a target="_blank" href="https://nemo.apache.org">Apache Nemo</a>.
3434
- name: { text: "JetRunner:", link: /documentation/runners/jet/ }
35-
description: Runs on <a target="_blank" href="https://jet.hazelcast.org/">Hazelcast Jet</a>.
35+
description: Runs on <a target="_blank" href="https://hazelcast.com/">Hazelcast Jet</a>.
3636
- name: { text: "Twister2Runner:", link: /documentation/runners/twister2/ }
3737
description: Runs on <a target="_blank" href="https://twister2.org/">Twister2</a>.

0 commit comments

Comments
 (0)