Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions docs/content.zh/docs/connectors/flink-sources/mysql-cdc.md
Original file line number Diff line number Diff line change
Expand Up @@ -377,6 +377,18 @@ Flink SQL> SELECT * FROM orders;
<td>Boolean</td>
<td>是否在快照结束后关闭空闲的 Reader。 此特性需要 flink 版本大于等于 1.14 并且 'execution.checkpointing.checkpoints-after-tasks-finish.enabled' 需要设置为 true。</td>
</tr>
<tr>
<td>scan.parse.online.schema.changes.enabled</td>
<td>optional</td>
<td style="word-wrap: break-word;">false</td>
<td>Boolean</td>
<td>
是否尝试解析由 <a href="https://github.com/github/gh-ost">gh-ost</a> 或 <a href="https://docs.percona.com/percona-toolkit/pt-online-schema-change.html">pt-osc</a> 工具生成的表结构变更事件。
这些工具会在变更表结构时,将变更语句应用到“影子表”之上,并稍后将其与主表进行交换,以达到表结构变更的目的。
<br>
这是一项实验性功能。
</td>
</tr>
</tbody>
</table>
</div>
Expand Down
12 changes: 12 additions & 0 deletions docs/content.zh/docs/connectors/pipeline-connectors/mysql.md
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,18 @@ pipeline:
scan.binlog.newly-added-table.enabled: 只在 binlog 读取阶段读取新增表的增量数据。
</td>
</tr>
<tr>
<td>scan.parse.online.schema.changes.enabled</td>
<td>optional</td>
<td style="word-wrap: break-word;">false</td>
<td>Boolean</td>
<td>
是否尝试解析由 <a href="https://github.com/github/gh-ost">gh-ost</a> 或 <a href="https://docs.percona.com/percona-toolkit/pt-online-schema-change.html">pt-osc</a> 工具生成的表结构变更事件。
这些工具会在变更表结构时,将变更语句应用到“影子表”之上,并稍后将其与主表进行交换,以达到表结构变更的目的。
<br>
这是一项实验性功能。
</td>
</tr>
</tbody>
</table>
</div>
Expand Down
12 changes: 12 additions & 0 deletions docs/content/docs/connectors/flink-sources/mysql-cdc.md
Original file line number Diff line number Diff line change
Expand Up @@ -391,6 +391,18 @@ During a snapshot operation, the connector will query each included table to pro
hex: The binary data type is converted to a hexadecimal string and transmitted.
The default value is none. Depending on your requirements and data types, you can choose the appropriate processing mode. If your database contains a large number of binary data types, it is recommended to use base64 or hex mode to make it easier to handle during transmission.</td>
</tr>
<tr>
<td>scan.parse.online.schema.changes.enabled</td>
<td>optional</td>
<td style="word-wrap: break-word;">false</td>
<td>Boolean</td>
<td>
Whether to parse "online" schema changes generated by <a href="https://github.com/github/gh-ost">gh-ost</a> or <a href="https://docs.percona.com/percona-toolkit/pt-online-schema-change.html">pt-osc</a>.
Schema change events are applied to a "shadow" table and then swapped with the original table later.
<br>
This is an experimental feature, and subject to change in the future.
</td>
</tr>
</tbody>
</table>
</div>
Expand Down
12 changes: 12 additions & 0 deletions docs/content/docs/connectors/pipeline-connectors/mysql.md
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,18 @@ pipeline:
scan.binlog.newly-added-table.enabled: only do binlog-reading for newly added table during binlog reading phase.
</td>
</tr>
<tr>
<td>scan.parse.online.schema.changes.enabled</td>
<td>optional</td>
<td style="word-wrap: break-word;">false</td>
<td>Boolean</td>
<td>
Whether to parse "online" schema changes generated by <a href="https://github.com/github/gh-ost">gh-ost</a> or <a href="https://docs.percona.com/percona-toolkit/pt-online-schema-change.html">pt-osc</a>.
Schema change events are applied to a "shadow" table and then swapped with the original table later.
<br>
This is an experimental feature, and subject to change in the future.
</td>
</tr>
</tbody>
</table>
</div>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.cdc.common.utils;

import org.apache.flink.util.function.SupplierWithException;

import java.time.Duration;
import java.util.Collections;
import java.util.List;
import java.util.function.Predicate;
import java.util.function.Supplier;

