diff --git a/metrics/grafana/ticdc_new_arch.json b/metrics/grafana/ticdc_new_arch.json index 7c7f6d3533..6b89684e30 100644 --- a/metrics/grafana/ticdc_new_arch.json +++ b/metrics/grafana/ticdc_new_arch.json @@ -3394,70 +3394,70 @@ "show": false } ], - "yaxis": { - "align": false, - "alignLevel": null - } - }, - { - "datasource": "${DS_TEST-CLUSTER}", - "description": "Build metadata of each TiCDC server instance.", - "fieldConfig": { - "defaults": {}, - "overrides": [] - }, - "gridPos": { - "h": 9, - "w": 12, - "x": 12, - "y": 31 - }, - "id": 22473, - "options": { - "showHeader": true, - "sortBy": [] - }, - "pluginVersion": "7.5.17", - "transformations": [ - { - "id": "labelsToFields", - "options": {} - }, - { - "id": "organize", - "options": { - "excludeByName": { - "Metric": true, - "Time": true, - "Value": true, - "__name__": true - }, - "indexByName": { - "instance": 0, - "kernel_type": 1, - "git_hash": 2, - "release_version": 3, - "utc_build_time": 4 - }, - "renameByName": {} - } - } - ], - "targets": [ - { - "expr": "max by (instance, kernel_type, git_hash, release_version, utc_build_time) (ticdc_server_build_info{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\", job=~\".*ticdc.*\", instance=~\"$ticdc_instance\"})", - "format": "time_series", - "instant": true, - "refId": "A" - } - ], - "title": "Build Info", - "type": "table" - } - ], - "title": "Server", - "type": "row" - }, + "yaxis": { + "align": false, + "alignLevel": null + } + }, + { + "datasource": "${DS_TEST-CLUSTER}", + "description": "Build metadata of each TiCDC server instance.", + "fieldConfig": { + "defaults": {}, + "overrides": [] + }, + "gridPos": { + "h": 9, + "w": 12, + "x": 12, + "y": 31 + }, + "id": 22473, + "options": { + "showHeader": true, + "sortBy": [] + }, + "pluginVersion": "7.5.17", + "transformations": [ + { + "id": "labelsToFields", + "options": {} + }, + { + "id": "organize", + "options": { + "excludeByName": { + "Metric": true, + "Time": true, + "Value": true, + "__name__": true + }, + "indexByName": { + "instance": 0, + "kernel_type": 1, + "git_hash": 2, + "release_version": 3, + "utc_build_time": 4 + }, + "renameByName": {} + } + } + ], + "targets": [ + { + "expr": "max by (instance, kernel_type, git_hash, release_version, utc_build_time) (ticdc_server_build_info{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\", job=~\".*ticdc.*\", instance=~\"$ticdc_instance\"})", + "format": "time_series", + "instant": true, + "refId": "A" + } + ], + "title": "Build Info", + "type": "table" + } + ], + "title": "Server", + "type": "row" + }, { "collapsed": true, "datasource": null, @@ -14135,6 +14135,105 @@ "align": false, "alignLevel": null } + }, + { + "aliasColors": {}, + "bars": false, + "dashLength": 10, + "dashes": false, + "datasource": "${DS_TEST-CLUSTER}", + "description": "The number of dml rows affected that sink flushes to downstream per second.", + "fieldConfig": { + "defaults": { + "links": [] + }, + "overrides": [] + }, + "fill": 1, + "fillGradient": 0, + "gridPos": { + "h": 8, + "w": 12, + "x": 12, + "y": 32 + }, + "hiddenSeries": false, + "id": 636, + "legend": { + "alignAsTable": true, + "avg": false, + "current": true, + "max": true, + "min": false, + "rightSide": false, + "show": true, + "sort": "current", + "sortDesc": true, + "total": false, + "values": true + }, + "lines": true, + "linewidth": 1, + "links": [], + "nullPointMode": "null", + "options": { + "alertThreshold": true + }, + "percentage": false, + "pluginVersion": "7.5.17", + "pointradius": 2, + "points": false, + "renderer": "flot", + "seriesOverrides": [], + "spaceLength": 10, + "stack": false, + "steppedLine": false, + "targets": [ + { + "exemplar": true, + "expr": "sum(rate(ticdc_sink_dml_event_affected_row_count{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\",namespace=~\"$namespace\", changefeed=~\"$changefeed\", instance=~\"$ticdc_instance\"}[1m])) by (namespace, changefeed, type, row_type)", + "format": "time_series", + "interval": "", + "intervalFactor": 1, + "legendFormat": "{{namespace}}-{{changefeed}}-{{type}}-{{row_type}}", + "refId": "A" + } + ], + "thresholds": [], + "timeFrom": null, + "timeRegions": [], + "timeShift": null, + "title": "Row Affected Count / s", + "tooltip": { + "shared": true, + "sort": 2, + "value_type": "individual" + }, + "type": "graph", + "xaxis": { + "buckets": null, + "mode": "time", + "name": null, + "show": true, + "values": [] + }, + "yaxes": [ + { + "decimals": 0, + "format": "short", + "logBase": 1, + "min": "0", + "show": true + }, + { + "format": "none", + "logBase": 1, + "show": false + } + ], + "yaxis": { + "align": false + } } ], "title": "Sink - Transaction Sink", @@ -23961,4 +24060,4 @@ "title": "${DS_TEST-CLUSTER}-TiCDC-New-Arch", "uid": "YiGL8hBZ0aac", "version": 36 -} +} \ No newline at end of file diff --git a/metrics/grafana/ticdc_new_arch_next_gen.json b/metrics/grafana/ticdc_new_arch_next_gen.json index 46d38be2ab..2d1b767660 100644 --- a/metrics/grafana/ticdc_new_arch_next_gen.json +++ b/metrics/grafana/ticdc_new_arch_next_gen.json @@ -3394,70 +3394,70 @@ "show": false } ], - "yaxis": { - "align": false, - "alignLevel": null - } - }, - { - "datasource": "${DS_TEST-CLUSTER}", - "description": "Build metadata of each TiCDC server instance.", - "fieldConfig": { - "defaults": {}, - "overrides": [] - }, - "gridPos": { - "h": 9, - "w": 12, - "x": 12, - "y": 31 - }, - "id": 22473, - "options": { - "showHeader": true, - "sortBy": [] - }, - "pluginVersion": "7.5.17", - "transformations": [ - { - "id": "labelsToFields", - "options": {} - }, - { - "id": "organize", - "options": { - "excludeByName": { - "Metric": true, - "Time": true, - "Value": true, - "__name__": true - }, - "indexByName": { - "instance": 0, - "kernel_type": 1, - "git_hash": 2, - "release_version": 3, - "utc_build_time": 4 - }, - "renameByName": {} - } - } - ], - "targets": [ - { - "expr": "max by (instance, kernel_type, git_hash, release_version, utc_build_time) (ticdc_server_build_info{k8s_cluster=\"$k8s_cluster\", sharedpool_id=\"$tidb_cluster\", job=~\".*ticdc.*\", instance=~\"$ticdc_instance\"})", - "format": "time_series", - "instant": true, - "refId": "A" - } - ], - "title": "Build Info", - "type": "table" - } - ], - "title": "Server", - "type": "row" - }, + "yaxis": { + "align": false, + "alignLevel": null + } + }, + { + "datasource": "${DS_TEST-CLUSTER}", + "description": "Build metadata of each TiCDC server instance.", + "fieldConfig": { + "defaults": {}, + "overrides": [] + }, + "gridPos": { + "h": 9, + "w": 12, + "x": 12, + "y": 31 + }, + "id": 22473, + "options": { + "showHeader": true, + "sortBy": [] + }, + "pluginVersion": "7.5.17", + "transformations": [ + { + "id": "labelsToFields", + "options": {} + }, + { + "id": "organize", + "options": { + "excludeByName": { + "Metric": true, + "Time": true, + "Value": true, + "__name__": true + }, + "indexByName": { + "instance": 0, + "kernel_type": 1, + "git_hash": 2, + "release_version": 3, + "utc_build_time": 4 + }, + "renameByName": {} + } + } + ], + "targets": [ + { + "expr": "max by (instance, kernel_type, git_hash, release_version, utc_build_time) (ticdc_server_build_info{k8s_cluster=\"$k8s_cluster\", sharedpool_id=\"$tidb_cluster\", job=~\".*ticdc.*\", instance=~\"$ticdc_instance\"})", + "format": "time_series", + "instant": true, + "refId": "A" + } + ], + "title": "Build Info", + "type": "table" + } + ], + "title": "Server", + "type": "row" + }, { "collapsed": true, "datasource": null, @@ -14135,6 +14135,105 @@ "align": false, "alignLevel": null } + }, + { + "aliasColors": {}, + "bars": false, + "dashLength": 10, + "dashes": false, + "datasource": "${DS_TEST-CLUSTER}", + "description": "The number of dml rows affected that sink flushes to downstream per second.", + "fieldConfig": { + "defaults": { + "links": [] + }, + "overrides": [] + }, + "fill": 1, + "fillGradient": 0, + "gridPos": { + "h": 8, + "w": 12, + "x": 12, + "y": 32 + }, + "hiddenSeries": false, + "id": 636, + "legend": { + "alignAsTable": true, + "avg": false, + "current": true, + "max": true, + "min": false, + "rightSide": false, + "show": true, + "sort": "current", + "sortDesc": true, + "total": false, + "values": true + }, + "lines": true, + "linewidth": 1, + "links": [], + "nullPointMode": "null", + "options": { + "alertThreshold": true + }, + "percentage": false, + "pluginVersion": "7.5.17", + "pointradius": 2, + "points": false, + "renderer": "flot", + "seriesOverrides": [], + "spaceLength": 10, + "stack": false, + "steppedLine": false, + "targets": [ + { + "exemplar": true, + "expr": "sum(rate(ticdc_sink_dml_event_affected_row_count{k8s_cluster=\"$k8s_cluster\", sharedpool_id=\"$tidb_cluster\",keyspace_name=~\"$keyspace_name\", changefeed=~\"$changefeed\", instance=~\"$ticdc_instance\"}[1m])) by (keyspace_name, changefeed, type, row_type)", + "format": "time_series", + "interval": "", + "intervalFactor": 1, + "legendFormat": "{{keyspace_name}}-{{changefeed}}-{{type}}-{{row_type}}", + "refId": "A" + } + ], + "thresholds": [], + "timeFrom": null, + "timeRegions": [], + "timeShift": null, + "title": "Row Affected Count / s", + "tooltip": { + "shared": true, + "sort": 2, + "value_type": "individual" + }, + "type": "graph", + "xaxis": { + "buckets": null, + "mode": "time", + "name": null, + "show": true, + "values": [] + }, + "yaxes": [ + { + "decimals": 0, + "format": "short", + "logBase": 1, + "min": "0", + "show": true + }, + { + "format": "none", + "logBase": 1, + "show": false + } + ], + "yaxis": { + "align": false + } } ], "title": "Sink - Transaction Sink", @@ -23961,4 +24060,4 @@ "title": "${DS_TEST-CLUSTER}-TiCDC-New-Arch", "uid": "YiGL8hBZ0aac", "version": 36 -} +} \ No newline at end of file diff --git a/metrics/grafana/ticdc_new_arch_with_keyspace_name.json b/metrics/grafana/ticdc_new_arch_with_keyspace_name.json index 2c7143cdae..43c4e35c6f 100644 --- a/metrics/grafana/ticdc_new_arch_with_keyspace_name.json +++ b/metrics/grafana/ticdc_new_arch_with_keyspace_name.json @@ -5429,6 +5429,105 @@ "align": false, "alignLevel": null } + }, + { + "aliasColors": {}, + "bars": false, + "dashLength": 10, + "dashes": false, + "datasource": "${DS_TEST-CLUSTER}", + "description": "The number of dml rows affected that sink flushes to downstream per second.", + "fieldConfig": { + "defaults": { + "links": [] + }, + "overrides": [] + }, + "fill": 1, + "fillGradient": 0, + "gridPos": { + "h": 8, + "w": 12, + "x": 12, + "y": 32 + }, + "hiddenSeries": false, + "id": 636, + "legend": { + "alignAsTable": true, + "avg": false, + "current": true, + "max": true, + "min": false, + "rightSide": false, + "show": true, + "sort": "current", + "sortDesc": true, + "total": false, + "values": true + }, + "lines": true, + "linewidth": 1, + "links": [], + "nullPointMode": "null", + "options": { + "alertThreshold": true + }, + "percentage": false, + "pluginVersion": "7.5.17", + "pointradius": 2, + "points": false, + "renderer": "flot", + "seriesOverrides": [], + "spaceLength": 10, + "stack": false, + "steppedLine": false, + "targets": [ + { + "exemplar": true, + "expr": "sum(rate(ticdc_sink_dml_event_affected_row_count{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\",keyspace_name=~\"$keyspace_name\", changefeed=~\"$changefeed\", instance=~\"$ticdc_instance\"}[1m])) by (keyspace_name, changefeed, type, row_type)", + "format": "time_series", + "interval": "", + "intervalFactor": 1, + "legendFormat": "{{keyspace_name}}-{{changefeed}}-{{type}}-{{row_type}}", + "refId": "A" + } + ], + "thresholds": [], + "timeFrom": null, + "timeRegions": [], + "timeShift": null, + "title": "Row Affected Count / s", + "tooltip": { + "shared": true, + "sort": 2, + "value_type": "individual" + }, + "type": "graph", + "xaxis": { + "buckets": null, + "mode": "time", + "name": null, + "show": true, + "values": [] + }, + "yaxes": [ + { + "decimals": 0, + "format": "short", + "logBase": 1, + "min": "0", + "show": true + }, + { + "format": "none", + "logBase": 1, + "show": false + } + ], + "yaxis": { + "align": false + } } ], "title": "Sink - Transaction Sink", diff --git a/pkg/metrics/sink.go b/pkg/metrics/sink.go index ac36bfbfb5..8f5ed89868 100644 --- a/pkg/metrics/sink.go +++ b/pkg/metrics/sink.go @@ -66,6 +66,14 @@ var ( Help: "Total count of DML events.", }, []string{getKeyspaceLabel(), "changefeed"}) + ExecDMLEventRowsAffectedCounter = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: "ticdc", + Subsystem: "sink", + Name: "dml_event_affected_row_count", + Help: "Total count of affected rows.", + }, []string{getKeyspaceLabel(), "changefeed", "type", "row_type"}) + // ExecDDLHistogram records the exexution time of a DDL. ExecDDLHistogram = prometheus.NewHistogramVec( prometheus.HistogramOpts{ @@ -246,6 +254,7 @@ func initSinkMetrics(registry *prometheus.Registry) { registry.MustRegister(EventSizeHistogram) registry.MustRegister(ExecutionErrorCounter) registry.MustRegister(ExecDMLEventCounter) + registry.MustRegister(ExecDMLEventRowsAffectedCounter) // txn sink metrics registry.MustRegister(ConflictDetectDuration) diff --git a/pkg/metrics/statistics.go b/pkg/metrics/statistics.go index 6c636290f4..410109ea74 100644 --- a/pkg/metrics/statistics.go +++ b/pkg/metrics/statistics.go @@ -94,6 +94,21 @@ func (b *Statistics) RecordDDLExecution(executor func() error) error { return nil } +func (b *Statistics) RecordTotalRowsAffected(actualRowsAffected, expectedRowsAffected int64) { + keyspace := b.changefeedID.Keyspace() + changefeedID := b.changefeedID.Name() + ExecDMLEventRowsAffectedCounter.WithLabelValues(keyspace, changefeedID, "actual", "total").Add(float64(actualRowsAffected)) + ExecDMLEventRowsAffectedCounter.WithLabelValues(keyspace, changefeedID, "expected", "total").Add(float64(expectedRowsAffected)) +} + +func (b *Statistics) RecordRowsAffected(rowsAffected int64, rowType common.RowType) { + keyspace := b.changefeedID.Keyspace() + changefeedID := b.changefeedID.Name() + ExecDMLEventRowsAffectedCounter.WithLabelValues(keyspace, changefeedID, "actual", rowType.String()).Add(float64(rowsAffected)) + ExecDMLEventRowsAffectedCounter.WithLabelValues(keyspace, changefeedID, "expected", rowType.String()).Add(1) + b.RecordTotalRowsAffected(rowsAffected, 1) +} + // Close release some internal resources. func (b *Statistics) Close() { keyspace := b.changefeedID.Keyspace() diff --git a/pkg/sink/mysql/mysql_writer_dml.go b/pkg/sink/mysql/mysql_writer_dml.go index 61683dccca..11508366c2 100644 --- a/pkg/sink/mysql/mysql_writer_dml.go +++ b/pkg/sink/mysql/mysql_writer_dml.go @@ -102,19 +102,21 @@ func (w *Writer) prepareDMLs(events []*commonEvent.DMLEvent) (*preparedDMLs, err // Step 2: prepare the dmls for each group var ( - queryList []string - argsList [][]interface{} + queryList []string + argsList [][]interface{} + rowTypesList []common.RowType ) for _, sortedEventGroups := range eventsGroupSortedByUpdateTs { for _, eventsInGroup := range sortedEventGroups { tableInfo := eventsInGroup[0].TableInfo if !w.shouldGenBatchSQL(tableInfo.HasPKOrNotNullUK, tableInfo.HasVirtualColumns(), eventsInGroup) { - queryList, argsList = w.generateNormalSQLs(eventsInGroup) + queryList, argsList, rowTypesList = w.generateNormalSQLs(eventsInGroup) } else { - queryList, argsList = w.generateBatchSQL(eventsInGroup) + queryList, argsList, rowTypesList = w.generateBatchSQL(eventsInGroup) } dmls.sqls = append(dmls.sqls, queryList...) dmls.values = append(dmls.values, argsList...) + dmls.rowTypes = append(dmls.rowTypes, rowTypesList...) } } // Pre-check log level to avoid dmls.String() being called unnecessarily @@ -203,34 +205,38 @@ func allRowInSameSafeMode(safemode bool, events []*commonEvent.DMLEvent) bool { // // Considering the batch algorithm in safe mode is O(n^3), which n is the number of rows. // So we need to limit the number of rows in one batch to avoid performance issues. -func (w *Writer) generateBatchSQL(events []*commonEvent.DMLEvent) ([]string, [][]interface{}) { +func (w *Writer) generateBatchSQL(events []*commonEvent.DMLEvent) ([]string, [][]interface{}, []common.RowType) { if len(events) == 0 { - return []string{}, [][]interface{}{} + return []string{}, [][]interface{}{}, []common.RowType{} } sqlList := make([]string, 0) argsList := make([][]interface{}, 0) + rowTypesList := make([]common.RowType, 0) batchSQL := func(events []*commonEvent.DMLEvent) { inSafeMode := w.cfg.SafeMode || w.isInErrorCausedSafeMode || events[0].CommitTs < events[0].ReplicatingTs if len(events) == 1 { // only one event, we don't need to do batch - sql, args := w.generateSQLForSingleEvent(events[0], inSafeMode) + sql, args, rowTypes := w.generateSQLForSingleEvent(events[0], inSafeMode) sqlList = append(sqlList, sql...) argsList = append(argsList, args...) + rowTypesList = append(rowTypesList, rowTypes...) return } if inSafeMode { // Insert will translate to Replace - sql, args := w.generateBatchSQLInSafeMode(events) + sql, args, rowTypes := w.generateBatchSQLInSafeMode(events) sqlList = append(sqlList, sql...) argsList = append(argsList, args...) + rowTypesList = append(rowTypesList, rowTypes...) } else { - sql, args := w.generateBatchSQLInUnSafeMode(events) + sql, args, rowTypes := w.generateBatchSQLInUnSafeMode(events) sqlList = append(sqlList, sql...) argsList = append(argsList, args...) + rowTypesList = append(rowTypesList, rowTypes...) } } @@ -249,10 +255,10 @@ func (w *Writer) generateBatchSQL(events []*commonEvent.DMLEvent) ([]string, [][ } batchSQL(events[beginIndex:]) - return sqlList, argsList + return sqlList, argsList, rowTypesList } -func (w *Writer) generateSQLForSingleEvent(event *commonEvent.DMLEvent, inDataSafeMode bool) ([]string, [][]interface{}) { +func (w *Writer) generateSQLForSingleEvent(event *commonEvent.DMLEvent, inDataSafeMode bool) ([]string, [][]interface{}, []common.RowType) { tableInfo := event.TableInfo rowLists := make([]*commonEvent.RowChange, 0, event.Len()) for { @@ -266,24 +272,26 @@ func (w *Writer) generateSQLForSingleEvent(event *commonEvent.DMLEvent, inDataSa return w.batchSingleTxnDmls(rowLists, tableInfo, inDataSafeMode) } -func (w *Writer) generateBatchSQLsPerEvent(events []*commonEvent.DMLEvent) ([]string, [][]interface{}) { +func (w *Writer) generateBatchSQLsPerEvent(events []*commonEvent.DMLEvent) ([]string, [][]interface{}, []common.RowType) { var ( - queries []string - args [][]interface{} + queries []string + args [][]interface{} + rowTypesList []common.RowType ) for _, event := range events { if event.Len() == 0 { continue } inSafeMode := w.cfg.SafeMode || w.isInErrorCausedSafeMode || event.CommitTs < event.ReplicatingTs - sqls, vals := w.generateSQLForSingleEvent(event, inSafeMode) + sqls, vals, rowTypes := w.generateSQLForSingleEvent(event, inSafeMode) queries = append(queries, sqls...) args = append(args, vals...) + rowTypesList = append(rowTypesList, rowTypes...) } - return queries, args + return queries, args, rowTypesList } -func (w *Writer) generateBatchSQLInUnSafeMode(events []*commonEvent.DMLEvent) ([]string, [][]interface{}) { +func (w *Writer) generateBatchSQLInUnSafeMode(events []*commonEvent.DMLEvent) ([]string, [][]interface{}, []common.RowType) { tableInfo := events[0].TableInfo type RowChangeWithKeys struct { RowChange *commonEvent.RowChange @@ -346,8 +354,8 @@ func (w *Writer) generateBatchSQLInUnSafeMode(events []*commonEvent.DMLEvent) ([ rowKey := rowLists[i].RowKeys if nextRowType == common.RowTypeInsert { if compareKeys(rowKey, rowLists[j].RowKeys) { - sql, values := w.generateBatchSQLsPerEvent(events) - log.Info("normal sql should be", zap.Any("sql", sql), zap.Any("values", values), zap.Int("writerID", w.id)) + sql, values, rowTypes := w.generateBatchSQLsPerEvent(events) + log.Info("normal sql should be", zap.Any("sql", sql), zap.Any("values", values), zap.Any("rowTypes", rowTypes), zap.Int("writerID", w.id)) log.Panic("Here are two invalid rows with the same row type and keys", zap.Any("Events", events), zap.Any("i", i), zap.Any("j", j), zap.Int("writerID", w.id)) } } else if nextRowType == common.RowTypeDelete { @@ -385,8 +393,8 @@ func (w *Writer) generateBatchSQLInUnSafeMode(events []*commonEvent.DMLEvent) ([ } if nextRowType == common.RowTypeInsert { if compareKeys(rowKey, rowLists[j].RowKeys) { - sql, values := w.generateBatchSQLsPerEvent(events) - log.Info("normal sql should be", zap.Any("sql", sql), zap.Any("values", values), zap.Int("writerID", w.id)) + sql, values, rowTypes := w.generateBatchSQLsPerEvent(events) + log.Info("normal sql should be", zap.Any("sql", sql), zap.Any("values", values), zap.Any("rowTypes", rowTypes), zap.Int("writerID", w.id)) log.Panic("Here are two invalid rows with the same row type and keys", zap.Any("Events", events), zap.Any("i", i), zap.Any("j", j), zap.Int("writerID", w.id)) } } else if nextRowType == common.RowTypeDelete { @@ -456,14 +464,14 @@ func (w *Writer) generateBatchSQLInUnSafeMode(events []*commonEvent.DMLEvent) ([ return w.batchSingleTxnDmls(finalRowLists, tableInfo, false) } -func (w *Writer) generateBatchSQLInSafeMode(events []*commonEvent.DMLEvent) ([]string, [][]interface{}) { +func (w *Writer) generateBatchSQLInSafeMode(events []*commonEvent.DMLEvent) ([]string, [][]interface{}, []common.RowType) { tableInfo := events[0].TableInfo // step 1. divide update row to delete row and insert row, and set into map based on the key hash rowsMap := make(map[uint64][]*commonEvent.RowChange) hashToKeyMap := make(map[uint64][]byte) - addRowToMap := func(row *commonEvent.RowChange, rowData *chunk.Row, event *commonEvent.DMLEvent) ([]string, [][]interface{}, bool) { + addRowToMap := func(row *commonEvent.RowChange, rowData *chunk.Row, event *commonEvent.DMLEvent) ([]string, [][]interface{}, []common.RowType, bool) { hashValue, keyValue := genKeyAndHash(rowData, tableInfo) if _, ok := hashToKeyMap[hashValue]; !ok { hashToKeyMap[hashValue] = keyValue @@ -472,12 +480,12 @@ func (w *Writer) generateBatchSQLInSafeMode(events []*commonEvent.DMLEvent) ([]s log.Warn("the key hash is equal, but the keys is not the same; so we don't use batch generate sql, but use the normal generated sql instead") event.Rewind() // reset event // fallback to per-event batch sql - sql, args := w.generateBatchSQLsPerEvent(events) - return sql, args, false + sql, args, rowTypes := w.generateBatchSQLsPerEvent(events) + return sql, args, rowTypes, false } } rowsMap[hashValue] = append(rowsMap[hashValue], row) - return nil, nil, true + return nil, nil, nil, true } for _, event := range events { @@ -491,28 +499,28 @@ func (w *Writer) generateBatchSQLInSafeMode(events []*commonEvent.DMLEvent) ([]s case common.RowTypeUpdate: { deleteRow := commonEvent.RowChange{RowType: common.RowTypeDelete, PreRow: row.PreRow} - sql, args, ok := addRowToMap(&deleteRow, &row.PreRow, event) + sql, args, rowTypes, ok := addRowToMap(&deleteRow, &row.PreRow, event) if !ok { - return sql, args + return sql, args, rowTypes } } { insertRow := commonEvent.RowChange{RowType: common.RowTypeInsert, Row: row.Row} - sql, args, ok := addRowToMap(&insertRow, &row.Row, event) + sql, args, rowTypes, ok := addRowToMap(&insertRow, &row.Row, event) if !ok { - return sql, args + return sql, args, rowTypes } } case common.RowTypeDelete: - sql, args, ok := addRowToMap(&row, &row.PreRow, event) + sql, args, rowTypes, ok := addRowToMap(&row, &row.PreRow, event) if !ok { - return sql, args + return sql, args, rowTypes } case common.RowTypeInsert: - sql, args, ok := addRowToMap(&row, &row.Row, event) + sql, args, rowTypes, ok := addRowToMap(&row, &row.Row, event) if !ok { - return sql, args + return sql, args, rowTypes } } } @@ -535,8 +543,8 @@ func (w *Writer) generateBatchSQLInSafeMode(events []*commonEvent.DMLEvent) ([]s for i := 1; i < len(rowChanges); i++ { rowType := rowChanges[i].RowType if rowType == prevType { - sql, values := w.generateBatchSQLsPerEvent(events) - log.Info("normal sql should be", zap.Any("sql", sql), zap.Any("values", values), zap.Int("writerID", w.id)) + sql, values, rowTypes := w.generateBatchSQLsPerEvent(events) + log.Info("normal sql should be", zap.Any("sql", sql), zap.Any("values", values), zap.Any("rowTypes", rowTypes), zap.Int("writerID", w.id)) log.Panic("invalid row changes", zap.String("schemaName", tableInfo.GetSchemaName()), zap.Any("PKIndex", tableInfo.GetPKIndex()), zap.String("tableName", tableInfo.GetTableName()), zap.Any("rowChanges", rowChanges), zap.Any("prevType", prevType), zap.Any("currentType", rowType), zap.Int("writerID", w.id)) @@ -549,10 +557,11 @@ func (w *Writer) generateBatchSQLInSafeMode(events []*commonEvent.DMLEvent) ([]s return w.batchSingleTxnDmls(rowsList, tableInfo, true) } -func (w *Writer) generateNormalSQLs(events []*commonEvent.DMLEvent) ([]string, [][]interface{}) { +func (w *Writer) generateNormalSQLs(events []*commonEvent.DMLEvent) ([]string, [][]interface{}, []common.RowType) { var ( - queries []string - args [][]interface{} + queries []string + args [][]interface{} + rowTypes []common.RowType ) for _, event := range events { @@ -560,14 +569,15 @@ func (w *Writer) generateNormalSQLs(events []*commonEvent.DMLEvent) ([]string, [ continue } - queryList, argsList := w.generateNormalSQL(event) + queryList, argsList, rowTypesList := w.generateNormalSQL(event) queries = append(queries, queryList...) args = append(args, argsList...) + rowTypes = append(rowTypes, rowTypesList...) } - return queries, args + return queries, args, rowTypes } -func (w *Writer) generateNormalSQL(event *commonEvent.DMLEvent) ([]string, [][]interface{}) { +func (w *Writer) generateNormalSQL(event *commonEvent.DMLEvent) ([]string, [][]interface{}, []common.RowType) { inSafeMode := w.cfg.SafeMode || w.isInErrorCausedSafeMode || event.CommitTs < event.ReplicatingTs log.Debug("inSafeMode", @@ -580,8 +590,9 @@ func (w *Writer) generateNormalSQL(event *commonEvent.DMLEvent) ([]string, [][]i ) var ( - queries []string - argsList [][]interface{} + queries []string + argsList [][]interface{} + rowTypesList []common.RowType ) for { row, ok := event.GetNextRow() @@ -589,8 +600,9 @@ func (w *Writer) generateNormalSQL(event *commonEvent.DMLEvent) ([]string, [][]i break } var ( - query string - args []interface{} + query string + args []interface{} + rowType common.RowType ) switch row.RowType { case common.RowTypeUpdate: @@ -601,23 +613,29 @@ func (w *Writer) generateNormalSQL(event *commonEvent.DMLEvent) ([]string, [][]i if query != "" { queries = append(queries, query) argsList = append(argsList, args) + rowTypesList = append(rowTypesList, common.RowTypeDelete) } query, args = buildInsert(event.TableInfo, row, inSafeMode) + rowType = common.RowTypeInsert } else { query, args = buildUpdate(event.TableInfo, row) + rowType = common.RowTypeUpdate } case common.RowTypeDelete: query, args = buildDelete(event.TableInfo, row) + rowType = common.RowTypeDelete case common.RowTypeInsert: query, args = buildInsert(event.TableInfo, row, inSafeMode) + rowType = common.RowTypeInsert } if query != "" { queries = append(queries, query) argsList = append(argsList, args) + rowTypesList = append(rowTypesList, rowType) } } - return queries, argsList + return queries, argsList, rowTypesList } func (w *Writer) execDMLWithMaxRetries(dmls *preparedDMLs) error { @@ -720,12 +738,15 @@ func (w *Writer) sequenceExecute( } } - var execError error + var ( + res sql.Result + execError error + ) if prepStmt == nil { - _, execError = tx.ExecContext(ctx, query, args...) + res, execError = tx.ExecContext(ctx, query, args...) } else { //nolint:sqlclosecheck - _, execError = tx.Stmt(prepStmt).ExecContext(ctx, args...) + res, execError = tx.Stmt(prepStmt).ExecContext(ctx, args...) } if execError != nil { @@ -738,6 +759,12 @@ func (w *Writer) sequenceExecute( cancelFunc() return cerror.WrapError(cerror.ErrMySQLTxnError, errors.WithMessage(execError, fmt.Sprintf("Failed to execute DMLs, query info:%s, args:%v; ", query, args))) } + rowsAffected, err := res.RowsAffected() + if err != nil { + log.Warn("get rows affected rows failed", zap.Error(err)) + } else { + w.statistics.RecordRowsAffected(rowsAffected, dmls.rowTypes[i]) + } cancelFunc() } return nil @@ -768,10 +795,16 @@ func (w *Writer) multiStmtExecute( // conn.ExecContext only use one RTT, while db.Begin + tx.ExecContext + db.Commit need three RTTs. // when some error occurs, we just need to close the conn to avoid the session to be reuse unexpectedly. // The txn can ensure the atomicity of the transaction. - _, err = conn.ExecContext(ctx, multiStmtSQLWithTxn, multiStmtArgs...) + res, err := conn.ExecContext(ctx, multiStmtSQLWithTxn, multiStmtArgs...) if err != nil { return cerror.WrapError(cerror.ErrMySQLTxnError, errors.WithMessage(err, fmt.Sprintf("Failed to execute DMLs, query info:%s, args:%v; ", multiStmtSQLWithTxn, multiStmtArgs))) } + rowsAffected, err := res.RowsAffected() + if err != nil { + log.Warn("get rows affected rows failed", zap.Error(err)) + } else { + w.statistics.RecordTotalRowsAffected(rowsAffected, dmls.RowsAffected()) + } return nil } @@ -809,7 +842,7 @@ func (w *Writer) batchSingleTxnDmls( rows []*commonEvent.RowChange, tableInfo *common.TableInfo, inSafeMode bool, -) (sqls []string, values [][]interface{}) { +) (sqls []string, values [][]interface{}, rowTypes []common.RowType) { insertRows, updateRows, deleteRows := w.groupRowsByType(rows, tableInfo) // handle delete @@ -818,6 +851,7 @@ func (w *Writer) batchSingleTxnDmls( sql, value := sqlmodel.GenDeleteSQL(rows...) sqls = append(sqls, sql) values = append(values, value) + rowTypes = append(rowTypes, common.RowTypeDelete) } } @@ -828,6 +862,7 @@ func (w *Writer) batchSingleTxnDmls( s, v := w.genUpdateSQL(rows...) sqls = append(sqls, s...) values = append(values, v...) + rowTypes = append(rowTypes, common.RowTypeUpdate) } // The behavior of update statement differs between TiDB and MySQL. // So we don't use batch update statement when downstream is MySQL. @@ -838,6 +873,7 @@ func (w *Writer) batchSingleTxnDmls( sql, value := row.GenSQL(sqlmodel.DMLUpdate) sqls = append(sqls, sql) values = append(values, value) + rowTypes = append(rowTypes, common.RowTypeUpdate) } } } @@ -854,8 +890,8 @@ func (w *Writer) batchSingleTxnDmls( sql, value := sqlmodel.GenInsertSQL(sqlmodel.DMLInsert, rows...) sqls = append(sqls, sql) values = append(values, value) - } + rowTypes = append(rowTypes, common.RowTypeInsert) } } diff --git a/pkg/sink/mysql/mysql_writer_dml_test.go b/pkg/sink/mysql/mysql_writer_dml_test.go index b7a3eb321d..5b795ada94 100644 --- a/pkg/sink/mysql/mysql_writer_dml_test.go +++ b/pkg/sink/mysql/mysql_writer_dml_test.go @@ -146,9 +146,10 @@ func TestGenerateBatchSQL(t *testing.T) { dmlInsertEvent2 := helper.DML2Event("test", "t", "insert into t values (17, 'test')") helper.ExecuteDeleteDml("test", "t", "delete from t where id = 17") dmlInsertEvent3 := helper.DML2Event("test", "t", "insert into t values (18, 'test')") - sql, args := writer.generateBatchSQL([]*commonEvent.DMLEvent{dmlInsertEvent, dmlInsertEvent2, dmlInsertEvent3}) + sql, args, rowTypes := writer.generateBatchSQL([]*commonEvent.DMLEvent{dmlInsertEvent, dmlInsertEvent2, dmlInsertEvent3}) require.Equal(t, 2, len(sql)) require.Equal(t, 2, len(args)) + require.Equal(t, 2, len(rowTypes)) require.Equal(t, "INSERT INTO `test`.`t` (`id`,`name`) VALUES (?,?),(?,?)", sql[0]) require.Equal(t, []interface{}{int64(16), "test", int64(17), "test"}, args[0]) require.Equal(t, "INSERT INTO `test`.`t` (`id`,`name`) VALUES (?,?)", sql[1]) @@ -156,7 +157,7 @@ func TestGenerateBatchSQL(t *testing.T) { writer.cfg.SafeMode = true writer.cfg.MaxTxnRow = 3 - sql, args = writer.generateBatchSQL([]*commonEvent.DMLEvent{dmlInsertEvent, dmlInsertEvent2, dmlInsertEvent3}) + sql, args, rowTypes = writer.generateBatchSQL([]*commonEvent.DMLEvent{dmlInsertEvent, dmlInsertEvent2, dmlInsertEvent3}) require.Equal(t, 1, len(sql)) require.Equal(t, 1, len(args)) require.Equal(t, "REPLACE INTO `test`.`t` (`id`,`name`) VALUES (?,?),(?,?),(?,?)", sql[0]) @@ -191,7 +192,7 @@ func TestGenerateBatchSQL(t *testing.T) { // Measure execution time start := time.Now() - sql, args = writer.generateBatchSQL([]*commonEvent.DMLEvent{dmlEvent}) + sql, args, rowTypes = writer.generateBatchSQL([]*commonEvent.DMLEvent{dmlEvent}) duration := time.Since(start) // Verify performance requirement @@ -227,9 +228,10 @@ func TestGenerateBatchSQLInUnSafeMode(t *testing.T) { // Delete A + insert A dmlDeleteEvent := helper.DML2DeleteEvent("test", "t", "insert into t values (1, 'test')", "delete from t where id = 1") dmlInsertEvent := helper.DML2Event("test", "t", "insert into t values (1, 'test')") - sql, args := writer.generateBatchSQLInUnSafeMode([]*commonEvent.DMLEvent{dmlDeleteEvent, dmlInsertEvent}) + sql, args, rowTypes := writer.generateBatchSQLInUnSafeMode([]*commonEvent.DMLEvent{dmlDeleteEvent, dmlInsertEvent}) require.Equal(t, 2, len(sql)) require.Equal(t, 2, len(args)) + require.Equal(t, 2, len(rowTypes)) require.Equal(t, "DELETE FROM `test`.`t` WHERE (`id` = ?)", sql[0]) require.Equal(t, []interface{}{int64(1)}, args[0]) require.Equal(t, "INSERT INTO `test`.`t` (`id`,`name`) VALUES (?,?)", sql[1]) @@ -238,9 +240,10 @@ func TestGenerateBatchSQLInUnSafeMode(t *testing.T) { // Delete A + Update A dmlDeleteEvent = helper.DML2DeleteEvent("test", "t", "insert into t values (2, 'test')", "delete from t where id = 2") dmlUpdateEvent, _ := helper.DML2UpdateEvent("test", "t", "insert into t values (2, 'test')", "update t set name = 'test2' where id = 2") - sql, args = writer.generateBatchSQLInUnSafeMode([]*commonEvent.DMLEvent{dmlDeleteEvent, dmlUpdateEvent}) + sql, args, rowTypes = writer.generateBatchSQLInUnSafeMode([]*commonEvent.DMLEvent{dmlDeleteEvent, dmlUpdateEvent}) require.Equal(t, 2, len(sql)) require.Equal(t, 2, len(args)) + require.Equal(t, 2, len(rowTypes)) require.Equal(t, "DELETE FROM `test`.`t` WHERE (`id` = ?)", sql[0]) require.Equal(t, []interface{}{int64(2)}, args[0]) require.Equal(t, "UPDATE `test`.`t` SET `id` = ?, `name` = ? WHERE `id` = ? LIMIT 1", sql[1]) @@ -248,7 +251,7 @@ func TestGenerateBatchSQLInUnSafeMode(t *testing.T) { // Insert A + Delete A dmlDeleteEvent = helper.DML2DeleteEvent("test", "t", "insert into t values (3, 'test')", "delete from t where id = 3") dmlInsertEvent = helper.DML2Event("test", "t", "insert into t values (3, 'test')") - sql, args = writer.generateBatchSQLInUnSafeMode([]*commonEvent.DMLEvent{dmlInsertEvent, dmlDeleteEvent}) + sql, args, rowTypes = writer.generateBatchSQLInUnSafeMode([]*commonEvent.DMLEvent{dmlInsertEvent, dmlDeleteEvent}) require.Equal(t, 1, len(sql)) require.Equal(t, 1, len(args)) require.Equal(t, "DELETE FROM `test`.`t` WHERE (`id` = ?)", sql[0]) @@ -258,7 +261,7 @@ func TestGenerateBatchSQLInUnSafeMode(t *testing.T) { dmlInsertEvent = helper.DML2Event("test", "t", "insert into t values (4, 'test')") helper.ExecuteDeleteDml("test", "t", "delete from t where id = 4") dmlUpdateEvent, _ = helper.DML2UpdateEvent("test", "t", "insert into t values (4, 'test')", "update t set name = 'test4' where id = 4") - sql, args = writer.generateBatchSQLInUnSafeMode([]*commonEvent.DMLEvent{dmlInsertEvent, dmlUpdateEvent}) + sql, args, rowTypes = writer.generateBatchSQLInUnSafeMode([]*commonEvent.DMLEvent{dmlInsertEvent, dmlUpdateEvent}) require.Equal(t, 1, len(sql)) require.Equal(t, 1, len(args)) require.Equal(t, "INSERT INTO `test`.`t` (`id`,`name`) VALUES (?,?)", sql[0]) @@ -267,7 +270,7 @@ func TestGenerateBatchSQLInUnSafeMode(t *testing.T) { // Update A + Delete A dmlDeleteEvent = helper.DML2DeleteEvent("test", "t", "insert into t values (5, 'test5')", "delete from t where id = 5") dmlUpdateEvent, _ = helper.DML2UpdateEvent("test", "t", "insert into t values (5, 'test')", "update t set name = 'test5' where id = 5") - sql, args = writer.generateBatchSQLInUnSafeMode([]*commonEvent.DMLEvent{dmlUpdateEvent, dmlDeleteEvent}) + sql, args, rowTypes = writer.generateBatchSQLInUnSafeMode([]*commonEvent.DMLEvent{dmlUpdateEvent, dmlDeleteEvent}) require.Equal(t, 1, len(sql)) require.Equal(t, 1, len(args)) require.Equal(t, "DELETE FROM `test`.`t` WHERE (`id` = ?)", sql[0]) @@ -277,7 +280,7 @@ func TestGenerateBatchSQLInUnSafeMode(t *testing.T) { dmlUpdateEvent, _ = helper.DML2UpdateEvent("test", "t", "insert into t values (6, 'test')", "update t set name = 'test6' where id = 6") helper.ExecuteDeleteDml("test", "t", "delete from t where id = 6") dmlUpdateEvent2, _ := helper.DML2UpdateEvent("test", "t", "insert into t values (6, 'test6')", "update t set name = 'test7' where id = 6") - sql, args = writer.generateBatchSQLInUnSafeMode([]*commonEvent.DMLEvent{dmlUpdateEvent, dmlUpdateEvent2}) + sql, args, rowTypes = writer.generateBatchSQLInUnSafeMode([]*commonEvent.DMLEvent{dmlUpdateEvent, dmlUpdateEvent2}) require.Equal(t, 1, len(sql)) require.Equal(t, 1, len(args)) require.Equal(t, "UPDATE `test`.`t` SET `id` = ?, `name` = ? WHERE `id` = ? LIMIT 1", sql[0]) @@ -288,9 +291,10 @@ func TestGenerateBatchSQLInUnSafeMode(t *testing.T) { dmlInsertEvent = helper.DML2Event("test", "t", "insert into t values (7, 'test')") helper.ExecuteDeleteDml("test", "t", "delete from t where id = 7") dmlInsertEvent2 := helper.DML2Event("test", "t", "insert into t values (7, 'test2')") - sql, args = writer.generateBatchSQLInUnSafeMode([]*commonEvent.DMLEvent{dmlDeleteEvent, dmlInsertEvent, dmlDeleteEvent, dmlInsertEvent2}) + sql, args, rowTypes = writer.generateBatchSQLInUnSafeMode([]*commonEvent.DMLEvent{dmlDeleteEvent, dmlInsertEvent, dmlDeleteEvent, dmlInsertEvent2}) require.Equal(t, 2, len(sql)) require.Equal(t, 2, len(args)) + require.Equal(t, 2, len(rowTypes)) require.Equal(t, "DELETE FROM `test`.`t` WHERE (`id` = ?) OR (`id` = ?)", sql[0]) require.Equal(t, []interface{}{int64(7), int64(7)}, args[0]) require.Equal(t, "INSERT INTO `test`.`t` (`id`,`name`) VALUES (?,?)", sql[1]) @@ -303,7 +307,7 @@ func TestGenerateBatchSQLInUnSafeMode(t *testing.T) { dmlUpdateEvent, _ = helper.DML2UpdateEvent("test", "t", "insert into t values (8, 'test')", "update t set name = 'test8' where id = 8") helper.ExecuteDeleteDml("test", "t", "delete from t where id = 8") dmlDeleteEvent2 := helper.DML2DeleteEvent("test", "t", "insert into t values (8, 'test8')", "delete from t where id = 8") - sql, args = writer.generateBatchSQLInUnSafeMode([]*commonEvent.DMLEvent{dmlDeleteEvent, dmlInsertEvent, dmlUpdateEvent, dmlDeleteEvent2}) + sql, args, rowTypes = writer.generateBatchSQLInUnSafeMode([]*commonEvent.DMLEvent{dmlDeleteEvent, dmlInsertEvent, dmlUpdateEvent, dmlDeleteEvent2}) require.Equal(t, 1, len(sql)) require.Equal(t, 1, len(args)) require.Equal(t, "DELETE FROM `test`.`t` WHERE (`id` = ?) OR (`id` = ?)", sql[0]) @@ -318,7 +322,7 @@ func TestGenerateBatchSQLInUnSafeMode(t *testing.T) { dmlUpdateEvent2, _ = helper.DML2UpdateEvent("test", "t", "insert into t values (9, 'test9')", "update t set name = 'test10' where id = 9") helper.ExecuteDeleteDml("test", "t", "delete from t where id = 9") dmlDeleteEvent2 = helper.DML2DeleteEvent("test", "t", "insert into t values (9, 'test10')", "delete from t where id = 9") - sql, args = writer.generateBatchSQLInUnSafeMode([]*commonEvent.DMLEvent{dmlDeleteEvent, dmlInsertEvent, dmlUpdateEvent, dmlUpdateEvent2, dmlDeleteEvent2}) + sql, args, rowTypes = writer.generateBatchSQLInUnSafeMode([]*commonEvent.DMLEvent{dmlDeleteEvent, dmlInsertEvent, dmlUpdateEvent, dmlUpdateEvent2, dmlDeleteEvent2}) require.Equal(t, 1, len(sql)) require.Equal(t, 1, len(args)) require.Equal(t, "DELETE FROM `test`.`t` WHERE (`id` = ?) OR (`id` = ?)", sql[0]) @@ -329,9 +333,10 @@ func TestGenerateBatchSQLInUnSafeMode(t *testing.T) { helper.ExecuteDeleteDml("test", "t", "delete from t where id = 10") dmlDeleteEvent = helper.DML2DeleteEvent("test", "t", "insert into t values (10, 'test')", "delete from t where id = 10") dmlInsertEvent2 = helper.DML2Event("test", "t", "insert into t values (10, 'test2')") - sql, args = writer.generateBatchSQLInUnSafeMode([]*commonEvent.DMLEvent{dmlInsertEvent, dmlDeleteEvent, dmlInsertEvent2}) + sql, args, rowTypes = writer.generateBatchSQLInUnSafeMode([]*commonEvent.DMLEvent{dmlInsertEvent, dmlDeleteEvent, dmlInsertEvent2}) require.Equal(t, 2, len(sql)) require.Equal(t, 2, len(args)) + require.Equal(t, 2, len(rowTypes)) require.Equal(t, "DELETE FROM `test`.`t` WHERE (`id` = ?)", sql[0]) require.Equal(t, []interface{}{int64(10)}, args[0]) require.Equal(t, "INSERT INTO `test`.`t` (`id`,`name`) VALUES (?,?)", sql[1]) @@ -343,7 +348,7 @@ func TestGenerateBatchSQLInUnSafeMode(t *testing.T) { dmlUpdateEvent, _ = helper.DML2UpdateEvent("test", "t", "insert into t values (11, 'test')", "update t set name = 'test11' where id = 11") helper.ExecuteDeleteDml("test", "t", "delete from t where id = 11") dmlDeleteEvent = helper.DML2DeleteEvent("test", "t", "insert into t values (11, 'test11')", "delete from t where id = 11") - sql, args = writer.generateBatchSQLInUnSafeMode([]*commonEvent.DMLEvent{dmlInsertEvent, dmlUpdateEvent, dmlDeleteEvent}) + sql, args, rowTypes = writer.generateBatchSQLInUnSafeMode([]*commonEvent.DMLEvent{dmlInsertEvent, dmlUpdateEvent, dmlDeleteEvent}) require.Equal(t, 1, len(sql)) require.Equal(t, 1, len(args)) require.Equal(t, "DELETE FROM `test`.`t` WHERE (`id` = ?)", sql[0]) @@ -355,7 +360,7 @@ func TestGenerateBatchSQLInUnSafeMode(t *testing.T) { dmlUpdateEvent, _ = helper.DML2UpdateEvent("test", "t", "insert into t values (12, 'test')", "update t set name = 'test12' where id = 12") helper.ExecuteDeleteDml("test", "t", "delete from t where id = 12") dmlUpdateEvent2, _ = helper.DML2UpdateEvent("test", "t", "insert into t values (12, 'test12')", "update t set name = 'test13' where id = 12") - sql, args = writer.generateBatchSQLInUnSafeMode([]*commonEvent.DMLEvent{dmlInsertEvent, dmlUpdateEvent, dmlUpdateEvent2}) + sql, args, rowTypes = writer.generateBatchSQLInUnSafeMode([]*commonEvent.DMLEvent{dmlInsertEvent, dmlUpdateEvent, dmlUpdateEvent2}) require.Equal(t, 1, len(sql)) require.Equal(t, 1, len(args)) require.Equal(t, "INSERT INTO `test`.`t` (`id`,`name`) VALUES (?,?)", sql[0]) @@ -366,7 +371,7 @@ func TestGenerateBatchSQLInUnSafeMode(t *testing.T) { helper.ExecuteDeleteDml("test", "t", "delete from t where id = 13") dmlDeleteEvent = helper.DML2DeleteEvent("test", "t", "insert into t values (14, 'test')", "delete from t where id = 14") dmlUpdateEvent, _ = helper.DML2UpdateEvent("test", "t", "insert into t values (15, 'test')", "update t set name = 'test15' where id = 15") - sql, args = writer.generateBatchSQLInUnSafeMode([]*commonEvent.DMLEvent{dmlInsertEvent, dmlDeleteEvent, dmlUpdateEvent}) + sql, args, rowTypes = writer.generateBatchSQLInUnSafeMode([]*commonEvent.DMLEvent{dmlInsertEvent, dmlDeleteEvent, dmlUpdateEvent}) require.Equal(t, 3, len(sql)) require.Equal(t, 3, len(args)) require.Equal(t, "DELETE FROM `test`.`t` WHERE (`id` = ?)", sql[0]) @@ -382,7 +387,7 @@ func TestGenerateBatchSQLInUnSafeMode(t *testing.T) { dmlInsertEvent2 = helper.DML2Event("test", "t", "insert into t values (17, 'test')") helper.ExecuteDeleteDml("test", "t", "delete from t where id = 17") dmlInsertEvent3 := helper.DML2Event("test", "t", "insert into t values (18, 'test')") - sql, args = writer.generateBatchSQLInUnSafeMode([]*commonEvent.DMLEvent{dmlInsertEvent, dmlInsertEvent2, dmlInsertEvent3}) + sql, args, rowTypes = writer.generateBatchSQLInUnSafeMode([]*commonEvent.DMLEvent{dmlInsertEvent, dmlInsertEvent2, dmlInsertEvent3}) require.Equal(t, 1, len(sql)) require.Equal(t, 1, len(args)) require.Equal(t, "INSERT INTO `test`.`t` (`id`,`name`) VALUES (?,?),(?,?),(?,?)", sql[0]) @@ -403,7 +408,7 @@ func TestGenerateBatchSQLInSafeMode(t *testing.T) { // Delete A + insert A dmlDeleteEvent := helper.DML2DeleteEvent("test", "t", "insert into t values (1, 'test')", "delete from t where id = 1") dmlInsertEvent := helper.DML2Event("test", "t", "insert into t values (1, 'test')") - sql, args := writer.generateBatchSQLInSafeMode([]*commonEvent.DMLEvent{dmlDeleteEvent, dmlInsertEvent}) + sql, args, rowTypes := writer.generateBatchSQLInSafeMode([]*commonEvent.DMLEvent{dmlDeleteEvent, dmlInsertEvent}) require.Equal(t, 1, len(sql)) require.Equal(t, 1, len(args)) require.Equal(t, "REPLACE INTO `test`.`t` (`id`,`name`) VALUES (?,?)", sql[0]) @@ -412,7 +417,7 @@ func TestGenerateBatchSQLInSafeMode(t *testing.T) { // Insert A + Delete A dmlDeleteEvent = helper.DML2DeleteEvent("test", "t", "insert into t values (3, 'test')", "delete from t where id = 3") dmlInsertEvent = helper.DML2Event("test", "t", "insert into t values (3, 'test')") - sql, args = writer.generateBatchSQLInSafeMode([]*commonEvent.DMLEvent{dmlInsertEvent, dmlDeleteEvent}) + sql, args, rowTypes = writer.generateBatchSQLInSafeMode([]*commonEvent.DMLEvent{dmlInsertEvent, dmlDeleteEvent}) require.Equal(t, 1, len(sql)) require.Equal(t, 1, len(args)) require.Equal(t, "DELETE FROM `test`.`t` WHERE (`id` = ?)", sql[0]) @@ -422,7 +427,7 @@ func TestGenerateBatchSQLInSafeMode(t *testing.T) { dmlInsertEvent = helper.DML2Event("test", "t", "insert into t values (4, 'test')") helper.ExecuteDeleteDml("test", "t", "delete from t where id = 4") dmlUpdateEvent, _ := helper.DML2UpdateEvent("test", "t", "insert into t values (4, 'test')", "update t set name = 'test4' where id = 4") - sql, args = writer.generateBatchSQLInSafeMode([]*commonEvent.DMLEvent{dmlInsertEvent, dmlUpdateEvent}) + sql, args, rowTypes = writer.generateBatchSQLInSafeMode([]*commonEvent.DMLEvent{dmlInsertEvent, dmlUpdateEvent}) require.Equal(t, 1, len(sql)) require.Equal(t, 1, len(args)) require.Equal(t, "REPLACE INTO `test`.`t` (`id`,`name`) VALUES (?,?)", sql[0]) @@ -431,7 +436,7 @@ func TestGenerateBatchSQLInSafeMode(t *testing.T) { // Update A + Delete A dmlDeleteEvent = helper.DML2DeleteEvent("test", "t", "insert into t values (5, 'test5')", "delete from t where id = 5") dmlUpdateEvent, _ = helper.DML2UpdateEvent("test", "t", "insert into t values (5, 'test')", "update t set name = 'test5' where id = 5") - sql, args = writer.generateBatchSQLInSafeMode([]*commonEvent.DMLEvent{dmlUpdateEvent, dmlDeleteEvent}) + sql, args, rowTypes = writer.generateBatchSQLInSafeMode([]*commonEvent.DMLEvent{dmlUpdateEvent, dmlDeleteEvent}) require.Equal(t, 1, len(sql)) require.Equal(t, 1, len(args)) require.Equal(t, "DELETE FROM `test`.`t` WHERE (`id` = ?)", sql[0]) @@ -441,7 +446,7 @@ func TestGenerateBatchSQLInSafeMode(t *testing.T) { dmlUpdateEvent, _ = helper.DML2UpdateEvent("test", "t", "insert into t values (6, 'test')", "update t set name = 'test6' where id = 6") helper.ExecuteDeleteDml("test", "t", "delete from t where id = 6") dmlUpdateEvent2, _ := helper.DML2UpdateEvent("test", "t", "insert into t values (6, 'test6')", "update t set name = 'test7' where id = 6") - sql, args = writer.generateBatchSQLInSafeMode([]*commonEvent.DMLEvent{dmlUpdateEvent, dmlUpdateEvent2}) + sql, args, rowTypes = writer.generateBatchSQLInSafeMode([]*commonEvent.DMLEvent{dmlUpdateEvent, dmlUpdateEvent2}) require.Equal(t, 1, len(sql)) require.Equal(t, 1, len(args)) require.Equal(t, "REPLACE INTO `test`.`t` (`id`,`name`) VALUES (?,?)", sql[0]) @@ -452,7 +457,7 @@ func TestGenerateBatchSQLInSafeMode(t *testing.T) { dmlInsertEvent = helper.DML2Event("test", "t", "insert into t values (7, 'test')") helper.ExecuteDeleteDml("test", "t", "delete from t where id = 7") dmlInsertEvent2 := helper.DML2Event("test", "t", "insert into t values (7, 'test2')") - sql, args = writer.generateBatchSQLInSafeMode([]*commonEvent.DMLEvent{dmlDeleteEvent, dmlInsertEvent, dmlDeleteEvent, dmlInsertEvent2}) + sql, args, rowTypes = writer.generateBatchSQLInSafeMode([]*commonEvent.DMLEvent{dmlDeleteEvent, dmlInsertEvent, dmlDeleteEvent, dmlInsertEvent2}) require.Equal(t, 1, len(sql)) require.Equal(t, 1, len(args)) require.Equal(t, "REPLACE INTO `test`.`t` (`id`,`name`) VALUES (?,?)", sql[0]) @@ -465,7 +470,7 @@ func TestGenerateBatchSQLInSafeMode(t *testing.T) { dmlUpdateEvent, _ = helper.DML2UpdateEvent("test", "t", "insert into t values (8, 'test')", "update t set name = 'test8' where id = 8") helper.ExecuteDeleteDml("test", "t", "delete from t where id = 8") dmlDeleteEvent2 := helper.DML2DeleteEvent("test", "t", "insert into t values (8, 'test8')", "delete from t where id = 8") - sql, args = writer.generateBatchSQLInSafeMode([]*commonEvent.DMLEvent{dmlDeleteEvent, dmlInsertEvent, dmlUpdateEvent, dmlDeleteEvent2}) + sql, args, rowTypes = writer.generateBatchSQLInSafeMode([]*commonEvent.DMLEvent{dmlDeleteEvent, dmlInsertEvent, dmlUpdateEvent, dmlDeleteEvent2}) require.Equal(t, 1, len(sql)) require.Equal(t, 1, len(args)) require.Equal(t, "DELETE FROM `test`.`t` WHERE (`id` = ?)", sql[0]) @@ -480,7 +485,7 @@ func TestGenerateBatchSQLInSafeMode(t *testing.T) { dmlUpdateEvent2, _ = helper.DML2UpdateEvent("test", "t", "insert into t values (9, 'test9')", "update t set name = 'test10' where id = 9") helper.ExecuteDeleteDml("test", "t", "delete from t where id = 9") dmlDeleteEvent2 = helper.DML2DeleteEvent("test", "t", "insert into t values (9, 'test10')", "delete from t where id = 9") - sql, args = writer.generateBatchSQLInSafeMode([]*commonEvent.DMLEvent{dmlDeleteEvent, dmlInsertEvent, dmlUpdateEvent, dmlUpdateEvent2, dmlDeleteEvent2}) + sql, args, rowTypes = writer.generateBatchSQLInSafeMode([]*commonEvent.DMLEvent{dmlDeleteEvent, dmlInsertEvent, dmlUpdateEvent, dmlUpdateEvent2, dmlDeleteEvent2}) require.Equal(t, 1, len(sql)) require.Equal(t, 1, len(args)) require.Equal(t, "DELETE FROM `test`.`t` WHERE (`id` = ?)", sql[0]) @@ -491,7 +496,7 @@ func TestGenerateBatchSQLInSafeMode(t *testing.T) { helper.ExecuteDeleteDml("test", "t", "delete from t where id = 10") dmlDeleteEvent = helper.DML2DeleteEvent("test", "t", "insert into t values (10, 'test')", "delete from t where id = 10") dmlInsertEvent2 = helper.DML2Event("test", "t", "insert into t values (10, 'test2')") - sql, args = writer.generateBatchSQLInSafeMode([]*commonEvent.DMLEvent{dmlInsertEvent, dmlDeleteEvent, dmlInsertEvent2}) + sql, args, rowTypes = writer.generateBatchSQLInSafeMode([]*commonEvent.DMLEvent{dmlInsertEvent, dmlDeleteEvent, dmlInsertEvent2}) require.Equal(t, 1, len(sql)) require.Equal(t, 1, len(args)) require.Equal(t, "REPLACE INTO `test`.`t` (`id`,`name`) VALUES (?,?)", sql[0]) @@ -503,7 +508,7 @@ func TestGenerateBatchSQLInSafeMode(t *testing.T) { dmlUpdateEvent, _ = helper.DML2UpdateEvent("test", "t", "insert into t values (11, 'test')", "update t set name = 'test11' where id = 11") helper.ExecuteDeleteDml("test", "t", "delete from t where id = 11") dmlDeleteEvent = helper.DML2DeleteEvent("test", "t", "insert into t values (11, 'test11')", "delete from t where id = 11") - sql, args = writer.generateBatchSQLInSafeMode([]*commonEvent.DMLEvent{dmlInsertEvent, dmlUpdateEvent, dmlDeleteEvent}) + sql, args, rowTypes = writer.generateBatchSQLInSafeMode([]*commonEvent.DMLEvent{dmlInsertEvent, dmlUpdateEvent, dmlDeleteEvent}) require.Equal(t, 1, len(sql)) require.Equal(t, 1, len(args)) require.Equal(t, "DELETE FROM `test`.`t` WHERE (`id` = ?)", sql[0]) @@ -515,7 +520,7 @@ func TestGenerateBatchSQLInSafeMode(t *testing.T) { dmlUpdateEvent, _ = helper.DML2UpdateEvent("test", "t", "insert into t values (12, 'test')", "update t set name = 'test12' where id = 12") helper.ExecuteDeleteDml("test", "t", "delete from t where id = 12") dmlUpdateEvent2, _ = helper.DML2UpdateEvent("test", "t", "insert into t values (12, 'test12')", "update t set name = 'test13' where id = 12") - sql, args = writer.generateBatchSQLInSafeMode([]*commonEvent.DMLEvent{dmlInsertEvent, dmlUpdateEvent, dmlUpdateEvent2}) + sql, args, rowTypes = writer.generateBatchSQLInSafeMode([]*commonEvent.DMLEvent{dmlInsertEvent, dmlUpdateEvent, dmlUpdateEvent2}) require.Equal(t, 1, len(sql)) require.Equal(t, 1, len(args)) require.Equal(t, "REPLACE INTO `test`.`t` (`id`,`name`) VALUES (?,?)", sql[0]) @@ -526,9 +531,10 @@ func TestGenerateBatchSQLInSafeMode(t *testing.T) { helper.ExecuteDeleteDml("test", "t", "delete from t where id = 13") dmlDeleteEvent = helper.DML2DeleteEvent("test", "t", "insert into t values (14, 'test')", "delete from t where id = 14") dmlUpdateEvent, _ = helper.DML2UpdateEvent("test", "t", "insert into t values (15, 'test')", "update t set name = 'test15' where id = 15") - sql, args = writer.generateBatchSQLInSafeMode([]*commonEvent.DMLEvent{dmlInsertEvent, dmlDeleteEvent, dmlUpdateEvent}) + sql, args, rowTypes = writer.generateBatchSQLInSafeMode([]*commonEvent.DMLEvent{dmlInsertEvent, dmlDeleteEvent, dmlUpdateEvent}) require.Equal(t, 2, len(sql)) require.Equal(t, 2, len(args)) + require.Equal(t, 2, len(rowTypes)) require.Equal(t, "DELETE FROM `test`.`t` WHERE (`id` = ?)", sql[0]) require.Equal(t, []interface{}{int64(14)}, args[0]) require.Equal(t, "REPLACE INTO `test`.`t` (`id`,`name`) VALUES (?,?),(?,?)", sql[1]) @@ -546,7 +552,7 @@ func TestGenerateBatchSQLInSafeMode(t *testing.T) { dmlInsertEvent2 = helper.DML2Event("test", "t", "insert into t values (17, 'test')") helper.ExecuteDeleteDml("test", "t", "delete from t where id = 17") dmlInsertEvent3 := helper.DML2Event("test", "t", "insert into t values (18, 'test')") - sql, args = writer.generateBatchSQLInSafeMode([]*commonEvent.DMLEvent{dmlInsertEvent, dmlInsertEvent2, dmlInsertEvent3}) + sql, args, rowTypes = writer.generateBatchSQLInSafeMode([]*commonEvent.DMLEvent{dmlInsertEvent, dmlInsertEvent2, dmlInsertEvent3}) require.Equal(t, 1, len(sql)) require.Equal(t, 1, len(args)) require.Equal(t, "REPLACE INTO `test`.`t` (`id`,`name`) VALUES (?,?),(?,?),(?,?)", sql[0]) diff --git a/pkg/sink/mysql/sql_builder.go b/pkg/sink/mysql/sql_builder.go index 17606d4e03..74d6ed3c2f 100644 --- a/pkg/sink/mysql/sql_builder.go +++ b/pkg/sink/mysql/sql_builder.go @@ -33,6 +33,7 @@ type tsPair struct { type preparedDMLs struct { sqls []string values [][]interface{} + rowTypes []common.RowType rowCount int approximateSize int64 tsPairs []tsPair @@ -128,6 +129,20 @@ func (d *preparedDMLs) fmtSqls() string { return builder.String() } +func (d *preparedDMLs) RowsAffected() int64 { + var count int64 + for _, rowType := range d.rowTypes { + switch rowType { + case common.RowTypeInsert, common.RowTypeDelete: + count += 1 + case common.RowTypeUpdate: + count += 2 + default: + } + } + return count +} + var dmlsPool = sync.Pool{ New: func() interface{} { return &preparedDMLs{