Skip to content

Commit 24a5a51

Browse files
committed
[Flink] merge again
2 parents dd23ef0 + 5b03c0c commit 24a5a51

File tree

15 files changed

+108
-126
lines changed

15 files changed

+108
-126
lines changed
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
11
{
22
"comment": "Modify this file in a trivial way to cause this test suite to run.",
3-
"modification": 2
3+
"modification": 3
44
}

.github/workflows/beam_PerformanceTests_Kafka_IO.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ jobs:
5454
github.event_name == 'workflow_dispatch' ||
5555
(github.event_name == 'schedule' && github.repository == 'apache/beam') ||
5656
github.event.comment.body == 'Run Java KafkaIO Performance Test'
57-
runs-on: [self-hosted, ubuntu-20.04, main]
57+
runs-on: [self-hosted, ubuntu-20.04, highmem]
5858
timeout-minutes: 120
5959
name: ${{ matrix.job_name }} (${{ matrix.job_phrase }})
6060
strategy:

.github/workflows/finalize_release.yml

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -138,6 +138,7 @@ jobs:
138138
env:
139139
VERSION_TAG: "v${{ github.event.inputs.RELEASE }}"
140140
RC_TAG: "v${{ github.event.inputs.RELEASE }}-RC${{ github.event.inputs.RC }}"
141+
POST_RELEASE_BRANCH: "release-${{ github.event.inputs.RELEASE }}-postrelease"
141142
run: |
142143
# Ensure local tags are in sync. If there's a mismatch, it will tell you.
143144
git fetch --all --tags --prune
@@ -152,3 +153,27 @@ jobs:
152153
# Tag for repo root.
153154
git tag "$VERSION_TAG" "$RC_TAG"^{} -m "Tagging release" --local-user="${{steps.import_gpg.outputs.name}}"
154155
git push https://github.com/apache/beam "$VERSION_TAG"
156+
157+
git checkout -b "$POST_RELEASE_BRANCH" "$VERSION_TAG"
158+
git push https://github.com/apache/beam "$POST_RELEASE_BRANCH"
159+
160+
update_master:
161+
needs: push_git_tags
162+
runs-on: ubuntu-latest
163+
env:
164+
POST_RELEASE_BRANCH: "release-${{ github.event.inputs.RELEASE }}-postrelease"
165+
steps:
166+
- name: Check out code
167+
uses: actions/checkout@v4
168+
- name: Set git config
169+
run: |
170+
git config user.name $GITHUB_ACTOR
171+
git config user.email actions@"$RUNNER_NAME".local
172+
- name: Update .asf.yaml to protect new postrelease branch from force push
173+
run: |
174+
sed -i -e "s/master: {}/master: {}\n ${POST_RELEASE_BRANCH}: {}/g" .asf.yaml
175+
- name: Commit and Push to master branch files with Next Version
176+
run: |
177+
git add .asf.yaml
178+
git commit -m "Moving to ${NEXT_VERSION_IN_BASE_BRANCH}-SNAPSHOT on master branch."
179+
git push origin ${MASTER_BRANCH}

.github/workflows/republish_released_docker_containers.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ jobs:
4343
- name: Checkout
4444
uses: actions/checkout@v4
4545
with:
46-
ref: "v${{ env.release }}-RC${{ env.rc }}"
46+
ref: "release-${{ env.release }}-postrelease"
4747
repository: apache/beam
4848
- name: Free Disk Space (Ubuntu)
4949
uses: jlumbroso/[email protected]

runners/flink/flink_runner.gradle

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -239,7 +239,9 @@ def sickbayTests = [
239239
// Flink errors are not deterministic. Exception may just be
240240
// org.apache.flink.runtime.operators.coordination.TaskNotRunningException: Task is not running, but in state FAILED
241241
// instead of the actual cause. Real cause is visible in the logs.
242-
'org.apache.beam.sdk.transforms.ParDoTest$LifecycleTests'
242+
'org.apache.beam.sdk.transforms.ParDoTest$LifecycleTests',
243+
'org.apache.beam.sdk.transforms.GroupByKeyTest$BasicTests.testAfterProcessingTimeContinuationTriggerUsingState',
244+
// TODO(https://github.com/apache/beam/issues/18198)
243245
]
244246

