Skip to content

Add integration tests to bulk spanner migration for checking MySQL data type support#3046

Merged
VardhanThigle merged 7 commits intoGoogleCloudPlatform:mainfrom
nmemond:data-type-tests-bulk
Dec 24, 2025
Merged

Add integration tests to bulk spanner migration for checking MySQL data type support#3046
VardhanThigle merged 7 commits intoGoogleCloudPlatform:mainfrom
nmemond:data-type-tests-bulk

Conversation

@nmemond
Copy link
Contributor

@nmemond nmemond commented Dec 3, 2025

This adds some missing data type mappings to the existing data type integration test, and also adds a test for a migration to a postgres dialect spanner instance.

Note that some of the type mappings fail to migrate as expected. The checks for those are still included for completeness, but they're commented out to avoid failing the tests.

@nmemond nmemond requested a review from a team as a code owner December 3, 2025 17:09
@gemini-code-assist
Copy link

Summary of Changes

Hello @nmemond, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request significantly enhances the integration test suite for bulk Spanner migrations by broadening the range of MySQL data types covered and introducing a dedicated test for migrations targeting PostgreSQL dialect Spanner instances. These changes aim to improve the robustness and reliability of the migration process by verifying data integrity across a wider array of data type conversions and different Spanner configurations.

Highlights

  • Expanded MySQL Data Type Coverage: The existing integration test for MySQL data types has been significantly expanded to include numerous new type mappings, such as various _to_string conversions (e.g., bigint_to_string, decimal_to_string, datetime_to_string), bit_to_bool, bit_to_int64, and json_to_string, among others. This ensures more comprehensive testing of data migration capabilities.
  • PostgreSQL Dialect Spanner Integration Test: A new integration test has been added specifically for migrating data from MySQL to a PostgreSQL dialect Spanner instance. This new test validates the data type conversions and migration process for Spanner instances configured with PostgreSQL compatibility.
  • Test Infrastructure Improvements: The test setup and teardown logic in MySQLDataTypesIT.java has been refined. Resource managers are now initialized only once per test class using a synchronized block and an initialized flag, improving efficiency. Cleanup now includes the newly introduced PostgreSQL dialect Spanner resource manager.
  • Handling of Failing Type Mappings: The pull request notes that some type mappings fail to migrate as expected. These checks are still included in the test for completeness but are commented out to prevent test failures, indicating areas for future investigation or known limitations.
Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point by creating a comment using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands on the current page.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in pull request comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here.

You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

@nmemond nmemond force-pushed the data-type-tests-bulk branch from 04b284c to cd3ae7c Compare December 3, 2025 18:12
Also, add missing tables to PG dialect spanner schema
@VardhanThigle VardhanThigle added ignore-for-release integration testing Migration of integration tests to github labels Dec 8, 2025
@codecov
Copy link

codecov bot commented Dec 8, 2025

Codecov Report

✅ All modified and coverable lines are covered by tests.
✅ Project coverage is 56.13%. Comparing base (a16c225) to head (ed0b509).
⚠️ Report is 4 commits behind head on main.

Additional details and impacted files
@@             Coverage Diff              @@
##               main    #3046      +/-   ##
============================================
+ Coverage     50.76%   56.13%   +5.36%     
+ Complexity     5101     1693    -3408     
============================================
  Files           974      471     -503     
  Lines         59967    26662   -33305     
  Branches       6551     2805    -3746     
============================================
- Hits          30445    14966   -15479     
+ Misses        27378    10800   -16578     
+ Partials       2144      896    -1248     
Components Coverage Δ
spanner-templates 72.15% <ø> (+1.21%) ⬆️
spanner-import-export ∅ <ø> (∅)
spanner-live-forward-migration 80.06% <ø> (ø)
spanner-live-reverse-replication 77.74% <ø> (ø)
spanner-bulk-migration 88.48% <ø> (ø)
see 522 files with indirect coverage changes
🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

Copy link
Contributor

@VardhanThigle VardhanThigle left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this particular case I might request to add a new test class than overload existing one. There could be cases where there's support missing on PG side for which we have support on MySQL side (or vice versa)