/** Some utility methods for creating repeated-checking test cases. */
public class TestCaseUtils {

public static final Duration DEFAULT_TIMEOUT = Duration.ofSeconds(30);
public static final Duration DEFAULT_INTERVAL = Duration.ofSeconds(1);

/** Fetch with a ({@code timeout}, {@code interval}) duration. */
public static void repeatedCheck(Supplier<Boolean> fetcher) {
repeatedCheck(fetcher, DEFAULT_TIMEOUT);
}

/** Fetch with a ({@code timeout}, {@code interval}) duration. */
public static void repeatedCheck(Supplier<Boolean> fetcher, Duration timeout) {
repeatedCheck(fetcher, timeout, DEFAULT_INTERVAL);
}

/** Fetch with a ({@code timeout}, {@code interval}) duration. */
public static void repeatedCheck(
Supplier<Boolean> fetcher, Duration timeout, Duration interval) {
repeatedCheck(fetcher::get, timeout, interval, Collections.emptyList());
}

/** Fetch and wait with a ({@code timeout}, {@code interval}) duration. */
public static <T> void repeatedCheck(
Supplier<T> fetcher, Predicate<T> validator, Duration timeout, Duration interval) {
repeatedCheckAndValidate(
fetcher::get, validator, timeout, interval, Collections.emptyList());
}

/** Waiting for fetching values with a ({@code timeout}, {@code interval}) duration. */
public static void repeatedCheck(
SupplierWithException<Boolean, Throwable> fetcher,
Duration timeout,
Duration interval,
List<Class<? extends Throwable>> allowedThrowsList) {
repeatedCheckAndValidate(fetcher, b -> b, timeout, interval, allowedThrowsList);
}

/** Fetch and validate, with a ({@code timeout}, {@code interval}) duration. */
public static <T> void repeatedCheckAndValidate(
SupplierWithException<T, Throwable> fetcher,
Predicate<T> validator,
Duration timeout,
Duration interval,
List<Class<? extends Throwable>> allowedThrowsList) {

long start = System.currentTimeMillis();
while (System.currentTimeMillis() - start < timeout.toMillis()) {
try {
if (validator.test(fetcher.get())) {
return;
}
} catch (Throwable t) {
if (allowedThrowsList.stream()
.noneMatch(clazz -> clazz.isAssignableFrom(t.getClass()))) {
throw new RuntimeException("Fetcher has thrown an unexpected exception: ", t);
}
}
try {
Thread.sleep(interval.toMillis());
} catch (InterruptedException ignored) {
// ignored
}
}
throw new RuntimeException("Timeout when waiting for state to be ready.");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.HEARTBEAT_INTERVAL;
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.HOSTNAME;
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.METADATA_LIST;
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.PARSE_ONLINE_SCHEMA_CHANGES;
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.PASSWORD;
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.PORT;
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_BINLOG_NEWLY_ADDED_TABLE_ENABLED;
Expand Down Expand Up @@ -139,6 +140,7 @@ public DataSource createDataSource(Context context) {
boolean scanNewlyAddedTableEnabled = config.get(SCAN_NEWLY_ADDED_TABLE_ENABLED);
boolean scanBinlogNewlyAddedTableEnabled =
config.get(SCAN_BINLOG_NEWLY_ADDED_TABLE_ENABLED);
boolean isParsingOnLineSchemaChanges = config.get(PARSE_ONLINE_SCHEMA_CHANGES);

validateIntegerOption(SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE, splitSize, 1);
validateIntegerOption(CHUNK_META_GROUP_SIZE, splitMetaGroupSize, 1);
Expand Down Expand Up @@ -175,7 +177,8 @@ public DataSource createDataSource(Context context) {
.includeSchemaChanges(includeSchemaChanges)
.debeziumProperties(getDebeziumProperties(configMap))
.jdbcProperties(getJdbcProperties(configMap))
.scanNewlyAddedTableEnabled(scanNewlyAddedTableEnabled);
.scanNewlyAddedTableEnabled(scanNewlyAddedTableEnabled)
.parseOnLineSchemaChanges(isParsingOnLineSchemaChanges);

List<TableId> tableIds = MySqlSchemaUtils.listTables(configFactory.createConfig(0), null);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -281,4 +281,12 @@ public class MySqlDataSourceOptions {
.withDescription(
"List of readable metadata from SourceRecord to be passed to downstream, split by `,`. "
+ "Available readable metadata are: op_ts.");

@Experimental
public static final ConfigOption<Boolean> PARSE_ONLINE_SCHEMA_CHANGES =
ConfigOptions.key("scan.parse.online.schema.changes.enabled")
.booleanType()
.defaultValue(false)
.withDescription(
"Whether to parse schema change events generated by gh-ost/pt-osc utilities. Defaults to false.");
}
Loading