Skip to content

Commit fb0b1c1

Browse files
hongyunyanlidezhu
authored andcommitted
dispatcherManager: close dispatcher manager immediately even there is a long-time-cost ddl execution (#3634)
ref #3557
1 parent b081874 commit fb0b1c1

File tree

3 files changed

+61
-2
lines changed

3 files changed

+61
-2
lines changed

downstreamadapter/dispatchermanager/dispatcher_manager.go

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -861,17 +861,25 @@ func (e *DispatcherManager) close(removeChangefeed bool) {
861861
e.heartBeatTask.Cancel()
862862
}
863863

864+
// Cancel the context to signal all dependent components to stop.
865+
// This is important to prevent `e.sink.Close() / e.sharedInfo.Close()` from blocking,
866+
// especially when a long-running DDL is being executed by the sink.
867+
e.cancel()
868+
864869
if e.sharedInfo != nil {
865870
e.sharedInfo.Close()
866871
}
867872

873+
log.Info("shared info closed", zap.Stringer("changefeedID", e.changefeedID))
874+
868875
if e.RedoEnable {
869876
e.redoSink.Close(removeChangefeed)
870877
// FIXME: cleanup redo log when remove the changefeed
871878
e.closeRedoMeta(removeChangefeed)
872879
}
873880
e.sink.Close(removeChangefeed)
874-
e.cancel()
881+
log.Info("sink closed", zap.Stringer("changefeedID", e.changefeedID))
882+
875883
e.wg.Wait()
876884

877885
e.removeTaskHandles.Range(func(key, value interface{}) bool {
Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
#!/bin/bash
2+
# this test case is used to test whether we can successfully pause a changefeed when writing a long-time-cost ddl.
3+
4+
set -eu
5+
6+
CUR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
7+
source $CUR/../_utils/test_prepare
8+
WORK_DIR=$OUT_DIR/$TEST_NAME
9+
CDC_BINARY=cdc.test
10+
SINK_TYPE=$1
11+
12+
function run() {
13+
# No need to test kafka and storage sink.
14+
if [ "$SINK_TYPE" != "mysql" ]; then
15+
return
16+
fi
17+
rm -rf $WORK_DIR && mkdir -p $WORK_DIR
18+
19+
start_tidb_cluster --workdir $WORK_DIR
20+
21+
export GO_FAILPOINTS='github.com/pingcap/ticdc/pkg/sink/mysql/MySQLSinkExecDDLDelay=return("3600")'
22+
run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY
23+
24+
SINK_URI="mysql://root:@127.0.0.1:3306/"
25+
cdc_cli_changefeed create --sink-uri="$SINK_URI" -c "test"
26+
27+
run_sql "use test;create table t1 (a int primary key, b int, unique key uk(b));" ${UP_TIDB_HOST} ${UP_TIDB_PORT}
28+
29+
cdc_cli_changefeed pause -c "test"
30+
31+
sleep 10
32+
33+
ensure 10 "check_logs_contains $WORK_DIR 'changefeed maintainer closed'"
34+
check_table_not_exists test.t1 ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT}
35+
36+
cdc_cli_changefeed resume -c "test"
37+
38+
cleanup_process $CDC_BINARY
39+
export GO_FAILPOINTS=''
40+
run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY
41+
42+
check_table_exists test.t1 ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT}
43+
44+
cleanup_process $CDC_BINARY
45+
stop_tidb_cluster
46+
}
47+
48+
trap 'stop_tidb_cluster; collect_logs $WORK_DIR' EXIT
49+
run $*
50+
check_logs $WORK_DIR
51+
echo "[$(date)] <<<<<< run test case $TEST_NAME success! >>>>>>"

tests/integration_tests/run_light_it_in_ci.sh

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ mysql_groups=(
6060
# G13
6161
'cli_tls_with_auth cli_with_auth fail_over_ddl_N'
6262
# G14
63-
'batch_add_table batch_update_to_no_batch fail_over_ddl_O update_changefeed_check_config'
63+
'batch_add_table batch_update_to_no_batch fail_over_ddl_O update_changefeed_check_config pause_changefeed_with_long_time_ddl'
6464
# G15
6565
'split_region changefeed_resume_with_checkpoint_ts autorandom gc_safepoint foreign_key_check old_arch_compatibility'
6666
)

0 commit comments

Comments
 (0)