For example in current PR, PG_FLOAT4 is not yet supported.

Can we add a new test Class for MySQL to PG dialect test (with it's own copy of schemas)?

When you add a new test class, for now, you can skip PG_FLOAT4 and we can fix that separately.

@nmemond
Copy link
Contributor Author

nmemond commented Dec 8, 2025

In this particular case I might request to add a new test class than overload existing one. There could be cases where there's support missing on PG side for which we have support on MySQL side (or vice versa)

For example in current PR, PG_FLOAT4 is not yet supported.

Can we add a new test Class for MySQL to PG dialect test (with it's own copy of schemas)?

When you add a new test class, for now, you can skip PG_FLOAT4 and we can fix that separately.

Sure, I can split it out into its own test class. I'll still keep the PG_FLOAT4 column but I'll make sure to keep the check on it commented out to avoid failing the test. This way, I can refer back to it when writing the parity report (and add additional details on why it failed to migrate) and when support is added, it's a simple uncomment to update the test.

@nmemond
Copy link
Contributor Author

nmemond commented Dec 8, 2025

In this particular case I might request to add a new test class than overload existing one. There could be cases where there's support missing on PG side for which we have support on MySQL side (or vice versa)

For example in current PR, PG_FLOAT4 is not yet supported.

Can we add a new test Class for MySQL to PG dialect test (with it's own copy of schemas)?

When you add a new test class, for now, you can skip PG_FLOAT4 and we can fix that separately.

@VardhanThigle I've split out the tests as requested, please have a look and let me know if there's anything else. Thanks!

VardhanThigle
VardhanThigle previously approved these changes Dec 9, 2025
Copy link
Contributor

@VardhanThigle VardhanThigle left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Over all looks good to me.
A couple of comments to cleanup potentially unused code.

@nmemond nmemond force-pushed the data-type-tests-bulk branch from de1e4d2 to 9854b15 Compare December 9, 2025 15:40
@nmemond
Copy link
Contributor Author

nmemond commented Dec 10, 2025

@VardhanThigle I see that the two data type tests I modified/added are failing, but I'm not able to access details about the actual failures (I can tell the job fails from the logs, but not why it failed). When I run these locally, they both pass without issue. Would it be possible to get the job logs so I can look into why these are failing?

@VardhanThigle
Copy link
Contributor

VardhanThigle commented Dec 11, 2025

@VardhanThigle I see that the two data type tests I modified/added are failing, but I'm not able to access details about the actual failures (I can tell the job fails from the logs, but not why it failed). When I run these locally, they both pass without issue. Would it be possible to get the job logs so I can look into why these are failing?

This is the stack trace I see on Dataflow worker. It's possible that we are having many tables and the sheer parallelization of the graph is reaching the connection limit on source.

Can you try reducing the setting the number of max connections to a smaller value like 8 while spawnning a the dataflow job for your tests? The default is 160 per thread which is more for an IT scenario (but might be good for a production setting)

Caused by: org.apache.beam.sdk.util.UserCodeException: java.sql.SQLException: Cannot create PoolableConnectionFactory (Could not create connection to database server. Attempted reconnect 10 times. Giving up.)
	at org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:39)
	at org.apache.beam.sdk.io.jdbc.JdbcIO$ReadFn$DoFnInvoker.invokeProcessElement(Unknown Source)
	at org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:215)
	at org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:192)
	at org.apache.beam.runners.dataflow.worker.SimpleParDoFn.processElement(SimpleParDoFn.java:344)
	at org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoOperation.process(ParDoOperation.java:44)
	at org.apache.beam.runners.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:55)
	at org.apache.beam.runners.dataflow.worker.SimpleParDoFn$1.output(SimpleParDoFn.java:289)
	at org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.outputWindowedValue(SimpleDoFnRunner.java:279)
	at org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.access$900(SimpleDoFnRunner.java:89)
	at org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.lambda$outputWindowedValue$0(SimpleDoFnRunner.java:463)
	at org.apache.beam.sdk.values.WindowedValues$Builder.output(WindowedValues.java:210)
	at org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.outputWindowedValue(SimpleDoFnRunner.java:465)
	at org.apache.beam.sdk.transforms.DoFnOutputReceivers$WindowedContextOutputReceiver.output(DoFnOutputReceivers.java:123)
	at org.apache.beam.sdk.values.WindowedValues$Builder.output(WindowedValues.java:210)
	at org.apache.beam.sdk.transforms.DoFn$OutputReceiver.output(DoFn.java:404)
	at org.apache.beam.sdk.transforms.MapElements$2.processElement(MapElements.java:151)
	at org.apache.beam.sdk.transforms.MapElements$2$DoFnInvoker.invokeProcessElement(Unknown Source)
	at org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:215)
	at org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:192)
	at org.apache.beam.runners.dataflow.worker.SimpleParDoFn.processElement(SimpleParDoFn.java:344)
	at org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoOperation.process(ParDoOperation.java:44)
	at org.apache.beam.runners.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:55)
	at org.apache.beam.runners.dataflow.worker.SimpleParDoFn$1.output(SimpleParDoFn.java:289)
	at org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.outputWindowedValue(SimpleDoFnRunner.java:279)
	at org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.access$900(SimpleDoFnRunner.java:89)
	at org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:438)
	at org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:418)
	at org.apache.beam.runners.dataflow.ReshuffleOverrideFactory$ReshuffleWithOnlyTrigger$1.processElement(ReshuffleOverrideFactory.java:86)
	at org.apache.beam.runners.dataflow.ReshuffleOverrideFactory$ReshuffleWithOnlyTrigger$1$DoFnInvoker.invokeProcessElement(Unknown Source)
	at org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:215)
	at org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:192)
	at org.apache.beam.runners.dataflow.worker.SimpleParDoFn.processElement(SimpleParDoFn.java:344)
	at org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoOperation.process(ParDoOperation.java:44)
	at org.apache.beam.runners.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:55)
	at org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowsParDoFn.lambda$createRunner$0(GroupAlsoByWindowsParDoFn.java:174)
	... 21 more
