Skip to content

Commit 8df04d0

Browse files
authored
Add dgraph import command that imports data using bulk loader (#9443)
1 parent 104cbae commit 8df04d0

File tree

9 files changed

+222
-47
lines changed

9 files changed

+222
-47
lines changed

.trunk/trunk.yaml

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -29,20 +29,20 @@ lint:
2929
3030
3131
32-
32+
3333
3434
- git-diff-check
3535
3636
3737
3838
3939
40-
- prettier@3.5.3
41-
- renovate@40.57.1
40+
- prettier@3.6.0
41+
- renovate@41.2.0
4242
4343
4444
45-
45+
4646
4747
actions:
4848
enabled:

dgraph/cmd/bulk/loader.go

Lines changed: 84 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ import (
2828

2929
"github.com/dgraph-io/badger/v4"
3030
"github.com/dgraph-io/badger/v4/y"
31+
"github.com/dgraph-io/dgo/v250"
3132
"github.com/hypermodeinc/dgraph/v25/chunker"
3233
"github.com/hypermodeinc/dgraph/v25/enc"
3334
"github.com/hypermodeinc/dgraph/v25/filestore"
@@ -38,7 +39,7 @@ import (
3839
"github.com/hypermodeinc/dgraph/v25/xidmap"
3940
)
4041

41-
type options struct {
42+
type BulkOptions struct {
4243
DataFiles string
4344
DataFormat string
4445
SchemaFile string
@@ -55,6 +56,7 @@ type options struct {
5556
Version bool
5657
StoreXids bool
5758
ZeroAddr string
59+
ConnStr string
5860
HttpAddr string
5961
IgnoreErrors bool
6062
CustomTokenizers string
@@ -78,7 +80,7 @@ type options struct {
7880
}
7981

8082
type state struct {
81-
opt *options
83+
opt *BulkOptions
8284
prog *progress
8385
xids *xidmap.XidMap
8486
schema *schemaStore
@@ -95,39 +97,54 @@ type loader struct {
9597
*state
9698
mappers []*mapper
9799
zero *grpc.ClientConn
100+
dg *dgo.Dgraph
98101
}
99102

100-
func newLoader(opt *options) *loader {
103+
func newLoader(opt *BulkOptions) *loader {
101104
if opt == nil {
102105
log.Fatalf("Cannot create loader with nil options.")
103106
}
104107

105-
fmt.Printf("Connecting to zero at %s\n", opt.ZeroAddr)
108+
var zero *grpc.ClientConn
109+
if opt.ZeroAddr != "" {
110+
fmt.Printf("Connecting to zero at %s\n", opt.ZeroAddr)
111+
112+
tlsConf, err := x.LoadClientTLSConfigForInternalPort(Bulk.Conf)
113+
x.Check(err)
114+
dialOpts := []grpc.DialOption{}
115+
if tlsConf != nil {
116+
dialOpts = append(dialOpts, grpc.WithTransportCredentials(credentials.NewTLS(tlsConf)))
117+
} else {
118+
dialOpts = append(dialOpts, grpc.WithTransportCredentials(insecure.NewCredentials()))
119+
}
120+
zero, err = grpc.NewClient(opt.ZeroAddr, dialOpts...)
121+
x.Checkf(err, "Unable to connect to zero, Is it running at %s?", opt.ZeroAddr)
122+
}
123+
124+
var dg *dgo.Dgraph
125+
if opt.ConnStr != "" {
126+
fmt.Printf("Connecting to alpha at %s\n", opt.ConnStr)
127+
128+
var err error
129+
dg, err = dgo.Open(opt.ConnStr)
130+
x.Checkf(err, "Unable to connect to alpha, Is it running at %s?", opt.ConnStr)
131+
}
106132

107-
tlsConf, err := x.LoadClientTLSConfigForInternalPort(Bulk.Conf)
108-
x.Check(err)
109-
dialOpts := []grpc.DialOption{}
110-
if tlsConf != nil {
111-
dialOpts = append(dialOpts, grpc.WithTransportCredentials(credentials.NewTLS(tlsConf)))
112-
} else {
113-
dialOpts = append(dialOpts, grpc.WithTransportCredentials(insecure.NewCredentials()))
114-
}
115-
zero, err := grpc.NewClient(opt.ZeroAddr, dialOpts...)
116-
x.Checkf(err, "Unable to connect to zero, Is it running at %s?", opt.ZeroAddr)
117133
st := &state{
118134
opt: opt,
119135
prog: newProgress(),
120136
shards: newShardMap(opt.MapShards),
121137
// Lots of gz readers, so not much channel buffer needed.
122138
readerChunkCh: make(chan *bytes.Buffer, opt.NumGoroutines),
123-
writeTs: getWriteTimestamp(zero),
139+
writeTs: getWriteTimestamp(zero, dg),
124140
namespaces: &sync.Map{},
125141
}
126142
st.schema = newSchemaStore(readSchema(opt), opt, st)
127143
ld := &loader{
128144
state: st,
129145
mappers: make([]*mapper, opt.NumGoroutines),
130146
zero: zero,
147+
dg: dg,
131148
}
132149
for i := range opt.NumGoroutines {
133150
ld.mappers[i] = newMapper(st)
@@ -136,16 +153,29 @@ func newLoader(opt *options) *loader {
136153
return ld
137154
}
138155

139-
func getWriteTimestamp(zero *grpc.ClientConn) uint64 {
140-
client := pb.NewZeroClient(zero)
156+
func getWriteTimestamp(zero *grpc.ClientConn, dg *dgo.Dgraph) uint64 {
157+
if zero != nil {
158+
client := pb.NewZeroClient(zero)
159+
for {
160+
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
161+
ts, err := client.Timestamps(ctx, &pb.Num{Val: 1})
162+
cancel()
163+
if err == nil {
164+
return ts.GetStartId()
165+
}
166+
fmt.Printf("Error communicating with dgraph zero, retrying: %v", err)
167+
time.Sleep(time.Second)
168+
}
169+
}
170+
141171
for {
142172
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
143-
ts, err := client.Timestamps(ctx, &pb.Num{Val: 1})
173+
_, ts, err := dg.AllocateTimestamps(ctx, 1)
144174
cancel()
145175
if err == nil {
146-
return ts.GetStartId()
176+
return ts
147177
}
148-
fmt.Printf("Error communicating with dgraph zero, retrying: %v", err)
178+
fmt.Printf("Error communicating with dgraph alpha, retrying: %v", err)
149179
time.Sleep(time.Second)
150180
}
151181
}
@@ -166,21 +196,37 @@ func (ld *loader) leaseNamespaces() {
166196
return
167197
}
168198

169-
client := pb.NewZeroClient(ld.zero)
170-
for {
171-
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
172-
ns, err := client.AssignIds(ctx, &pb.Num{Val: maxNs, Type: pb.Num_NS_ID})
173-
cancel()
174-
if err == nil {
175-
fmt.Printf("Assigned namespaces till %d\n", ns.GetEndId())
176-
return
199+
if ld.zero != nil {
200+
client := pb.NewZeroClient(ld.zero)
201+
for {
202+
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
203+
ns, err := client.AssignIds(ctx, &pb.Num{Val: maxNs, Type: pb.Num_NS_ID})
204+
cancel()
205+
if err == nil {
206+
fmt.Printf("Assigned namespaces till %d\n", ns.GetEndId())
207+
return
208+
}
209+
fmt.Printf("Error communicating with dgraph zero, retrying: %v", err)
210+
time.Sleep(time.Second)
211+
}
212+
}
213+
214+
if ld.dg != nil {
215+
for {
216+
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
217+
_, end, err := ld.dg.AllocateTimestamps(ctx, maxNs)
218+
cancel()
219+
if err == nil {
220+
fmt.Printf("Assigned namespaces till %d\n", end)
221+
return
222+
}
223+
fmt.Printf("Error communicating with dgraph alpha, retrying: %v", err)
224+
time.Sleep(time.Second)
177225
}
178-
fmt.Printf("Error communicating with dgraph zero, retrying: %v", err)
179-
time.Sleep(time.Second)
180226
}
181227
}
182228

183-
func readSchema(opt *options) *schema.ParsedSchema {
229+
func readSchema(opt *BulkOptions) *schema.ParsedSchema {
184230
if opt.SchemaFile == "" {
185231
return genDQLSchema(opt)
186232
}
@@ -212,7 +258,7 @@ func readSchema(opt *options) *schema.ParsedSchema {
212258
return result
213259
}
214260

215-
func genDQLSchema(opt *options) *schema.ParsedSchema {
261+
func genDQLSchema(opt *BulkOptions) *schema.ParsedSchema {
216262
gqlSchBytes := readGqlSchema(opt)
217263
nsToSchemas := parseGqlSchema(string(gqlSchBytes))
218264

@@ -251,6 +297,7 @@ func (ld *loader) mapStage() {
251297
ld.xids = xidmap.New(xidmap.XidMapOptions{
252298
UidAssigner: ld.zero,
253299
DB: db,
300+
DgClient: ld.dg,
254301
Dir: filepath.Join(ld.opt.TmpDir, bufferDir),
255302
})
256303

@@ -348,7 +395,7 @@ func parseGqlSchema(s string) map[uint64]string {
348395
return schemaMap
349396
}
350397

351-
func readGqlSchema(opt *options) []byte {
398+
func readGqlSchema(opt *BulkOptions) []byte {
352399
f, err := filestore.Open(opt.GqlSchemaFile)
353400
x.Check(err)
354401
defer func() {
@@ -400,9 +447,11 @@ func (ld *loader) processGqlSchema(loadType chunker.InputFormat) {
400447
schema = strconv.Quote(schema)
401448
switch loadType {
402449
case chunker.RdfFormat:
403-
x.Check2(gqlBuf.Write([]byte(fmt.Sprintf(rdfSchema, ns, ns, schema, ns))))
450+
_, err := fmt.Fprintf(gqlBuf, rdfSchema, ns, ns, schema, ns)
451+
x.Check(err)
404452
case chunker.JsonFormat:
405-
x.Check2(gqlBuf.Write([]byte(fmt.Sprintf(jsonSchema, ns, schema))))
453+
_, err := fmt.Fprintf(gqlBuf, jsonSchema, ns, schema)
454+
x.Check(err)
406455
}
407456
ld.readerChunkCh <- gqlBuf
408457
}

dgraph/cmd/bulk/mapper.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ type shardState struct {
5252
mu sync.Mutex // Allow only 1 write per shard at a time.
5353
}
5454

55-
func newMapperBuffer(opt *options) *z.Buffer {
55+
func newMapperBuffer(opt *BulkOptions) *z.Buffer {
5656
sz := float64(opt.MapBufSize) * 1.1
5757
tmpDir := filepath.Join(opt.TmpDir, bufferDir)
5858
buf, err := z.NewBufferTmp(tmpDir, int(sz))

dgraph/cmd/bulk/merge_shards.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ const (
2121
bufferDir = "buffer"
2222
)
2323

24-
func mergeMapShardsIntoReduceShards(opt *options) {
24+
func mergeMapShardsIntoReduceShards(opt *BulkOptions) {
2525
if opt == nil {
2626
fmt.Printf("Nil options passed to merge shards phase.\n")
2727
os.Exit(1)

dgraph/cmd/bulk/run.go

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -133,7 +133,7 @@ func run() {
133133
keys, err := x.GetEncAclKeys(Bulk.Conf)
134134
x.Check(err)
135135

136-
opt := options{
136+
opt := BulkOptions{
137137
DataFiles: Bulk.Conf.GetString("files"),
138138
DataFormat: Bulk.Conf.GetString("format"),
139139
EncryptionKey: keys.EncKey,
@@ -169,6 +169,10 @@ func run() {
169169
os.Exit(0)
170170
}
171171

172+
RunBulkLoader(opt)
173+
}
174+
175+
func RunBulkLoader(opt BulkOptions) {
172176
if len(opt.EncryptionKey) == 0 {
173177
if opt.Encrypted || opt.EncryptedOut {
174178
fmt.Fprint(os.Stderr, "Must use --encryption or vault option(s).\n")
@@ -213,8 +217,8 @@ func run() {
213217
fmt.Fprint(os.Stderr, "RDF or JSON file(s) location must be specified.\n")
214218
os.Exit(1)
215219
} else {
216-
fileList := strings.Split(opt.DataFiles, ",")
217-
for _, file := range fileList {
220+
fileList := strings.SplitSeq(opt.DataFiles, ",")
221+
for file := range fileList {
218222
if !filestore.Exists(file) {
219223
fmt.Fprintf(os.Stderr, "Data path(%v) does not exist.\n", file)
220224
os.Exit(1)

dgraph/cmd/bulk/schema.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ type schemaStore struct {
2828
*state
2929
}
3030

31-
func newSchemaStore(initial *schema.ParsedSchema, opt *options, state *state) *schemaStore {
31+
func newSchemaStore(initial *schema.ParsedSchema, opt *BulkOptions, state *state) *schemaStore {
3232
if opt == nil {
3333
log.Fatalf("Cannot create schema store with nil options.")
3434
}

0 commit comments

Comments
 (0)