Skip to content

Commit 2fa7a81

Browse files
authored
Merge branch 'master' into bvolpato/bump-postgres-jdbc-42.7.10
2 parents c510158 + b6d8f42 commit 2fa7a81

File tree

23 files changed

+2508
-24
lines changed

23 files changed

+2508
-24
lines changed
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
11
{
22
"comment": "Modify this file in a trivial way to cause this test suite to run.",
3-
"modification": 4
3+
"modification": 1
44
}

.github/workflows/finalize_release.yml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,12 @@ jobs:
8080
docker buildx imagetools create --tag "${IMAGE}:latest" "${IMAGE}:${RELEASE}"
8181
done
8282
83+
# Flink 2 images have different tagging, add latest tag to latest supported Flink version
84+
echo "================Confirming Runner container Release and RC version==========="
85+
BEAM_FLINK_REPO=apache/beam_flink_job_server
86+
LATEST_FLINK_VERSION=$(wget -qO- https://raw.githubusercontent.com/apache/beam/refs/tags/v${RELEASE}-RC${RC_NUM}/gradle.properties | grep -E flink_versions | tr ',' '\n' | tail -1)
87+
docker buildx imagetools create --tag "${BEAM_FLINK_REPO}:latest" "${BEAM_FLINK_REPO}:${RELEASE}${RC_VERSION}-flink${LATEST_FLINK_VERSION}"
88+
8389
publish_python_artifacts:
8490
if: ${{github.event.inputs.PUBLISH_PYTHON_ARTIFACTS == 'yes'}}
8591
runs-on: [self-hosted, ubuntu-20.04, main]

CHANGES.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,7 @@
8888
## Security Fixes
8989

9090
* Fixed [CVE-2024-1597](https://www.cve.org/CVERecord?id=CVE-2024-1597), [CVE-2022-31197](https://www.cve.org/CVERecord?id=CVE-2022-31197), and [CVE-2022-21724](https://www.cve.org/CVERecord?id=CVE-2022-21724) by upgrading PostgreSQL JDBC Driver from 42.2.16 to 42.7.10 (Java) ([#37942](https://github.com/apache/beam/issues/37942)).
91+
* Fixed [CVE-2023-46604](https://www.cve.org/CVERecord?id=CVE-2023-46604) (CVSS 10.0) and [CVE-2022-41678](https://www.cve.org/CVERecord?id=CVE-2022-41678) by upgrading ActiveMQ from 5.14.5 to 5.19.2 (Java) ([#37943](https://github.com/apache/beam/issues/37943)).
9192

9293
## Known Issues
9394

@@ -2386,4 +2387,4 @@ Schema Options, it will be removed in version `2.23.0`. ([BEAM-9704](https://iss
23862387

23872388
## Highlights
23882389

2389-
- For versions 2.19.0 and older release notes are available on [Apache Beam Blog](https://beam.apache.org/blog/).
2390+
- For versions 2.19.0 and older release notes are available on [Apache Beam Blog](https://beam.apache.org/blog/).

buildSrc/src/main/groovy/org/apache/beam/gradle/BeamDockerPlugin.groovy

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ class BeamDockerPlugin implements Plugin<Project> {
4848
String dockerComposeFile = 'docker-compose.yml'
4949
Set<Task> dependencies = [] as Set
5050
Set<String> tags = [] as Set
51+
String tagSuffix = null
5152
Map<String, String> namedTags = [:]
5253
Map<String, String> labels = [:]
5354
Map<String, String> buildArgs = [:]
@@ -100,7 +101,11 @@ class BeamDockerPlugin implements Plugin<Project> {
100101
}
101102

102103
Set<String> getTags() {
103-
return this.tags + project.getVersion().toString()
104+
def allTags = this.tags + project.getVersion().toString()
105+
if (tagSuffix) {
106+
allTags = allTags.collect { it.endsWith(tagSuffix) ? it : it + tagSuffix }.toSet()
107+
}
108+
return allTags
104109
}
105110

106111
Set<String> getPlatform() {

buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -597,7 +597,7 @@ class BeamModulePlugin implements Plugin<Project> {
597597
//
598598
// There are a few versions are determined by the BOMs by running scripts/tools/bomupgrader.py
599599
// marked as [bomupgrader]. See the documentation of that script for detail.
600-
def activemq_version = "5.14.5"
600+
def activemq_version = "5.19.2"
601601
def autovalue_version = "1.9"
602602
def autoservice_version = "1.0.1"
603603
def aws_java_sdk2_version = "2.20.162"

runners/flink/job-server-container/flink_job_server_container.gradle

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,8 +55,10 @@ task copyDockerfileDependencies(type: Copy) {
5555
def pushContainers = project.rootProject.hasProperty(["isRelease"]) || project.rootProject.hasProperty("push-containers")
5656
def containerName = project.parent.name.startsWith("2") ? "flink_job_server" : "flink${project.parent.name}_job_server"
5757
def containerTag = project.rootProject.hasProperty(["docker-tag"]) ? project.rootProject["docker-tag"] : project.sdk_version
58+
String verInSuffix = null
5859
if (project.parent.name.startsWith("2")) {
5960
containerTag += "-flink${project.parent.name}"
61+
verInSuffix = "-flink${project.parent.name}"
6062
}
6163

6264
docker {
@@ -68,6 +70,7 @@ docker {
6870
tag: containerTag)
6971
// tags used by dockerTag task
7072
tags containerImageTags()
73+
tagSuffix verInSuffix
7174
files "./build/"
7275
buildx project.useBuildx()
7376
platform(*project.containerPlatforms())

sdks/java/core/src/main/java/org/apache/beam/sdk/util/StreamUtils.java

Lines changed: 41 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@
2222
import java.io.IOException;
2323
import java.io.InputStream;
2424
import java.lang.ref.SoftReference;
25+
import java.util.Arrays;
26+
import javax.annotation.Nullable;
2527
import org.apache.beam.sdk.annotations.Internal;
2628

2729
/** Utility functions for stream operations. */
@@ -35,34 +37,67 @@ private StreamUtils() {}
3537

3638
private static final int BUF_SIZE = 8192;
3739

38-
private static ThreadLocal<SoftReference<byte[]>> threadLocalBuffer = new ThreadLocal<>();
40+
private static final ThreadLocal<SoftReference<byte[]>> threadLocalBuffer = new ThreadLocal<>();
3941

4042
/** Efficient converting stream to bytes. */
4143
public static byte[] getBytesWithoutClosing(InputStream stream) throws IOException {
44+
// Unwrap the stream so the below optimizations based upon class type function properly.
45+
// We don't use mark or reset in this function.
46+
while (stream instanceof UnownedInputStream) {
47+
stream = ((UnownedInputStream) stream).getWrappedStream();
48+
}
49+
4250
if (stream instanceof ExposedByteArrayInputStream) {
4351
// Fast path for the exposed version.
4452
return ((ExposedByteArrayInputStream) stream).readAll();
45-
} else if (stream instanceof ByteArrayInputStream) {
53+
}
54+
if (stream instanceof ByteArrayInputStream) {
4655
// Fast path for ByteArrayInputStream.
4756
byte[] ret = new byte[stream.available()];
4857
stream.read(ret);
4958
return ret;
5059
}
51-
// Falls back to normal stream copying.
60+
61+
// Most inputs are fully available so we attempt to first read directly
62+
// into a buffer of the right size, assuming available reflects all the bytes.
63+
int available = stream.available();
64+
@Nullable ByteArrayOutputStream outputStream = null;
65+
if (available > 0 && available < 1024 * 1024) {
66+
byte[] initialBuffer = new byte[available];
67+
int initialReadSize = stream.read(initialBuffer);
68+
if (initialReadSize == -1) {
69+
return new byte[0];
70+
}
71+
int nextByte = stream.read();
72+
if (nextByte == -1) {
73+
if (initialReadSize == available) {
74+
// Available reflected the full buffer and we copied directly to the
75+
// right size.
76+
return initialBuffer;
77+
}
78+
return Arrays.copyOf(initialBuffer, initialReadSize);
79+
}
80+
outputStream = new ByteArrayOutputStream();
81+
outputStream.write(initialBuffer, 0, initialReadSize);
82+
outputStream.write(nextByte);
83+
} else {
84+
outputStream = new ByteArrayOutputStream();
85+
}
86+
87+
// Normal stream copying using the thread-local buffer.
5288
SoftReference<byte[]> refBuffer = threadLocalBuffer.get();
5389
byte[] buffer = refBuffer == null ? null : refBuffer.get();
5490
if (buffer == null) {
5591
buffer = new byte[BUF_SIZE];
5692
threadLocalBuffer.set(new SoftReference<>(buffer));
5793
}
58-
ByteArrayOutputStream outStream = new ByteArrayOutputStream();
5994
while (true) {
6095
int r = stream.read(buffer);
6196
if (r == -1) {
6297
break;
6398
}
64-
outStream.write(buffer, 0, r);
99+
outputStream.write(buffer, 0, r);
65100
}
66-
return outStream.toByteArray();
101+
return outputStream.toByteArray();
67102
}
68103
}

sdks/java/core/src/main/java/org/apache/beam/sdk/util/UnownedInputStream.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,10 @@ public UnownedInputStream(InputStream delegate) {
3535
super(delegate);
3636
}
3737

38+
InputStream getWrappedStream() {
39+
return in;
40+
}
41+
3842
@Override
3943
public void close() throws IOException {
4044
throw new UnsupportedOperationException(

sdks/java/core/src/test/java/org/apache/beam/sdk/util/CoderUtilsTest.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.beam.sdk.util;
1919

2020
import static org.junit.Assert.assertEquals;
21+
import static org.junit.Assert.assertSame;
2122
import static org.junit.Assert.assertThrows;
2223
import static org.mockito.ArgumentMatchers.any;
2324
import static org.mockito.ArgumentMatchers.anyString;
@@ -26,7 +27,9 @@
2627

2728
import java.io.InputStream;
2829
import java.io.OutputStream;
30+
import java.nio.charset.StandardCharsets;
2931
import org.apache.beam.sdk.coders.AtomicCoder;
32+
import org.apache.beam.sdk.coders.ByteArrayCoder;
3033
import org.apache.beam.sdk.coders.Coder;
3134
import org.apache.beam.sdk.coders.Coder.Context;
3235
import org.apache.beam.sdk.coders.CoderException;
@@ -142,4 +145,11 @@ public void testDecodeFromByteStringWithExtraDataThrows() throws Exception {
142145
CoderException.class,
143146
() -> CoderUtils.decodeFromByteString(StringUtf8Coder.of(), byteString, Context.NESTED));
144147
}
148+
149+
@Test
150+
public void testDecodeByteArrayWithoutCopy() throws Exception {
151+
byte[] data = "test data".getBytes(StandardCharsets.UTF_8);
152+
byte[] result = CoderUtils.decodeFromByteArray(ByteArrayCoder.of(), data);
153+
assertSame(data, result);
154+
}
145155
}

sdks/java/core/src/test/java/org/apache/beam/sdk/util/StreamUtilsTest.java

Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,9 +23,11 @@
2323

2424
import java.io.BufferedInputStream;
2525
import java.io.ByteArrayInputStream;
26+
import java.io.FilterInputStream;
2627
import java.io.IOException;
2728
import java.io.InputStream;
2829
import java.util.Arrays;
30+
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.io.ByteStreams;
2931
import org.junit.Before;
3032
import org.junit.Test;
3133
import org.junit.runner.RunWith;
@@ -68,4 +70,97 @@ public void testGetBytesFromInputStream() throws IOException {
6870
assertArrayEquals(testData, bytes);
6971
assertEquals(0, stream.available());
7072
}
73+
74+
@Test
75+
public void testGetBytesFromUnownedInputStreamAroundExposed() throws IOException {
76+
InputStream stream = new UnownedInputStream(new ExposedByteArrayInputStream(testData));
77+
byte[] bytes = StreamUtils.getBytesWithoutClosing(stream);
78+
assertArrayEquals(testData, bytes);
79+
assertSame(testData, bytes);
80+
assertEquals(0, stream.available());
81+
}
82+
83+
@Test
84+
public void testGetBytesFromUnownedInputStreamAroundArray() throws IOException {
85+
InputStream stream = new UnownedInputStream(new ByteArrayInputStream(testData));
86+
byte[] bytes = StreamUtils.getBytesWithoutClosing(stream);
87+
assertArrayEquals(testData, bytes);
88+
assertEquals(0, stream.available());
89+
}
90+
91+
@Test
92+
public void testGetBytesFromLimitedInputStream() throws IOException {
93+
InputStream stream = ByteStreams.limit(new ByteArrayInputStream(testData), Integer.MAX_VALUE);
94+
byte[] bytes = StreamUtils.getBytesWithoutClosing(stream);
95+
assertArrayEquals(testData, bytes);
96+
assertEquals(0, stream.available());
97+
}
98+
99+
@Test
100+
public void testGetBytesFromEmptyLimitedInputStream() throws IOException {
101+
InputStream stream = ByteStreams.limit(new ByteArrayInputStream(testData), 0);
102+
byte[] bytes = StreamUtils.getBytesWithoutClosing(stream);
103+
assertArrayEquals(new byte[0], bytes);
104+
assertEquals(0, stream.available());
105+
}
106+
107+
@Test
108+
public void testGetBytesFromRepeatedInputStream() throws IOException {
109+
byte[] largeBytes = new byte[2 * 1024 * 1024];
110+
Arrays.fill(largeBytes, (byte) 1);
111+
InputStream stream = ByteStreams.limit(new ByteArrayInputStream(largeBytes), Integer.MAX_VALUE);
112+
byte[] bytes = StreamUtils.getBytesWithoutClosing(stream);
113+
assertArrayEquals(largeBytes, bytes);
114+
assertEquals(0, stream.available());
115+
}
116+
117+
public static class LyingInputStream extends FilterInputStream {
118+
private final int availableLie;
119+
120+
public LyingInputStream(InputStream in, int availableLie) {
121+
super(in);
122+
this.availableLie = availableLie;
123+
}
124+
125+
@Override
126+
public int available() throws IOException {
127+
return availableLie;
128+
}
129+
}
130+
131+
@Test
132+
public void testGetBytesFromHugeAvailable() throws IOException {
133+
InputStream wrappedStream = new ByteArrayInputStream(testData);
134+
InputStream stream = new LyingInputStream(wrappedStream, Integer.MAX_VALUE - 1);
135+
byte[] bytes = StreamUtils.getBytesWithoutClosing(stream);
136+
assertArrayEquals(testData, bytes);
137+
assertEquals(0, wrappedStream.available());
138+
}
139+
140+
@Test
141+
public void testGetBytesFromZeroAvailable() throws IOException {
142+
InputStream wrappedStream = new ByteArrayInputStream(testData);
143+
InputStream stream = new LyingInputStream(wrappedStream, 0);
144+
byte[] bytes = StreamUtils.getBytesWithoutClosing(stream);
145+
assertArrayEquals(testData, bytes);
146+
assertEquals(0, wrappedStream.available());
147+
}
148+
149+
@Test
150+
public void testGetBytesFromOneExtraAvailable() throws IOException {
151+
InputStream wrappedStream = new ByteArrayInputStream(testData);
152+
InputStream stream = new LyingInputStream(wrappedStream, wrappedStream.available() + 1);
153+
byte[] bytes = StreamUtils.getBytesWithoutClosing(stream);
154+
assertArrayEquals(testData, bytes);
155+
assertEquals(0, wrappedStream.available());
156+
}
157+
158+
@Test
159+
public void testGetBytesFromOneLessAvailable() throws IOException {
160+
InputStream wrappedStream = new ByteArrayInputStream(testData);
161+
InputStream stream = new LyingInputStream(wrappedStream, wrappedStream.available() - 1);
162+
byte[] bytes = StreamUtils.getBytesWithoutClosing(stream);
163+
assertArrayEquals(testData, bytes);
164+
assertEquals(0, wrappedStream.available());
165+
}
71166
}

0 commit comments

Comments
 (0)