Caused by: java.sql.SQLException: Cannot create PoolableConnectionFactory (Could not create connection to database server. Attempted reconnect 10 times. Giving up.)
	at org.apache.commons.dbcp2.BasicDataSource.createPoolableConnectionFactory(BasicDataSource.java:653)
	at org.apache.commons.dbcp2.BasicDataSource.createDataSource(BasicDataSource.java:531)
	at org.apache.commons.dbcp2.BasicDataSource.getConnection(BasicDataSource.java:731)
	at org.apache.commons.dbcp2.DataSourceConnectionFactory.createConnection(DataSourceConnectionFactory.java:83)
	at org.apache.commons.dbcp2.PoolableConnectionFactory.makeObject(PoolableConnectionFactory.java:374)
	at org.apache.commons.pool2.impl.GenericObjectPool.create(GenericObjectPool.java:571)
	at org.apache.commons.pool2.impl.GenericObjectPool.borrowObject(GenericObjectPool.java:298)
	at org.apache.commons.pool2.impl.GenericObjectPool.borrowObject(GenericObjectPool.java:223)
	at org.apache.commons.dbcp2.PoolingDataSource.getConnection(PoolingDataSource.java:141)
	at org.apache.beam.sdk.io.jdbc.JdbcIO$ReadFn.getConnection(JdbcIO.java:1726)
	at org.apache.beam.sdk.io.jdbc.JdbcIO$ReadFn.processElement(JdbcIO.java:1766)
