diff --git a/.github/trigger_files/IO_Iceberg_Integration_Tests.json b/.github/trigger_files/IO_Iceberg_Integration_Tests.json index 37dd25bf9029..34a6e02150e7 100644 --- a/.github/trigger_files/IO_Iceberg_Integration_Tests.json +++ b/.github/trigger_files/IO_Iceberg_Integration_Tests.json @@ -1,4 +1,4 @@ { "comment": "Modify this file in a trivial way to cause this test suite to run.", - "modification": 3 + "modification": 4 } diff --git a/.github/trigger_files/IO_Iceberg_Integration_Tests_Dataflow.json b/.github/trigger_files/IO_Iceberg_Integration_Tests_Dataflow.json index 8fab48cc672a..5abe02fc09c7 100644 --- a/.github/trigger_files/IO_Iceberg_Integration_Tests_Dataflow.json +++ b/.github/trigger_files/IO_Iceberg_Integration_Tests_Dataflow.json @@ -1,4 +1,4 @@ { "comment": "Modify this file in a trivial way to cause this test suite to run.", - "modification": 5 + "modification": 1 } diff --git a/.github/trigger_files/IO_Iceberg_Managed_Integration_Tests_Dataflow.json b/.github/trigger_files/IO_Iceberg_Managed_Integration_Tests_Dataflow.json index 8fab48cc672a..5abe02fc09c7 100644 --- a/.github/trigger_files/IO_Iceberg_Managed_Integration_Tests_Dataflow.json +++ b/.github/trigger_files/IO_Iceberg_Managed_Integration_Tests_Dataflow.json @@ -1,4 +1,4 @@ { "comment": "Modify this file in a trivial way to cause this test suite to run.", - "modification": 5 + "modification": 1 } diff --git a/.github/trigger_files/beam_PostCommit_Python_Xlang_IO_Direct.json b/.github/trigger_files/beam_PostCommit_Python_Xlang_IO_Direct.json index 455144f02a35..e3d6056a5de9 100644 --- a/.github/trigger_files/beam_PostCommit_Python_Xlang_IO_Direct.json +++ b/.github/trigger_files/beam_PostCommit_Python_Xlang_IO_Direct.json @@ -1,4 +1,4 @@ { "comment": "Modify this file in a trivial way to cause this test suite to run", - "modification": 6 + "modification": 1 } diff --git a/.github/trigger_files/beam_PostCommit_SQL.json b/.github/trigger_files/beam_PostCommit_SQL.json index 5ac8a7f3f6ee..833fd9b0d174 100644 --- a/.github/trigger_files/beam_PostCommit_SQL.json +++ b/.github/trigger_files/beam_PostCommit_SQL.json @@ -1,4 +1,4 @@ { "comment": "Modify this file in a trivial way to cause this test suite to run ", - "modification": 4 + "modification": 2 } diff --git a/.github/workflows/build_wheels_manual.yml b/.github/workflows/build_wheels_manual.yml new file mode 100644 index 000000000000..8fa27161c9be --- /dev/null +++ b/.github/workflows/build_wheels_manual.yml @@ -0,0 +1,126 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +name: Build Python 3.11 and 3.13 wheels (Manual) + +on: + workflow_dispatch: + +# This allows a subsequently queued workflow run to interrupt previous runs +concurrency: + group: '${{ github.workflow }} @ ${{ github.sha || github.head_ref || github.ref }}-${{ github.event.sender.login}}' + cancel-in-progress: true + +jobs: + build_source: + runs-on: ubuntu-latest + name: Build python source distribution + steps: + - name: Checkout code + uses: actions/checkout@v4 + - name: Install python + uses: actions/setup-python@v5 + with: + python-version: 3.11 + - name: Build source + working-directory: ./sdks/python + run: pip install -U build && python -m build --sdist + - name: Add checksums + working-directory: ./sdks/python/dist + run: | + file=$(ls | grep .tar.gz | head -n 1) + sha512sum $file > ${file}.sha512 + - name: Unzip source + working-directory: ./sdks/python + run: tar -xzvf dist/$(ls dist | grep .tar.gz | head -n 1) + - name: Rename source directory + working-directory: ./sdks/python + # https://github.com/pypa/setuptools/issues/4300 changed naming. Match both old and new names. + run: mv $(ls | grep "apache-beam-\|apache_beam-") apache-beam-source + - name: Upload source as artifact + uses: actions/upload-artifact@v4 + with: + name: source + path: sdks/python/apache-beam-source + - name: Upload compressed sources as artifacts + uses: actions/upload-artifact@v4 + with: + name: source_zip + path: sdks/python/dist + + build_wheels: + name: Build python ${{matrix.py_version}} wheels on ${{matrix.os_python.arch}} for ${{ matrix.os_python.os }} + needs: + - build_source + env: + CIBW_ARCHS_LINUX: ${{matrix.os_python.arch}} + runs-on: ${{ matrix.os_python.runner }} + timeout-minutes: 480 + strategy: + fail-fast: false + matrix: + os_python: [ + {"os": "ubuntu-latest", "runner": "ubuntu-latest", "arch": "x86_64" }, + {"os": "macos-latest", "runner": "macos-latest", "arch": "x86_64" }, + {"os": "ubuntu-latest", "runner": "ubuntu-latest", "arch": "aarch64" } + ] + # Only Python 3.11 and 3.13 + py_version: ["cp311-", "cp313-"] + steps: + - name: Download python source distribution from artifacts + uses: actions/download-artifact@v4.1.8 + with: + name: source + path: apache-beam-source + - name: Install Python + uses: actions/setup-python@v5 + with: + python-version: 3.11 + - uses: docker/setup-qemu-action@v1 + if: ${{matrix.os_python.arch == 'aarch64'}} + name: Set up QEMU + - name: Install cibuildwheel + run: pip install cibuildwheel==3.1.3 setuptools + - name: Fix distlib conflict on macOS + if: startsWith(matrix.os_python.os, 'macos') + working-directory: apache-beam-source + run: | + sed -i.bak 's/distlib==0.3.7/distlib>=0.3.7,<1.0.0/g' pyproject.toml + - name: Build wheel + working-directory: apache-beam-source + env: + CIBW_BUILD: ${{ matrix.py_version }}* + # TODO: https://github.com/apache/beam/issues/23048 + CIBW_SKIP: "*-musllinux_*" + CIBW_BEFORE_BUILD: pip install cython==0.29.36 numpy --config-settings=setup-args="-Dallow-noblas=true" && pip install --upgrade setuptools + run: cibuildwheel --print-build-identifiers && cibuildwheel --output-dir wheelhouse + shell: bash + - name: install sha512sum on MacOS + if: startsWith(matrix.os_python.os, 'macos') + run: brew install coreutils + - name: Add checksums + working-directory: apache-beam-source/wheelhouse/ + run: | + for file in *.whl; do + sha512sum $file > ${file}.sha512 + done + shell: bash + - name: Upload wheels as artifacts + uses: actions/upload-artifact@v4 + with: + name: wheelhouse-${{ matrix.py_version }}${{ matrix.os_python.os }}-${{ matrix.os_python.arch }} + path: apache-beam-source/wheelhouse/ diff --git a/CHANGES.md b/CHANGES.md index 73bc6850b57b..f74f633638de 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -70,6 +70,7 @@ ## I/Os * Support for X source added (Java/Python) ([#X](https://github.com/apache/beam/issues/X)). +* Upgraded Iceberg dependency to 1.9.2 ([#35981](https://github.com/apache/beam/pull/35981)) ## New Features / Improvements @@ -98,6 +99,7 @@ improves support for BigQuery and other SQL dialects. Note: Minor behavior changes are observed such as output significant digits related to casting. * (Python) Prism runner now enabled by default for most Python pipelines using the direct runner ([#34612](https://github.com/apache/beam/pull/34612)). This may break some tests, see https://github.com/apache/beam/pull/34612 for details on how to handle issues. +* Dropped Java 8 support for [IO expansion-service](https://central.sonatype.com/artifact/org.apache.beam/beam-sdks-java-io-expansion-service). Cross-language pipelines using this expansion service will need a Java11+ runtime ([#35981](https://github.com/apache/beam/pull/35981). ## Deprecations diff --git a/build.gradle.kts b/build.gradle.kts index f4174d6376d5..c1167f8f72e2 100644 --- a/build.gradle.kts +++ b/build.gradle.kts @@ -253,6 +253,7 @@ tasks.register("javaPreCommit") { dependsOn(":examples:java:sql:preCommit") dependsOn(":examples:java:twitter:build") dependsOn(":examples:java:twitter:preCommit") + dependsOn(":examples:java:iceberg:build") dependsOn(":examples:multi-language:build") dependsOn(":model:fn-execution:build") dependsOn(":model:job-management:build") @@ -380,6 +381,7 @@ tasks.register("sqlPreCommit") { dependsOn(":sdks:java:extensions:sql:datacatalog:build") dependsOn(":sdks:java:extensions:sql:expansion-service:build") dependsOn(":sdks:java:extensions:sql:hcatalog:build") + dependsOn(":sdks:java:extensions:sql:iceberg:build") dependsOn(":sdks:java:extensions:sql:jdbc:build") dependsOn(":sdks:java:extensions:sql:jdbc:preCommit") dependsOn(":sdks:java:extensions:sql:perf-tests:build") @@ -426,6 +428,7 @@ tasks.register("sqlPostCommit") { dependsOn(":sdks:java:extensions:sql:postCommit") dependsOn(":sdks:java:extensions:sql:jdbc:postCommit") dependsOn(":sdks:java:extensions:sql:datacatalog:postCommit") + dependsOn(":sdks:java:extensions:sql:iceberg:integrationTest") dependsOn(":sdks:java:extensions:sql:hadoopVersionsTest") } diff --git a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy index d7ae0f60c2dd..103405a57931 100644 --- a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy +++ b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy @@ -617,7 +617,7 @@ class BeamModulePlugin implements Plugin { // [bomupgrader] determined by: io.grpc:grpc-netty, consistent with: google_cloud_platform_libraries_bom def grpc_version = "1.71.0" def guava_version = "33.1.0-jre" - def hadoop_version = "3.4.1" + def hadoop_version = "3.4.2" def hamcrest_version = "2.1" def influxdb_version = "2.19" def httpclient_version = "4.5.13" diff --git a/examples/java/build.gradle b/examples/java/build.gradle index 0f1a1f7ef7e8..6f35a109998c 100644 --- a/examples/java/build.gradle +++ b/examples/java/build.gradle @@ -71,9 +71,6 @@ dependencies { implementation project(":sdks:java:extensions:python") implementation project(":sdks:java:io:google-cloud-platform") implementation project(":sdks:java:io:kafka") - runtimeOnly project(":sdks:java:io:iceberg") - runtimeOnly project(":sdks:java:io:iceberg:bqms") - implementation project(":sdks:java:managed") implementation project(":sdks:java:extensions:ml") implementation library.java.avro implementation library.java.bigdataoss_util diff --git a/examples/java/iceberg/build.gradle b/examples/java/iceberg/build.gradle new file mode 100644 index 000000000000..09ef64d32ee3 --- /dev/null +++ b/examples/java/iceberg/build.gradle @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +plugins { + id 'java' + id 'org.apache.beam.module' + id 'com.gradleup.shadow' +} + +applyJavaNature( + exportJavadoc: false, + automaticModuleName: 'org.apache.beam.examples.iceberg', + // iceberg requires Java11+ + requireJavaVersion: JavaVersion.VERSION_11 +) + +description = "Apache Beam :: Examples :: Java :: Iceberg" +ext.summary = """Apache Beam Java SDK examples using IcebergIO.""" + +/** Define the list of runners which execute a precommit test. + * Some runners are run from separate projects, see the preCommit task below + * for details. + */ +def preCommitRunners = ["directRunner", "flinkRunner", "sparkRunner"] +// The following runners have configuration created but not added to preCommit +def nonPreCommitRunners = ["dataflowRunner", "prismRunner"] +for (String runner : preCommitRunners) { + configurations.create(runner + "PreCommit") +} +for (String runner: nonPreCommitRunners) { + configurations.create(runner + "PreCommit") +} +configurations.sparkRunnerPreCommit { + // Ban certain dependencies to prevent a StackOverflow within Spark + // because JUL -> SLF4J -> JUL, and similarly JDK14 -> SLF4J -> JDK14 + exclude group: "org.slf4j", module: "jul-to-slf4j" + exclude group: "org.slf4j", module: "slf4j-jdk14" +} + +dependencies { + implementation enforcedPlatform(library.java.google_cloud_platform_libraries_bom) + runtimeOnly project(":sdks:java:io:iceberg") + runtimeOnly project(":sdks:java:io:iceberg:bqms") + implementation project(path: ":sdks:java:core", configuration: "shadow") + implementation project(":sdks:java:extensions:google-cloud-platform-core") + implementation project(":sdks:java:io:google-cloud-platform") + implementation project(":sdks:java:managed") + implementation library.java.google_auth_library_oauth2_http + implementation library.java.joda_time + runtimeOnly project(path: ":runners:direct-java", configuration: "shadow") + implementation library.java.vendored_guava_32_1_2_jre + runtimeOnly library.java.hadoop_client + runtimeOnly library.java.bigdataoss_gcs_connector + + // Add dependencies for the PreCommit configurations + // For each runner a project level dependency on the examples project. + for (String runner : preCommitRunners) { + delegate.add(runner + "PreCommit", project(path: ":examples:java", configuration: "testRuntimeMigration")) + } + directRunnerPreCommit project(path: ":runners:direct-java", configuration: "shadow") + flinkRunnerPreCommit project(":runners:flink:${project.ext.latestFlinkVersion}") + sparkRunnerPreCommit project(":runners:spark:3") + sparkRunnerPreCommit project(":sdks:java:io:hadoop-file-system") + dataflowRunnerPreCommit project(":runners:google-cloud-dataflow-java") + dataflowRunnerPreCommit project(":runners:google-cloud-dataflow-java:worker") // v2 worker + dataflowRunnerPreCommit project(":sdks:java:harness") // v2 worker + prismRunnerPreCommit project(":runners:prism:java") + + // Add dependency if requested on command line for runner + if (project.hasProperty("runnerDependency")) { + runtimeOnly project(path: project.getProperty("runnerDependency")) + } +} diff --git a/examples/java/src/main/java/org/apache/beam/examples/cookbook/IcebergBatchWriteExample.java b/examples/java/iceberg/src/main/java/org/apache/beam/examples/iceberg/IcebergBatchWriteExample.java similarity index 99% rename from examples/java/src/main/java/org/apache/beam/examples/cookbook/IcebergBatchWriteExample.java rename to examples/java/iceberg/src/main/java/org/apache/beam/examples/iceberg/IcebergBatchWriteExample.java index 458f2b545450..2a5f85e524ed 100644 --- a/examples/java/src/main/java/org/apache/beam/examples/cookbook/IcebergBatchWriteExample.java +++ b/examples/java/iceberg/src/main/java/org/apache/beam/examples/iceberg/IcebergBatchWriteExample.java @@ -15,7 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.beam.examples.cookbook; +package org.apache.beam.examples.iceberg; import java.io.IOException; import java.util.Map; diff --git a/examples/java/src/main/java/org/apache/beam/examples/cookbook/IcebergRestCatalogCDCExample.java b/examples/java/iceberg/src/main/java/org/apache/beam/examples/iceberg/IcebergRestCatalogCDCExample.java similarity index 99% rename from examples/java/src/main/java/org/apache/beam/examples/cookbook/IcebergRestCatalogCDCExample.java rename to examples/java/iceberg/src/main/java/org/apache/beam/examples/iceberg/IcebergRestCatalogCDCExample.java index ecc047a56949..4229e401ab94 100644 --- a/examples/java/src/main/java/org/apache/beam/examples/cookbook/IcebergRestCatalogCDCExample.java +++ b/examples/java/iceberg/src/main/java/org/apache/beam/examples/iceberg/IcebergRestCatalogCDCExample.java @@ -15,7 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.beam.examples.cookbook; +package org.apache.beam.examples.iceberg; import static org.apache.beam.sdk.managed.Managed.ICEBERG_CDC; diff --git a/examples/java/src/main/java/org/apache/beam/examples/cookbook/IcebergRestCatalogStreamingWriteExample.java b/examples/java/iceberg/src/main/java/org/apache/beam/examples/iceberg/IcebergRestCatalogStreamingWriteExample.java similarity index 99% rename from examples/java/src/main/java/org/apache/beam/examples/cookbook/IcebergRestCatalogStreamingWriteExample.java rename to examples/java/iceberg/src/main/java/org/apache/beam/examples/iceberg/IcebergRestCatalogStreamingWriteExample.java index 63dc4ff7056c..0ea73cdf0c87 100644 --- a/examples/java/src/main/java/org/apache/beam/examples/cookbook/IcebergRestCatalogStreamingWriteExample.java +++ b/examples/java/iceberg/src/main/java/org/apache/beam/examples/iceberg/IcebergRestCatalogStreamingWriteExample.java @@ -15,7 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.beam.examples.cookbook; +package org.apache.beam.examples.iceberg; import com.google.auth.oauth2.GoogleCredentials; import java.io.IOException; diff --git a/examples/java/src/main/java/org/apache/beam/examples/cookbook/IcebergTaxiExamples.java b/examples/java/iceberg/src/main/java/org/apache/beam/examples/iceberg/IcebergTaxiExamples.java similarity index 99% rename from examples/java/src/main/java/org/apache/beam/examples/cookbook/IcebergTaxiExamples.java rename to examples/java/iceberg/src/main/java/org/apache/beam/examples/iceberg/IcebergTaxiExamples.java index 446d11d03be4..5b4fe1b9b913 100644 --- a/examples/java/src/main/java/org/apache/beam/examples/cookbook/IcebergTaxiExamples.java +++ b/examples/java/iceberg/src/main/java/org/apache/beam/examples/iceberg/IcebergTaxiExamples.java @@ -15,7 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.beam.examples.cookbook; +package org.apache.beam.examples.iceberg; import java.util.Arrays; import java.util.Map; diff --git a/gradle.properties b/gradle.properties index beb498d11943..5ba364b01de6 100644 --- a/gradle.properties +++ b/gradle.properties @@ -30,8 +30,8 @@ signing.gnupg.useLegacyGpg=true # buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy. # To build a custom Beam version make sure you change it in both places, see # https://github.com/apache/beam/issues/21302. -version=2.68.0-SNAPSHOT -sdk_version=2.68.0.dev +version=2.68.0-post2 +sdk_version=2.68.0.post2 javaVersion=1.8 diff --git a/runners/google-cloud-dataflow-java/build.gradle b/runners/google-cloud-dataflow-java/build.gradle index 05cb8417106d..8256804fb82b 100644 --- a/runners/google-cloud-dataflow-java/build.gradle +++ b/runners/google-cloud-dataflow-java/build.gradle @@ -52,8 +52,8 @@ evaluationDependsOn(":sdks:java:container:java11") ext.dataflowLegacyEnvironmentMajorVersion = '8' ext.dataflowFnapiEnvironmentMajorVersion = '8' -ext.dataflowLegacyContainerVersion = 'beam-master-20250811' -ext.dataflowFnapiContainerVersion = 'beam-master-20250811' +ext.dataflowLegacyContainerVersion = '2.68.0' +ext.dataflowFnapiContainerVersion = '2.68.0' ext.dataflowContainerBaseRepository = 'gcr.io/cloud-dataflow/v1beta3' processResources { diff --git a/runners/google-cloud-dataflow-java/worker/build.gradle b/runners/google-cloud-dataflow-java/worker/build.gradle index fe7e3b93dd0e..4068c5f88e4f 100644 --- a/runners/google-cloud-dataflow-java/worker/build.gradle +++ b/runners/google-cloud-dataflow-java/worker/build.gradle @@ -131,7 +131,7 @@ applyJavaNature( dependencies { // We have to include jetty-server/jetty-servlet and all of its transitive dependencies // which includes several org.eclipse.jetty artifacts + servlet-api - include(dependency("org.eclipse.jetty:.*:9.4.54.v20240208")) + include(dependency("org.eclipse.jetty:.*:9.4.57.v20241219")) include(dependency("javax.servlet:javax.servlet-api:3.1.0")) } relocate("org.eclipse.jetty", getWorkerRelocatedPath("org.eclipse.jetty")) @@ -200,8 +200,8 @@ dependencies { compileOnly "org.conscrypt:conscrypt-openjdk-uber:2.5.1" implementation "javax.servlet:javax.servlet-api:3.1.0" - implementation "org.eclipse.jetty:jetty-server:9.4.54.v20240208" - implementation "org.eclipse.jetty:jetty-servlet:9.4.54.v20240208" + implementation "org.eclipse.jetty:jetty-server:9.4.57.v20241219" + implementation "org.eclipse.jetty:jetty-servlet:9.4.57.v20241219" implementation library.java.avro implementation library.java.jackson_annotations implementation library.java.jackson_core diff --git a/sdks/go/pkg/beam/core/core.go b/sdks/go/pkg/beam/core/core.go index 843cfea07743..2ff8150a618f 100644 --- a/sdks/go/pkg/beam/core/core.go +++ b/sdks/go/pkg/beam/core/core.go @@ -27,7 +27,7 @@ const ( // SdkName is the human readable name of the SDK for UserAgents. SdkName = "Apache Beam SDK for Go" // SdkVersion is the current version of the SDK. - SdkVersion = "2.68.0.dev" + SdkVersion = "2.68.0" // DefaultDockerImage represents the associated image for this release. DefaultDockerImage = "apache/beam_go_sdk:" + SdkVersion diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/logicaltypes/OneOfType.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/logicaltypes/OneOfType.java index 31b6c8db2fed..5c2e376e4bf4 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/logicaltypes/OneOfType.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/logicaltypes/OneOfType.java @@ -155,12 +155,12 @@ public Value toInputType(Row base) { for (int i = 0; i < base.getFieldCount(); ++i) { Object value = base.getValue(i); if (value != null) { - checkArgument(caseType == null, "More than one field set in union " + this); + checkArgument(caseType == null, "More than one field set in union %s", this); caseType = enumerationType.valueOf(oneOfSchema.getField(i).getName()); oneOfValue = value; } } - checkNotNull(oneOfValue, "No value set in union" + this); + checkNotNull(oneOfValue, "No value set in union %s", this); return createValue(caseType, oneOfValue); } diff --git a/sdks/java/expansion-service/container/Dockerfile b/sdks/java/expansion-service/container/Dockerfile index 1b83ec68b994..2688a3176713 100644 --- a/sdks/java/expansion-service/container/Dockerfile +++ b/sdks/java/expansion-service/container/Dockerfile @@ -24,6 +24,8 @@ ARG TARGETARCH WORKDIR /opt/apache/beam # Copy dependencies generated by the Gradle build. +# TODO(https://github.com/apache/beam/issues/34098) remove when Beam moved to avro 1.12 +COPY target/avro.jar jars/ COPY target/beam-sdks-java-io-expansion-service.jar jars/ COPY target/beam-sdks-java-io-google-cloud-platform-expansion-service.jar jars/ COPY target/beam-sdks-java-extensions-schemaio-expansion-service.jar jars/ diff --git a/sdks/java/expansion-service/container/build.gradle b/sdks/java/expansion-service/container/build.gradle index cf81d462f08b..080eb68c3a2e 100644 --- a/sdks/java/expansion-service/container/build.gradle +++ b/sdks/java/expansion-service/container/build.gradle @@ -36,6 +36,8 @@ configurations { } dependencies { + // TODO(https://github.com/apache/beam/issues/34098) remove when Beam moved to avro 1.12 + dockerDependency "org.apache.avro:avro:1.12.0" dockerDependency project(path: ":sdks:java:extensions:schemaio-expansion-service", configuration: "shadow") dockerDependency project(path: ":sdks:java:io:expansion-service", configuration: "shadow") dockerDependency project(path: ":sdks:java:io:google-cloud-platform:expansion-service", configuration: "shadow") @@ -48,6 +50,8 @@ goBuild { task copyDockerfileDependencies(type: Copy) { from configurations.dockerDependency + // TODO(https://github.com/apache/beam/issues/34098) remove when Beam moved to avro 1.12 + rename 'avro-.*.jar', 'avro.jar' rename 'beam-sdks-java-extensions-schemaio-expansion-service-.*.jar', 'beam-sdks-java-extensions-schemaio-expansion-service.jar' rename 'beam-sdks-java-io-expansion-service-.*.jar', 'beam-sdks-java-io-expansion-service.jar' rename 'beam-sdks-java-io-google-cloud-platform-expansion-service-.*.jar', 'beam-sdks-java-io-google-cloud-platform-expansion-service.jar' diff --git a/sdks/java/extensions/sql/build.gradle b/sdks/java/extensions/sql/build.gradle index 2ad4ddf306bb..afbc87f8eeba 100644 --- a/sdks/java/extensions/sql/build.gradle +++ b/sdks/java/extensions/sql/build.gradle @@ -76,10 +76,6 @@ dependencies { fmppTask "org.freemarker:freemarker:2.3.31" fmppTemplates library.java.vendored_calcite_1_40_0 implementation project(path: ":sdks:java:core", configuration: "shadow") - implementation project(":sdks:java:managed") - implementation project(":sdks:java:io:iceberg") - runtimeOnly project(":sdks:java:io:iceberg:bqms") - runtimeOnly project(":sdks:java:io:iceberg:hive") implementation project(":sdks:java:extensions:avro") implementation project(":sdks:java:extensions:join-library") permitUnusedDeclared project(":sdks:java:extensions:join-library") // BEAM-11761 @@ -120,9 +116,6 @@ dependencies { permitUnusedDeclared library.java.hadoop_client provided library.java.kafka_clients - implementation "org.apache.iceberg:iceberg-api:1.6.1" - permitUnusedDeclared "org.apache.iceberg:iceberg-api:1.6.1" // errorprone crash cannot find this transient dep - testImplementation "org.apache.iceberg:iceberg-core:1.6.1" testImplementation library.java.vendored_calcite_1_40_0 testImplementation library.java.vendored_guava_32_1_2_jre testImplementation library.java.junit diff --git a/sdks/java/extensions/sql/hcatalog/build.gradle b/sdks/java/extensions/sql/hcatalog/build.gradle index e8abf21b7c3e..0a267a6f424e 100644 --- a/sdks/java/extensions/sql/hcatalog/build.gradle +++ b/sdks/java/extensions/sql/hcatalog/build.gradle @@ -26,7 +26,7 @@ applyJavaNature( ) def hive_version = "3.1.3" -def netty_version = "4.1.51.Final" +def netty_version = "4.1.110.Final" /* * We need to rely on manually specifying these evaluationDependsOn to ensure that diff --git a/sdks/java/extensions/sql/iceberg/build.gradle b/sdks/java/extensions/sql/iceberg/build.gradle new file mode 100644 index 000000000000..d5f9e74c53bd --- /dev/null +++ b/sdks/java/extensions/sql/iceberg/build.gradle @@ -0,0 +1,81 @@ +import groovy.json.JsonOutput + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +plugins { id 'org.apache.beam.module' } + +applyJavaNature( + automaticModuleName: 'org.apache.beam.sdk.extensions.sql.meta.provider.hcatalog', + // iceberg requires Java11+ + requireJavaVersion: JavaVersion.VERSION_11, +) + +dependencies { + implementation project(":sdks:java:extensions:sql") + implementation project(":sdks:java:core") + implementation project(":sdks:java:managed") + implementation project(":sdks:java:io:iceberg") + runtimeOnly project(":sdks:java:io:iceberg:bqms") + runtimeOnly project(":sdks:java:io:iceberg:hive") + // TODO(https://github.com/apache/beam/issues/21156): Determine how to build without this dependency + provided "org.immutables:value:2.8.8" + permitUnusedDeclared "org.immutables:value:2.8.8" + implementation library.java.slf4j_api + implementation library.java.vendored_guava_32_1_2_jre + implementation library.java.vendored_calcite_1_40_0 + implementation library.java.jackson_databind + + testImplementation library.java.joda_time + testImplementation library.java.junit + testImplementation library.java.google_api_services_bigquery + testImplementation "org.apache.iceberg:iceberg-api:1.9.2" + testImplementation "org.apache.iceberg:iceberg-core:1.9.2" + testImplementation project(":sdks:java:io:google-cloud-platform") + testImplementation project(":sdks:java:extensions:google-cloud-platform-core") +} + +task integrationTest(type: Test) { + def gcpProject = project.findProperty('gcpProject') ?: 'apache-beam-testing' + def gcsTempRoot = project.findProperty('gcsTempRoot') ?: 'gs://temp-storage-for-end-to-end-tests/' + + // Disable Gradle cache (it should not be used because the IT's won't run). + outputs.upToDateWhen { false } + + def pipelineOptions = [ + "--project=${gcpProject}", + "--tempLocation=${gcsTempRoot}", + "--blockOnRun=false"] + + systemProperty "beamTestPipelineOptions", JsonOutput.toJson(pipelineOptions) + + include '**/*IT.class' + + maxParallelForks 4 + classpath = project(":sdks:java:extensions:sql:iceberg") + .sourceSets + .test + .runtimeClasspath + testClassesDirs = files(project(":sdks:java:extensions:sql:iceberg").sourceSets.test.output.classesDirs) + useJUnit { } +} + +configurations.all { + // iceberg-core needs avro:1.12.0 + resolutionStrategy.force 'org.apache.avro:avro:1.12.0' +} diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/iceberg/IcebergCatalog.java b/sdks/java/extensions/sql/iceberg/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/iceberg/IcebergCatalog.java similarity index 100% rename from sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/iceberg/IcebergCatalog.java rename to sdks/java/extensions/sql/iceberg/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/iceberg/IcebergCatalog.java diff --git a/sdks/java/extensions/sql/iceberg/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/iceberg/IcebergCatalogRegistrar.java b/sdks/java/extensions/sql/iceberg/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/iceberg/IcebergCatalogRegistrar.java new file mode 100644 index 000000000000..03c524f7b0fc --- /dev/null +++ b/sdks/java/extensions/sql/iceberg/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/iceberg/IcebergCatalogRegistrar.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.sql.meta.provider.iceberg; + +import com.google.auto.service.AutoService; +import org.apache.beam.sdk.extensions.sql.meta.catalog.Catalog; +import org.apache.beam.sdk.extensions.sql.meta.catalog.CatalogRegistrar; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; + +@AutoService(CatalogRegistrar.class) +public class IcebergCatalogRegistrar implements CatalogRegistrar { + @Override + public Iterable> getCatalogs() { + return ImmutableList.>builder().add(IcebergCatalog.class).build(); + } +} diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/iceberg/IcebergFilter.java b/sdks/java/extensions/sql/iceberg/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/iceberg/IcebergFilter.java similarity index 100% rename from sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/iceberg/IcebergFilter.java rename to sdks/java/extensions/sql/iceberg/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/iceberg/IcebergFilter.java diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/iceberg/IcebergTable.java b/sdks/java/extensions/sql/iceberg/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/iceberg/IcebergTable.java similarity index 100% rename from sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/iceberg/IcebergTable.java rename to sdks/java/extensions/sql/iceberg/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/iceberg/IcebergTable.java diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/iceberg/IcebergTableProvider.java b/sdks/java/extensions/sql/iceberg/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/iceberg/IcebergTableProvider.java similarity index 100% rename from sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/iceberg/IcebergTableProvider.java rename to sdks/java/extensions/sql/iceberg/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/iceberg/IcebergTableProvider.java diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/iceberg/package-info.java b/sdks/java/extensions/sql/iceberg/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/iceberg/package-info.java similarity index 100% rename from sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/iceberg/package-info.java rename to sdks/java/extensions/sql/iceberg/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/iceberg/package-info.java diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/iceberg/BeamSqlCliIcebergTest.java b/sdks/java/extensions/sql/iceberg/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/iceberg/BeamSqlCliIcebergTest.java similarity index 100% rename from sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/iceberg/BeamSqlCliIcebergTest.java rename to sdks/java/extensions/sql/iceberg/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/iceberg/BeamSqlCliIcebergTest.java diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/iceberg/IcebergFilterTest.java b/sdks/java/extensions/sql/iceberg/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/iceberg/IcebergFilterTest.java similarity index 100% rename from sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/iceberg/IcebergFilterTest.java rename to sdks/java/extensions/sql/iceberg/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/iceberg/IcebergFilterTest.java diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/iceberg/IcebergReadWriteIT.java b/sdks/java/extensions/sql/iceberg/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/iceberg/IcebergReadWriteIT.java similarity index 98% rename from sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/iceberg/IcebergReadWriteIT.java rename to sdks/java/extensions/sql/iceberg/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/iceberg/IcebergReadWriteIT.java index a223194b8e91..a7b128b2bca3 100644 --- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/iceberg/IcebergReadWriteIT.java +++ b/sdks/java/extensions/sql/iceberg/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/iceberg/IcebergReadWriteIT.java @@ -19,7 +19,6 @@ import static java.lang.String.format; import static java.util.Arrays.asList; -import static org.apache.beam.sdk.extensions.sql.utils.DateTimeUtils.parseTimestampWithUTCTimeZone; import static org.apache.beam.sdk.schemas.Schema.FieldType.BOOLEAN; import static org.apache.beam.sdk.schemas.Schema.FieldType.DOUBLE; import static org.apache.beam.sdk.schemas.Schema.FieldType.FLOAT; @@ -64,6 +63,7 @@ import org.apache.iceberg.catalog.Catalog; import org.apache.iceberg.catalog.TableIdentifier; import org.joda.time.Duration; +import org.joda.time.format.DateTimeFormat; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Rule; @@ -238,7 +238,9 @@ public void runSqlWriteAndRead(boolean withPartitionFields) (float) 1.0, 1.0, true, - parseTimestampWithUTCTimeZone("2018-05-28 20:17:40.123"), + DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss.SSS") + .withZoneUTC() + .parseDateTime("2018-05-28 20:17:40.123"), "varchar", "char", asList("123", "456"), diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/iceberg/IcebergTableProviderTest.java b/sdks/java/extensions/sql/iceberg/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/iceberg/IcebergTableProviderTest.java similarity index 100% rename from sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/iceberg/IcebergTableProviderTest.java rename to sdks/java/extensions/sql/iceberg/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/iceberg/IcebergTableProviderTest.java diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/PubsubToIcebergIT.java b/sdks/java/extensions/sql/iceberg/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/iceberg/PubsubToIcebergIT.java similarity index 98% rename from sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/PubsubToIcebergIT.java rename to sdks/java/extensions/sql/iceberg/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/iceberg/PubsubToIcebergIT.java index 7343c9b9a52f..bdd710c861e0 100644 --- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/PubsubToIcebergIT.java +++ b/sdks/java/extensions/sql/iceberg/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/iceberg/PubsubToIcebergIT.java @@ -15,7 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.beam.sdk.extensions.sql; +package org.apache.beam.sdk.extensions.sql.meta.provider.iceberg; import static java.lang.String.format; import static java.nio.charset.StandardCharsets.UTF_8; @@ -33,6 +33,7 @@ import java.util.UUID; import java.util.stream.Collectors; import org.apache.beam.sdk.extensions.gcp.options.GcpOptions; +import org.apache.beam.sdk.extensions.sql.SqlTransform; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryUtils; import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage; import org.apache.beam.sdk.io.gcp.pubsub.TestPubsub; diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/catalog/InMemoryCatalogRegistrar.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/catalog/InMemoryCatalogRegistrar.java index 2d94e19c1689..afffa24e6cd7 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/catalog/InMemoryCatalogRegistrar.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/catalog/InMemoryCatalogRegistrar.java @@ -18,16 +18,12 @@ package org.apache.beam.sdk.extensions.sql.meta.catalog; import com.google.auto.service.AutoService; -import org.apache.beam.sdk.extensions.sql.meta.provider.iceberg.IcebergCatalog; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; @AutoService(CatalogRegistrar.class) public class InMemoryCatalogRegistrar implements CatalogRegistrar { @Override public Iterable> getCatalogs() { - return ImmutableList.>builder() - .add(InMemoryCatalog.class) - .add(IcebergCatalog.class) - .build(); + return ImmutableList.>builder().add(InMemoryCatalog.class).build(); } } diff --git a/sdks/java/io/expansion-service/build.gradle b/sdks/java/io/expansion-service/build.gradle index c139315d925f..08c3f2b051dc 100644 --- a/sdks/java/io/expansion-service/build.gradle +++ b/sdks/java/io/expansion-service/build.gradle @@ -25,6 +25,8 @@ applyJavaNature( exportJavadoc: false, validateShadowJar: false, shadowClosure: {}, + // iceberg requires Java11+ + requireJavaVersion: JavaVersion.VERSION_11 ) // We don't want to use the latest version for the entire beam sdk since beam Java users can override it themselves. @@ -33,9 +35,8 @@ applyJavaNature( configurations.runtimeClasspath { // Pin kafka-clients version due to <3.4.0 missing auth callback classes. resolutionStrategy.force 'org.apache.kafka:kafka-clients:3.9.0' - // Pin avro to 1.11.4 due to https://github.com/apache/beam/issues/34968 - // cannot upgrade this to the latest version due to https://github.com/apache/beam/issues/34993 - resolutionStrategy.force 'org.apache.avro:avro:1.11.4' + // iceberg needs avro:1.12.0 + resolutionStrategy.force 'org.apache.avro:avro:1.12.0' // force parquet-avro:1.15.2 to fix CVE-2025-46762 resolutionStrategy.force 'org.apache.parquet:parquet-avro:1.15.2' @@ -66,18 +67,17 @@ dependencies { permitUnusedDeclared project(":sdks:java:expansion-service") // BEAM-11761 implementation project(":sdks:java:managed") permitUnusedDeclared project(":sdks:java:managed") // BEAM-11761 - implementation project(":sdks:java:io:iceberg") - permitUnusedDeclared project(":sdks:java:io:iceberg") // BEAM-11761 implementation project(":sdks:java:io:kafka") permitUnusedDeclared project(":sdks:java:io:kafka") // BEAM-11761 implementation project(":sdks:java:io:kafka:upgrade") permitUnusedDeclared project(":sdks:java:io:kafka:upgrade") // BEAM-11761 - // **** IcebergIO catalogs **** - // HiveCatalog - runtimeOnly project(path: ":sdks:java:io:iceberg:hive") - // BigQueryMetastoreCatalog (Java 11+) - runtimeOnly project(path: ":sdks:java:io:iceberg:bqms", configuration: "shadow") + if (JavaVersion.current().compareTo(JavaVersion.VERSION_11) >= 0 && project.findProperty('testJavaVersion') != '8') { + // iceberg ended support for Java 8 in 1.7.0 + runtimeOnly project(":sdks:java:io:iceberg") + runtimeOnly project(":sdks:java:io:iceberg:hive") + runtimeOnly project(path: ":sdks:java:io:iceberg:bqms", configuration: "shadow") + } runtimeOnly library.java.kafka_clients runtimeOnly library.java.slf4j_jdk14 diff --git a/sdks/java/io/iceberg/build.gradle b/sdks/java/io/iceberg/build.gradle index ba1d27b0e3e6..0f0fa0a2bb9f 100644 --- a/sdks/java/io/iceberg/build.gradle +++ b/sdks/java/io/iceberg/build.gradle @@ -23,6 +23,8 @@ import java.util.stream.Collectors plugins { id 'org.apache.beam.module' } applyJavaNature( automaticModuleName: 'org.apache.beam.sdk.io.iceberg', + // iceberg ended support for Java 8 in 1.7.0 + requireJavaVersion: JavaVersion.VERSION_11, ) description = "Apache Beam :: SDKs :: Java :: IO :: Iceberg" @@ -37,10 +39,7 @@ def hadoopVersions = [ hadoopVersions.each {kv -> configurations.create("hadoopVersion$kv.key")} -// we cannot upgrade this since the newer iceberg requires Java 11 -// many other modules like examples/expansion use Java 8 and have the iceberg dependency -// def iceberg_version = "1.9.0" -def iceberg_version = "1.6.1" +def iceberg_version = "1.9.2" def parquet_version = "1.15.2" def orc_version = "1.9.2" def hive_version = "3.1.3" @@ -107,6 +106,11 @@ dependencies { } } +configurations.all { + // iceberg-core needs avro:1.12.0 + resolutionStrategy.force 'org.apache.avro:avro:1.12.0' +} + hadoopVersions.each {kv -> configurations."hadoopVersion$kv.key" { resolutionStrategy { diff --git a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/RecordWriterManagerTest.java b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/RecordWriterManagerTest.java index b240442deb6d..36b74967f0b2 100644 --- a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/RecordWriterManagerTest.java +++ b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/RecordWriterManagerTest.java @@ -34,8 +34,8 @@ import java.nio.ByteBuffer; import java.time.LocalDate; import java.time.LocalDateTime; +import java.time.LocalTime; import java.util.ArrayList; -import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -59,11 +59,15 @@ import org.apache.iceberg.catalog.Namespace; import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.hadoop.HadoopCatalog; +import org.apache.iceberg.transforms.Transform; import org.apache.iceberg.types.Conversions; +import org.apache.iceberg.types.Type; import org.apache.iceberg.types.Types; +import org.apache.iceberg.util.DateTimeUtil; import org.checkerframework.checker.nullness.qual.Nullable; import org.joda.time.DateTime; import org.joda.time.DateTimeZone; +import org.joda.time.ReadableDateTime; import org.junit.Before; import org.junit.ClassRule; import org.junit.Rule; @@ -532,13 +536,35 @@ public void testIdentityPartitioning() throws IOException { assertEquals(1, dataFile.getRecordCount()); // build this string: bool=true/int=1/long=1/float=1.0/double=1.0/str=str List expectedPartitions = new ArrayList<>(); - List dateTypes = Arrays.asList("date", "time", "datetime", "datetime_tz"); - for (Schema.Field field : primitiveTypeSchema.getFields()) { - Object val = checkStateNotNull(row.getValue(field.getName())); - if (dateTypes.contains(field.getName())) { - val = URLEncoder.encode(val.toString(), UTF_8.toString()); + + for (PartitionField field : spec.fields()) { + String name = field.name(); + Type type = spec.schema().findType(name); + Transform transform = (Transform) field.transform(); + String val; + switch (name) { + case "date": + LocalDate localDate = checkStateNotNull(row.getValue(name)); + Integer day = Integer.parseInt(String.valueOf(localDate.toEpochDay())); + val = transform.toHumanString(type, day); + break; + case "time": + LocalTime localTime = checkStateNotNull(row.getValue(name)); + val = transform.toHumanString(type, localTime.toNanoOfDay() / 1000); + break; + case "datetime": + LocalDateTime ldt = checkStateNotNull(row.getValue(name)); + val = transform.toHumanString(type, DateTimeUtil.microsFromTimestamp(ldt)); + break; + case "datetime_tz": + ReadableDateTime dt = checkStateNotNull(row.getDateTime(name)); + val = transform.toHumanString(type, dt.getMillis() * 1000); + break; + default: + val = transform.toHumanString(type, checkStateNotNull(row.getValue(name))); + break; } - expectedPartitions.add(field.getName() + "=" + val); + expectedPartitions.add(name + "=" + URLEncoder.encode(val, UTF_8.toString())); } String expectedPartitionPath = String.join("/", expectedPartitions); assertEquals(expectedPartitionPath, dataFile.getPartitionPath()); diff --git a/sdks/python/apache_beam/io/jdbc.py b/sdks/python/apache_beam/io/jdbc.py index 79e6b3ce315e..df5d7f21a343 100644 --- a/sdks/python/apache_beam/io/jdbc.py +++ b/sdks/python/apache_beam/io/jdbc.py @@ -87,7 +87,6 @@ # pytype: skip-file import contextlib -import datetime import typing import numpy as np @@ -96,10 +95,11 @@ from apache_beam.transforms.external import BeamJarExpansionService from apache_beam.transforms.external import ExternalTransform from apache_beam.transforms.external import NamedTupleBasedPayloadBuilder +from apache_beam.typehints.schemas import JdbcDateType # pylint: disable=unused-import +from apache_beam.typehints.schemas import JdbcTimeType # pylint: disable=unused-import from apache_beam.typehints.schemas import LogicalType from apache_beam.typehints.schemas import MillisInstant from apache_beam.typehints.schemas import typing_to_runner_api -from apache_beam.utils.timestamp import Timestamp __all__ = [ 'WriteToJdbc', @@ -399,91 +399,3 @@ def __init__( ), expansion_service or default_io_expansion_service(classpath), ) - - -@LogicalType.register_logical_type -class JdbcDateType(LogicalType[datetime.date, MillisInstant, str]): - """ - For internal use only; no backwards-compatibility guarantees. - - Support of Legacy JdbcIO DATE logical type. Deemed to change when Java JDBCIO - has been migrated to Beam portable logical types. - """ - def __init__(self, argument=""): - pass - - @classmethod - def representation_type(cls) -> type: - return MillisInstant - - @classmethod - def urn(cls): - return "beam:logical_type:javasdk_date:v1" - - @classmethod - def language_type(cls): - return datetime.date - - def to_representation_type(self, value: datetime.date) -> Timestamp: - return Timestamp.from_utc_datetime( - datetime.datetime.combine( - value, datetime.datetime.min.time(), tzinfo=datetime.timezone.utc)) - - def to_language_type(self, value: Timestamp) -> datetime.date: - return value.to_utc_datetime().date() - - @classmethod - def argument_type(cls): - return str - - def argument(self): - return "" - - @classmethod - def _from_typing(cls, typ): - return cls() - - -@LogicalType.register_logical_type -class JdbcTimeType(LogicalType[datetime.time, MillisInstant, str]): - """ - For internal use only; no backwards-compatibility guarantees. - - Support of Legacy JdbcIO TIME logical type. . Deemed to change when Java - JDBCIO has been migrated to Beam portable logical types. - """ - def __init__(self, argument=""): - pass - - @classmethod - def representation_type(cls) -> type: - return MillisInstant - - @classmethod - def urn(cls): - return "beam:logical_type:javasdk_time:v1" - - @classmethod - def language_type(cls): - return datetime.time - - def to_representation_type(self, value: datetime.date) -> Timestamp: - return Timestamp.from_utc_datetime( - datetime.datetime.combine( - datetime.datetime.utcfromtimestamp(0), - value, - tzinfo=datetime.timezone.utc)) - - def to_language_type(self, value: Timestamp) -> datetime.date: - return value.to_utc_datetime().time() - - @classmethod - def argument_type(cls): - return str - - def argument(self): - return "" - - @classmethod - def _from_typing(cls, typ): - return cls() diff --git a/sdks/python/apache_beam/transforms/core.py b/sdks/python/apache_beam/transforms/core.py index e7d88e8884ec..1bfc732d13a3 100644 --- a/sdks/python/apache_beam/transforms/core.py +++ b/sdks/python/apache_beam/transforms/core.py @@ -3642,11 +3642,13 @@ class ApplyPartitionFnFn(DoFn): """A DoFn that applies a PartitionFn.""" def process(self, element, partitionfn, n, *args, **kwargs): partition = partitionfn.partition_for(element, n, *args, **kwargs) - if isinstance(partition, bool) or not isinstance(partition, int): + import numbers + if isinstance(partition, + bool) or not isinstance(partition, numbers.Integral): raise ValueError( f"PartitionFn yielded a '{type(partition).__name__}' " "when it should only yield integers") - if not 0 <= partition < n: + if not 0 <= int(partition) < n: raise ValueError( 'PartitionFn specified out-of-bounds partition index: ' '%d not in [0, %d)' % (partition, n)) diff --git a/sdks/python/apache_beam/transforms/core_test.py b/sdks/python/apache_beam/transforms/core_test.py index a4fa3b528178..3e5e7670bf50 100644 --- a/sdks/python/apache_beam/transforms/core_test.py +++ b/sdks/python/apache_beam/transforms/core_test.py @@ -218,6 +218,61 @@ def test_partition_with_bools(self): p | beam.Create([input_value]) | beam.Partition(lambda x, _: x, 2)) + def test_partition_with_numpy_integers(self): + # Test that numpy integer types are correctly accepted by the + # ApplyPartitionFnFn class + import numpy as np + + # Create an instance of the ApplyPartitionFnFn class + apply_partition_fn = beam.Partition.ApplyPartitionFnFn() + + # Define a simple partition function + class SimplePartitionFn(beam.PartitionFn): + def partition_for(self, element, num_partitions): + return element % num_partitions + + partition_fn = SimplePartitionFn() + + # Test with numpy.int32 + # This should not raise an exception + outputs = list(apply_partition_fn.process(np.int32(1), partition_fn, 3)) + self.assertEqual(len(outputs), 1) + self.assertEqual(outputs[0].tag, '1') # 1 % 3 = 1 + + # Test with numpy.int64 + # This should not raise an exception + outputs = list(apply_partition_fn.process(np.int64(2), partition_fn, 3)) + self.assertEqual(len(outputs), 1) + self.assertEqual(outputs[0].tag, '2') # 2 % 3 = 2 + + def test_partition_fn_returning_numpy_integers(self): + # Test that partition functions can return numpy integer types + import numpy as np + + # Create an instance of the ApplyPartitionFnFn class + apply_partition_fn = beam.Partition.ApplyPartitionFnFn() + + # Define partition functions that return numpy integer types + class Int32PartitionFn(beam.PartitionFn): + def partition_for(self, element, num_partitions): + return np.int32(element % num_partitions) + + class Int64PartitionFn(beam.PartitionFn): + def partition_for(self, element, num_partitions): + return np.int64(element % num_partitions) + + # Test with partition function returning numpy.int32 + # This should not raise an exception + outputs = list(apply_partition_fn.process(1, Int32PartitionFn(), 3)) + self.assertEqual(len(outputs), 1) + self.assertEqual(outputs[0].tag, '1') # 1 % 3 = 1 + + # Test with partition function returning numpy.int64 + # This should not raise an exception + outputs = list(apply_partition_fn.process(2, Int64PartitionFn(), 3)) + self.assertEqual(len(outputs), 1) + self.assertEqual(outputs[0].tag, '2') # 2 % 3 = 2 + def test_partition_boundedness(self): def partition_fn(val, num_partitions): return val % num_partitions diff --git a/sdks/python/apache_beam/typehints/native_type_compatibility.py b/sdks/python/apache_beam/typehints/native_type_compatibility.py index 0806a2e6624e..b6bf6d37fe02 100644 --- a/sdks/python/apache_beam/typehints/native_type_compatibility.py +++ b/sdks/python/apache_beam/typehints/native_type_compatibility.py @@ -164,7 +164,7 @@ def _match_is_exactly_sequence(user_type): def match_is_named_tuple(user_type): return ( _safe_issubclass(user_type, typing.Tuple) and - hasattr(user_type, '__annotations__')) + hasattr(user_type, '__annotations__') and hasattr(user_type, '_fields')) def _match_is_optional(user_type): diff --git a/sdks/python/apache_beam/typehints/schemas.py b/sdks/python/apache_beam/typehints/schemas.py index 32dc2fd06ece..c21dde426fc7 100644 --- a/sdks/python/apache_beam/typehints/schemas.py +++ b/sdks/python/apache_beam/typehints/schemas.py @@ -66,6 +66,7 @@ # pytype: skip-file +import datetime import decimal import logging from typing import Any @@ -1189,3 +1190,94 @@ def argument_type(cls): def argument(self): return self.max_length + + +# TODO: A temporary fix for missing jdbc logical types. +# See the discussion in https://github.com/apache/beam/issues/35738 for +# more detail. +@LogicalType.register_logical_type +class JdbcDateType(LogicalType[datetime.date, MillisInstant, str]): + """ + For internal use only; no backwards-compatibility guarantees. + + Support of Legacy JdbcIO DATE logical type. Deemed to change when Java JDBCIO + has been migrated to Beam portable logical types. + """ + def __init__(self, argument=""): + pass + + @classmethod + def representation_type(cls) -> type: + return MillisInstant + + @classmethod + def urn(cls): + return "beam:logical_type:javasdk_date:v1" + + @classmethod + def language_type(cls): + return datetime.date + + def to_representation_type(self, value: datetime.date) -> Timestamp: + return Timestamp.from_utc_datetime( + datetime.datetime.combine( + value, datetime.datetime.min.time(), tzinfo=datetime.timezone.utc)) + + def to_language_type(self, value: Timestamp) -> datetime.date: + return value.to_utc_datetime().date() + + @classmethod + def argument_type(cls): + return str + + def argument(self): + return "" + + @classmethod + def _from_typing(cls, typ): + return cls() + + +@LogicalType.register_logical_type +class JdbcTimeType(LogicalType[datetime.time, MillisInstant, str]): + """ + For internal use only; no backwards-compatibility guarantees. + + Support of Legacy JdbcIO TIME logical type. . Deemed to change when Java + JDBCIO has been migrated to Beam portable logical types. + """ + def __init__(self, argument=""): + pass + + @classmethod + def representation_type(cls) -> type: + return MillisInstant + + @classmethod + def urn(cls): + return "beam:logical_type:javasdk_time:v1" + + @classmethod + def language_type(cls): + return datetime.time + + def to_representation_type(self, value: datetime.time) -> Timestamp: + return Timestamp.from_utc_datetime( + datetime.datetime.combine( + datetime.datetime.utcfromtimestamp(0), + value, + tzinfo=datetime.timezone.utc)) + + def to_language_type(self, value: Timestamp) -> datetime.time: + return value.to_utc_datetime().time() + + @classmethod + def argument_type(cls): + return str + + def argument(self): + return "" + + @classmethod + def _from_typing(cls, typ): + return cls() diff --git a/sdks/python/apache_beam/utils/subprocess_server.py b/sdks/python/apache_beam/utils/subprocess_server.py index 84848479430b..4fc613dcaf33 100644 --- a/sdks/python/apache_beam/utils/subprocess_server.py +++ b/sdks/python/apache_beam/utils/subprocess_server.py @@ -400,6 +400,8 @@ def path_to_beam_jar( return local_path maven_repo = cls.MAVEN_CENTRAL_REPOSITORY + if ".post" in version: + version = version.split(".post")[0] if 'rc' in version: # Release candidate version = version.split('rc')[0] diff --git a/sdks/python/apache_beam/version.py b/sdks/python/apache_beam/version.py index 8b6c2550238a..fe78423f2979 100644 --- a/sdks/python/apache_beam/version.py +++ b/sdks/python/apache_beam/version.py @@ -17,4 +17,4 @@ """Apache Beam SDK version information and utilities.""" -__version__ = '2.68.0.dev' +__version__ = '2.68.0.post2' diff --git a/sdks/python/pyproject.toml b/sdks/python/pyproject.toml index 97a9fe6141ea..7a8e42015e0f 100644 --- a/sdks/python/pyproject.toml +++ b/sdks/python/pyproject.toml @@ -27,7 +27,7 @@ requires = [ # Avoid https://github.com/pypa/virtualenv/issues/2006 "distlib==0.3.7", # Numpy headers - "numpy>=1.14.3,<2.3.0", # Update setup.py as well. + "numpy>=1.14.3,<2.4.0", # Update setup.py as well. # having cython here will create wheels that are platform dependent. "cython>=3.0,<4", ## deps for generating external transform wrappers: diff --git a/sdks/python/setup.py b/sdks/python/setup.py index 69a3555ec55f..17a99f7f96b6 100644 --- a/sdks/python/setup.py +++ b/sdks/python/setup.py @@ -369,7 +369,7 @@ def get_portability_package_data(): 'fastavro>=0.23.6,<2', 'fasteners>=0.3,<1.0', # TODO(https://github.com/grpc/grpc/issues/37710): Unpin grpc - 'grpcio>=1.33.1,<2,!=1.48.0,!=1.59.*,!=1.60.*,!=1.61.*,!=1.62.0,!=1.62.1,<1.66.0; python_version <= "3.12"', # pylint: disable=line-too-long + 'grpcio>=1.33.1,<2,!=1.48.0,!=1.59.*,!=1.60.*,!=1.61.*,!=1.62.0,!=1.62.1; python_version <= "3.12"', # pylint: disable=line-too-long 'grpcio>=1.67.0; python_version >= "3.13"', 'hdfs>=2.1.0,<3.0.0', 'httplib2>=0.8,<0.23.0', @@ -377,7 +377,7 @@ def get_portability_package_data(): 'jsonpickle>=3.0.0,<4.0.0', # numpy can have breaking changes in minor versions. # Use a strict upper bound. - 'numpy>=1.14.3,<2.3.0', # Update pyproject.toml as well. + 'numpy>=1.14.3,<2.4.0', # Update pyproject.toml as well. 'objsize>=0.6.1,<0.8.0', 'packaging>=22.0', 'pymongo>=3.8.0,<5.0.0', @@ -392,11 +392,11 @@ def get_portability_package_data(): # # 3. Exclude protobuf 4 versions that leak memory, see: # https://github.com/apache/beam/issues/28246 - 'protobuf>=3.20.3,<6.0.0.dev0,!=4.0.*,!=4.21.*,!=4.22.0,!=4.23.*,!=4.24.*', # pylint: disable=line-too-long + 'protobuf>=3.20.3,!=4.0.*,!=4.21.*,!=4.22.0,!=4.23.*,!=4.24.*', # pylint: disable=line-too-long 'pydot>=1.2.0,<2', 'python-dateutil>=2.8.0,<3', 'pytz>=2018.3', - 'redis>=5.0.0,<6', + 'redis>=5.0.0', 'regex>=2020.6.8', 'requests>=2.32.4,<3.0.0', 'sortedcontainers>=2.4.0', @@ -453,8 +453,7 @@ def get_portability_package_data(): 'gcp': [ 'cachetools>=3.1.0,<7', 'google-api-core>=2.0.0,<3', - 'google-apitools>=0.5.31,<0.5.32; python_version < "3.13"', - 'google-apitools>=0.5.32,<0.5.33; python_version >= "3.13"', + 'google-apitools>=0.5.31', # NOTE: Maintainers, please do not require google-auth>=2.x.x # Until this issue is closed # https://github.com/googleapis/google-cloud-python/issues/10566 diff --git a/sdks/python/test-suites/portable/common.gradle b/sdks/python/test-suites/portable/common.gradle index f7fa3e8e0b26..f325582cbf54 100644 --- a/sdks/python/test-suites/portable/common.gradle +++ b/sdks/python/test-suites/portable/common.gradle @@ -96,7 +96,7 @@ tasks.register("portableLocalRunnerJuliaSetWithSetupPy") { && python juliaset_main.py \\ --runner=PortableRunner \\ --job_endpoint=embed \\ - --setup_file=./setup.py \\ + --requirements_file=./requirements.txt \\ --coordinate_output=/tmp/juliaset \\ --grid_size=1 """ diff --git a/sdks/typescript/package.json b/sdks/typescript/package.json index 0ea99eee3a6f..750e7685e6d7 100644 --- a/sdks/typescript/package.json +++ b/sdks/typescript/package.json @@ -1,6 +1,6 @@ { "name": "apache-beam", - "version": "2.68.0-SNAPSHOT", + "version": "2.68.0", "devDependencies": { "@google-cloud/bigquery": "^5.12.0", "@types/mocha": "^9.0.0", diff --git a/settings.gradle.kts b/settings.gradle.kts index 135d9da42b05..451c33f308ac 100644 --- a/settings.gradle.kts +++ b/settings.gradle.kts @@ -296,6 +296,12 @@ include(":sdks:python:container:distroless:py310") include(":sdks:python:container:distroless:py311") include(":sdks:python:container:distroless:py312") include(":sdks:python:container:distroless:py313") +include(":sdks:python:container:ml") +include(":sdks:python:container:ml:py39") +include(":sdks:python:container:ml:py310") +include(":sdks:python:container:ml:py311") +include(":sdks:python:container:ml:py312") +include(":sdks:python:container:ml:py313") include(":sdks:python:expansion-service-container") include(":sdks:python:test-suites:dataflow") include(":sdks:python:test-suites:dataflow:py39") @@ -372,3 +378,7 @@ include("sdks:java:io:iceberg:bqms") findProject(":sdks:java:io:iceberg:bqms")?.name = "bqms" include("it:clickhouse") findProject(":it:clickhouse")?.name = "clickhouse" +include("sdks:java:extensions:sql:iceberg") +findProject(":sdks:java:extensions:sql:iceberg")?.name = "iceberg" +include("examples:java:iceberg") +findProject(":examples:java:iceberg")?.name = "iceberg"