Skip to content

Commit 2bd497f

Browse files
authored
Merge pull request #36524: Drain - model part + windowedValue changes
2 parents f517b01 + fc4b29c commit 2bd497f

File tree

8 files changed

+150
-38
lines changed

8 files changed

+150
-38
lines changed

model/fn-execution/src/main/proto/org/apache/beam/model/fn_execution/v1/beam_fn_api.proto

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -740,10 +740,18 @@ message Elements {
740740
bool is_last = 4;
741741
}
742742

743+
message DrainMode {
744+
enum Enum {
745+
UNSPECIFIED = 0;
746+
NOT_DRAINING = 1;
747+
DRAINING = 2;
748+
}
749+
}
750+
743751
// Element metadata passed as part of WindowedValue to make WindowedValue
744752
// extensible and backward compatible
745753
message ElementMetadata {
746-
// empty message - add drain, kind, tracing metadata in the future
754+
optional DrainMode.Enum drain = 1;
747755
}
748756

749757
// Represent the encoded user timer for a given instruction, transform and

runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchViewOverrides.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1378,6 +1378,11 @@ public T getValue() {
13781378
return value;
13791379
}
13801380

1381+
@Override
1382+
public boolean causedByDrain() {
1383+
return false;
1384+
}
1385+
13811386
@Override
13821387
public Instant getTimestamp() {
13831388
return BoundedWindow.TIMESTAMP_MIN_VALUE;

runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/ValueInEmptyWindows.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,11 @@ public PaneInfo getPaneInfo() {
5959
return null;
6060
}
6161

62+
@Override
63+
public boolean causedByDrain() {
64+
return false;
65+
}
66+
6267
@Override
6368
public Iterable<WindowedValue<T>> explodeWindows() {
6469
return Collections.emptyList();

runners/spark/src/main/java/org/apache/beam/runners/spark/util/TimerUtils.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,11 @@ public PaneInfo getPaneInfo() {
115115
return null;
116116
}
117117

118+
@Override
119+
public boolean causedByDrain() {
120+
return false;
121+
}
122+
118123
@Override
119124
public @Nullable Long getRecordOffset() {
120125
return null;

sdks/java/core/src/main/java/org/apache/beam/sdk/values/OutputBuilder.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,5 +48,7 @@ public interface OutputBuilder<T> extends WindowedValue<T> {
4848

4949
OutputBuilder<T> setRecordOffset(@Nullable Long recordOffset);
5050

51+
OutputBuilder<T> setCausedByDrain(boolean causedByDrain);
52+
5153
void output();
5254
}

sdks/java/core/src/main/java/org/apache/beam/sdk/values/WindowedValue.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,8 @@ public interface WindowedValue<T> {
5252
@Nullable
5353
Long getRecordOffset();
5454

55+
boolean causedByDrain();
56+
5557
/**
5658
* A representation of each of the actual values represented by this compressed {@link
5759
* WindowedValue}, one per window.

0 commit comments

Comments
 (0)