Caused by: java.sql.SQLNonTransientConnectionException: Could not create connection to database server. Attempted reconnect 10 times. Giving up.
	at com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:110)
	at com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:97)
	at com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:89)
	at com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:63)
	at com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:73)
	at com.mysql.cj.jdbc.ConnectionImpl.connectWithRetries(ConnectionImpl.java:898)
	at com.mysql.cj.jdbc.ConnectionImpl.createNewIO(ConnectionImpl.java:823)
	at com.mysql.cj.jdbc.ConnectionImpl.<init>(ConnectionImpl.java:448)
	at com.mysql.cj.jdbc.ConnectionImpl.getInstance(ConnectionImpl.java:241)
	at com.mysql.cj.jdbc.NonRegisteringDriver.connect(NonRegisteringDriver.java:198)
	at org.apache.commons.dbcp2.DriverConnectionFactory.createConnection(DriverConnectionFactory.java:52)
	at org.apache.commons.dbcp2.PoolableConnectionFactory.makeObject(PoolableConnectionFactory.java:374)
	at org.apache.commons.dbcp2.BasicDataSource.validateConnectionFactory(BasicDataSource.java:106)
	at org.apache.commons.dbcp2.BasicDataSource.createPoolableConnectionFactory(BasicDataSource.java:649)
	at org.apache.commons.dbcp2.BasicDataSource.createDataSource(BasicDataSource.java:531)
	at org.apache.commons.dbcp2.BasicDataSource.getConnection(BasicDataSource.java:731)
	at org.apache.commons.dbcp2.DataSourceConnectionFactory.createConnection(DataSourceConnectionFactory.java:83)
	at org.apache.commons.dbcp2.PoolableConnectionFactory.makeObject(PoolableConnectionFactory.java:374)
	at org.apache.commons.pool2.impl.GenericObjectPool.create(GenericObjectPool.java:571)
	at org.apache.commons.pool2.impl.GenericObjectPool.borrowObject(GenericObjectPool.java:298)
	at org.apache.commons.pool2.impl.GenericObjectPool.borrowObject(GenericObjectPool.java:223)
	at org.apache.commons.dbcp2.PoolingDataSource.getConnection(PoolingDataSource.java:141)
	at org.apache.beam.sdk.io.jdbc.JdbcIO$ReadFn.getConnection(JdbcIO.java:1726)
	at org.apache.beam.sdk.io.jdbc.JdbcIO$ReadFn.processElement(JdbcIO.java:1766)
	at org.apache.beam.sdk.io.jdbc.JdbcIO$ReadFn$DoFnInvoker.invokeProcessElement(Unknown Source)
	at org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:215)
	at org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:192)
	at org.apache.beam.runners.dataflow.worker.SimpleParDoFn.processElement(SimpleParDoFn.java:344)
	at org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoOperation.process(ParDoOperation.java:44)
	at org.apache.beam.runners.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:55)
	at org.apache.beam.runners.dataflow.worker.SimpleParDoFn$1.output(SimpleParDoFn.java:289)
	at org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.outputWindowedValue(SimpleDoFnRunner.java:279)
	at org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.access$900(SimpleDoFnRunner.java:89)
	at org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.lambda$outputWindowedValue$0(SimpleDoFnRunner.java:463)
	at org.apache.beam.sdk.values.WindowedValues$Builder.output(WindowedValues.java:210)
	at org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.outputWindowedValue(SimpleDoFnRunner.java:465)
	at org.apache.beam.sdk.transforms.DoFnOutputReceivers$WindowedContextOutputReceiver.output(DoFnOutputReceivers.java:123)
	at org.apache.beam.sdk.values.WindowedValues$Builder.output(WindowedValues.java:210)
	at org.apache.beam.sdk.transforms.DoFn$OutputReceiver.output(DoFn.java:404)
	at org.apache.beam.sdk.transforms.MapElements$2.processElement(MapElements.java:151)
	at org.apache.beam.sdk.transforms.MapElements$2$DoFnInvoker.invokeProcessElement(Unknown Source)
	at org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:215)
	at org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:192)
	at org.apache.beam.runners.dataflow.worker.SimpleParDoFn.processElement(SimpleParDoFn.java:344)
	at org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoOperation.process(ParDoOperation.java:44)
	at org.apache.beam.runners.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:55)
	at org.apache.beam.runners.dataflow.worker.SimpleParDoFn$1.output(SimpleParDoFn.java:289)
	at org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.outputWindowedValue(SimpleDoFnRunner.java:279)
	at org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.access$900(SimpleDoFnRunner.java:89)
	at org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:438)
	at org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:418)
	at org.apache.beam.runners.dataflow.ReshuffleOverrideFactory$ReshuffleWithOnlyTrigger$1.processElement(ReshuffleOverrideFactory.java:86)
	at org.apache.beam.runners.dataflow.ReshuffleOverrideFactory$ReshuffleWithOnlyTrigger$1$DoFnInvoker.invokeProcessElement(Unknown Source)
	at org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:215)
	at org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:192)
	at org.apache.beam.runners.dataflow.worker.SimpleParDoFn.processElement(SimpleParDoFn.java:344)
	at org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoOperation.process(ParDoOperation.java:44)
	at org.apache.beam.runners.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:55)
	at org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowsParDoFn.lambda$createRunner$0(GroupAlsoByWindowsParDoFn.java:174)
	at org.apache.beam.runners.dataflow.worker.util.BatchGroupAlsoByWindowReshuffleFn.processElement(BatchGroupAlsoByWindowReshuffleFn.java:56)
	at org.apache.beam.runners.dataflow.worker.util.BatchGroupAlsoByWindowReshuffleFn.processElement(BatchGroupAlsoByWindowReshuffleFn.java:39)
	at org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowFnRunner.invokeProcessElement(GroupAlsoByWindowFnRunner.java:92)
	at org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowFnRunner.processElement(GroupAlsoByWindowFnRunner.java:66)
	at org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowsParDoFn.processElement(GroupAlsoByWindowsParDoFn.java:113)
	at org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoOperation.process(ParDoOperation.java:44)
	at org.apache.beam.runners.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:55)
	at org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:218)
	at org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.start(ReadOperation.java:169)
	at org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:83)
	at org.apache.beam.runners.dataflow.worker.BatchDataflowWorker.executeWork(BatchDataflowWorker.java:304)
	at org.apache.beam.runners.dataflow.worker.BatchDataflowWorker.doWork(BatchDataflowWorker.java:276)
	at org.apache.beam.runners.dataflow.worker.BatchDataflowWorker.getAndPerformWork(BatchDataflowWorker.java:206)
	at org.apache.beam.runners.dataflow.worker.DataflowBatchWorkerHarness$WorkerThread.doWork(DataflowBatchWorkerHarness.java:148)
	at org.apache.beam.runners.dataflow.worker.DataflowBatchWorkerHarness$WorkerThread.call(DataflowBatchWorkerHarness.java:130)
	at org.apache.beam.runners.dataflow.worker.DataflowBatchWorkerHarness$WorkerThread.call(DataflowBatchWorkerHarness.java:117)
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
	at org.apache.beam.sdk.util.UnboundedScheduledExecutorService$ScheduledFutureTask.run(UnboundedScheduledExecutorService.java:163)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
	at java.base/java.lang.Thread.run(Thread.java:833)
