diff --git a/.gitignore b/.gitignore index 43f1e36..72afad8 100644 --- a/.gitignore +++ b/.gitignore @@ -2,8 +2,9 @@ .DS_Store # ide -.idea/** -.vscode/** +**/.idea/ +.idea/ +.vscode/ target/* diff --git a/.goreleaser.yaml b/.goreleaser.yaml index 98d1e73..5327bb7 100644 --- a/.goreleaser.yaml +++ b/.goreleaser.yaml @@ -45,6 +45,32 @@ builds: - -trimpath - -gcflags=-l -B + - main: ./cmd/ts-cli + id: ts-cli-full + binary: ts-cli-full + tags: [full] + env: + - CGO_ENABLED=0 + goos: + - linux + - windows + - darwin + goarch: + - amd64 + - arm64 + ignore: + - goos: windows + goarch: arm64 + ldflags: + - -s -w + - -X github.com/openGemini/openGemini-cli/common.Version={{.Tag}} + - -X github.com/openGemini/openGemini-cli/common.GitCommit={{.ShortCommit}} + - -X github.com/openGemini/openGemini-cli/common.BuildTime={{.Date}} + - -X github.com/openGemini/openGemini-cli/common.GitBranch={{.Branch}} + flags: + - -trimpath + - -gcflags=-l -B + archives: - formats: [ 'zip' ] # this name template makes the OS and Arch compatible with the results of `uname`. diff --git a/cmd/subcmd/export_dispatcher_default.go b/cmd/subcmd/export_dispatcher_default.go new file mode 100644 index 0000000..4bf6be8 --- /dev/null +++ b/cmd/subcmd/export_dispatcher_default.go @@ -0,0 +1,23 @@ +// Copyright 2025 openGemini Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//go:build !full +// +build !full + +package subcmd + +// Run executes the export command (online mode only in default build) +func (c *ExportCommand) Run(config *ExportConfig) error { + return c.runOnlineExport(config) +} diff --git a/cmd/subcmd/export_dispatcher_full.go b/cmd/subcmd/export_dispatcher_full.go new file mode 100644 index 0000000..d0b0b69 --- /dev/null +++ b/cmd/subcmd/export_dispatcher_full.go @@ -0,0 +1,28 @@ +// Copyright 2025 openGemini Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//go:build full +// +build full + +package subcmd + +// Run executes the export command (supports both online and offline modes in full build) +func (c *ExportCommand) Run(config *ExportConfig) error { + useOffline := config.DataDir != "" || config.WalDir != "" + + if useOffline { + return c.runOfflineExport(config) + } + return c.runOnlineExport(config) +} diff --git a/cmd/subcmd/export.go b/cmd/subcmd/export_offline.go similarity index 68% rename from cmd/subcmd/export.go rename to cmd/subcmd/export_offline.go index fac066f..b42772c 100644 --- a/cmd/subcmd/export.go +++ b/cmd/subcmd/export_offline.go @@ -12,33 +12,28 @@ // See the License for the specific language governing permissions and // limitations under the License. +//go:build full +// +build full + package subcmd import ( - "bufio" "bytes" "compress/gzip" - "context" - "crypto/tls" "encoding/binary" - "encoding/json" "flag" "fmt" "io" "io/fs" - "log" - "math" - "net" "os" "path/filepath" - "sort" "strconv" "strings" "time" "github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding" - "github.com/golang/snappy" - "github.com/openGemini/openGemini-cli/core" + "github.com/klauspost/compress/snappy" + influx "github.com/openGemini/openGemini-cli/lib/influxparser" "github.com/openGemini/openGemini/engine" "github.com/openGemini/openGemini/engine/immutable" "github.com/openGemini/openGemini/engine/index/tsi" @@ -49,439 +44,73 @@ import ( "github.com/openGemini/openGemini/lib/index" "github.com/openGemini/openGemini/lib/record" "github.com/openGemini/openGemini/lib/util" - "github.com/openGemini/openGemini/lib/util/lifted/vm/protoparser/influx" "github.com/openGemini/opengemini-client-go/opengemini" "github.com/vbauerster/mpb/v7" "github.com/vbauerster/mpb/v7/decor" ) -const ( - tsspFileExtension = "tssp" - walFileExtension = "wal" - csvFormatExporter = "csv" - txtFormatExporter = "txt" - remoteFormatExporter = "remote" - resumeFilePrefix = "resume_" - dirNameSeparator = "_" -) +// Helper functions for offline mode -var ( - MpbProgress = mpb.New(mpb.WithWidth(100)) - ResumeJsonPath string - ProgressedFilesPath string +const ( + dirNameSeparator = "_" ) -type ExportConfig struct { - *core.CommandLineConfig - Export bool - Format string `json:"format"` - Out string `json:"out"` - DataDir string `json:"data"` - WalDir string `json:"wal"` - Remote string `json:"remote"` - RemoteUsername string `json:"-"` - RemotePassword string `json:"-"` - RemoteSsl bool `json:"remotessl"` - DBFilter string `json:"dbfilter"` - RetentionFilter string `json:"retentionfilter"` - MeasurementFilter string `json:"mstfilter"` - TimeFilter string `json:"timefilter"` - Compress bool `json:"compress"` - Resume bool -} - -type ExportCommand struct { - cfg *ExportConfig - exportCmd *Exporter -} - -func (c *ExportCommand) Run(config *ExportConfig) error { - if err := flag.CommandLine.Parse([]string{"-loggerLevel=ERROR"}); err != nil { - return err - } - c.cfg = config - c.exportCmd = NewExporter() - - return c.process() -} - -func (c *ExportCommand) process() error { - if c.cfg.Resume { - if err := ReadLatestProgressFile(); err != nil { - return err - } - oldConfig, err := getResumeConfig(c.cfg) - if err != nil { - return err - } - progressedFiles, err := getProgressedFiles() - if err != nil { - return err - } - return c.exportCmd.Export(oldConfig, progressedFiles) - } else { - if err := CreateNewProgressFolder(); err != nil { - return err - } - return c.exportCmd.Export(c.cfg, nil) - } -} - -func getResumeConfig(options *ExportConfig) (*ExportConfig, error) { - jsonData, err := os.ReadFile(ResumeJsonPath) - if err != nil { - return nil, err +func parseShardDir(shardDirName string) (uint64, int64, int64, uint64, error) { + shardDir := strings.Split(shardDirName, dirNameSeparator) + if len(shardDir) != 4 { + return 0, 0, 0, 0, errno.NewError(errno.InvalidDataDir) } - var config ExportConfig - err = json.Unmarshal(jsonData, &config) + shardID, err := strconv.ParseUint(shardDir[0], 10, 64) if err != nil { - return nil, err + return 0, 0, 0, 0, errno.NewError(errno.InvalidDataDir) } - config.Resume = true - config.RemoteUsername = options.RemoteUsername - config.RemotePassword = options.RemotePassword - return &config, nil -} - -func getProgressedFiles() (map[string]struct{}, error) { - file, err := os.Open(ProgressedFilesPath) + dirStartTime, err := strconv.ParseInt(shardDir[1], 10, 64) if err != nil { - return nil, err - } - defer file.Close() - - scanner := bufio.NewScanner(file) - lineSet := make(map[string]struct{}) - - for scanner.Scan() { - line := scanner.Text() - lineSet[line] = struct{}{} - } - - if err := scanner.Err(); err != nil { - return nil, err + return 0, 0, 0, 0, errno.NewError(errno.InvalidDataDir) } - return lineSet, nil -} - -// CreateNewProgressFolder init ResumeJsonPath and ProgressedFilesPath -func CreateNewProgressFolder() error { - home, err := os.UserHomeDir() + dirEndTime, err := strconv.ParseInt(shardDir[2], 10, 64) if err != nil { - return err + return 0, 0, 0, 0, errno.NewError(errno.InvalidDataDir) } - targetPath := filepath.Join(home, ".ts-cli", time.Now().Format("2006-01-02_15-04-05.000000000")) - err = os.MkdirAll(targetPath, os.ModePerm) + indexID, err := strconv.ParseUint(shardDir[3], 10, 64) if err != nil { - return err + return 0, 0, 0, 0, errno.NewError(errno.InvalidDataDir) } - // create progress.json - progressJson := filepath.Join(targetPath, "progress.json") - ResumeJsonPath = progressJson - // create progressedFiles - progressedFiles := filepath.Join(targetPath, "progressedFiles") - ProgressedFilesPath = progressedFiles - return nil + return shardID, dirStartTime, dirEndTime, indexID, nil } -// ReadLatestProgressFile reads and processes the latest folder -func ReadLatestProgressFile() error { - home, err := os.UserHomeDir() - if err != nil { - return err - } - baseDir := filepath.Join(home, ".ts-cli") - var dirs []string - err = filepath.Walk(baseDir, func(path string, info os.FileInfo, err error) error { - if err != nil { - return err - } - if !info.IsDir() || path == baseDir { - return nil - } - dirs = append(dirs, path) - return nil - }) - if err != nil { - return err +func parseIndexDir(indexDirName string) (uint64, error) { + indexDir := strings.Split(indexDirName, dirNameSeparator) + if len(indexDir) != 3 { + return 0, errno.NewError(errno.InvalidDataDir) } - sort.Strings(dirs) - latestDir := dirs[len(dirs)-1] - // read progress.json - ResumeJsonPath = filepath.Join(latestDir, "progress.json") - // read progressedFiles - ProgressedFilesPath = filepath.Join(latestDir, "progressedFiles") - return nil -} - -type dataFilter struct { - database string - retention string - measurement string - startTime int64 - endTime int64 -} -func newDataFilter() *dataFilter { - return &dataFilter{ - database: "", - measurement: "", - startTime: math.MinInt64, - endTime: math.MaxInt64, + indexID, err := strconv.ParseUint(indexDir[0], 10, 64) + if err != nil { + return 0, errno.NewError(errno.InvalidDataDir) } + return indexID, nil } -func (d *dataFilter) parseTime(clc *ExportConfig) error { - var start, end string - timeSlot := strings.Split(clc.TimeFilter, "~") - if len(timeSlot) == 2 { - start = timeSlot[0] - end = timeSlot[1] - } else if clc.TimeFilter != "" { - return fmt.Errorf("invalid time filter %q", clc.TimeFilter) - } - - if start != "" { - st, err := convertTime(start) - if err != nil { - return err - } - d.startTime = st - } - - if end != "" { - ed, err := convertTime(end) - if err != nil { - return err +func getFieldNameIndexFromRecord(slice []record.Field, str string) (int, bool) { + for i, v := range slice { + if v.Name == str { + return i, true } - d.endTime = ed - } - - if d.startTime > d.endTime { - return fmt.Errorf("start time `%q` > end time `%q`", start, end) - } - - return nil -} - -func (d *dataFilter) parseDatabase(dbFilter string) { - if dbFilter == "" { - return - } - d.database = dbFilter -} - -func (d *dataFilter) parseRetention(retentionFilter string) { - if retentionFilter == "" { - return - } - d.retention = retentionFilter -} - -func (d *dataFilter) parseMeasurement(mstFilter string) error { - if mstFilter == "" { - return nil - } - if mstFilter != "" && d.database == "" { - return fmt.Errorf("measurement filter %q requires database filter", mstFilter) - } - d.measurement = mstFilter - return nil -} - -// timeFilter [startTime, endTime] -func (d *dataFilter) timeFilter(t int64) bool { - return t >= d.startTime && t <= d.endTime -} - -func (d *dataFilter) isBelowMinTimeFilter(t int64) bool { - return t < d.startTime -} - -func (d *dataFilter) isAboveMaxTimeFilter(t int64) bool { - return t > d.endTime -} - -type DatabaseDiskInfo struct { - dbName string // ie. "NOAA_water_database" - rps map[string]struct{} // ie. ["0:autogen","1:every_one_day"] - dataDir string // ie. "/tmp/openGemini/data/data/NOAA_water_database" - walDir string // ie. "/tmp/openGemini/data/wal/NOAA_water_database" - rpToTsspDirMap map[string]string // ie. {"0:autogen", "/tmp/openGemini/data/data/NOAA_water_database/0/autogen"} - rpToWalDirMap map[string]string // ie. {"0:autogen", "/tmp/openGemini/data/wal/NOAA_water_database/0/autogen"} - rpToIndexDirMap map[string]string // ie. {"0:autogen", "/tmp/openGemini/data/data/NOAA_water_database/0/autogen/index"} -} - -func newDatabaseDiskInfo() *DatabaseDiskInfo { - return &DatabaseDiskInfo{ - rps: make(map[string]struct{}), - rpToTsspDirMap: make(map[string]string), - rpToWalDirMap: make(map[string]string), - rpToIndexDirMap: make(map[string]string), } + return 0, false } -func (d *DatabaseDiskInfo) init(actualDataDir string, actualWalDir string, databaseName string, retentionPolicy string) error { - d.dbName = databaseName - - // check whether the database is in actualDataPath - dataDir := filepath.Join(actualDataDir, databaseName) - if _, err := os.Stat(dataDir); err != nil { - return err - } - // check whether the database is in actualWalPath - walDir := filepath.Join(actualWalDir, databaseName) - if _, err := os.Stat(walDir); err != nil { - return err - } - - // ie. /tmp/openGemini/data/data/my_db /tmp/openGemini/data/wal/my_db - d.dataDir, d.walDir = dataDir, walDir - - ptDirs, err := os.ReadDir(d.dataDir) - if err != nil { - return err - } - for _, ptDir := range ptDirs { - // ie. /tmp/openGemini/data/data/my_db/0 - ptTsspPath := filepath.Join(d.dataDir, ptDir.Name()) - // ie. /tmp/openGemini/data/wal/my_db/0 - ptWalPath := filepath.Join(d.walDir, ptDir.Name()) - - if retentionPolicy != "" { - ptWithRp := ptDir.Name() + ":" + retentionPolicy - // ie. /tmp/openGemini/data/data/my_db/0/autogen - rpTsspPath := filepath.Join(ptTsspPath, retentionPolicy) - if _, err := os.Stat(rpTsspPath); err != nil { - return fmt.Errorf("retention policy %q invalid : %s", retentionPolicy, err) - } else { - d.rps[ptWithRp] = struct{}{} - d.rpToTsspDirMap[ptWithRp] = rpTsspPath - d.rpToIndexDirMap[ptWithRp] = filepath.Join(rpTsspPath, "index") - } - // ie. /tmp/openGemini/data/wal/my_db/0/autogen - rpWalPath := filepath.Join(ptWalPath, retentionPolicy) - if _, err := os.Stat(rpWalPath); err != nil { - return fmt.Errorf("retention policy %q invalid : %s", retentionPolicy, err) - } else { - d.rpToWalDirMap[ptWithRp] = rpWalPath - } - continue - } - - rpTsspDirs, err1 := os.ReadDir(ptTsspPath) - if err1 != nil { - return err1 - } - for _, rpDir := range rpTsspDirs { - if !rpDir.IsDir() { - continue - } - ptWithRp := ptDir.Name() + ":" + rpDir.Name() - rpPath := filepath.Join(ptTsspPath, rpDir.Name()) - d.rps[ptWithRp] = struct{}{} - d.rpToTsspDirMap[ptWithRp] = rpPath - d.rpToIndexDirMap[ptWithRp] = filepath.Join(rpPath, "index") - } - - rpWalDirs, err2 := os.ReadDir(ptWalPath) - if err2 != nil { - return err2 - } - for _, rpDir := range rpWalDirs { - ptWithRp := ptDir.Name() + ":" + rpDir.Name() - if !rpDir.IsDir() { - continue - } - rpPath := filepath.Join(ptWalPath, rpDir.Name()) - d.rpToWalDirMap[ptWithRp] = rpPath +func getFieldNameIndexFromRowOffline(slice []influx.Field, str string) (int, bool) { + for i, v := range slice { + if v.Key == str { + return i, true } } - return nil -} - -type Exporter struct { - exportFormat string - databaseDiskInfos []*DatabaseDiskInfo - filesTotalCount int - actualDataPath string - actualWalPath string - outPutPath string - filter *dataFilter - compress bool - lineCount uint64 - resume bool - progress map[string]struct{} - remote string - remoteExporter *remoteExporter - parser - - stderrLogger *log.Logger - stdoutLogger *log.Logger - defaultLogger *log.Logger - - manifest map[string]struct{} // {dbName:rpName, struct{}{}} - rpNameToMeasurementTsspFilesMap map[string]map[string][]string // {dbName:rpName, {measurementName, tssp file absolute path}} - rpNameToIdToIndexMap map[string]map[uint64]*tsi.MergeSetIndex // {dbName:rpName, {indexId, *mergeSetIndex}} - rpNameToWalFilesMap map[string][]string // {dbName:rpName:shardDurationRange, wal file absolute path} - - Stderr io.Writer - Stdout io.Writer - bar *mpb.Bar -} - -func NewExporter() *Exporter { - return &Exporter{ - resume: false, - progress: make(map[string]struct{}), - - stderrLogger: log.New(os.Stderr, "export: ", log.LstdFlags), - stdoutLogger: log.New(os.Stdout, "export: ", log.LstdFlags), - - manifest: make(map[string]struct{}), - rpNameToMeasurementTsspFilesMap: make(map[string]map[string][]string), - rpNameToIdToIndexMap: make(map[string]map[uint64]*tsi.MergeSetIndex), - rpNameToWalFilesMap: make(map[string][]string), - remoteExporter: newRemoteExporter(), - - Stdout: os.Stdout, - Stderr: os.Stderr, - } -} - -// parseActualDir transforms user puts in datadir and waldir to actual dirs -func (e *Exporter) parseActualDir(clc *ExportConfig) error { - actualDataDir := filepath.Join(clc.DataDir, config.DataDirectory) - if _, err := os.Stat(actualDataDir); err != nil { - return err - } else { - e.actualDataPath = actualDataDir - } - - actualWalDir := filepath.Join(clc.WalDir, config.WalDirectory) - if _, err := os.Stat(actualWalDir); err != nil { - return err - } else { - e.actualWalPath = actualWalDir - } - - return nil -} - -// parseDatabaseInfos get all path infos for export. -func (e *Exporter) parseDatabaseInfos() error { - dbName := e.filter.database - - dbDiskInfo := newDatabaseDiskInfo() - err := dbDiskInfo.init(e.actualDataPath, e.actualWalPath, dbName, e.filter.retention) - if err != nil { - return fmt.Errorf("can't find database files for %s : %s", dbName, err) - } - e.databaseDiskInfos = append(e.databaseDiskInfos, dbDiskInfo) - return nil + return 0, false } -// Init inits the Exporter instance ues CommandLineConfig specific by user +// Init inits the Exporter instance for offline mode func (e *Exporter) Init(clc *ExportConfig, progressedFiles map[string]struct{}) error { if clc.Format == "" { return fmt.Errorf("export flag format is required") @@ -489,6 +118,9 @@ func (e *Exporter) Init(clc *ExportConfig, progressedFiles map[string]struct{}) if clc.DataDir == "" { return fmt.Errorf("export flag data is required") } + if clc.WalDir == "" { + return fmt.Errorf("export flag wal is required") + } if clc.DBFilter == "" { return fmt.Errorf("export flag dbfilter is required") } @@ -505,9 +137,9 @@ func (e *Exporter) Init(clc *ExportConfig, progressedFiles map[string]struct{}) } e.exportFormat = clc.Format if e.exportFormat == txtFormatExporter || e.exportFormat == remoteFormatExporter { - e.parser = newTxtParser() + e.parser = newOfflineTxtParser() } else if e.exportFormat == csvFormatExporter { - e.parser = newCsvParser() + e.parser = newOfflineCsvParser() } e.outPutPath = clc.Out e.compress = clc.Compress @@ -545,7 +177,7 @@ func (e *Exporter) Init(clc *ExportConfig, progressedFiles map[string]struct{}) return nil } -// Export exports all data user want. +// Export exports all data user want in offline mode. func (e *Exporter) Export(clc *ExportConfig, progressedFiles map[string]struct{}) error { err := e.Init(clc, progressedFiles) if err != nil { @@ -564,111 +196,103 @@ func (e *Exporter) Export(clc *ExportConfig, progressedFiles map[string]struct{} return e.write() } -// walkDatabase gets all db's tssp filepath, wal filepath, and index filepath. -func (e *Exporter) walkDatabase(dbDiskInfo *DatabaseDiskInfo) error { - if err := e.walkTsspFile(dbDiskInfo); err != nil { +// Run executes the export command in offline mode +func (c *ExportCommand) runOfflineExport(config *ExportConfig) error { + if err := flag.CommandLine.Parse([]string{"-loggerLevel=ERROR"}); err != nil { return err } - if err := e.walkIndexFiles(dbDiskInfo); err != nil { + c.cfg = config + c.exportCmd = NewExporter() + + return c.processOffline() +} + +// process handles the export process in offline mode +func (c *ExportCommand) processOffline() error { + useOffline := (c.cfg.DataDir != "" || c.cfg.WalDir != "") + + if c.cfg.Resume { + if err := ReadLatestProgressFile(); err != nil { + return err + } + oldConfig, err := getResumeConfig(c.cfg) + if err != nil { + return err + } + progressedFiles, err := getProgressedFiles() + if err != nil { + return err + } + + isOnlineResume := false + for path := range progressedFiles { + if strings.HasPrefix(path, "online://") { + isOnlineResume = true + break + } + } + + if isOnlineResume { + return fmt.Errorf("cannot resume online export in offline mode. Please use online mode to resume") + } else { + return c.exportCmd.Export(oldConfig, progressedFiles) + } + } else { + if err := CreateNewProgressFolder(); err != nil { + return err + } + + if useOffline { + return c.exportCmd.Export(c.cfg, nil) + } else { + return fmt.Errorf("offline mode requires --data and --wal flags") + } + } +} + +// parseActualDir transforms user puts in datadir and waldir to actual dirs +func (e *Exporter) parseActualDir(clc *ExportConfig) error { + actualDataDir := filepath.Join(clc.DataDir, config.DataDirectory) + if _, err := os.Stat(actualDataDir); err != nil { return err + } else { + e.actualDataPath = actualDataDir } - if err := e.walkWalFile(dbDiskInfo); err != nil { + + actualWalDir := filepath.Join(clc.WalDir, config.WalDirectory) + if _, err := os.Stat(actualWalDir); err != nil { return err + } else { + e.actualWalPath = actualWalDir } + return nil } -func (e *Exporter) newBar() (*mpb.Bar, error) { - for _, measurementToTsspFileMap := range e.rpNameToMeasurementTsspFilesMap { - for _, tsspFiles := range measurementToTsspFileMap { - e.filesTotalCount += len(tsspFiles) - } - } - for _, walFiles := range e.rpNameToWalFilesMap { - e.filesTotalCount += len(walFiles) - } - if e.filesTotalCount == 0 { - return nil, fmt.Errorf("no files to export.check your filter or datapath") - } - bar := MpbProgress.New(int64(e.filesTotalCount), - mpb.BarStyle().Lbound("[").Filler("=").Tip(">").Padding("-").Rbound("]"), - mpb.PrependDecorators( - decor.Name("Exporting Data:", decor.WC{W: 20, C: decor.DidentRight}), - decor.CountersNoUnit("%d/%d", decor.WC{W: 15, C: decor.DidentRight}), - decor.OnComplete( - decor.AverageETA(decor.ET_STYLE_GO, decor.WC{W: 6}), - "complete", - ), - ), - mpb.AppendDecorators( - decor.Percentage(), - ), - ) - return bar, nil -} - -// write writes data to output fd user specifics. -func (e *Exporter) write() error { - var outputWriter, metaWriter io.Writer - var err error - if e.remoteExporter.isExist { - outputWriter = io.Discard - } else { - err = os.MkdirAll(filepath.Dir(e.outPutPath), 0755) - if err != nil { - return err - } - var outputFile *os.File // open file descriptor. - if e.resume { - exportDir := filepath.Dir(e.outPutPath) - exportFilePath := filepath.Join(exportDir, resumeFilePrefix+time.Now().Format("2006-01-02_15-04-05.000000000")+filepath.Ext(e.outPutPath)) - outputFile, err = os.OpenFile(exportFilePath, os.O_CREATE|os.O_TRUNC|os.O_WRONLY, 0644) - if err != nil { - return err - } - } else { - outputFile, err = os.OpenFile(e.outPutPath, os.O_CREATE|os.O_TRUNC|os.O_WRONLY, 0644) - if err != nil { - return err - } - } - defer outputFile.Close() - - outputWriter = outputFile - } - - if e.compress { - if e.remoteExporter.isExist { - return fmt.Errorf("remote format can't compress") - } - gzipWriter := gzip.NewWriter(outputWriter) - defer gzipWriter.Close() - outputWriter = gzipWriter - } +// parseDatabaseInfos get all path infos for export. +func (e *Exporter) parseDatabaseInfos() error { + dbName := e.filter.database - // metaWriter to write information that are not line-protocols - if e.remoteExporter.isExist { - metaWriter = io.Discard - } else { - metaWriter = outputWriter + dbDiskInfo := newDatabaseDiskInfo() + err := dbDiskInfo.init(e.actualDataPath, e.actualWalPath, dbName, e.filter.retention) + if err != nil { + return fmt.Errorf("can't find database files for %s : %s", dbName, err) } - - return e.writeFull(metaWriter, outputWriter) + e.databaseDiskInfos = append(e.databaseDiskInfos, dbDiskInfo) + return nil } -// writeFull writes all DDL and DML -func (e *Exporter) writeFull(metaWriter io.Writer, outputWriter io.Writer) error { - start, end := time.Unix(0, e.filter.startTime).UTC().Format(time.RFC3339), time.Unix(0, e.filter.endTime).UTC().Format(time.RFC3339) - e.parser.writeMetaInfo(metaWriter, 0, fmt.Sprintf("# openGemini EXPORT: %s - %s", start, end)) - e.defaultLogger.Printf("Exporting data total %d files\n", e.filesTotalCount) - if err := e.writeDDL(metaWriter, outputWriter); err != nil { +// walkDatabase gets all db's tssp filepath, wal filepath, and index filepath. +func (e *Exporter) walkDatabase(dbDiskInfo *DatabaseDiskInfo) error { + if err := e.walkTsspFile(dbDiskInfo); err != nil { return err } - - if err := e.writeDML(metaWriter, outputWriter); err != nil { + if err := e.walkIndexFiles(dbDiskInfo); err != nil { + return err + } + if err := e.walkWalFile(dbDiskInfo); err != nil { return err } - e.defaultLogger.Printf("Summarize %d line protocol\n", e.lineCount) return nil } @@ -752,165 +376,15 @@ func (e *Exporter) walkIndexFiles(dbDiskInfo *DatabaseDiskInfo) error { opt := &tsi.Options{} opt.Path(filepath.Join(indexPath, file.Name())).IndexType(index.MergeSet).Lock(&lockPath) if _, ok := e.rpNameToIdToIndexMap[key]; !ok { // db:rp - e.rpNameToIdToIndexMap[key] = make(map[uint64]*tsi.MergeSetIndex) + e.rpNameToIdToIndexMap[key] = make(map[uint64]interface{}) } e.manifest[key] = struct{}{} - if e.rpNameToIdToIndexMap[key][indexId], err = tsi.NewMergeSetIndex(opt); err != nil { - return err - } - } - } - return nil -} - -// writeDDL write every "database:retention policy" DDL -func (e *Exporter) writeDDL(metaWriter io.Writer, outputWriter io.Writer) error { - e.parser.writeMetaInfo(metaWriter, 0, "# DDL") - for _, dbDiskInfo := range e.databaseDiskInfos { - avoidRepetition := map[string]struct{}{} - databaseName := dbDiskInfo.dbName - e.parser.writeOutputInfo(outputWriter, fmt.Sprintf("CREATE DATABASE %s\n", databaseName)) - if e.remoteExporter.isExist { - // write DDL to remote - if err := e.remoteExporter.createDatabase(databaseName); err != nil { - return err - } - } - for ptWithRp := range dbDiskInfo.rps { - rpName := strings.Split(ptWithRp, ":")[1] - if _, ok := avoidRepetition[rpName]; !ok { - if e.remoteExporter.isExist { - // write DDL to remote - if err := e.remoteExporter.createRetentionPolicy(databaseName, rpName); err != nil { - return err - } - } - e.parser.writeOutputInfo(outputWriter, fmt.Sprintf("CREATE RETENTION POLICY %s ON %s DURATION 0s REPLICATION 1\n", rpName, databaseName)) - avoidRepetition[rpName] = struct{}{} - } - } - e.parser.writeMetaInfo(metaWriter, 0, "") - } - return nil -} - -// writeDML write every "database:retention policy" DML -func (e *Exporter) writeDML(metaWriter io.Writer, outputWriter io.Writer) error { - e.parser.writeMetaInfo(metaWriter, 0, "# DML") - var curDatabaseName string - // write DML for every item which key = "database:retention policy" - for key := range e.manifest { - keySplits := strings.Split(key, ":") - - if keySplits[0] != curDatabaseName { - e.parser.writeMetaInfo(metaWriter, InfoTypeDatabase, keySplits[0]) - curDatabaseName = keySplits[0] - } - e.remoteExporter.database = curDatabaseName - - // shardKeyToIndexMap stores all indexes for this "database:retention policy" - shardKeyToIndexMap, ok := e.rpNameToIdToIndexMap[key] - if !ok { - return fmt.Errorf("cant find rpNameToIdToIndexMap for %q", key) - } - e.remoteExporter.retentionPolicy = keySplits[1] - - e.parser.writeMetaInfo(metaWriter, InfoTypeRetentionPolicy, keySplits[1]) - // Write all tssp files from this "database:retention policy" - if measurementToTsspFileMap, ok := e.rpNameToMeasurementTsspFilesMap[key]; ok { - if err := e.writeAllTsspFilesInRp(metaWriter, outputWriter, measurementToTsspFileMap, shardKeyToIndexMap); err != nil { - return err - } - } - // Write all wal files from this "database:retention policy" - if files, ok := e.rpNameToWalFilesMap[key]; ok { - if err := e.writeAllWalFilesInRp(metaWriter, outputWriter, files, curDatabaseName); err != nil { - return err - } - } - } - MpbProgress.Wait() - return nil -} - -// writeProgressJson writes progress to json file -func (e *Exporter) writeProgressJson(clc *ExportConfig) error { - output, err := json.MarshalIndent(clc, "", "\t") - if err != nil { - return err - } - err = os.WriteFile(ResumeJsonPath, output, 0644) - if err != nil { - return err - } - return nil -} - -// writeProgressedFiles writes progressed file name -func (e *Exporter) writeProgressedFiles(filename string) error { - file, err := os.OpenFile(ProgressedFilesPath, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644) - if err != nil { - return err - } - defer file.Close() - - _, err = file.WriteString(filename + "\n") - if err != nil { - return err - } - return nil -} - -// writeAllTsspFilesInRp writes all tssp files in a "database:retention policy" -func (e *Exporter) writeAllTsspFilesInRp(metaWriter io.Writer, outputWriter io.Writer, measurementFilesMap map[string][]string, indexesMap map[uint64]*tsi.MergeSetIndex) error { - e.parser.writeMetaInfo(metaWriter, 0, "# FROM TSSP FILE") - var isOrder bool - hasWrittenMstInfo := make(map[string]bool) - for measurementName, files := range measurementFilesMap { - e.parser.writeMetaInfo(metaWriter, InfoTypeMeasurement, measurementName) - hasWrittenMstInfo[measurementName] = false - for _, file := range files { - if _, ok := e.progress[file]; ok { - e.bar.Increment() - continue - } - splits := strings.Split(file, string(os.PathSeparator)) - var shardDir string - if strings.Contains(file, "out-of-order") { - isOrder = false - // ie./tmp/openGemini/data/data/db1/0/autogen/1_1567382400000000000_1567987200000000000_1/tssp/average_temperature_0000/out-of-order/00000002-0000-00000000.tssp - shardDir = splits[len(splits)-5] - } else { - isOrder = true - // ie./tmp/openGemini/data/data/db1/0/autogen/1_1567382400000000000_1567987200000000000_1/tssp/average_temperature_0000/00000002-0000-00000000.tssp - shardDir = splits[len(splits)-4] - } - _, dirStartTime, dirEndTime, indexId, err := parseShardDir(shardDir) + index, err := tsi.NewMergeSetIndex(opt) if err != nil { return err } - if err = indexesMap[indexId].Open(); err != nil { - return err - } - if !hasWrittenMstInfo[measurementName] { - if err := e.parser.writeMstInfoFromTssp(metaWriter, outputWriter, file, isOrder, indexesMap[indexId]); err != nil { - return err - } - hasWrittenMstInfo[measurementName] = true - } - if e.filter.isBelowMinTimeFilter(dirEndTime) || e.filter.isAboveMaxTimeFilter(dirStartTime) { - e.bar.Increment() - continue - } - if err := e.writeSingleTsspFile(file, outputWriter, indexesMap[indexId], isOrder); err != nil { - return err - } - if err = indexesMap[indexId].Close(); err != nil { - return err - } - e.bar.Increment() + e.rpNameToIdToIndexMap[key][indexId] = index } - fmt.Fprintf(outputWriter, "\n") } return nil } @@ -1049,27 +523,6 @@ func (e *Exporter) writeSingleRecord(outputWriter io.Writer, seriesKey [][]byte, return buf, nil } -// writeAllWalFilesInRp writes all wal files in a "database:retention policy" -func (e *Exporter) writeAllWalFilesInRp(metaWriter io.Writer, outputWriter io.Writer, files []string, currentDatabase string) error { - e.parser.writeMetaInfo(metaWriter, 0, "# FROM WAL FILE") - var currentMeasurement string - for _, file := range files { - if _, ok := e.progress[file]; ok { - e.bar.Increment() - continue - } - if err := e.writeSingleWalFile(file, metaWriter, outputWriter, currentDatabase, ¤tMeasurement); err != nil { - return err - } - e.bar.Increment() - if err := e.writeProgressedFiles(file); err != nil { - return err - } - } - fmt.Fprintf(outputWriter, "\n") - return nil -} - // writeSingleWalFile writes a single wal file's all rows. func (e *Exporter) writeSingleWalFile(file string, metaWriter io.Writer, outputWriter io.Writer, currentDatabase string, currentMeasurement *string) error { lockPath := fileops.FileLockOption("") @@ -1185,64 +638,301 @@ func (e *Exporter) writeRows(rows []influx.Row, metaWriter io.Writer, outputWrit return nil } -// writeSingleRow parse a single row to lint protocol, and writes it. -func (e *Exporter) writeSingleRow(row influx.Row, metaWriter io.Writer, outputWriter io.Writer, buf []byte, - point *opengemini.Point, currentDatabase string, mstName *string) ([]byte, error) { - measurementWithVersion := row.Name - measurementName := influx.GetOriginMstName(measurementWithVersion) - measurementName = EscapeMstName(measurementName) - tm := row.Timestamp - // filter measurement - if len(e.filter.measurement) != 0 && e.filter.measurement != measurementName { - return buf, nil - } - if !e.filter.timeFilter(tm) { - return buf, nil - } - - if measurementName != *mstName { - e.parser.writeMetaInfo(metaWriter, InfoTypeMeasurement, measurementName) - if err := e.parser.writeMstInfoFromWal(metaWriter, outputWriter, row, currentDatabase); err != nil { - return buf, err +// writeSingleRow parse a single row to lint protocol, and writes it. +func (e *Exporter) writeSingleRow(row influx.Row, metaWriter io.Writer, outputWriter io.Writer, buf []byte, + point *opengemini.Point, currentDatabase string, mstName *string) ([]byte, error) { + measurementWithVersion := row.Name + measurementName := influx.GetOriginMstName(measurementWithVersion) + measurementName = EscapeMstName(measurementName) + tm := row.Timestamp + // filter measurement + if len(e.filter.measurement) != 0 && e.filter.measurement != measurementName { + return buf, nil + } + if !e.filter.timeFilter(tm) { + return buf, nil + } + + if measurementName != *mstName { + e.parser.writeMetaInfo(metaWriter, InfoTypeMeasurement, measurementName) + if err := e.parser.writeMstInfoFromWal(metaWriter, outputWriter, row, currentDatabase); err != nil { + return buf, err + } + *mstName = measurementName + } + buf, err := e.parser.getRowBuf(buf, measurementName, row, point) + if err != nil { + return nil, err + } + if e.remoteExporter.isExist { + e.remoteExporter.points = append(e.remoteExporter.points, point) + } else { + if _, err := outputWriter.Write(buf); err != nil { + return buf, err + } + } + e.lineCount++ + buf = buf[:0] + return buf, nil +} + +func (e *Exporter) newBar() (*mpb.Bar, error) { + for _, measurementToTsspFileMap := range e.rpNameToMeasurementTsspFilesMap { + for _, tsspFiles := range measurementToTsspFileMap { + e.filesTotalCount += len(tsspFiles) + } + } + for _, walFiles := range e.rpNameToWalFilesMap { + e.filesTotalCount += len(walFiles) + } + if e.filesTotalCount == 0 { + return nil, fmt.Errorf("no files to export.check your filter or datapath") + } + bar := MpbProgress.New(int64(e.filesTotalCount), + mpb.BarStyle().Lbound("[").Filler("=").Tip(">").Padding("-").Rbound("]"), + mpb.PrependDecorators( + decor.Name("Exporting Data:", decor.WC{W: 20, C: decor.DidentRight}), + decor.CountersNoUnit("%d/%d", decor.WC{W: 15, C: decor.DidentRight}), + decor.OnComplete( + decor.AverageETA(decor.ET_STYLE_GO, decor.WC{W: 6}), + "complete", + ), + ), + mpb.AppendDecorators( + decor.Percentage(), + ), + ) + return bar, nil +} + +// write writes data to output fd user specifics. +func (e *Exporter) write() error { + var outputWriter, metaWriter io.Writer + var err error + if e.remoteExporter.isExist { + outputWriter = io.Discard + } else { + err = os.MkdirAll(filepath.Dir(e.outPutPath), 0755) + if err != nil { + return err + } + var outputFile *os.File // open file descriptor. + if e.resume { + exportDir := filepath.Dir(e.outPutPath) + exportFilePath := filepath.Join(exportDir, resumeFilePrefix+time.Now().Format("2006-01-02_15-04-05.000000000")+filepath.Ext(e.outPutPath)) + outputFile, err = os.OpenFile(exportFilePath, os.O_CREATE|os.O_TRUNC|os.O_WRONLY, 0644) + if err != nil { + return err + } + } else { + outputFile, err = os.OpenFile(e.outPutPath, os.O_CREATE|os.O_TRUNC|os.O_WRONLY, 0644) + if err != nil { + return err + } + } + defer outputFile.Close() + + outputWriter = outputFile + } + + if e.compress { + if e.remoteExporter.isExist { + return fmt.Errorf("remote format can't compress") + } + gzipWriter := gzip.NewWriter(outputWriter) + defer gzipWriter.Close() + outputWriter = gzipWriter + } + + // metaWriter to write information that are not line-protocols + if e.remoteExporter.isExist { + metaWriter = io.Discard + } else { + metaWriter = outputWriter + } + + return e.writeFull(metaWriter, outputWriter) +} + +// writeFull writes all DDL and DML +func (e *Exporter) writeFull(metaWriter io.Writer, outputWriter io.Writer) error { + start, end := time.Unix(0, e.filter.startTime).UTC().Format(time.RFC3339), time.Unix(0, e.filter.endTime).UTC().Format(time.RFC3339) + e.parser.writeMetaInfo(metaWriter, 0, fmt.Sprintf("# openGemini EXPORT: %s - %s", start, end)) + e.defaultLogger.Printf("Exporting data total %d files\n", e.filesTotalCount) + if err := e.writeDDL(metaWriter, outputWriter); err != nil { + return err + } + + if err := e.writeDML(metaWriter, outputWriter); err != nil { + return err + } + e.defaultLogger.Printf("Summarize %d line protocol\n", e.lineCount) + return nil +} + +// writeDDL write every "database:retention policy" DDL +func (e *Exporter) writeDDL(metaWriter io.Writer, outputWriter io.Writer) error { + e.parser.writeMetaInfo(metaWriter, 0, "# DDL") + for _, dbDiskInfo := range e.databaseDiskInfos { + avoidRepetition := map[string]struct{}{} + databaseName := dbDiskInfo.dbName + e.parser.writeOutputInfo(outputWriter, fmt.Sprintf("CREATE DATABASE %s\n", databaseName)) + if e.remoteExporter.isExist { + // write DDL to remote + if err := e.remoteExporter.createDatabase(databaseName); err != nil { + return err + } + } + for ptWithRp := range dbDiskInfo.rps { + rpName := strings.Split(ptWithRp, ":")[1] + if _, ok := avoidRepetition[rpName]; !ok { + if e.remoteExporter.isExist { + // write DDL to remote + if err := e.remoteExporter.createRetentionPolicy(databaseName, rpName); err != nil { + return err + } + } + e.parser.writeOutputInfo(outputWriter, fmt.Sprintf("CREATE RETENTION POLICY %s ON %s DURATION 0s REPLICATION 1\n", rpName, databaseName)) + avoidRepetition[rpName] = struct{}{} + } + } + e.parser.writeMetaInfo(metaWriter, 0, "") + } + return nil +} + +// writeDML write every "database:retention policy" DML +func (e *Exporter) writeDML(metaWriter io.Writer, outputWriter io.Writer) error { + e.parser.writeMetaInfo(metaWriter, 0, "# DML") + var curDatabaseName string + // write DML for every item which key = "database:retention policy" + for key := range e.manifest { + keySplits := strings.Split(key, ":") + + if keySplits[0] != curDatabaseName { + e.parser.writeMetaInfo(metaWriter, InfoTypeDatabase, keySplits[0]) + curDatabaseName = keySplits[0] + } + e.remoteExporter.database = curDatabaseName + + // shardKeyToIndexMap stores all indexes for this "database:retention policy" + shardKeyToIndexMapInterface, ok := e.rpNameToIdToIndexMap[key] + if !ok { + return fmt.Errorf("cant find rpNameToIdToIndexMap for %q", key) + } + e.remoteExporter.retentionPolicy = keySplits[1] + + // Convert interface{} map to *tsi.MergeSetIndex map + shardKeyToIndexMap := make(map[uint64]*tsi.MergeSetIndex) + for k, v := range shardKeyToIndexMapInterface { + if idx, ok := v.(*tsi.MergeSetIndex); ok { + shardKeyToIndexMap[k] = idx + } + } + + e.parser.writeMetaInfo(metaWriter, InfoTypeRetentionPolicy, keySplits[1]) + // Write all tssp files from this "database:retention policy" + if measurementToTsspFileMap, ok := e.rpNameToMeasurementTsspFilesMap[key]; ok { + if err := e.writeAllTsspFilesInRp(metaWriter, outputWriter, measurementToTsspFileMap, shardKeyToIndexMap); err != nil { + return err + } + } + // Write all wal files from this "database:retention policy" + if files, ok := e.rpNameToWalFilesMap[key]; ok { + if err := e.writeAllWalFilesInRp(metaWriter, outputWriter, files, curDatabaseName); err != nil { + return err + } + } + } + MpbProgress.Wait() + return nil +} + +// writeAllTsspFilesInRp writes all tssp files in a "database:retention policy" +func (e *Exporter) writeAllTsspFilesInRp(metaWriter io.Writer, outputWriter io.Writer, measurementFilesMap map[string][]string, indexesMap map[uint64]*tsi.MergeSetIndex) error { + e.parser.writeMetaInfo(metaWriter, 0, "# FROM TSSP FILE") + var isOrder bool + hasWrittenMstInfo := make(map[string]bool) + for measurementName, files := range measurementFilesMap { + e.parser.writeMetaInfo(metaWriter, InfoTypeMeasurement, measurementName) + hasWrittenMstInfo[measurementName] = false + for _, file := range files { + if _, ok := e.progress[file]; ok { + e.bar.Increment() + continue + } + splits := strings.Split(file, string(os.PathSeparator)) + var shardDir string + if strings.Contains(file, "out-of-order") { + isOrder = false + // ie./tmp/openGemini/data/data/db1/0/autogen/1_1567382400000000000_1567987200000000000_1/tssp/average_temperature_0000/out-of-order/00000002-0000-00000000.tssp + shardDir = splits[len(splits)-5] + } else { + isOrder = true + // ie./tmp/openGemini/data/data/db1/0/autogen/1_1567382400000000000_1567987200000000000_1/tssp/average_temperature_0000/00000002-0000-00000000.tssp + shardDir = splits[len(splits)-4] + } + _, dirStartTime, dirEndTime, indexId, err := parseShardDir(shardDir) + if err != nil { + return err + } + if err = indexesMap[indexId].Open(); err != nil { + return err + } + if !hasWrittenMstInfo[measurementName] { + if err := e.parser.writeMstInfoFromTssp(metaWriter, outputWriter, file, isOrder, indexesMap[indexId]); err != nil { + return err + } + hasWrittenMstInfo[measurementName] = true + } + if e.filter.isBelowMinTimeFilter(dirEndTime) || e.filter.isAboveMaxTimeFilter(dirStartTime) { + e.bar.Increment() + continue + } + if err := e.writeSingleTsspFile(file, outputWriter, indexesMap[indexId], isOrder); err != nil { + return err + } + if err = indexesMap[indexId].Close(); err != nil { + return err + } + e.bar.Increment() + } + fmt.Fprintf(outputWriter, "\n") + } + return nil +} + +// writeAllWalFilesInRp writes all wal files in a "database:retention policy" +func (e *Exporter) writeAllWalFilesInRp(metaWriter io.Writer, outputWriter io.Writer, files []string, currentDatabase string) error { + e.parser.writeMetaInfo(metaWriter, 0, "# FROM WAL FILE") + var currentMeasurement string + for _, file := range files { + if _, ok := e.progress[file]; ok { + e.bar.Increment() + continue } - *mstName = measurementName - } - buf, err := e.parser.getRowBuf(buf, measurementName, row, point) - if err != nil { - return nil, err - } - if e.remoteExporter.isExist { - e.remoteExporter.points = append(e.remoteExporter.points, point) - } else { - if _, err := outputWriter.Write(buf); err != nil { - return buf, err + if err := e.writeSingleWalFile(file, metaWriter, outputWriter, currentDatabase, ¤tMeasurement); err != nil { + return err + } + e.bar.Increment() + if err := e.writeProgressedFiles(file); err != nil { + return err } } - e.lineCount++ - buf = buf[:0] - return buf, nil -} - -type parser interface { - parse2SeriesKeyWithoutVersion(key []byte, dst []byte, splitWithNull bool, point *opengemini.Point) ([]byte, error) - appendFields(rec record.Record, buf []byte, point *opengemini.Point) ([]byte, error) - writeMstInfoFromTssp(metaWriter io.Writer, outputWriter io.Writer, filePath string, isOrder bool, index *tsi.MergeSetIndex) error - writeMstInfoFromWal(metaWriter io.Writer, outputWriter io.Writer, row influx.Row, curDatabase string) error - writeMetaInfo(metaWriter io.Writer, infoType InfoType, info string) - writeOutputInfo(outputWriter io.Writer, info string) - getRowBuf(buf []byte, measurementName string, row influx.Row, point *opengemini.Point) ([]byte, error) + fmt.Fprintf(outputWriter, "\n") + return nil } -type txtParser struct{} +type offlineTxtParser struct{} -func newTxtParser() *txtParser { - return &txtParser{} +func newOfflineTxtParser() *offlineTxtParser { + return &offlineTxtParser{} } // parse2SeriesKeyWithoutVersion parse encoded index key to line protocol series key,without version and escape special characters // encoded index key format: [total len][ms len][ms][tagkey1 len][tagkey1 val]...] // parse to line protocol format: mst,tagkey1=tagval1,tagkey2=tagval2... -func (t *txtParser) parse2SeriesKeyWithoutVersion(key []byte, dst []byte, splitWithNull bool, point *opengemini.Point) ([]byte, error) { +func (t *offlineTxtParser) parse2SeriesKeyWithoutVersion(key []byte, dst []byte, splitWithNull bool, point *opengemini.Point) ([]byte, error) { msName, src, err := influx.MeasurementName(key) originMstName := influx.GetOriginMstName(string(msName)) originMstName = EscapeMstName(originMstName) @@ -1281,7 +971,11 @@ func (t *txtParser) parse2SeriesKeyWithoutVersion(key []byte, dst []byte, splitW return dst[:len(dst)-1], nil } -func (t *txtParser) appendFields(rec record.Record, buf []byte, point *opengemini.Point) ([]byte, error) { +func (t *offlineTxtParser) appendFields(recInterface interface{}, buf []byte, point *opengemini.Point) ([]byte, error) { + rec, ok := recInterface.(record.Record) + if !ok { + return nil, fmt.Errorf("invalid record type for offline mode") + } buf = append(buf, ' ') for i, field := range rec.Schema { if field.Name == "time" { @@ -1318,23 +1012,99 @@ func (t *txtParser) appendFields(rec record.Record, buf []byte, point *opengemin } buf = strconv.AppendInt(buf, rec.Times()[0], 10) buf = append(buf, '\n') - point.Timestamp = rec.Times()[0] // point.Time = time.Unix(0, rec.Times()[0]) + point.Timestamp = rec.Times()[0] return buf, nil } -func (t *txtParser) writeMstInfoFromTssp(_ io.Writer, _ io.Writer, _ string, _ bool, _ *tsi.MergeSetIndex) error { +func (t *offlineTxtParser) writeMstInfoFromTssp(metaWriter io.Writer, outputWriter io.Writer, filePath string, isOrder bool, index interface{}) error { + idx, ok := index.(*tsi.MergeSetIndex) + if !ok { + return fmt.Errorf("invalid index type") + } + lockPath := "" + tsspFile, err := immutable.OpenTSSPFile(filePath, &lockPath, isOrder) + defer util.MustClose(tsspFile) + if err != nil { + return err + } + // search tags + fiTag := immutable.NewFileIterator(tsspFile, immutable.CLog) + itrTag := immutable.NewChunkIterator(fiTag) + itrTag.Next() + sid := itrTag.GetSeriesID() + if sid == 0 { + return fmt.Errorf("series ID is zero") + } + var combineKey []byte + var seriesKeys [][]byte + var isExpectSeries []bool + // Use sid get series key's []byte + if seriesKeys, _, _, err = idx.SearchSeriesWithTagArray(sid, seriesKeys, nil, combineKey, isExpectSeries, nil); err != nil { + return err + } + _, src, err := influx.MeasurementName(seriesKeys[0]) + if err != nil { + return err + } + tagsN := encoding.UnmarshalUint16(src) + src = src[2:] + var i uint16 + var tags, fields, tagsType, fieldsType []string + for i = 0; i < tagsN; i++ { + keyLen := encoding.UnmarshalUint16(src) + src = src[2:] + tagKey := EscapeTagKey(string(src[:keyLen])) + tags = append(tags, tagKey) + src = src[keyLen:] + + valLen := encoding.UnmarshalUint16(src) + src = src[2:] + src = src[valLen:] + } + for i := 0; i < len(tags); i++ { + tagsType = append(tagsType, "tag") + } + // search fields + fiField := immutable.NewFileIterator(tsspFile, immutable.CLog) + itrField := immutable.NewChunkIterator(fiField) + itrField.NextChunkMeta() + for _, colMeta := range fiField.GetCurtChunkMeta().GetColMeta() { + fields = append(fields, colMeta.Name()) + if colMeta.Name() == "time" { + fieldsType = append(fieldsType, "dateTime:timeStamp") + } else { + fieldsType = append(fieldsType, influx.FieldTypeString(int32(colMeta.Type()))) + } + } + // write datatype + fmt.Fprintf(metaWriter, "#datatype %s,%s\n", strings.Join(tagsType, ","), strings.Join(fieldsType, ",")) + // write tags and fields name + buf := influx.GetBytesBuffer() + defer influx.PutBytesBuffer(buf) + buf = append(buf, strings.Join(tags, ",")...) + buf = append(buf, ',') + buf = append(buf, strings.Join(fields, ",")...) + buf = append(buf, '\n') + _, err = outputWriter.Write(buf) + if err != nil { + return err + } return nil } -func (t *txtParser) writeMstInfoFromWal(_ io.Writer, _ io.Writer, _ influx.Row, _ string) error { +func (t *offlineTxtParser) writeMstInfoFromWal(_ io.Writer, _ io.Writer, _ interface{}, _ string) error { return nil } -func (t *txtParser) getRowBuf(buf []byte, measurementName string, row influx.Row, point *opengemini.Point) ([]byte, error) { +func (t *offlineTxtParser) getRowBuf(buf []byte, measurementName string, row interface{}, point *opengemini.Point) ([]byte, error) { + rowData, ok := row.(influx.Row) + if !ok { + return nil, fmt.Errorf("invalid row type") + } point.Measurement = measurementName - tags := row.Tags - fields := row.Fields - tm := row.Timestamp + tags := rowData.Tags + fields := rowData.Fields + tm := rowData.Timestamp buf = append(buf, measurementName...) buf = append(buf, ',') @@ -1378,19 +1148,11 @@ func (t *txtParser) getRowBuf(buf []byte, measurementName string, row influx.Row } buf = strconv.AppendInt(buf, tm, 10) buf = append(buf, '\n') - point.Timestamp = tm // point.Time = time.Unix(0, tm) + point.Timestamp = tm return buf, nil } -type InfoType int - -const ( - InfoTypeDatabase InfoType = 1 + iota - InfoTypeRetentionPolicy - InfoTypeMeasurement -) - -func (t *txtParser) writeMetaInfo(metaWriter io.Writer, infoType InfoType, info string) { +func (t *offlineTxtParser) writeMetaInfo(metaWriter io.Writer, infoType InfoType, info string) { switch infoType { case InfoTypeDatabase: fmt.Fprintf(metaWriter, "# CONTEXT-DATABASE: %s\n", info) @@ -1403,18 +1165,18 @@ func (t *txtParser) writeMetaInfo(metaWriter io.Writer, infoType InfoType, info } } -func (t *txtParser) writeOutputInfo(outputWriter io.Writer, info string) { +func (t *offlineTxtParser) writeOutputInfo(outputWriter io.Writer, info string) { fmt.Fprint(outputWriter, info) } -type csvParser struct { +type offlineCsvParser struct { fieldsName map[string]map[string][]string // database -> measurement -> []field curDatabase string curMeasurement string } -func newCsvParser() *csvParser { - return &csvParser{ +func newOfflineCsvParser() *offlineCsvParser { + return &offlineCsvParser{ fieldsName: make(map[string]map[string][]string), } } @@ -1422,7 +1184,7 @@ func newCsvParser() *csvParser { // parse2SeriesKeyWithoutVersion parse encoded index key to csv series key,without version and escape special characters // encoded index key format: [total len][ms len][ms][tagkey1 len][tagkey1 val]...] // parse to csv format: mst,tagval1,tagval2... -func (c *csvParser) parse2SeriesKeyWithoutVersion(key []byte, dst []byte, splitWithNull bool, _ *opengemini.Point) ([]byte, error) { +func (c *offlineCsvParser) parse2SeriesKeyWithoutVersion(key []byte, dst []byte, splitWithNull bool, _ *opengemini.Point) ([]byte, error) { _, src, err := influx.MeasurementName(key) if err != nil { return []byte{}, err @@ -1450,10 +1212,13 @@ func (c *csvParser) parse2SeriesKeyWithoutVersion(key []byte, dst []byte, splitW src = src[valLen:] } return dst, nil - } -func (c *csvParser) appendFields(rec record.Record, buf []byte, _ *opengemini.Point) ([]byte, error) { +func (c *offlineCsvParser) appendFields(recInterface interface{}, buf []byte, _ *opengemini.Point) ([]byte, error) { + rec, ok := recInterface.(record.Record) + if !ok { + return nil, fmt.Errorf("invalid record type for offline mode") + } curFieldsName := c.fieldsName[c.curDatabase][c.curMeasurement] for _, fieldName := range curFieldsName { if fieldName == "time" { @@ -1490,7 +1255,11 @@ func (c *csvParser) appendFields(rec record.Record, buf []byte, _ *opengemini.Po return buf, nil } -func (c *csvParser) writeMstInfoFromTssp(metaWriter io.Writer, outputWriter io.Writer, filePath string, isOrder bool, index *tsi.MergeSetIndex) error { +func (c *offlineCsvParser) writeMstInfoFromTssp(metaWriter io.Writer, outputWriter io.Writer, filePath string, isOrder bool, index interface{}) error { + idx, ok := index.(*tsi.MergeSetIndex) + if !ok { + return fmt.Errorf("invalid index type") + } tsspPathSplits := strings.Split(filePath, string(byte(os.PathSeparator))) measurementDirWithVersion := tsspPathSplits[len(tsspPathSplits)-2] measurementName := influx.GetOriginMstName(measurementDirWithVersion) @@ -1513,7 +1282,7 @@ func (c *csvParser) writeMstInfoFromTssp(metaWriter io.Writer, outputWriter io.W var seriesKeys [][]byte var isExpectSeries []bool // Use sid get series key's []byte - if seriesKeys, _, _, err = index.SearchSeriesWithTagArray(sid, seriesKeys, nil, combineKey, isExpectSeries, nil); err != nil { + if seriesKeys, _, _, err = idx.SearchSeriesWithTagArray(sid, seriesKeys, nil, combineKey, isExpectSeries, nil); err != nil { return err } _, src, err := influx.MeasurementName(seriesKeys[0]) @@ -1570,9 +1339,13 @@ func (c *csvParser) writeMstInfoFromTssp(metaWriter io.Writer, outputWriter io.W return nil } -func (c *csvParser) writeMstInfoFromWal(metaWriter io.Writer, outputWriter io.Writer, row influx.Row, currentDatabase string) error { - tagsN := row.Tags - fieldsN := row.Fields +func (c *offlineCsvParser) writeMstInfoFromWal(metaWriter io.Writer, outputWriter io.Writer, row interface{}, currentDatabase string) error { + rowData, ok := row.(influx.Row) + if !ok { + return fmt.Errorf("invalid row type") + } + tagsN := rowData.Tags + fieldsN := rowData.Fields var tags, fields, tagsType, fieldsType []string for _, tag := range tagsN { tags = append(tags, tag.Key) @@ -1583,7 +1356,7 @@ func (c *csvParser) writeMstInfoFromWal(metaWriter io.Writer, outputWriter io.Wr fieldsType = append(fieldsType, influx.FieldTypeString(field.Type)) } fieldsType = append(fieldsType, "dateTime:timeStamp") - measurementWithVersion := row.Name + measurementWithVersion := rowData.Name measurementName := influx.GetOriginMstName(measurementWithVersion) measurementName = EscapeMstName(measurementName) c.fieldsName[currentDatabase] = make(map[string][]string) @@ -1608,10 +1381,14 @@ func (c *csvParser) writeMstInfoFromWal(metaWriter io.Writer, outputWriter io.Wr return nil } -func (c *csvParser) getRowBuf(buf []byte, measurementName string, row influx.Row, _ *opengemini.Point) ([]byte, error) { - tags := row.Tags - fields := row.Fields - tm := row.Timestamp +func (c *offlineCsvParser) getRowBuf(buf []byte, measurementName string, row interface{}, _ *opengemini.Point) ([]byte, error) { + rowData, ok := row.(influx.Row) + if !ok { + return nil, fmt.Errorf("invalid row type") + } + tags := rowData.Tags + fields := rowData.Fields + tm := rowData.Timestamp for _, tag := range tags { buf = append(buf, EscapeTagValue(tag.Value)...) @@ -1649,7 +1426,7 @@ func (c *csvParser) getRowBuf(buf []byte, measurementName string, row influx.Row return buf, nil } -func (c *csvParser) writeMetaInfo(metaWriter io.Writer, infoType InfoType, info string) { +func (c *offlineCsvParser) writeMetaInfo(metaWriter io.Writer, infoType InfoType, info string) { switch infoType { case InfoTypeDatabase: fmt.Fprintf(metaWriter, "#constant database,%s\n", info) @@ -1662,213 +1439,5 @@ func (c *csvParser) writeMetaInfo(metaWriter io.Writer, infoType InfoType, info } } -func (c *csvParser) writeOutputInfo(_ io.Writer, _ string) { -} - -type remoteExporter struct { - isExist bool - client opengemini.Client - database string - retentionPolicy string - points []*opengemini.Point -} - -func newRemoteExporter() *remoteExporter { - return &remoteExporter{ - isExist: false, - } -} - -func (re *remoteExporter) Init(clc *ExportConfig) error { - if len(clc.Remote) == 0 { - return fmt.Errorf("execute -export cmd, using remote format, --remote is required") - } - h, p, err := net.SplitHostPort(clc.Remote) - if err != nil { - return err - } - port, err := strconv.Atoi(p) - if err != nil { - return fmt.Errorf("invalid port number :%s", err) - } - var authConfig *opengemini.AuthConfig - if clc.RemoteUsername != "" { - authConfig = &opengemini.AuthConfig{ - AuthType: 0, - Username: clc.RemoteUsername, - Password: clc.RemotePassword, - } - } else { - authConfig = nil - } - var remoteConfig *opengemini.Config - if clc.RemoteSsl { - remoteConfig = &opengemini.Config{ - Addresses: []opengemini.Address{ - { - Host: h, - Port: port, - }, - }, - AuthConfig: authConfig, - TlsConfig: &tls.Config{ - InsecureSkipVerify: true, - }, - } - } else { - remoteConfig = &opengemini.Config{ - Addresses: []opengemini.Address{ - { - Host: h, - Port: port, - }, - }, - AuthConfig: authConfig, - } - } - - cli, err := opengemini.NewClient(remoteConfig) - if err != nil { - return err - } - re.isExist = true - re.client = cli - if err = re.client.Ping(0); err != nil { - return err - } - return nil -} - -func (re *remoteExporter) createDatabase(dbName string) error { - err := re.client.CreateDatabase(dbName) - if err != nil { - return fmt.Errorf("error writing command: %s", err) - } - return nil -} - -func (re *remoteExporter) createRetentionPolicy(dbName string, rpName string) error { - err := re.client.CreateRetentionPolicy(dbName, opengemini.RpConfig{ - Name: rpName, - Duration: "0s", - }, false) - if err != nil { - return fmt.Errorf("error writing command: %s", err) - } - return nil -} - -func (re *remoteExporter) writeAllPoints() error { - err := re.client.WriteBatchPointsWithRp(context.Background(), re.database, re.retentionPolicy, re.points) - if err != nil { - return err - } - re.points = re.points[:0] - return nil -} - -func parseShardDir(shardDirName string) (uint64, int64, int64, uint64, error) { - shardDir := strings.Split(shardDirName, dirNameSeparator) - if len(shardDir) != 4 { - return 0, 0, 0, 0, errno.NewError(errno.InvalidDataDir) - } - shardID, err := strconv.ParseUint(shardDir[0], 10, 64) - if err != nil { - return 0, 0, 0, 0, errno.NewError(errno.InvalidDataDir) - } - dirStartTime, err := strconv.ParseInt(shardDir[1], 10, 64) - if err != nil { - return 0, 0, 0, 0, errno.NewError(errno.InvalidDataDir) - } - dirEndTime, err := strconv.ParseInt(shardDir[2], 10, 64) - if err != nil { - return 0, 0, 0, 0, errno.NewError(errno.InvalidDataDir) - } - indexID, err := strconv.ParseUint(shardDir[3], 10, 64) - if err != nil { - return 0, 0, 0, 0, errno.NewError(errno.InvalidDataDir) - } - return shardID, dirStartTime, dirEndTime, indexID, nil -} - -func parseIndexDir(indexDirName string) (uint64, error) { - indexDir := strings.Split(indexDirName, dirNameSeparator) - if len(indexDir) != 3 { - return 0, errno.NewError(errno.InvalidDataDir) - } - - indexID, err := strconv.ParseUint(indexDir[0], 10, 64) - if err != nil { - return 0, errno.NewError(errno.InvalidDataDir) - } - return indexID, nil -} - -var escapeFieldKeyReplacer = strings.NewReplacer(`,`, `\,`, `=`, `\=`, ` `, `\ `) -var escapeTagKeyReplacer = strings.NewReplacer(`,`, `\,`, `=`, `\=`, ` `, `\ `) -var escapeTagValueReplacer = strings.NewReplacer(`,`, `\,`, `=`, `\=`, ` `, `\ `) -var escapeMstNameReplacer = strings.NewReplacer(`=`, `\=`, ` `, `\ `) -var escapeStringFieldReplacer = strings.NewReplacer(`"`, `\"`, `\`, `\\`) - -// EscapeFieldKey returns a copy of in with any comma or equal sign or space -// with escaped values. -func EscapeFieldKey(in string) string { - return escapeFieldKeyReplacer.Replace(in) -} - -// EscapeStringFieldValue returns a copy of in with any double quotes or -// backslashes with escaped values. -func EscapeStringFieldValue(in string) string { - return escapeStringFieldReplacer.Replace(in) -} - -// EscapeTagKey returns a copy of in with any "comma" or "equal sign" or "space" -// with escaped values. -func EscapeTagKey(in string) string { - return escapeTagKeyReplacer.Replace(in) -} - -// EscapeTagValue returns a copy of in with any "comma" or "equal sign" or "space" -// with escaped values -func EscapeTagValue(in string) string { - return escapeTagValueReplacer.Replace(in) -} - -// EscapeMstName returns a copy of in with any "equal sign" or "space" -// with escaped values. -func EscapeMstName(in string) string { - return escapeMstNameReplacer.Replace(in) -} - -// getFieldNameIndexFromRecord returns the index of a field in a slice -func getFieldNameIndexFromRecord(slice []record.Field, str string) (int, bool) { - for i, v := range slice { - if v.Name == str { - return i, true - } - } - return 0, false -} - -func getFieldNameIndexFromRow(slice []influx.Field, str string) (int, bool) { - for i, v := range slice { - if v.Key == str { - return i, true - } - } - return 0, false -} - -func convertTime(input string) (int64, error) { - t, err := time.Parse(time.RFC3339, input) - if err == nil { - return t.UnixNano(), nil - } - - timestamp, err := strconv.ParseInt(input, 10, 64) - if err == nil { - return timestamp, nil - } - - return 0, err +func (c *offlineCsvParser) writeOutputInfo(_ io.Writer, _ string) { } diff --git a/cmd/subcmd/export_online.go b/cmd/subcmd/export_online.go new file mode 100644 index 0000000..f00b361 --- /dev/null +++ b/cmd/subcmd/export_online.go @@ -0,0 +1,930 @@ +// Copyright 2025 openGemini Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package subcmd + +import ( + "compress/gzip" + "context" + "fmt" + "io" + "net" + "os" + "path/filepath" + "strconv" + "strings" + "time" + + "github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding" + "github.com/openGemini/openGemini-cli/core" + influx "github.com/openGemini/openGemini-cli/lib/influxparser" + "github.com/openGemini/opengemini-client-go/opengemini" + "github.com/vbauerster/mpb/v7" + "github.com/vbauerster/mpb/v7/decor" +) + +// Helper functions for online mode + +func getFieldNameIndexFromRow(slice []influx.Field, str string) (int, bool) { + for i, v := range slice { + if v.Key == str { + return i, true + } + } + return 0, false +} + +// runOnlineExport executes the export command in online mode +func (c *ExportCommand) runOnlineExport(config *ExportConfig) error { + c.cfg = config + c.exportCmd = NewExporter() + + return c.process() +} + +// process handles the export process in online mode +func (c *ExportCommand) process() error { + useOffline := c.cfg.DataDir != "" || c.cfg.WalDir != "" + + if c.cfg.Resume { + if err := ReadLatestProgressFile(); err != nil { + return err + } + oldConfig, err := getResumeConfig(c.cfg) + if err != nil { + return err + } + progressedFiles, err := getProgressedFiles() + if err != nil { + return err + } + + isOnlineResume := false + for path := range progressedFiles { + if strings.HasPrefix(path, "online://") { + isOnlineResume = true + break + } + } + + if isOnlineResume && !useOffline { + return c.runOnlineModeWithResume(oldConfig, progressedFiles) + } else { + return fmt.Errorf("offline mode is not available in this build. Please compile with -tags offline to enable offline export") + } + } else { + if err := CreateNewProgressFolder(); err != nil { + return err + } + + if useOffline { + return fmt.Errorf("offline mode is not available in this build. Please compile with -tags offline to enable offline export") + } else { + return c.runOnlineMode(c.cfg) + } + } +} + +func (c *ExportCommand) runOnlineMode(config *ExportConfig) error { + maxRetries := 3 + retryDelay := 5 * time.Second + + for i := 0; i < maxRetries; i++ { + err := c.runOnlineModeInternal(config) + if err == nil { + return nil + } + + if isConnectionError(err) { + if i < maxRetries-1 { + c.exportCmd.defaultLogger.Printf("Connection failed, retrying in %v... (attempt %d/%d)", + retryDelay, i+1, maxRetries) + time.Sleep(retryDelay) + continue + } + + if config.DataDir != "" || config.WalDir != "" { + return fmt.Errorf("online mode failed after %d retries. Offline mode is not available in this build. Please compile with -tags offline to enable offline export: %w", maxRetries, err) + } + + return fmt.Errorf("online mode failed after %d retries: %w", maxRetries, err) + } + + return err + } + + return fmt.Errorf("failed after %d retries", maxRetries) +} + +func (c *ExportCommand) runOnlineModeInternal(config *ExportConfig) error { + if config.DBFilter == "" { + return fmt.Errorf("export flag dbfilter is required") + } + if config.Format != remoteFormatExporter && config.Out == "" { + return fmt.Errorf("export flag out is required") + } + + onlineExporter := NewOnlineExporter(c.exportCmd) + ctx := context.Background() + return onlineExporter.Export(ctx, config, nil) +} + +func (c *ExportCommand) runOnlineModeWithResume(config *ExportConfig, progressedFiles map[string]struct{}) error { + if config.DBFilter == "" { + return fmt.Errorf("export flag dbfilter is required") + } + if config.Format != remoteFormatExporter && config.Out == "" { + return fmt.Errorf("export flag out is required") + } + + onlineExporter := NewOnlineExporter(c.exportCmd) + ctx := context.Background() + return onlineExporter.Export(ctx, config, progressedFiles) +} + +func isConnectionError(err error) bool { + if err == nil { + return false + } + + errStr := err.Error() + if strings.Contains(errStr, "connection") || + strings.Contains(errStr, "timeout") || + strings.Contains(errStr, "network") || + strings.Contains(errStr, "dial") { + return true + } + + if _, ok := err.(net.Error); ok { + return true + } + + return false +} + +// Parser implementations for online mode + +type txtParser struct{} + +func newTxtParser() *txtParser { + return &txtParser{} +} + +// parse2SeriesKeyWithoutVersion parse encoded index key to line protocol series key,without version and escape special characters +// encoded index key format: [total len][ms len][ms][tagkey1 len][tagkey1 val]...] +// parse to line protocol format: mst,tagkey1=tagval1,tagkey2=tagval2... +func (t *txtParser) parse2SeriesKeyWithoutVersion(key []byte, dst []byte, splitWithNull bool, point *opengemini.Point) ([]byte, error) { + msName, src, err := influx.MeasurementName(key) + originMstName := influx.GetOriginMstName(string(msName)) + originMstName = EscapeMstName(originMstName) + if err != nil { + return []byte{}, err + } + var split [2]byte + if splitWithNull { + split[0], split[1] = influx.ByteSplit, influx.ByteSplit + } else { + split[0], split[1] = '=', ',' + } + point.Measurement = originMstName + dst = append(dst, originMstName...) + dst = append(dst, ',') + tagsN := encoding.UnmarshalUint16(src) + src = src[2:] + var i uint16 + for i = 0; i < tagsN; i++ { + keyLen := encoding.UnmarshalUint16(src) + src = src[2:] + tagKey := EscapeTagKey(string(src[:keyLen])) + dst = append(dst, tagKey...) + dst = append(dst, split[0]) + src = src[keyLen:] + + valLen := encoding.UnmarshalUint16(src) + src = src[2:] + tagVal := EscapeTagValue(string(src[:valLen])) + dst = append(dst, tagVal...) + dst = append(dst, split[1]) + src = src[valLen:] + + point.AddTag(tagKey, tagVal) + } + return dst[:len(dst)-1], nil +} + +func (t *txtParser) appendFields(rec interface{}, buf []byte, point *opengemini.Point) ([]byte, error) { + // This method is not used in online mode + return nil, fmt.Errorf("appendFields not implemented for online mode") +} + +func (t *txtParser) writeMstInfoFromTssp(_ io.Writer, _ io.Writer, _ string, _ bool, _ interface{}) error { + // This function is only used in offline mode + return nil +} + +func (t *txtParser) writeMstInfoFromWal(_ io.Writer, _ io.Writer, _ interface{}, _ string) error { + return nil +} + +func (t *txtParser) getRowBuf(buf []byte, measurementName string, row interface{}, point *opengemini.Point) ([]byte, error) { + rowData, ok := row.(influx.Row) + if !ok { + return nil, fmt.Errorf("invalid row type") + } + point.Measurement = measurementName + tags := rowData.Tags + fields := rowData.Fields + tm := rowData.Timestamp + + buf = append(buf, measurementName...) + buf = append(buf, ',') + for i, tag := range tags { + buf = append(buf, EscapeTagKey(tag.Key)+"="...) + buf = append(buf, EscapeTagValue(tag.Value)...) + if i != len(tags)-1 { + buf = append(buf, ',') + } else { + buf = append(buf, ' ') + } + point.AddTag(EscapeTagKey(tag.Key), EscapeTagValue(tag.Value)) + } + for i, field := range fields { + buf = append(buf, EscapeFieldKey(field.Key)+"="...) + switch field.Type { + case influx.Field_Type_Float: + buf = strconv.AppendFloat(buf, field.NumValue, 'g', -1, 64) + point.AddField(EscapeFieldKey(field.Key), strconv.FormatFloat(field.NumValue, 'g', -1, 64)) + case influx.Field_Type_Int: + buf = strconv.AppendInt(buf, int64(field.NumValue), 10) + point.AddField(EscapeFieldKey(field.Key), strconv.FormatInt(int64(field.NumValue), 10)) + case influx.Field_Type_Boolean: + buf = strconv.AppendBool(buf, field.NumValue == 1) + point.AddField(EscapeFieldKey(field.Key), strconv.FormatBool(field.NumValue == 1)) + case influx.Field_Type_String: + buf = append(buf, '"') + buf = append(buf, EscapeStringFieldValue(field.StrValue)...) + buf = append(buf, '"') + point.AddField(EscapeFieldKey(field.Key), field.StrValue) + default: + // This shouldn't be possible, but we'll format it anyway. + buf = append(buf, fmt.Sprintf("%v", field)...) + point.AddField(EscapeFieldKey(field.Key), fmt.Sprintf("%v", field)) + } + if i != len(fields)-1 { + buf = append(buf, ',') + } else { + buf = append(buf, ' ') + } + } + buf = strconv.AppendInt(buf, tm, 10) + buf = append(buf, '\n') + point.Timestamp = tm + return buf, nil +} + +func (t *txtParser) writeMetaInfo(metaWriter io.Writer, infoType InfoType, info string) { + switch infoType { + case InfoTypeDatabase: + fmt.Fprintf(metaWriter, "# CONTEXT-DATABASE: %s\n", info) + case InfoTypeRetentionPolicy: + fmt.Fprintf(metaWriter, "# CONTEXT-RETENTION-POLICY: %s\n", info) + case InfoTypeMeasurement: + fmt.Fprintf(metaWriter, "# CONTEXT-MEASUREMENT: %s\n", info) + default: + fmt.Fprintf(metaWriter, "%s\n", info) + } +} + +func (t *txtParser) writeOutputInfo(outputWriter io.Writer, info string) { + fmt.Fprint(outputWriter, info) +} + +type csvParser struct { + fieldsName map[string]map[string][]string // database -> measurement -> []field + curDatabase string + curMeasurement string +} + +func newCsvParser() *csvParser { + return &csvParser{ + fieldsName: make(map[string]map[string][]string), + } +} + +// parse2SeriesKeyWithoutVersion parse encoded index key to csv series key,without version and escape special characters +// encoded index key format: [total len][ms len][ms][tagkey1 len][tagkey1 val]...] +// parse to csv format: mst,tagval1,tagval2... +func (c *csvParser) parse2SeriesKeyWithoutVersion(key []byte, dst []byte, splitWithNull bool, _ *opengemini.Point) ([]byte, error) { + _, src, err := influx.MeasurementName(key) + if err != nil { + return []byte{}, err + } + var split [2]byte + if splitWithNull { + split[0], split[1] = influx.ByteSplit, influx.ByteSplit + } else { + split[0], split[1] = '=', ',' + } + + tagsN := encoding.UnmarshalUint16(src) + src = src[2:] + var i uint16 + for i = 0; i < tagsN; i++ { + keyLen := encoding.UnmarshalUint16(src) + src = src[2:] + src = src[keyLen:] + + valLen := encoding.UnmarshalUint16(src) + src = src[2:] + tagVal := EscapeTagValue(string(src[:valLen])) + dst = append(dst, tagVal...) + dst = append(dst, split[1]) + src = src[valLen:] + } + return dst, nil +} + +func (c *csvParser) appendFields(rec interface{}, buf []byte, _ *opengemini.Point) ([]byte, error) { + // This method is not used in online mode + return nil, fmt.Errorf("appendFields not implemented for online mode") +} + +func (c *csvParser) writeMstInfoFromTssp(_ io.Writer, _ io.Writer, _ string, _ bool, _ interface{}) error { + // This function is only used in offline mode + return nil +} + +func (c *csvParser) writeMstInfoFromWal(metaWriter io.Writer, outputWriter io.Writer, row interface{}, currentDatabase string) error { + rowData, ok := row.(influx.Row) + if !ok { + return fmt.Errorf("invalid row type") + } + tagsN := rowData.Tags + fieldsN := rowData.Fields + var tags, fields, tagsType, fieldsType []string + for _, tag := range tagsN { + tags = append(tags, tag.Key) + tagsType = append(tagsType, "tag") + } + for _, field := range fieldsN { + fields = append(fields, field.Key) + fieldsType = append(fieldsType, influx.FieldTypeString(field.Type)) + } + fieldsType = append(fieldsType, "dateTime:timeStamp") + measurementWithVersion := rowData.Name + measurementName := influx.GetOriginMstName(measurementWithVersion) + measurementName = EscapeMstName(measurementName) + c.fieldsName[currentDatabase] = make(map[string][]string) + c.fieldsName[currentDatabase][measurementName] = fields + c.curDatabase = currentDatabase + c.curMeasurement = measurementName + // write datatype + fmt.Fprintf(metaWriter, "#datatype %s,%s\n", strings.Join(tagsType, ","), strings.Join(fieldsType, ",")) + // write tags and fields name + buf := influx.GetBytesBuffer() + defer influx.PutBytesBuffer(buf) + buf = append(buf, strings.Join(tags, ",")...) + buf = append(buf, ',') + buf = append(buf, strings.Join(fields, ",")...) + buf = append(buf, ',') + buf = append(buf, "time"...) + buf = append(buf, '\n') + _, err := outputWriter.Write(buf) + if err != nil { + return err + } + return nil +} + +func (c *csvParser) getRowBuf(buf []byte, measurementName string, row interface{}, _ *opengemini.Point) ([]byte, error) { + rowData, ok := row.(influx.Row) + if !ok { + return nil, fmt.Errorf("invalid row type") + } + tags := rowData.Tags + fields := rowData.Fields + tm := rowData.Timestamp + + for _, tag := range tags { + buf = append(buf, EscapeTagValue(tag.Value)...) + buf = append(buf, ',') + } + curFieldsName := c.fieldsName[c.curDatabase][c.curMeasurement] + for _, fieldName := range curFieldsName { + if fieldName == "time" { + continue + } + k, ok := getFieldNameIndexFromRow(fields, fieldName) + if !ok { + buf = append(buf, ',') + } else { + switch fields[k].Type { + case influx.Field_Type_Float: + buf = strconv.AppendFloat(buf, fields[k].NumValue, 'g', -1, 64) + case influx.Field_Type_Int: + buf = strconv.AppendInt(buf, int64(fields[k].NumValue), 10) + case influx.Field_Type_Boolean: + buf = strconv.AppendBool(buf, fields[k].NumValue == 1) + case influx.Field_Type_String: + buf = append(buf, '"') + buf = append(buf, EscapeStringFieldValue(fields[k].StrValue)...) + buf = append(buf, '"') + default: + // This shouldn't be possible, but we'll format it anyway. + buf = append(buf, fmt.Sprintf("%v", fields[k])...) + } + buf = append(buf, ',') + } + } + buf = strconv.AppendInt(buf, tm, 10) + buf = append(buf, '\n') + return buf, nil +} + +func (c *csvParser) writeMetaInfo(metaWriter io.Writer, infoType InfoType, info string) { + switch infoType { + case InfoTypeDatabase: + fmt.Fprintf(metaWriter, "#constant database,%s\n", info) + case InfoTypeRetentionPolicy: + fmt.Fprintf(metaWriter, "#constant retention_policy,%s\n", info) + case InfoTypeMeasurement: + fmt.Fprintf(metaWriter, "#constant measurement,%s\n", info) + default: + return + } +} + +func (c *csvParser) writeOutputInfo(_ io.Writer, _ string) { +} + +// OnlineExporter implementation + +type TimeWindow struct { + Start time.Time + End time.Time + FilePath string +} + +type OnlineExporter struct { + *Exporter + httpClient core.HttpClient + windows []TimeWindow +} + +func NewOnlineExporter(baseExporter *Exporter) *OnlineExporter { + return &OnlineExporter{ + Exporter: baseExporter, + } +} + +func (oe *OnlineExporter) initHttpClient(config *ExportConfig) error { + httpClient, err := core.NewHttpClient(config.CommandLineConfig) + if err != nil { + return fmt.Errorf("failed to create http client: %w", err) + } + oe.httpClient = httpClient + return nil +} + +func calculateWindowSize(start, end time.Time) time.Duration { + duration := end.Sub(start) + + switch { + case duration <= 1*time.Hour: + return 5 * time.Minute + case duration <= 6*time.Hour: + return 30 * time.Minute + case duration <= 24*time.Hour: + return 1 * time.Hour + case duration <= 7*24*time.Hour: + return 6 * time.Hour + case duration <= 30*24*time.Hour: + return 24 * time.Hour + default: + return 7 * 24 * time.Hour + } +} + +func parseTimeRange(timeFilter string) (time.Time, time.Time, error) { + // If no time filter is provided, use a default large range (from Unix epoch to now) + if timeFilter == "" { + // Default: from Unix epoch (1970-01-01) to current time + start := time.Unix(0, 0) + end := time.Now() + return start, end, nil + } + + timeSlot := strings.Split(timeFilter, "~") + if len(timeSlot) != 2 { + return time.Time{}, time.Time{}, fmt.Errorf("invalid time filter format, expected 'start~end'") + } + + start, err := parseTimeString(timeSlot[0]) + if err != nil { + return time.Time{}, time.Time{}, fmt.Errorf("invalid start time: %w", err) + } + + end, err := parseTimeString(timeSlot[1]) + if err != nil { + return time.Time{}, time.Time{}, fmt.Errorf("invalid end time: %w", err) + } + + if start.After(end) { + return time.Time{}, time.Time{}, fmt.Errorf("start time must be before end time") + } + + return start, end, nil +} + +func parseTimeString(input string) (time.Time, error) { + if t, err := time.Parse(time.RFC3339, input); err == nil { + return t, nil + } + + if timestamp, err := strconv.ParseInt(input, 10, 64); err == nil { + return time.Unix(0, timestamp), nil + } + + return time.Time{}, fmt.Errorf("unable to parse time: %s", input) +} + +func (oe *OnlineExporter) prepareWindows(config *ExportConfig) error { + startTime, endTime, err := parseTimeRange(config.TimeFilter) + if err != nil { + return err + } + + windowSize := calculateWindowSize(startTime, endTime) + currentTime := startTime + + for currentTime.Before(endTime) { + windowEnd := currentTime.Add(windowSize) + if windowEnd.After(endTime) { + windowEnd = endTime + } + + virtualPath := fmt.Sprintf("online://%s/%s/%s/%d-%d", + config.DBFilter, + config.RetentionFilter, + config.MeasurementFilter, + currentTime.UnixNano(), + windowEnd.UnixNano()) + + oe.windows = append(oe.windows, TimeWindow{ + Start: currentTime, + End: windowEnd, + FilePath: virtualPath, + }) + + currentTime = windowEnd + } + + oe.filesTotalCount = len(oe.windows) + return nil +} + +func (oe *OnlineExporter) buildQuery(config *ExportConfig, window TimeWindow) string { + var query strings.Builder + + query.WriteString("SELECT * FROM ") + + if config.MeasurementFilter != "" { + query.WriteString(fmt.Sprintf(`"%s"`, config.MeasurementFilter)) + } else { + query.WriteString(`/.*/`) + } + + query.WriteString(fmt.Sprintf(" WHERE time >= '%s' AND time <= '%s'", + window.Start.Format(time.RFC3339), + window.End.Format(time.RFC3339))) + + return query.String() +} + +func (oe *OnlineExporter) queryAndExportWindow(ctx context.Context, config *ExportConfig, window TimeWindow, outputWriter io.Writer, currentMeasurement *string) error { + queryStr := oe.buildQuery(config, window) + + query := &opengemini.Query{ + Command: queryStr, + Database: config.DBFilter, + RetentionPolicy: config.RetentionFilter, + } + + result, err := oe.httpClient.Query(ctx, query) + if err != nil { + return fmt.Errorf("query failed: %w", err) + } + + if result.Error != "" { + return fmt.Errorf("query error: %s", result.Error) + } + + if len(result.Results) == 0 { + return nil + } + + for _, res := range result.Results { + if len(res.Series) == 0 { + continue + } + + for _, series := range res.Series { + if err := oe.exportSeries(series, outputWriter, config, currentMeasurement); err != nil { + return err + } + } + } + + return nil +} + +func (oe *OnlineExporter) exportSeries(series *opengemini.Series, outputWriter io.Writer, config *ExportConfig, currentMeasurement *string) error { + measurementName := series.Name + if config.MeasurementFilter != "" && measurementName != config.MeasurementFilter { + return nil + } + + // Write CONTEXT-MEASUREMENT when measurement changes + if measurementName != *currentMeasurement { + oe.parser.writeMetaInfo(outputWriter, InfoTypeMeasurement, measurementName) + *currentMeasurement = measurementName + } + + if len(series.Values) == 0 { + return nil + } + + columns := series.Columns + timeIndex := -1 + for i, col := range columns { + if col == "time" { + timeIndex = i + break + } + } + + if timeIndex == -1 { + return fmt.Errorf("time column not found in query result") + } + + for _, values := range series.Values { + if len(values) <= timeIndex { + continue + } + + timestamp, ok := values[timeIndex].(float64) + if !ok { + if tsStr, ok := values[timeIndex].(string); ok { + if t, err := time.Parse(time.RFC3339, tsStr); err == nil { + timestamp = float64(t.UnixNano()) + } else { + continue + } + } else { + continue + } + } + + line, err := oe.formatLineProtocol(measurementName, series.Tags, columns, values, int64(timestamp)) + if err != nil { + continue + } + + if _, err := outputWriter.Write(line); err != nil { + return err + } + + oe.lineCount++ + } + + return nil +} + +func (oe *OnlineExporter) formatLineProtocol(measurement string, tags map[string]string, columns []string, values []interface{}, timestamp int64) ([]byte, error) { + var buf strings.Builder + + buf.WriteString(measurement) + + for k, v := range tags { + buf.WriteString(fmt.Sprintf(",%s=%s", EscapeTagKey(k), EscapeTagValue(v))) + } + + buf.WriteString(" ") + + firstField := true + for i, col := range columns { + if col == "time" { + continue + } + + if i >= len(values) { + continue + } + + if !firstField { + buf.WriteString(",") + } + + fieldName := EscapeFieldKey(col) + fieldValue := values[i] + + switch v := fieldValue.(type) { + case float64: + buf.WriteString(fmt.Sprintf("%s=%g", fieldName, v)) + case int64: + buf.WriteString(fmt.Sprintf("%s=%di", fieldName, v)) + case int: + buf.WriteString(fmt.Sprintf("%s=%di", fieldName, v)) + case bool: + buf.WriteString(fmt.Sprintf("%s=%t", fieldName, v)) + case string: + buf.WriteString(fmt.Sprintf(`%s="%s"`, fieldName, EscapeStringFieldValue(v))) + default: + buf.WriteString(fmt.Sprintf(`%s="%v"`, fieldName, v)) + } + + firstField = false + } + + buf.WriteString(fmt.Sprintf(" %d\n", timestamp)) + + return []byte(buf.String()), nil +} + +func (oe *OnlineExporter) createProgressBar() (*mpb.Bar, error) { + if oe.filesTotalCount == 0 { + return nil, fmt.Errorf("no windows to export") + } + + bar := MpbProgress.New(int64(oe.filesTotalCount), + mpb.BarStyle().Lbound("[").Filler("=").Tip(">").Padding("-").Rbound("]"), + mpb.PrependDecorators( + decor.Name("Exporting Data:", decor.WC{W: 20, C: decor.DidentRight}), + decor.CountersNoUnit("%d/%d", decor.WC{W: 15, C: decor.DidentRight}), + decor.OnComplete( + decor.AverageETA(decor.ET_STYLE_GO, decor.WC{W: 6}), + "complete", + ), + ), + mpb.AppendDecorators( + decor.Percentage(), + ), + ) + + return bar, nil +} + +// writeDDL writes DDL statements for online mode +func (oe *OnlineExporter) writeDDL(outputWriter io.Writer, config *ExportConfig) error { + oe.parser.writeMetaInfo(outputWriter, 0, "# DDL") + + if config.DBFilter != "" { + oe.parser.writeOutputInfo(outputWriter, fmt.Sprintf("CREATE DATABASE %s\n", config.DBFilter)) + } + + if config.RetentionFilter != "" { + oe.parser.writeOutputInfo(outputWriter, fmt.Sprintf("CREATE RETENTION POLICY %s ON %s DURATION 0s REPLICATION 1\n", config.RetentionFilter, config.DBFilter)) + } else if config.DBFilter != "" { + // Default retention policy + oe.parser.writeOutputInfo(outputWriter, fmt.Sprintf("CREATE RETENTION POLICY autogen ON %s DURATION 0s REPLICATION 1\n", config.DBFilter)) + } + + oe.parser.writeMetaInfo(outputWriter, 0, "") + return nil +} + +func (oe *OnlineExporter) Export(ctx context.Context, config *ExportConfig, progressedFiles map[string]struct{}) error { + // Initialize defaultLogger if not set + if oe.defaultLogger == nil { + oe.defaultLogger = oe.stdoutLogger + } + + // Initialize parser based on format + if oe.parser == nil { + if config.Format == txtFormatExporter || config.Format == remoteFormatExporter { + oe.parser = newTxtParser() + } else if config.Format == csvFormatExporter { + oe.parser = newCsvParser() + } + } + + if err := oe.initHttpClient(config); err != nil { + return err + } + + if err := oe.prepareWindows(config); err != nil { + return err + } + + if config.Resume { + oe.resume = true + oe.progress = progressedFiles + oe.defaultLogger.Printf("starting resume export, you have exported %d windows\n", len(oe.progress)) + } + + if err := oe.writeProgressJson(config); err != nil { + return err + } + + bar, err := oe.createProgressBar() + if err != nil { + return err + } + oe.bar = bar + + var outputWriter io.Writer + if config.Format == remoteFormatExporter { + outputWriter = io.Discard + } else { + if err := os.MkdirAll(filepath.Dir(config.Out), 0755); err != nil { + return err + } + + var outputFile *os.File + if oe.resume { + exportDir := filepath.Dir(config.Out) + exportFilePath := filepath.Join(exportDir, resumeFilePrefix+time.Now().Format("2006-01-02_15-04-05.000000000")+filepath.Ext(config.Out)) + outputFile, err = os.OpenFile(exportFilePath, os.O_CREATE|os.O_TRUNC|os.O_WRONLY, 0644) + } else { + outputFile, err = os.OpenFile(config.Out, os.O_CREATE|os.O_TRUNC|os.O_WRONLY, 0644) + } + if err != nil { + return err + } + defer outputFile.Close() + + outputWriter = outputFile + + if config.Compress { + gzipWriter := gzip.NewWriter(outputWriter) + defer gzipWriter.Close() + outputWriter = gzipWriter + } + } + + var start, end time.Time + if config.TimeFilter != "" { + start, end, _ = parseTimeRange(config.TimeFilter) + } else { + start, end, _ = parseTimeRange("") + } + startStr := start.UTC().Format(time.RFC3339) + endStr := end.UTC().Format(time.RFC3339) + oe.parser.writeMetaInfo(outputWriter, 0, fmt.Sprintf("# openGemini EXPORT: %s - %s", startStr, endStr)) + + // Write DDL section + if err := oe.writeDDL(outputWriter, config); err != nil { + return err + } + + // Write DML section header + oe.parser.writeMetaInfo(outputWriter, 0, "# DML") + oe.parser.writeMetaInfo(outputWriter, 0, "# FROM HTTP API") + + // Write context information + if config.DBFilter != "" { + oe.parser.writeMetaInfo(outputWriter, InfoTypeDatabase, config.DBFilter) + } + if config.RetentionFilter != "" { + oe.parser.writeMetaInfo(outputWriter, InfoTypeRetentionPolicy, config.RetentionFilter) + } + + oe.defaultLogger.Printf("Exporting data total %d windows\n", oe.filesTotalCount) + + var currentMeasurement string + for _, window := range oe.windows { + if _, ok := oe.progress[window.FilePath]; ok { + oe.bar.Increment() + continue + } + + if err := oe.queryAndExportWindow(ctx, config, window, outputWriter, ¤tMeasurement); err != nil { + if writeErr := oe.writeProgressedFiles(window.FilePath); writeErr != nil { + oe.stderrLogger.Printf("failed to write progress: %v", writeErr) + } + return err + } + + if err := oe.writeProgressedFiles(window.FilePath); err != nil { + oe.stderrLogger.Printf("failed to write progress: %v", err) + } + + oe.bar.Increment() + } + + MpbProgress.Wait() + oe.defaultLogger.Printf("Summarize %d line protocol\n", oe.lineCount) + + return nil +} diff --git a/cmd/subcmd/export_types.go b/cmd/subcmd/export_types.go new file mode 100644 index 0000000..7aebbf2 --- /dev/null +++ b/cmd/subcmd/export_types.go @@ -0,0 +1,606 @@ +// Copyright 2025 openGemini Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package subcmd + +import ( + "bufio" + "context" + "crypto/tls" + "encoding/json" + "fmt" + "io" + "log" + "math" + "net" + "os" + "path/filepath" + "sort" + "strconv" + "strings" + "time" + + "github.com/openGemini/openGemini-cli/core" + "github.com/openGemini/opengemini-client-go/opengemini" + "github.com/vbauerster/mpb/v7" +) + +type ExportConfig struct { + *core.CommandLineConfig + Export bool + Format string `json:"format"` + Out string `json:"out"` + DataDir string `json:"data"` + WalDir string `json:"wal"` + Remote string `json:"remote"` + RemoteUsername string `json:"-"` + RemotePassword string `json:"-"` + RemoteSsl bool `json:"remotessl"` + DBFilter string `json:"dbfilter"` + RetentionFilter string `json:"retentionfilter"` + MeasurementFilter string `json:"mstfilter"` + TimeFilter string `json:"timefilter"` + Compress bool `json:"compress"` + Resume bool +} + +type ExportCommand struct { + cfg *ExportConfig + exportCmd *Exporter +} + +type Exporter struct { + exportFormat string //nolint:unused,structcheck // used in offline mode + databaseDiskInfos []*DatabaseDiskInfo //nolint:unused,structcheck // used in offline mode + filesTotalCount int + actualDataPath string //nolint:unused,structcheck // used in offline mode + actualWalPath string //nolint:unused,structcheck // used in offline mode + outPutPath string //nolint:unused,structcheck // used in offline mode + filter *dataFilter //nolint:unused,structcheck // used in offline mode + compress bool //nolint:unused,structcheck // used in offline mode + lineCount uint64 + resume bool + progress map[string]struct{} + remote string //nolint:unused,structcheck // used in offline mode + remoteExporter *remoteExporter //nolint:unused,structcheck // used in offline mode + parser + + stderrLogger *log.Logger + stdoutLogger *log.Logger + defaultLogger *log.Logger + + manifest map[string]struct{} + rpNameToMeasurementTsspFilesMap map[string]map[string][]string + rpNameToIdToIndexMap map[string]map[uint64]interface{} // 在offline模式下为*tsi.MergeSetIndex,在online模式下为nil + rpNameToWalFilesMap map[string][]string + + Stderr io.Writer + Stdout io.Writer + bar *mpb.Bar +} + +type DatabaseDiskInfo struct { //nolint:unused // used in offline mode + dbName string + rps map[string]struct{} + dataDir string + walDir string + rpToTsspDirMap map[string]string + rpToWalDirMap map[string]string + rpToIndexDirMap map[string]string +} + +func (d *DatabaseDiskInfo) init(actualDataDir string, actualWalDir string, databaseName string, retentionPolicy string) error { + d.dbName = databaseName + + // check whether the database is in actualDataPath + dataDir := filepath.Join(actualDataDir, databaseName) + if _, err := os.Stat(dataDir); err != nil { + return err + } + // check whether the database is in actualWalPath + walDir := filepath.Join(actualWalDir, databaseName) + if _, err := os.Stat(walDir); err != nil { + return err + } + + // ie. /tmp/openGemini/data/data/my_db /tmp/openGemini/data/wal/my_db + d.dataDir, d.walDir = dataDir, walDir + + ptDirs, err := os.ReadDir(d.dataDir) + if err != nil { + return err + } + for _, ptDir := range ptDirs { + // ie. /tmp/openGemini/data/data/my_db/0 + ptTsspPath := filepath.Join(d.dataDir, ptDir.Name()) + // ie. /tmp/openGemini/data/wal/my_db/0 + ptWalPath := filepath.Join(d.walDir, ptDir.Name()) + + if retentionPolicy != "" { + ptWithRp := ptDir.Name() + ":" + retentionPolicy + // ie. /tmp/openGemini/data/data/my_db/0/autogen + rpTsspPath := filepath.Join(ptTsspPath, retentionPolicy) + if _, err := os.Stat(rpTsspPath); err != nil { + return fmt.Errorf("retention policy %q invalid : %s", retentionPolicy, err) + } else { + d.rps[ptWithRp] = struct{}{} + d.rpToTsspDirMap[ptWithRp] = rpTsspPath + d.rpToIndexDirMap[ptWithRp] = filepath.Join(rpTsspPath, "index") + } + // ie. /tmp/openGemini/data/wal/my_db/0/autogen + rpWalPath := filepath.Join(ptWalPath, retentionPolicy) + if _, err := os.Stat(rpWalPath); err != nil { + return fmt.Errorf("retention policy %q invalid : %s", retentionPolicy, err) + } else { + d.rpToWalDirMap[ptWithRp] = rpWalPath + } + continue + } + + rpTsspDirs, err1 := os.ReadDir(ptTsspPath) + if err1 != nil { + return err1 + } + for _, rpDir := range rpTsspDirs { + if !rpDir.IsDir() { + continue + } + ptWithRp := ptDir.Name() + ":" + rpDir.Name() + rpPath := filepath.Join(ptTsspPath, rpDir.Name()) + d.rps[ptWithRp] = struct{}{} + d.rpToTsspDirMap[ptWithRp] = rpPath + d.rpToIndexDirMap[ptWithRp] = filepath.Join(rpPath, "index") + } + + rpWalDirs, err2 := os.ReadDir(ptWalPath) + if err2 != nil { + return err2 + } + for _, rpDir := range rpWalDirs { + ptWithRp := ptDir.Name() + ":" + rpDir.Name() + if !rpDir.IsDir() { + continue + } + rpPath := filepath.Join(ptWalPath, rpDir.Name()) + d.rpToWalDirMap[ptWithRp] = rpPath + } + } + return nil +} + +type dataFilter struct { //nolint:unused // used in offline mode + database string //nolint:unused // used in offline mode + retention string //nolint:unused // used in offline mode + measurement string + startTime int64 + endTime int64 +} + +func (d *dataFilter) isBelowMinTimeFilter(t int64) bool { //nolint:unused // used in offline mode + return t < d.startTime +} + +func (d *dataFilter) isAboveMaxTimeFilter(t int64) bool { //nolint:unused // used in offline mode + return t > d.endTime +} + +func newDataFilter() *dataFilter { //nolint:unused // used in offline mode + return &dataFilter{ + database: "", + measurement: "", + startTime: math.MinInt64, + endTime: math.MaxInt64, + } +} + +func newDatabaseDiskInfo() *DatabaseDiskInfo { //nolint:unused // used in offline mode + return &DatabaseDiskInfo{ + rps: make(map[string]struct{}), + rpToTsspDirMap: make(map[string]string), + rpToWalDirMap: make(map[string]string), + rpToIndexDirMap: make(map[string]string), + } +} + +func newRemoteExporter() *remoteExporter { + return &remoteExporter{ + isExist: false, + } +} + +func NewExporter() *Exporter { + return &Exporter{ + resume: false, + progress: make(map[string]struct{}), + + stderrLogger: log.New(os.Stderr, "export: ", log.LstdFlags), + stdoutLogger: log.New(os.Stdout, "export: ", log.LstdFlags), + + manifest: make(map[string]struct{}), + rpNameToMeasurementTsspFilesMap: make(map[string]map[string][]string), + rpNameToIdToIndexMap: make(map[string]map[uint64]interface{}), + rpNameToWalFilesMap: make(map[string][]string), + remoteExporter: newRemoteExporter(), + + Stdout: os.Stdout, + Stderr: os.Stderr, + } +} + +type remoteExporter struct { + isExist bool + client opengemini.Client + database string //nolint:unused // used in offline mode + retentionPolicy string //nolint:unused // used in offline mode + points []*opengemini.Point //nolint:unused // used in offline mode +} + +type parser interface { + parse2SeriesKeyWithoutVersion(key []byte, dst []byte, splitWithNull bool, point *opengemini.Point) ([]byte, error) + appendFields(rec interface{}, buf []byte, point *opengemini.Point) ([]byte, error) + writeMstInfoFromTssp(metaWriter io.Writer, outputWriter io.Writer, filePath string, isOrder bool, index interface{}) error + writeMstInfoFromWal(metaWriter io.Writer, outputWriter io.Writer, row interface{}, curDatabase string) error + writeMetaInfo(metaWriter io.Writer, infoType InfoType, info string) + writeOutputInfo(outputWriter io.Writer, info string) + getRowBuf(buf []byte, measurementName string, row interface{}, point *opengemini.Point) ([]byte, error) +} + +type InfoType int + +const ( + InfoTypeDatabase InfoType = 1 + iota + InfoTypeRetentionPolicy + InfoTypeMeasurement +) + +const ( + tsspFileExtension = "tssp" + walFileExtension = "wal" + csvFormatExporter = "csv" + txtFormatExporter = "txt" + remoteFormatExporter = "remote" + resumeFilePrefix = "resume_" +) + +// Global variables for progress tracking +var ( + MpbProgress = mpb.New(mpb.WithWidth(100)) + ResumeJsonPath string + ProgressedFilesPath string +) + +// Progress management functions + +func getResumeConfig(options *ExportConfig) (*ExportConfig, error) { + jsonData, err := os.ReadFile(ResumeJsonPath) + if err != nil { + return nil, err + } + var config ExportConfig + err = json.Unmarshal(jsonData, &config) + if err != nil { + return nil, err + } + config.Resume = true + config.RemoteUsername = options.RemoteUsername + config.RemotePassword = options.RemotePassword + return &config, nil +} + +func getProgressedFiles() (map[string]struct{}, error) { + file, err := os.Open(ProgressedFilesPath) + if err != nil { + return nil, err + } + defer file.Close() + + scanner := bufio.NewScanner(file) + lineSet := make(map[string]struct{}) + + for scanner.Scan() { + line := scanner.Text() + lineSet[line] = struct{}{} + } + + if err := scanner.Err(); err != nil { + return nil, err + } + return lineSet, nil +} + +// CreateNewProgressFolder init ResumeJsonPath and ProgressedFilesPath +func CreateNewProgressFolder() error { + home, err := os.UserHomeDir() + if err != nil { + return err + } + targetPath := filepath.Join(home, ".ts-cli", time.Now().Format("2006-01-02_15-04-05.000000000")) + err = os.MkdirAll(targetPath, os.ModePerm) + if err != nil { + return err + } + // create progress.json + progressJson := filepath.Join(targetPath, "progress.json") + ResumeJsonPath = progressJson + // create progressedFiles + progressedFiles := filepath.Join(targetPath, "progressedFiles") + ProgressedFilesPath = progressedFiles + return nil +} + +// ReadLatestProgressFile reads and processes the latest folder +func ReadLatestProgressFile() error { + home, err := os.UserHomeDir() + if err != nil { + return err + } + baseDir := filepath.Join(home, ".ts-cli") + var dirs []string + err = filepath.Walk(baseDir, func(path string, info os.FileInfo, err error) error { + if err != nil { + return err + } + if !info.IsDir() || path == baseDir { + return nil + } + dirs = append(dirs, path) + return nil + }) + if err != nil { + return err + } + sort.Strings(dirs) + latestDir := dirs[len(dirs)-1] + // read progress.json + ResumeJsonPath = filepath.Join(latestDir, "progress.json") + // read progressedFiles + ProgressedFilesPath = filepath.Join(latestDir, "progressedFiles") + return nil +} + +// writeProgressJson writes progress to json file +func (e *Exporter) writeProgressJson(clc *ExportConfig) error { + output, err := json.MarshalIndent(clc, "", "\t") + if err != nil { + return err + } + err = os.WriteFile(ResumeJsonPath, output, 0644) + if err != nil { + return err + } + return nil +} + +// writeProgressedFiles writes progressed file name +func (e *Exporter) writeProgressedFiles(filename string) error { + file, err := os.OpenFile(ProgressedFilesPath, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644) + if err != nil { + return err + } + defer file.Close() + + _, err = file.WriteString(filename + "\n") + if err != nil { + return err + } + return nil +} + +// Escape utility functions and variables +var escapeFieldKeyReplacer = strings.NewReplacer(`,`, `\,`, `=`, `\=`, ` `, `\ `) +var escapeTagKeyReplacer = strings.NewReplacer(`,`, `\,`, `=`, `\=`, ` `, `\ `) +var escapeTagValueReplacer = strings.NewReplacer(`,`, `\,`, `=`, `\=`, ` `, `\ `) +var escapeMstNameReplacer = strings.NewReplacer(`=`, `\=`, ` `, `\ `) +var escapeStringFieldReplacer = strings.NewReplacer(`"`, `\"`, `\`, `\\`) + +// EscapeFieldKey returns a copy of in with any comma or equal sign or space +// with escaped values. +func EscapeFieldKey(in string) string { + return escapeFieldKeyReplacer.Replace(in) +} + +// EscapeStringFieldValue returns a copy of in with any double quotes or +// backslashes with escaped values. +func EscapeStringFieldValue(in string) string { + return escapeStringFieldReplacer.Replace(in) +} + +// EscapeTagKey returns a copy of in with any "comma" or "equal sign" or "space" +// with escaped values. +func EscapeTagKey(in string) string { + return escapeTagKeyReplacer.Replace(in) +} + +// EscapeTagValue returns a copy of in with any "comma" or "equal sign" or "space" +// with escaped values +func EscapeTagValue(in string) string { + return escapeTagValueReplacer.Replace(in) +} + +// EscapeMstName returns a copy of in with any "equal sign" or "space" +// with escaped values. +func EscapeMstName(in string) string { + return escapeMstNameReplacer.Replace(in) +} + +// Helper utility functions + +func convertTime(input string) (int64, error) { //nolint:unused // used in offline mode + t, err := time.Parse(time.RFC3339, input) + if err == nil { + return t.UnixNano(), nil + } + + timestamp, err := strconv.ParseInt(input, 10, 64) + if err == nil { + return timestamp, nil + } + + return 0, err +} + +// dataFilter methods + +func (d *dataFilter) parseTime(clc *ExportConfig) error { //nolint:unused // used in offline mode + var start, end string + timeSlot := strings.Split(clc.TimeFilter, "~") + if len(timeSlot) == 2 { + start = timeSlot[0] + end = timeSlot[1] + } else if clc.TimeFilter != "" { + return fmt.Errorf("invalid time filter %q", clc.TimeFilter) + } + + if start != "" { + st, err := convertTime(start) + if err != nil { + return err + } + d.startTime = st + } + + if end != "" { + ed, err := convertTime(end) + if err != nil { + return err + } + d.endTime = ed + } + + if d.startTime > d.endTime { + return fmt.Errorf("start time `%q` > end time `%q`", start, end) + } + + return nil +} + +func (d *dataFilter) parseDatabase(dbFilter string) { //nolint:unused // used in offline mode + if dbFilter == "" { + return + } + d.database = dbFilter +} + +func (d *dataFilter) parseRetention(retentionFilter string) { //nolint:unused // used in offline mode + if retentionFilter == "" { + return + } + d.retention = retentionFilter +} + +func (d *dataFilter) parseMeasurement(mstFilter string) error { //nolint:unused // used in offline mode + if mstFilter == "" { + return nil + } + if mstFilter != "" && d.database == "" { + return fmt.Errorf("measurement filter %q requires database filter", mstFilter) + } + d.measurement = mstFilter + return nil +} + +// timeFilter [startTime, endTime] +func (d *dataFilter) timeFilter(t int64) bool { //nolint:unused // used in offline mode + return t >= d.startTime && t <= d.endTime +} + +// remoteExporter methods + +func (re *remoteExporter) Init(clc *ExportConfig) error { + if len(clc.Remote) == 0 { + return fmt.Errorf("execute -export cmd, using remote format, --remote is required") + } + h, p, err := net.SplitHostPort(clc.Remote) + if err != nil { + return err + } + port, err := strconv.Atoi(p) + if err != nil { + return fmt.Errorf("invalid port number :%s", err) + } + var authConfig *opengemini.AuthConfig + if clc.RemoteUsername != "" { + authConfig = &opengemini.AuthConfig{ + AuthType: 0, + Username: clc.RemoteUsername, + Password: clc.RemotePassword, + } + } else { + authConfig = nil + } + var remoteConfig *opengemini.Config + if clc.RemoteSsl { + remoteConfig = &opengemini.Config{ + Addresses: []opengemini.Address{ + { + Host: h, + Port: port, + }, + }, + AuthConfig: authConfig, + TlsConfig: &tls.Config{ + InsecureSkipVerify: true, + }, + } + } else { + remoteConfig = &opengemini.Config{ + Addresses: []opengemini.Address{ + { + Host: h, + Port: port, + }, + }, + AuthConfig: authConfig, + } + } + + cli, err := opengemini.NewClient(remoteConfig) + if err != nil { + return err + } + re.isExist = true + re.client = cli + if err = re.client.Ping(0); err != nil { + return err + } + return nil +} + +func (re *remoteExporter) createDatabase(dbName string) error { //nolint:unused // used in offline mode + err := re.client.CreateDatabase(dbName) + if err != nil { + return fmt.Errorf("error writing command: %s", err) + } + return nil +} + +func (re *remoteExporter) createRetentionPolicy(dbName string, rpName string) error { //nolint:unused // used in offline mode + err := re.client.CreateRetentionPolicy(dbName, opengemini.RpConfig{ + Name: rpName, + Duration: "0s", + }, false) + if err != nil { + return fmt.Errorf("error writing command: %s", err) + } + return nil +} + +func (re *remoteExporter) writeAllPoints() error { //nolint:unused // used in offline mode + err := re.client.WriteBatchPointsWithRp(context.Background(), re.database, re.retentionPolicy, re.points) + if err != nil { + return err + } + re.points = re.points[:0] + return nil +} diff --git a/cmd/ts-cli/cli.go b/cmd/ts-cli/cli.go index a9464ff..775ea07 100644 --- a/cmd/ts-cli/cli.go +++ b/cmd/ts-cli/cli.go @@ -150,6 +150,20 @@ func (m *Command) exportCommand() { }, } + // Connection flags for online mode + cmd.Flags().StringVarP(&config.Host, "host", "H", common.DefaultHost, "ts-sql host to connect to.") + cmd.Flags().IntVarP(&config.Port, "port", "p", common.DefaultHttpPort, "ts-sql tcp port to connect to.") + cmd.Flags().IntVarP(&config.Timeout, "timeout", "", common.DefaultRequestTimeout, "request-timeout in mill-seconds.") + cmd.Flags().StringVarP(&config.Username, "username", "u", "", "username to connect to openGemini.") + cmd.Flags().StringVarP(&config.Password, "password", "P", "", "password to connect to openGemini.") + cmd.Flags().BoolVarP(&config.EnableTls, "ssl", "s", false, "use https for connecting to openGemini.") + cmd.Flags().BoolVarP(&config.InsecureTls, "insecure-tls", "i", false, "ignore ssl verification when connecting openGemini by https.") + cmd.Flags().StringVarP(&config.CACert, "cacert", "c", "", "CA certificate to verify peer against when connecting openGemini by https.") + cmd.Flags().StringVarP(&config.Cert, "cert", "C", "", "client certificate file when connecting openGemini by https.") + cmd.Flags().StringVarP(&config.CertKey, "cert-key", "k", "", "client certificate password.") + cmd.Flags().BoolVarP(&config.InsecureHostname, "insecure-hostname", "I", false, "ignore server certificate hostname verification when connecting openGemini by https.") + + // Export-specific flags cmd.Flags().StringVar(&config.Format, "format", "txt", "Export data format, support csv, txt, remote.") cmd.Flags().StringVar(&config.Out, "out", "", "Destination file to export to.") cmd.Flags().StringVar(&config.DataDir, "data", "", "Data storage path to export.") @@ -160,11 +174,13 @@ func (m *Command) exportCommand() { cmd.Flags().StringVar(&config.MeasurementFilter, "mstfilter", "", "Optional.Measurement to export.") cmd.Flags().StringVar(&config.TimeFilter, "timefilter", "", "Optional.Export time range, support 'start~end'") cmd.Flags().BoolVar(&config.Compress, "compress", false, "Optional. Compress the export output.") - cmd.Flags().StringVarP(&config.RemoteUsername, "remoteusername", "u", "", "Remote export Optional.Username to connect to remote openGemini.") - cmd.Flags().StringVarP(&config.RemotePassword, "remotepassword", "p", "", "Remote export Optional.Password to connect to remote openGemini.") + cmd.Flags().StringVar(&config.RemoteUsername, "remoteusername", "", "Remote export Optional.Username to connect to remote openGemini.") + cmd.Flags().StringVar(&config.RemotePassword, "remotepassword", "", "Remote export Optional.Password to connect to remote openGemini.") cmd.Flags().BoolVar(&config.RemoteSsl, "remotessl", false, "Remote export Optional.Use https for connecting to remote openGemini.") cmd.Flags().BoolVar(&config.Resume, "resume", false, "Resume the export progress from the last point.") + cmd.MarkFlagsRequiredTogether("username", "password") + cmd.MarkFlagsRequiredTogether("cert", "cert-key") m.cmd.AddCommand(cmd) } diff --git a/go.mod b/go.mod index 5b655fc..63440ae 100644 --- a/go.mod +++ b/go.mod @@ -4,7 +4,7 @@ go 1.24 require ( github.com/VictoriaMetrics/VictoriaMetrics v1.102.1 - github.com/golang/snappy v1.0.0 + github.com/klauspost/compress v1.18.0 github.com/mattn/go-runewidth v0.0.16 github.com/olekukonko/tablewriter v1.0.9 github.com/openGemini/go-prompt v0.0.0-20250603013942-a2bf30109e15 @@ -14,6 +14,7 @@ require ( github.com/stretchr/testify v1.11.1 github.com/valyala/fastjson v1.6.4 github.com/vbauerster/mpb/v7 v7.3.2 + go.uber.org/zap v1.27.0 golang.org/x/term v0.34.0 google.golang.org/grpc v1.74.2 ) @@ -57,6 +58,7 @@ require ( github.com/goccy/go-json v0.10.3 // indirect github.com/gogo/protobuf v1.3.2 // indirect github.com/golang/protobuf v1.5.4 // indirect + github.com/golang/snappy v1.0.0 // indirect github.com/google/btree v1.0.1 // indirect github.com/google/flatbuffers v24.3.25+incompatible // indirect github.com/google/uuid v1.6.0 // indirect @@ -80,7 +82,6 @@ require ( github.com/json-iterator/go v1.1.13-0.20220915233716-71ac16282d12 // indirect github.com/jsternberg/zap-logfmt v1.2.0 // indirect github.com/klauspost/asmfmt v1.3.2 // indirect - github.com/klauspost/compress v1.18.0 // indirect github.com/klauspost/cpuid/v2 v2.2.7 // indirect github.com/kr/pretty v0.3.1 // indirect github.com/kr/text v0.2.0 // indirect @@ -140,7 +141,6 @@ require ( go.opentelemetry.io/otel/trace v1.36.0 // indirect go.uber.org/atomic v1.11.0 // indirect go.uber.org/multierr v1.11.0 // indirect - go.uber.org/zap v1.27.0 // indirect golang.org/x/arch v0.0.0-20210923205945-b76863e36670 // indirect golang.org/x/crypto v0.38.0 // indirect golang.org/x/exp v0.0.0-20240823005443-9b4947da3948 // indirect diff --git a/lib/VictoriaMetrics/go.sum b/lib/VictoriaMetrics/go.sum new file mode 100644 index 0000000..6a4e549 --- /dev/null +++ b/lib/VictoriaMetrics/go.sum @@ -0,0 +1,18 @@ +github.com/VictoriaMetrics/fastcache v1.12.2/go.mod h1:AmC+Nzz1+3G2eCPapF6UcsnkThDcMsQicp4xDukwJYI= +github.com/VictoriaMetrics/metrics v1.24.0/go.mod h1:eFT25kvsTidQFHb6U0oa0rTrDRdz4xTYjpL8+UPohys= +github.com/allegro/bigcache v1.2.1-0.20190218064605-e24eb225f156/go.mod h1:Cb/ax3seSYIx7SuZdm2G2xzfwmv3TPSk2ucNfQESPXM= +github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= +github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= +github.com/golang/snappy v0.0.5-0.20220116011046-fa5810519dcb/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= +github.com/klauspost/compress v1.17.4/go.mod h1:/dCuZOvVtNoHsyb+cuJD3itjs3NbnF6KH9zAO4BDxPM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= +github.com/valyala/fastrand v1.1.0/go.mod h1:HWqCzkrkg6QXT8V2EXWvXCoow7vLwOFN002oeRzjapQ= +github.com/valyala/gozstd v1.20.1/go.mod h1:y5Ew47GLlP37EkTB+B4s7r6A5rdaeB7ftbl9zoYiIPQ= +github.com/valyala/histogram v1.2.0/go.mod h1:Hb4kBwb4UxsaNbbbh+RRz8ZR6pdodR57tzWUS3BUzXY= +golang.org/x/sys v0.14.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/sys v0.25.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= diff --git a/lib/influxdb/go.sum b/lib/influxdb/go.sum new file mode 100644 index 0000000..eae6213 --- /dev/null +++ b/lib/influxdb/go.sum @@ -0,0 +1,66 @@ +github.com/bytedance/sonic v1.11.6/go.mod h1:LysEHSvpvDySVdC2f87zGWf6CIKJcAvqab1ZaiQtds4= +github.com/bytedance/sonic/loader v0.1.1/go.mod h1:ncP89zfokxS5LZrJxl5z0UJcsk4M4yY2JpfqGeCtNLU= +github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= +github.com/cloudwego/base64x v0.1.4/go.mod h1:0zlkT4Wn5C6NdauXdJRhSKRlJvmclQ1hhJgA0rcu/8w= +github.com/cloudwego/iasm v0.2.0/go.mod h1:8rXZaNYT2n95jn+zTI1sDr+IgcD2GVs0nlbbQPiEFhY= +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q= +github.com/jsternberg/zap-logfmt v1.2.0/go.mod h1:kz+1CUmCutPWABnNkOu9hOHKdT2q3TDYCcsFy9hpqb0= +github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8= +github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= +github.com/klauspost/cpuid/v2 v2.0.9/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg= +github.com/knz/go-libedit v1.10.1/go.mod h1:MZTVkCWyz0oBc7JOWP3wNAzd002ZbM/5hgShxwh4x8M= +github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/prometheus/prometheus v0.50.1/go.mod h1:FvE8dtQ1Ww63IlyKBn1V4s+zMwF9kHkVNkQBR1pM4CU= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= +github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= +github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= +github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= +github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= +github.com/twitchyliquid64/golang-asm v0.15.1/go.mod h1:a1lVb/DtPvCB8fslRZhAngC2+aY1QWCk3Cedj/Gdt08= +github.com/xlab/treeprint v1.2.0/go.mod h1:gj5Gd3gPdKtR1ikdDK6fnFLdmIS0X30kTTuNd/WEJu0= +github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= +github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= +go.uber.org/atomic v1.3.2/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE= +go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0= +go.uber.org/multierr v1.11.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y= +go.uber.org/zap v1.9.1/go.mod h1:vwi/ZaCAaUcBkycHslxD9B2zi4UTXhF60s6SWpuDF0Q= +go.uber.org/zap v1.27.0/go.mod h1:GB2qFLM7cTU87MWRP2mPIjqfIDnGu+VIO4V/SdhGo2E= +golang.org/x/arch v0.0.0-20210923205945-b76863e36670/go.mod h1:5om86z9Hs0C8fWVUuoMHwpExlXzs5Tkyp9hOrfG7pp8= +golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= +golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= +golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= +golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= +golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= +golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= +golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.21.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= +golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= +golang.org/x/tools v0.0.0-20200619180055-7c47624df98f/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE= +golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= +golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +nullprogram.com/x/optparse v1.0.0/go.mod h1:KdyPE+Igbe0jQUrVfMqDMeJQIJZEuyV7pjYmp6pbG50= +rsc.io/pdf v0.1.1/go.mod h1:n8OzWcQ6Sp37PL01nO98y4iUCRdTGarVfzxY20ICaU4= diff --git a/lib/influxparser/buffer.go b/lib/influxparser/buffer.go new file mode 100644 index 0000000..bd84355 --- /dev/null +++ b/lib/influxparser/buffer.go @@ -0,0 +1,32 @@ +// Copyright 2025 openGemini Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package influxparser + +import "sync" + +var bPool = &sync.Pool{} + +func GetBytesBuffer() []byte { + v := bPool.Get() + if v != nil { + return v.([]byte) + } + return []byte{} +} + +func PutBytesBuffer(b []byte) { + b = b[:0] + bPool.Put(b) //nolint:staticcheck // SA6002: []byte is already a reference type +} diff --git a/lib/influxparser/common.go b/lib/influxparser/common.go new file mode 100644 index 0000000..65c7c67 --- /dev/null +++ b/lib/influxparser/common.go @@ -0,0 +1,79 @@ +// Copyright 2025 openGemini Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package influxparser + +import ( + "fmt" + + "github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding" +) + +// MeasurementName extract measurement from series key, +// return measurement_name_with_version, tail, error +func MeasurementName(src []byte) ([]byte, []byte, error) { + if len(src) < 4 { + return nil, nil, fmt.Errorf("too small data for tags") + } + + kl := int(encoding.UnmarshalUint32(src)) + if len(src) < kl { + return nil, nil, fmt.Errorf("too small indexKey") + } + src = src[4:] + + mnl := int(encoding.UnmarshalUint16(src)) + src = src[2:] + if mnl+2 > len(src) { + return nil, nil, fmt.Errorf("too small data for measurement(%d: %d > %d)", kl, mnl, len(src)) + } + mn := src[:mnl] + src = src[mnl:] + + return mn, src, nil +} + +func GetOriginMstName(nameWithVer string) string { + if len(nameWithVer) < 5 { + // test case tolerate + return nameWithVer + } + if nameWithVer[len(nameWithVer)-5] == '_' && + nameWithVer[len(nameWithVer)-4] >= '0' && nameWithVer[len(nameWithVer)-4] <= '9' && + nameWithVer[len(nameWithVer)-3] >= '0' && nameWithVer[len(nameWithVer)-3] <= '9' && + nameWithVer[len(nameWithVer)-2] >= '0' && nameWithVer[len(nameWithVer)-2] <= '9' && + nameWithVer[len(nameWithVer)-1] >= '0' && nameWithVer[len(nameWithVer)-1] <= '9' { + return nameWithVer[:len(nameWithVer)-5] + } + return nameWithVer +} + +func FieldTypeString(fieldType int32) string { + switch fieldType { + case Field_Type_Int: + return "integer" + case Field_Type_UInt: + return "unsigned" + case Field_Type_Float: + return "float" + case Field_Type_String: + return "string" + case Field_Type_Boolean: + return "boolean" + case Field_Type_Tag: + return "tag" + default: + return "unknown" + } +} diff --git a/lib/influxparser/parser_full.go b/lib/influxparser/parser_full.go new file mode 100644 index 0000000..6298120 --- /dev/null +++ b/lib/influxparser/parser_full.go @@ -0,0 +1,302 @@ +// Copyright 2025 openGemini Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//go:build full +// +build full + +package influxparser + +import ( + "errors" + + "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding" + "github.com/openGemini/openGemini/lib/logger" + "github.com/openGemini/openGemini/lib/numberenc" + "go.uber.org/zap" +) + +const ( + INDEXCOUNT = 1 +) + +var hasIndexOption byte = 'y' +var hasNoIndexOption byte = 'n' + +func FastUnmarshalMultiRows(src []byte, rows []Row, tagPool []Tag, fieldPool []Field, indexOptionPool []IndexOption, + indexKeyPool []byte) ([]Row, []Tag, []Field, []IndexOption, []byte, error) { + pointsN := int(encoding.UnmarshalUint32(src)) + src = src[4:] + src = src[1:] // skip version + + if pointsN > cap(rows) { + rows = make([]Row, pointsN) + } + rows = rows[:pointsN] + + var err error + var decodeN int + for len(src) > 0 { + if decodeN >= pointsN { + logger.GetLogger().Error("FastUnmarshalMultiRows over", zap.Int("decodeN", decodeN), zap.Int("pointsN", pointsN)) + break + } + row := &rows[decodeN] + decodeN++ + + row.StreamOnly = false + src, tagPool, fieldPool, indexOptionPool, indexKeyPool, err = + row.FastUnmarshalBinary(src, tagPool, fieldPool, indexOptionPool, indexKeyPool) + if err != nil { + return rows[:0], tagPool, fieldPool, indexOptionPool, indexKeyPool, err + } + } + + if decodeN != pointsN { + return rows[:0], tagPool, fieldPool, indexOptionPool, indexKeyPool, errors.New("unmarshal error len(rows) != pointsN") + } + return rows, tagPool, fieldPool, indexOptionPool, indexKeyPool, nil +} + +func (r *Row) FastUnmarshalBinary(src []byte, tagpool []Tag, fieldpool []Field, indexOptionPool []IndexOption, indexKeypool []byte) ([]byte, []Tag, []Field, []IndexOption, []byte, error) { + if len(src) < 1 { + return nil, tagpool, fieldpool, indexOptionPool, indexKeypool, errors.New("too small bytes for row binary") + } + var err error + + mLen := int(src[0]) + src = src[1:] + if len(src) < mLen+4 { + return nil, tagpool, fieldpool, indexOptionPool, indexKeypool, errors.New("too small bytes for row measurement") + } + r.Name = bytesutil.ToUnsafeString(src[:mLen]) + src = src[mLen:] + + skLen := encoding.UnmarshalUint32(src) + src = src[4:] + if len(src) < int(skLen+4) { + return nil, tagpool, fieldpool, indexOptionPool, indexKeypool, errors.New("too small bytes for row shardKey") + } + r.ShardKey = append(r.ShardKey[:0], src[:skLen]...) + src = src[skLen:] + + src, tagpool, err = r.unmarshalTags(src, tagpool) + if err != nil { + return nil, tagpool, fieldpool, indexOptionPool, indexKeypool, err + } + if len(src) < 4 { + return nil, tagpool, fieldpool, indexOptionPool, indexKeypool, errors.New("too small bytes for row field count") + } + + src, fieldpool, err = r.unmarshalFields(src, fieldpool) + if err != nil { + return nil, tagpool, fieldpool, indexOptionPool, indexKeypool, err + } + + src, indexOptionPool, err = r.unmarshalIndexOptions(src, indexOptionPool) + if err != nil { + return nil, tagpool, fieldpool, indexOptionPool, indexKeypool, err + } + + r.Timestamp = encoding.UnmarshalInt64(src[:8]) + if len(src) < 8 { + return nil, tagpool, fieldpool, indexOptionPool, indexKeypool, errors.New("too small bytes for row timestamp") + } + + indexKeypool = r.UnmarshalIndexKeys(indexKeypool) + + return src[8:], tagpool, fieldpool, indexOptionPool, indexKeypool, nil +} + +func (r *Row) unmarshalTags(src []byte, tagpool []Tag) ([]byte, []Tag, error) { + tagN := int(encoding.UnmarshalUint32(src[:4])) + src = src[4:] + start := len(tagpool) + + if len(tagpool)+tagN > cap(tagpool) { + tagpool = append(tagpool[:cap(tagpool)], make([]Tag, start+tagN-cap(tagpool))...) + } + tagpool = tagpool[:start+tagN] + + for i := 0; i < tagN; i++ { + if len(src) < 1 { + return nil, tagpool, errors.New("too small bytes for row tag key len") + } + tl := int(encoding.UnmarshalUint16(src[:2])) //int(src[0]) + src = src[2:] + if len(src) < tl+1 { + return nil, tagpool, errors.New("too small bytes for row tag key") + } + + tg := &tagpool[start+i] + + tg.Key = bytesutil.ToUnsafeString(src[:tl]) + src = src[tl:] + vl := int(encoding.UnmarshalUint16(src[:2])) //int(src[0]) + if len(src) < vl { + tagpool = tagpool[:len(tagpool)-1] + return nil, tagpool, errors.New("too small bytes for row tag value") + } + src = src[2:] + tg.Value = bytesutil.ToUnsafeString(src[:vl]) + tg.IsArray = false + src = src[vl:] + } + r.Tags = tagpool[start:] + return src, tagpool, nil +} + +func (r *Row) unmarshalFields(src []byte, fieldpool []Field) ([]byte, []Field, error) { + fieldN := int(encoding.UnmarshalUint32(src[:4])) + src = src[4:] + start := len(fieldpool) + + if len(fieldpool)+fieldN > cap(fieldpool) { + fieldpool = append(fieldpool[:cap(fieldpool)], make([]Field, start+fieldN-cap(fieldpool))...) + } + fieldpool = fieldpool[:start+fieldN] + + for i := 0; i < fieldN; i++ { + if len(src) < 2 { + return nil, fieldpool, errors.New("too small for field key length") + } + l := int(encoding.UnmarshalUint16(src[:2])) //int(src[0]) + src = src[2:] + if len(src) < l+1 { + return nil, fieldpool, errors.New("too small for field key") + } + + fd := &fieldpool[start+i] + + fd.Key = bytesutil.ToUnsafeString(src[:l]) + src = src[l:] + + fd.Type = int32(src[0]) + if fd.Type <= Field_Type_Unknown || fd.Type >= Field_Type_Last { + fieldpool = fieldpool[:len(fieldpool)-1] + return nil, fieldpool, errors.New("error field type") + } + src = src[1:] + + if fd.Type == Field_Type_String { + if len(src) < 8 { + fieldpool = fieldpool[:len(fieldpool)-1] + return nil, fieldpool, errors.New("too small for string field length") + } + l = int(encoding.UnmarshalUint64(src[:8])) + src = src[8:] + if len(src) < l { + fieldpool = fieldpool[:len(fieldpool)-1] + return nil, fieldpool, errors.New("too small for string field value") + } + fd.StrValue = bytesutil.ToUnsafeString(src[:l]) + src = src[l:] + } else { + if len(src) < 8 { + fieldpool = fieldpool[:len(fieldpool)-1] + return nil, fieldpool, errors.New("too small for field") + } + fd.NumValue = numberenc.UnmarshalFloat64(src[:8]) + src = src[8:] + } + } + r.Fields = fieldpool[start:] + return src, fieldpool, nil +} + +func (r *Row) unmarshalIndexOptions(src []byte, indexOptionPool []IndexOption) ([]byte, []IndexOption, error) { + isIndexOpt := src[:INDEXCOUNT] + r.IndexOptions = nil + if isIndexOpt[0] == hasNoIndexOption { + src = src[INDEXCOUNT:] + return src, indexOptionPool, nil + } + src = src[INDEXCOUNT:] + indexN := int(encoding.UnmarshalUint32(src[:4])) + src = src[4:] + start := len(indexOptionPool) + + if len(indexOptionPool)+indexN > cap(indexOptionPool) { + indexOptionPool = append(indexOptionPool[:cap(indexOptionPool)], make([]IndexOption, start+indexN-cap(indexOptionPool))...) + } + indexOptionPool = indexOptionPool[:start+indexN] + + for i := 0; i < indexN; i++ { + if len(src) < 1 { + return nil, indexOptionPool, errors.New("too small for indexOption key length") + } + + indexOpt := &indexOptionPool[start+i] + + indexOpt.Oid = encoding.UnmarshalUint32(src[:4]) + src = src[4:] + indexListLen := encoding.UnmarshalUint16(src[:2]) + if int(indexListLen) < cap(indexOpt.IndexList) { + indexOpt.IndexList = indexOpt.IndexList[:indexListLen] + } else { + indexOpt.IndexList = append(indexOpt.IndexList, make([]uint16, int(indexListLen)-cap(indexOpt.IndexList))...) + } + src = src[2:] + for j := 0; j < int(indexListLen); j++ { + indexOpt.IndexList[j] = encoding.UnmarshalUint16(src[:2]) + src = src[2:] + } + } + r.IndexOptions = indexOptionPool[start:] + return src, indexOptionPool, nil +} + +func (r *Row) UnmarshalIndexKeys(indexkeypool []byte) []byte { + indexKl := 4 + // total length of indexkey + 2 + // measurment name length + len(r.Name) + // measurment name with version + 2 + // tag count + 4*len(r.Tags) + // length of each tag key and value + r.Tags.TagsSize() // size of tag keys/values + start := len(indexkeypool) + if start+indexKl > cap(indexkeypool) { + indexkeypool = append(indexkeypool[:cap(indexkeypool)], make([]byte, start+indexKl-cap(indexkeypool))...) + } + indexkeypool = indexkeypool[:start+indexKl] + MakeIndexKey(r.Name, r.Tags, indexkeypool[start:start]) + r.IndexKey = indexkeypool[start:] + return indexkeypool +} + +func MakeIndexKey(name string, tags PointTags, dst []byte) []byte { + indexKl := 4 + // total length of indexkey + 2 + // measurment name length + len(name) + // measurment name with version + 2 + // tag count + 4*len(tags) + // length of each tag key and value + tags.TagsSize() // size of tag keys/values + start := len(dst) + + // marshal total len + dst = encoding.MarshalUint32(dst, uint32(indexKl)) + // marshal measurement + dst = encoding.MarshalUint16(dst, uint16(len(name))) + dst = append(dst, name...) + // marshal tags + dst = encoding.MarshalUint16(dst, uint16(len(tags))) + for i := range tags { + kl := len(tags[i].Key) + dst = encoding.MarshalUint16(dst, uint16(kl)) + dst = append(dst, tags[i].Key...) + vl := len(tags[i].Value) + dst = encoding.MarshalUint16(dst, uint16(vl)) + dst = append(dst, tags[i].Value...) + } + return dst[start:] +} diff --git a/lib/influxparser/types.go b/lib/influxparser/types.go new file mode 100644 index 0000000..c673e8f --- /dev/null +++ b/lib/influxparser/types.go @@ -0,0 +1,183 @@ +// Copyright 2025 openGemini Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package influxparser + +import ( + "strings" + + "github.com/openGemini/openGemini/lib/errno" +) + +var ( + // ErrPointMustHaveAField is returned when operating on a point that does not have any fields. + ErrPointMustHaveAField = errno.NewError(errno.WritePointMustHaveAField) + // ErrInvalidPoint is returned when a point cannot be parsed correctly. + ErrInvalidPoint = errno.NewError(errno.WriteInvalidPoint) +) + +const ( + Field_Type_Unknown = 0 + Field_Type_Int = 1 + Field_Type_UInt = 2 + Field_Type_Float = 3 + Field_Type_String = 4 + Field_Type_Boolean = 5 + Field_Type_Tag = 6 + Field_Type_Last = 7 +) + +const ByteSplit = 0x00 + +// Row is a single influx row. +type Row struct { + // if streamOnly is false, it means that the source table data of the stream will also be written, + // otherwise the source table data of the stream will not be written + StreamOnly bool + Timestamp int64 + SeriesId uint64 + PrimaryId uint64 + Name string // measurement name with version + Tags PointTags + Fields Fields + IndexKey []byte + ShardKey []byte + StreamId []uint64 // it used to indicate that the data is shared by multiple streams + IndexOptions IndexOptions + ColumnToIndex map[string]int // it indicates the sorted tagKey, fieldKey and index mapping relationship + ReadyBuildColumnToIndex bool + + tagArrayInitialized bool + hasTagArray bool + skipMarshalShardKey bool +} + +func (r *Row) Reset() { + r.Name = "" + r.Tags = nil + r.Fields = nil + r.IndexKey = nil + r.ShardKey = nil + r.IndexOptions = nil + r.StreamId = nil + r.StreamOnly = false + r.Timestamp = 0 + r.SeriesId = 0 + r.PrimaryId = 0 + r.ColumnToIndex = nil + r.ReadyBuildColumnToIndex = false + r.tagArrayInitialized = false + r.hasTagArray = false + r.skipMarshalShardKey = false +} + +// Tag PointTag represents influx tag. +type Tag struct { + Key string + Value string + IsArray bool +} + +func (tag *Tag) Reset() { + tag.Key = "" + tag.Value = "" + tag.IsArray = false +} + +func (tag *Tag) Size() int { + return len(tag.Key) + len(tag.Value) +} + +type PointTags []Tag + +func (pts *PointTags) Less(i, j int) bool { + x := *pts + return x[i].Key < x[j].Key +} +func (pts *PointTags) Len() int { return len(*pts) } +func (pts *PointTags) Swap(i, j int) { + x := *pts + x[i], x[j] = x[j], x[i] +} + +func (pts *PointTags) TagsSize() int { + var total int + for i := range *pts { + total += (*pts)[i].Size() + } + return total +} + +func (pts *PointTags) Reset() { + for i := range *pts { + (*pts)[i].Reset() + } +} + +func (pts *PointTags) HasTagArray() bool { + has := false + for i := 0; i < len(*pts); i++ { + val := (*pts)[i].Value + if strings.HasPrefix(val, "[") && strings.HasSuffix(val, "]") { + (*pts)[i].IsArray = true + has = true + } + } + return has +} + +// Field represents influx field. +type Field struct { + Key string + NumValue float64 + StrValue string + Type int32 +} + +func (f *Field) Reset() { + f.Key = "" + f.NumValue = 0 + f.StrValue = "" + f.Type = Field_Type_Unknown +} + +type Fields []Field + +func (fs *Fields) Less(i, j int) bool { + return (*fs)[i].Key < (*fs)[j].Key +} + +func (fs *Fields) Len() int { + return len(*fs) +} + +func (fs *Fields) Swap(i, j int) { + (*fs)[i], (*fs)[j] = (*fs)[j], (*fs)[i] +} + +func (fs *Fields) Reset() { + for i := range *fs { + (*fs)[i].Reset() + } +} + +// IndexOption represents index option. +type IndexOption struct { + IndexList []uint16 + Oids []uint32 + IndexType uint32 + Oid uint32 +} + +type IndexOptions []IndexOption