Skip to content

Commit 06b4f82

Browse files
authored
feat: retry write points and print the retried points (#14)
Signed-off-by: shilinlee <836160610@qq.com>
1 parent e0de62b commit 06b4f82

File tree

9 files changed

+50
-36
lines changed

9 files changed

+50
-36
lines changed

build.sh

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,3 @@
11
#!/bin/bash
22
go mod tidy
3-
go build -o dataMigrate ./src/
3+
go build .

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
module dataMigrate
1+
module github.com/openGemini/dataMigrate
22

33
go 1.16
44

src/main.go renamed to main.go

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -16,23 +16,22 @@ package main
1616

1717
import (
1818
"os"
19-
)
2019

21-
var logger *Logger
20+
"github.com/openGemini/dataMigrate/src"
21+
)
2222

2323
func main() {
24-
logger = NewLogger()
25-
defer logger.close()
26-
logger.LogString("Data migrate tool starting", TOCONSOLE, LEVEL_INFO)
24+
defer src.Logger.Close()
25+
src.Logger.LogString("Data migrate tool starting", src.TOCONSOLE, src.LEVEL_INFO)
2726
if err := Run(os.Args[1:]...); err != nil {
28-
logger.LogError(err)
27+
src.Logger.LogError(err)
2928
os.Exit(1)
3029
}
3130
}
3231