Caused by: com.mysql.cj.exceptions.CJException: Data source rejected establishment of connection,  message from server: "Too many connections"
	at jdk.internal.reflect.GeneratedConstructorAccessor52.newInstance(Unknown Source)
	at java.base/jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
	at java.base/java.lang.reflect.Constructor.newInstanceWithCaller(Constructor.java:499)
	at java.base/java.lang.reflect.Constructor.newInstance(Constructor.java:480)
	at com.mysql.cj.exceptions.ExceptionFactory.createException(ExceptionFactory.java:61)
	at com.mysql.cj.exceptions.ExceptionFactory.createException(ExceptionFactory.java:105)
	at com.mysql.cj.exceptions.ExceptionFactory.createException(ExceptionFactory.java:151)
	at com.mysql.cj.exceptions.ExceptionFactory.createException(ExceptionFactory.java:129)
	at com.mysql.cj.protocol.a.NativeProtocol.rejectProtocol(NativeProtocol.java:409)
	at com.mysql.cj.protocol.a.NativeProtocol.readServerCapabilities(NativeProtocol.java:540)
	at com.mysql.cj.protocol.a.NativeProtocol.beforeHandshake(NativeProtocol.java:423)
	at com.mysql.cj.protocol.a.NativeProtocol.connect(NativeProtocol.java:1431)
	at com.mysql.cj.NativeSession.connect(NativeSession.java:133)
	at com.mysql.cj.jdbc.ConnectionImpl.connectWithRetries(ConnectionImpl.java:842)
	... 74 more