245247
def createValidatesRunnerTask(Map m) {

runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/LargeCommitTest.java

Lines changed: 0 additions & 73 deletions
This file was deleted.

runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/common/worker/OutputObjectAndByteCounter.java

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -18,27 +18,28 @@
1818
package org.apache.beam.runners.dataflow.worker.util.common.worker;
1919

2020
import java.util.Random;
21+
import java.util.concurrent.ThreadLocalRandom;
2122
import org.apache.beam.runners.core.ElementByteSizeObservable;
2223
import org.apache.beam.runners.dataflow.worker.counters.Counter;
2324
import org.apache.beam.runners.dataflow.worker.counters.CounterBackedElementByteSizeObserver;
2425
import org.apache.beam.runners.dataflow.worker.counters.CounterFactory;
2526
import org.apache.beam.runners.dataflow.worker.counters.CounterFactory.CounterMean;
2627
import org.apache.beam.runners.dataflow.worker.counters.CounterName;
2728
import org.apache.beam.runners.dataflow.worker.counters.NameContext;
29+
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
2830
import org.checkerframework.checker.nullness.qual.Nullable;
2931

3032
/** An {@link ElementCounter} that counts output objects, bytes, and mean bytes. */
3133
@SuppressWarnings({
3234
"nullness" // TODO(https://github.com/apache/beam/issues/20497)
3335
})
3436
public class OutputObjectAndByteCounter implements ElementCounter {
37+
3538
// Might be null, e.g., undeclared outputs will not have an
3639
// elementByteSizeObservable.
3740
private final ElementByteSizeObservable<Object> elementByteSizeObservable;
3841
private final CounterFactory counterFactory;
3942

40-
private Random randomGenerator = new Random();
41-
4243
// Lowest sampling probability: 0.001%.
4344
private static final int SAMPLING_TOKEN_UPPER_BOUND = 1000000;
4445
private static final int SAMPLING_CUTOFF = 10;
@@ -163,12 +164,12 @@ protected boolean sampleElement() {
163164
// samplingCutoff / samplingTokenUpperBound. This algorithm may be refined
164165
// later.
165166
samplingToken = Math.min(samplingToken + 1, samplingTokenUpperBound);
166-
return randomGenerator.nextInt(samplingToken) < SAMPLING_CUTOFF;
167+
return getRandom().nextInt(samplingToken) < SAMPLING_CUTOFF;
167168
}
168169

169-
public OutputObjectAndByteCounter setRandom(Random random) {
170-
this.randomGenerator = random;
171-
return this;
170+
@VisibleForTesting
171+
protected Random getRandom() {
172+
return ThreadLocalRandom.current();
172173
}
173174

174175
private CounterName getCounterName(String name) {

runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/util/common/worker/OutputObjectAndByteCounterTest.java

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -92,13 +92,19 @@ public void testAddingCountersIntoCounterSet() throws Exception {
9292
}
9393

9494
private OutputObjectAndByteCounter makeCounter(String name, int samplingPeriod, int seed) {
95-
return new OutputObjectAndByteCounter(
95+
OutputObjectAndByteCounter outputObjectAndByteCounter =
96+
new OutputObjectAndByteCounter(
9697
new ElementByteSizeObservableCoder<>(StringUtf8Coder.of()),
9798
counterSet,
98-
NameContextsForTests.nameContextForTest())
99-
.setSamplingPeriod(samplingPeriod)
100-
.setRandom(new Random(seed))
101-
.countBytes(name);
99+
NameContextsForTests.nameContextForTest()) {
100+
private final Random random = new Random(seed);
101+
102+
@Override
103+
protected Random getRandom() {
104+
return random;
105+
}
106+
};
107+
return outputObjectAndByteCounter.setSamplingPeriod(samplingPeriod).countBytes(name);
102108
}
103109

104110
@Test

sdks/java/io/iceberg/build.gradle

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ dependencies {
5555
implementation "org.apache.iceberg:iceberg-api:$iceberg_version"
5656
implementation "org.apache.iceberg:iceberg-parquet:$iceberg_version"
5757
implementation "org.apache.iceberg:iceberg-orc:$iceberg_version"
58+
implementation "org.apache.iceberg:iceberg-data:$iceberg_version"
5859
implementation library.java.hadoop_common
5960
runtimeOnly "org.apache.iceberg:iceberg-gcp:$iceberg_version"
6061

sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/RecordWriterManager.java

Lines changed: 4 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -50,14 +50,11 @@
5050
import org.apache.iceberg.Table;
5151
import org.apache.iceberg.catalog.Catalog;
5252
import org.apache.iceberg.catalog.TableIdentifier;
53-
import org.apache.iceberg.data.GenericRecord;
53+
import org.apache.iceberg.data.InternalRecordWrapper;
5454
import org.apache.iceberg.data.Record;
5555
import org.apache.iceberg.exceptions.AlreadyExistsException;
5656
import org.apache.iceberg.exceptions.NoSuchTableException;
57-
import org.apache.iceberg.expressions.Literal;
58-
import org.apache.iceberg.transforms.Transform;
5957
import org.apache.iceberg.transforms.Transforms;
60-
import org.apache.iceberg.types.Types;
6158
import org.checkerframework.checker.nullness.qual.Nullable;
6259
import org.slf4j.Logger;
6360
import org.slf4j.LoggerFactory;
@@ -106,12 +103,14 @@ class DestinationState {
106103
@VisibleForTesting final Map<PartitionKey, Integer> writerCounts = Maps.newHashMap();
107104
private final Map<String, PartitionField> partitionFieldMap = Maps.newHashMap();
108105
private final List<Exception> exceptions = Lists.newArrayList();
106+
private final InternalRecordWrapper wrapper; // wrapper that facilitates partitioning
109107

110108
DestinationState(IcebergDestination icebergDestination, Table table) {
111109
this.icebergDestination = icebergDestination;
112110
this.schema = table.schema();
113111
this.spec = table.spec();
114112
this.routingPartitionKey = new PartitionKey(spec, schema);
113+
this.wrapper = new InternalRecordWrapper(schema.asStruct());
115114
this.table = table;
116115
for (PartitionField partitionField : spec.fields()) {
117116
partitionFieldMap.put(partitionField.name(), partitionField);
@@ -156,7 +155,7 @@ class DestinationState {
156155
* can't create a new writer, the {@link Record} is rejected and {@code false} is returned.
157156
*/
158157
boolean write(Record record) {
159-
routingPartitionKey.partition(getPartitionableRecord(record));
158+
routingPartitionKey.partition(wrapper.wrap(record));
160159

161160
@Nullable RecordWriter writer = writers.getIfPresent(routingPartitionKey);
162161
if (writer == null && openWriters >= maxNumWriters) {
@@ -207,30 +206,6 @@ private RecordWriter createWriter(PartitionKey partitionKey) {
207206
e);
208207
}
209208
}
210-
211-
/**
212-
* Resolves an input {@link Record}'s partition values and returns another {@link Record} that
213-
* can be applied to the destination's {@link PartitionSpec}.
214-
*/
215-
private Record getPartitionableRecord(Record record) {
216-
if (spec.isUnpartitioned()) {
217-
return record;
218-
}
219-
Record output = GenericRecord.create(schema);
220-
for (PartitionField partitionField : spec.fields()) {
221-
Transform<?, ?> transform = partitionField.transform();
222-
Types.NestedField field = schema.findField(partitionField.sourceId());
223-
String name = field.name();
224-
Object value = record.getField(name);
225-
@Nullable Literal<Object> literal = Literal.of(value.toString()).to(field.type());
226-
if (literal == null || transform.isVoid() || transform.isIdentity()) {
227-
output.setField(name, value);
228-
} else {
229-
output.setField(name, literal.value());
230-
}
231-
}
232-
return output;
233-
}
234209
}
235210

236211
/**

0 commit comments

Comments
 (0)