3332
func Run(args ...string) error {
3433
if len(args) > 0 {
35-
cmd := NewDataMigrateCommand()
34+
cmd := src.NewDataMigrateCommand()
3635
if err := cmd.Run(args...); err != nil {
3736
return err
3837
}

src/cursor.go

Lines changed: 18 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ the implementations of the "location" and "KeyCursor" classes in the above file
1212
copyright 2023 Qizhi Huang(flaggyellow@qq.com)
1313
*/
1414

15-
package main
15+
package src
1616

1717
import (
1818
"container/heap"
@@ -348,35 +348,41 @@ func (s *Scanner) writeBatches(c client.Client, cmd Migrator) error {
348348
}
349349

350350
pt, err := s.nextPoint(cmd)
351-
352351
if err != nil {
353352
logger.LogString("point read error: "+err.Error(), TOLOGFILE|TOCONSOLE, LEVEL_ERROR)
354353
return err
355354
}
356355

357356
if pt == nil {
358357
rowsNum := len(bp.Points())
359-
err := c.Write(bp)
360-
if err != nil {
361-
logger.LogString("insert error: "+err.Error(), TOLOGFILE|TOCONSOLE, LEVEL_ERROR)
362-
return err
363-
}
358+
s.retryWrite(c, bp)
364359
cmd.getStat().rowsRead += rowsNum
365360
break
366361
}
367362

368363
bp.AddPoint(pt)
369364
count = count + 1
370365
if count == cmd.getBatchSize() {
371-
err := c.Write(bp)
372-
if err != nil {
373-
logger.LogString("insert error: "+err.Error(), TOLOGFILE|TOCONSOLE, LEVEL_ERROR)
374-
return err
375-
}
366+
s.retryWrite(c, bp)
376367
cmd.getStat().rowsRead += cmd.getBatchSize()
377368
flag = true
378369
count = 0
379370
}
380371
}
381372
return nil
382373
}
374+
375+
func (s *Scanner) retryWrite(c client.Client, bp client.BatchPoints) {
376+
for {
377+
err := c.Write(bp)
378+
if err == nil {
379+
break
380+
}
381+
logger.LogString("insert error: "+err.Error(), TOLOGFILE|TOCONSOLE, LEVEL_ERROR)
382+
points := bp.Points()
383+
if len(points) > 0 {
384+
logger.LogString("retry for points like:"+points[0].String(), TOLOGFILE|TOCONSOLE, LEVEL_ERROR)
385+
}
386+
time.Sleep(3 * time.Second)
387+
}
388+
}

src/dataMigrate.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ Add insert values into openGemini
1111
copyright 2023 Huawei Cloud Computing Technologies Co., Ltd.
1212
*/
1313

14-
package main
14+
package src
1515

1616
import (
1717
"bytes"

src/dataMigrate_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ Remove write data to local file
77
Add insert values into openGemini
88
copyright 2023 Huawei Cloud Computing Technologies Co., Ltd.
99
*/
10-
package main
10+
package src
1111

1212
import (
1313
"fmt"

src/geminiservice.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package main
1+
package src
22

33
import (
44
client "github.com/influxdata/influxdb1-client/v2"

src/log.go

Lines changed: 17 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
copyright 2023 Qizhi Huang(flaggyellow@qq.com)
33
*/
44

5-
package main
5+
package src
66

77
import (
88
"log"
@@ -30,7 +30,7 @@ var LevelPrefixDict map[int]string = map[int]string{
3030
LEVEL_ERROR: "ERROR: ",
3131
}
3232

33-
type Logger struct {
33+
type Log struct {
3434
logDir string
3535
logName string
3636
fileWriter *os.File
@@ -40,12 +40,20 @@ type Logger struct {
4040
debug bool
4141
}
4242

43-
func NewLogger() *Logger {
43+
var Logger *Log
44+
var logger *Log
45+
46+
func init() {
47+
Logger = NewLogger()
48+
logger = Logger
49+
}
50+
51+
func NewLogger() *Log {
4452
var err error
4553
tm := time.Unix(0, time.Now().UnixNano())
4654
timestr := tm.Format("2006-01-02_15-04-05")
4755
filename := "migrate_log_" + timestr + ".log"
48-
l := &Logger{
56+
l := &Log{
4957
logDir: "./logs",
5058
logName: filename,
5159
debug: false,
@@ -69,15 +77,15 @@ func NewLogger() *Logger {
6977
return l
7078
}
7179

72-
func (l *Logger) SetDebug() {
80+
func (l *Log) SetDebug() {
7381
l.debug = true
7482
}
7583

76-
func (l *Logger) IsDebug() bool {
84+
func (l *Log) IsDebug() bool {
7785
return l.debug
7886
}
7987

80-
func (l *Logger) LogString(str string, target int, level int) {
88+
func (l *Log) LogString(str string, target int, level int) {
8189
if level == LEVEL_DEBUG {
8290
if !l.debug {
8391
return
@@ -96,12 +104,12 @@ func (l *Logger) LogString(str string, target int, level int) {
96104
}
97105
}
98106

99-
func (l *Logger) LogError(err error) {
107+
func (l *Log) LogError(err error) {
100108
l.fileLogger.Println("ERROR: ", err.Error())
101109
l.errorLogger.Println("ERROR: ", err)
102110
}
103111

104-
func (l *Logger) close() {
112+
func (l *Log) Close() {
105113
l.fileLogger = nil
106114
l.fileWriter.Close()
107115
l.consoleLogger = nil

src/migrator.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package main
1+
package src
22

33
import (
44
"fmt"
@@ -201,7 +201,8 @@ func (m *migrator) writeCurrentFiles() error {
201201
if measurement, ok = m.mstCache.Get(series); !ok {
202202
measurement, tags, err = splitMeasurementAndTag(series)
203203
if err != nil {
204-
return err
204+
logger.LogString(fmt.Sprintf("split measurement name and tag from %s, err: %s", series, err), TOLOGFILE, LEVEL_ERROR)
205+
continue
205206
}
206207
m.mstCache.Add(series, measurement)
207208
m.tagsCache.Add(series, tags)
@@ -226,7 +227,7 @@ func (m *migrator) writeCurrentFiles() error {
226227
key: key,
227228
seeks: m.locations(key, m.startTime, m.endTime),
228229
}
229-
if err := newCursor.init(); err != nil {
230+
if err = newCursor.init(); err != nil {
230231
return err
231232
}
232233
scanner.fields[f] = newCursor

0 commit comments

Comments
 (0)