Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/pr.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ jobs:

steps:
- uses: actions/checkout@v2
- uses: actions/setup-go@v1
- uses: actions/setup-go@v6
with:
go-version: 1.24.11

Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/release.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ jobs:

steps:
- uses: actions/checkout@v2
- uses: actions/setup-go@v1
- uses: actions/setup-go@v6
with:
go-version: 1.24.11

Expand Down
142 changes: 95 additions & 47 deletions pkg/common/csv.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,92 +2,140 @@ package common

import (
"encoding/csv"
"fmt"
"io"
"os"
"sync"
"sync/atomic"
)

type (
CSVReader struct {
Path string
Delimiter string
WithHeader bool
limit int
fileReader struct {
data []Data
index atomic.Int64
}

CSVWriter struct {
Path string
Header []string
Delimiter string
DataCh <-chan []string
csvReaderImpl struct {
defaultSource string
config CsvReaderConfig
readers map[string]*fileReader
mutex sync.Mutex
}
)

func NewCsvReader(path, delimiter string, withHeader bool, limit int) *CSVReader {
return &CSVReader{
Path: path,
Delimiter: delimiter,
WithHeader: withHeader,
limit: limit,
func NewCsvReader(defaultSource string, config CsvReaderConfig) ICsvReader {
return &csvReaderImpl{
defaultSource: defaultSource,
config: config,
readers: make(map[string]*fileReader),
}
}

func NewCsvWriter(path, delimiter string, header []string, dataCh <-chan []string) *CSVWriter {
return &CSVWriter{
Path: path,
Delimiter: delimiter,
Header: header,
DataCh: dataCh,
func (r *csvReaderImpl) GetData(sourceFile string) (Data, error) {
key := sourceFile
if key == "" {
key = r.defaultSource
}

reader := r.getOrCreateReader(key)
if reader == nil {
return nil, fmt.Errorf("failed to load csv file: %s", key)
}

idx := int(reader.index.Add(1) - 1)
data := reader.data
if len(data) == 0 {
return nil, fmt.Errorf("no data in csv file: %s", key)
}
if idx >= len(data) {
idx = idx % len(data)
}
return data[idx], nil
}

// ReadForever read the csv in slice first, and send to the data channel forever.
func (c *CSVReader) ReadForever(dataCh chan<- Data) error {
lines := make([]Data, 0, c.limit)
file, err := os.Open(c.Path)
func (r *csvReaderImpl) getOrCreateReader(key string) *fileReader {
r.mutex.Lock()
defer r.mutex.Unlock()

if reader, ok := r.readers[key]; ok {
return reader
}

data, err := r.readCsvFile(key)
if err != nil {
return err
return nil
}
defer func() {
_ = file.Close()
}()

reader := &fileReader{
data: data,
index: atomic.Int64{},
}
reader.index.Store(0)
r.readers[key] = reader
return reader
}

func (r *csvReaderImpl) readCsvFile(sourceFile string) ([]Data, error) {
file, err := os.Open(sourceFile)
if err != nil {
return nil, err
}
defer file.Close()

reader := csv.NewReader(file)
comma := []rune(c.Delimiter)
comma := []rune(r.config.Delimiter)
if len(comma) > 0 {
reader.Comma = comma[0]
}
if c.WithHeader {

if r.config.WithHeader {
_, err := reader.Read()
if err != nil {
return err
if err != nil && err != io.EOF {
return nil, err
}
}

var lines []Data
for {
row, err := reader.Read()
if err == io.EOF {
break
}
if err != nil {
return err
return nil, err
}

lines = append(lines, row)
if len(lines) == c.limit {
if r.config.Limit > 0 && len(lines) >= r.config.Limit {
break
}
}
return lines, nil
}

go func() {
index := 0
for {
if index == len(lines) {
index = 0
}
dataCh <- lines[index]
index++
}
}()
func (r *csvReaderImpl) Close() error {
r.mutex.Lock()
defer r.mutex.Unlock()
clear(r.readers)
r.readers = nil
return nil
}

type (
CSVWriter struct {
Path string
Header []string
Delimiter string
DataCh <-chan []string
}
)

func NewCsvWriter(path, delimiter string, header []string, dataCh <-chan []string) *CSVWriter {
return &CSVWriter{
Path: path,
Delimiter: delimiter,
Header: header,
DataCh: dataCh,
}
}

func (c *CSVWriter) WriteForever() error {
Expand Down
10 changes: 9 additions & 1 deletion pkg/common/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ type (
IGraphClient interface {
IClient
GetData() (Data, error)
GetFileData(sourceFile string) (Data, error)
Execute(stmt string) (IGraphResponse, error)
}

Expand All @@ -46,8 +47,15 @@ type (
SetOption(*GraphOption) error
}

CsvReaderConfig struct {
Delimiter string
WithHeader bool
Limit int
}

ICsvReader interface {
ReadForever(dataCh chan<- Data) error
GetData(sourceFile string) (Data, error)
Close() error
}

GraphOption struct {
Expand Down
51 changes: 20 additions & 31 deletions pkg/nebulagraph/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ const EnvRetryTimeoutUs = "NEBULA_RETRY_TIMEOUT_US"
type (
// GraphPool nebula connection pool
GraphPool struct {
DataCh chan common.Data
OutputCh chan []string
initialized bool
closed bool
Expand All @@ -37,7 +36,6 @@ type (
GraphClient struct {
Client *graph.Session
Pool *GraphPool
DataCh chan common.Data
logger logger
}

Expand All @@ -47,8 +45,6 @@ type (
ResponseTime int32
}

csvReaderStrategy int

output struct {
timeStamp int64
nGQL string
Expand All @@ -72,13 +68,6 @@ type (
var _ common.IGraphClient = &GraphClient{}
var _ common.IGraphClientPool = &GraphPool{}

const (
// AllInOne read csv sequentially
AllInOne csvReaderStrategy = iota
// Separate read csv concurrently
Separate
)

func formatOutput(o *output) []string {
return []string{
strconv.FormatInt(o.timeStamp, 10),
Expand Down Expand Up @@ -146,14 +135,12 @@ func (gp *GraphPool) Init() (common.IGraphClientPool, error) {
if gp.graphOption.CsvPath != "" {
gp.csvReader = common.NewCsvReader(
gp.graphOption.CsvPath,
gp.graphOption.CsvDelimiter,
gp.graphOption.CsvWithHeader,
gp.graphOption.CsvDataLimit,
common.CsvReaderConfig{
Delimiter: gp.graphOption.CsvDelimiter,
WithHeader: gp.graphOption.CsvWithHeader,
Limit: gp.graphOption.CsvDataLimit,
},
)
gp.DataCh = make(chan common.Data, gp.graphOption.CsvChannelSize)
if err := gp.csvReader.ReadForever(gp.DataCh); err != nil {
return nil, err
}
}
return gp, nil
}
Expand Down Expand Up @@ -258,11 +245,6 @@ func (gp *GraphPool) validate(address string) ([]graph.HostAddress, error) {
return hosts, nil
}

// Deprecated ConfigCsvStrategy sets csv reader strategy
func (gp *GraphPool) ConfigCsvStrategy(strategy int) {
return
}

// Close closes the nebula pool
func (gp *GraphPool) Close() error {
gp.mutex.Lock()
Expand All @@ -281,6 +263,9 @@ func (gp *GraphPool) Close() error {
if gp.sessPool != nil {
gp.sessPool.Close()
}
if gp.csvReader != nil {
gp.csvReader.Close()
}
gp.closed = true

return nil
Expand All @@ -302,11 +287,11 @@ func (gp *GraphPool) GetSession() (common.IGraphClient, error) {
if err != nil {
return nil, err
}
s := &GraphClient{Client: c, Pool: gp, DataCh: gp.DataCh, logger: gp.logger}
s := &GraphClient{Client: c, Pool: gp, logger: gp.logger}
gp.clients = append(gp.clients, s)
return s, nil
} else {
s := &GraphClient{Client: nil, Pool: gp, DataCh: gp.DataCh, logger: gp.logger}
s := &GraphClient{Client: nil, Pool: gp, logger: gp.logger}
return s, nil
}

Expand Down Expand Up @@ -335,14 +320,18 @@ func (gc *GraphClient) Close() error {
return nil
}

// GetData get data from csv reader
func (gc *GraphClient) GetData() (common.Data, error) {
if gc.DataCh != nil && len(gc.DataCh) != 0 {
if d, ok := <-gc.DataCh; ok {
return d, nil
}
if gc.Pool.csvReader != nil {
return gc.Pool.csvReader.GetData("")
}
return nil, fmt.Errorf("csv reader not initialized")
}

func (gc *GraphClient) GetFileData(sourceFile string) (common.Data, error) {
if gc.Pool.csvReader != nil {
return gc.Pool.csvReader.GetData(sourceFile)
}
return nil, fmt.Errorf("no Data at all")
return nil, fmt.Errorf("csv reader not initialized")
}

func (gc *GraphClient) executeRetry(stmt string) (*graph.ResultSet, error) {
Expand Down
Loading
Loading