"
job: "2025-12-09_20_25_53-10206998930226706309"
logger: "org.apache.beam.runners.dataflow.worker.WorkItemStatusClient"
message: "Failure processing work item cloud-teleport-testing;2025-12-09_20_25_53-10206998930226706309;430587612135142885: Uncaught exception occurred during work unit execution. This will be retried."
stage: "s670"
thread: "32"
work: "430587612135142885"
worker: "my-s-q-l-data-types-i-t-2-12092028-6su8-harness-dlwj"
}
labels: {7}
logName: "projects/cloud-teleport-testing/logs/dataflow.googleapis.com%2Fworker"
payload: "jsonPayload"
receiveLocation: "us-central1"
receiveTimestamp: "2025-12-10T04:34:20.038741234Z"
resource: {2}
severity: "ERROR"
timestamp: "2025-12-10T04:34:18.682Z"
traceSampled: false
}

@nmemond
Copy link
Contributor Author

nmemond commented Dec 11, 2025

@VardhanThigle I see that the two data type tests I modified/added are failing, but I'm not able to access details about the actual failures (I can tell the job fails from the logs, but not why it failed). When I run these locally, they both pass without issue. Would it be possible to get the job logs so I can look into why these are failing?

This is the stack trace I see on Dataflow worker. It's possible that we are having many tables and the sheer parallelization of the graph is reaching the connection limit on source.

Can you try reducing the setting the number of max connections to a smaller value like 8 while spawnning a the dataflow job for your tests? The default is 160 per thread which is more for an IT scenario (but might be good for a production setting)

Interesting, thanks for the stack trace. I'll try with your suggestion, it does sound like it could allow me to replicate the issue locally.

@nmemond
Copy link
Contributor Author

nmemond commented Dec 11, 2025

@VardhanThigle I see that the two data type tests I modified/added are failing, but I'm not able to access details about the actual failures (I can tell the job fails from the logs, but not why it failed). When I run these locally, they both pass without issue. Would it be possible to get the job logs so I can look into why these are failing?

This is the stack trace I see on Dataflow worker. It's possible that we are having many tables and the sheer parallelization of the graph is reaching the connection limit on source.

Can you try reducing the setting the number of max connections to a smaller value like 8 while spawnning a the dataflow job for your tests? The default is 160 per thread which is more for an IT scenario (but might be good for a production setting)

@VardhanThigle I went with a max connections value of 4 which was about as low as I could go without causing timeout issues trying to fetch connections from the pool itself.

That said, when testing locally, I do still end up seeing quite a few connections coming into the DB from the job (almost 100), so I'm not sure if this will end up resolving the problem. Potentially we'd need to reduce the number of threads for the job itself? Or are there other levers we can adjust if it ends up failing still?

Alternatively, there's potentially a connection leak, but I didn't notice anything obvious when I went through the code other than the three connections used for discovering the tables/indexes/schema (I'm pretty sure try (Statement stmt = dataSource.getConnection().createStatement()) doesn't close the connection, only the statement). But still, those only get called once, so that's at most 3 connections...

@VardhanThigle
Copy link
Contributor

Could we please rebase this ? I wanted to try if the workaround in #3147 helps here.

@nmemond
Copy link
Contributor Author

nmemond commented Dec 23, 2025

Could we please rebase this ? I wanted to try if the workaround in #3147 helps here.

Done.

@VardhanThigle VardhanThigle merged commit 1d3183c into GoogleCloudPlatform:main Dec 24, 2025
14 checks passed
@nmemond nmemond deleted the data-type-tests-bulk branch December 24, 2025 14:08
MnkyGns pushed a commit to MnkyGns/DataflowTemplates that referenced this pull request Feb 12, 2026
…ta type support (GoogleCloudPlatform#3046)

* Add missing data type mappings to data types integration test

* Add data types test for bulk migration from MySQL to a Postgres dialect Spanner DB

* Improve check for bit to string data type mapping

Also, add missing tables to PG dialect spanner schema

* Split PG dialect test into its own test class

* Remove unused code

* Reduce maxConnections to 4 in an attempt to avoid exceeding the max connection limit of the MySQL DB
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

ignore-for-release integration testing Migration of integration tests to github size/XXL

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants