diff --git a/cdc/sink/dmlsink/cloudstorage/dml_worker.go b/cdc/sink/dmlsink/cloudstorage/dml_worker.go index e6970cf6c9..31bd3f4ddc 100644 --- a/cdc/sink/dmlsink/cloudstorage/dml_worker.go +++ b/cdc/sink/dmlsink/cloudstorage/dml_worker.go @@ -250,22 +250,21 @@ func (d *dmlWorker) writeDataFile(ctx context.Context, path string, task *single return 0, 0, inErr } - defer func() { - closeErr := writer.Close(ctx) - if inErr != nil { - log.Error("failed to close writer", zap.Error(closeErr), - zap.Int("workerID", d.id), - zap.Any("table", task.tableInfo.TableName), - zap.String("namespace", d.changeFeedID.Namespace), - zap.String("changefeed", d.changeFeedID.ID)) - if inErr == nil { - inErr = closeErr - } - } - }() if _, inErr = writer.Write(ctx, buf.Bytes()); inErr != nil { return 0, 0, inErr } + + // We have to wait the writer to close to complete the upload + // If failed to close writer, some DMLs may not be upload successfully + if inErr = writer.Close(ctx); inErr != nil { + log.Error("failed to close writer", zap.Error(inErr), + zap.Int("workerID", d.id), + zap.Any("table", task.tableInfo.TableName), + zap.String("namespace", d.changeFeedID.Namespace), + zap.String("changefeed", d.changeFeedID.ID)) + return 0, 0, inErr + } + return rowsCnt, bytesCnt, nil }); err != nil { return err