@@ -16,22 +16,22 @@ import (
1616
1717 "github.com/dgraph-io/badger/v4"
1818 "github.com/dgraph-io/dgo/v250"
19- apiv2 "github.com/dgraph-io/dgo/v250/protos/api.v2 "
19+ "github.com/dgraph-io/dgo/v250/protos/api"
2020 "github.com/dgraph-io/ristretto/v2/z"
2121
2222 "github.com/golang/glog"
2323 "golang.org/x/sync/errgroup"
2424)
2525
2626// newClient creates a new import client with the specified endpoint and gRPC options.
27- func newClient (connectionString string ) (apiv2 .DgraphClient , error ) {
27+ func newClient (connectionString string ) (api .DgraphClient , error ) {
2828 dg , err := dgo .Open (connectionString )
2929 if err != nil {
3030 return nil , fmt .Errorf ("failed to connect to endpoint [%s]: %w" , connectionString , err )
3131 }
3232
3333 glog .Infof ("[import] Successfully connected to Dgraph endpoint: %s" , connectionString )
34- return dg .GetAPIv2Client ()[0 ], nil
34+ return dg .GetAPIClients ()[0 ], nil
3535}
3636
3737func Import (ctx context.Context , connectionString string , bulkOutDir string ) error {
@@ -48,9 +48,9 @@ func Import(ctx context.Context, connectionString string, bulkOutDir string) err
4848}
4949
5050// initiateSnapshotStream initiates a snapshot stream session with the Dgraph server.
51- func initiateSnapshotStream (ctx context.Context , dc apiv2 .DgraphClient ) (* apiv2 .UpdateExtSnapshotStreamingStateResponse , error ) {
51+ func initiateSnapshotStream (ctx context.Context , dc api .DgraphClient ) (* api .UpdateExtSnapshotStreamingStateResponse , error ) {
5252 glog .Info ("[import] Initiating external snapshot stream" )
53- req := & apiv2 .UpdateExtSnapshotStreamingStateRequest {
53+ req := & api .UpdateExtSnapshotStreamingStateRequest {
5454 Start : true ,
5555 }
5656 resp , err := dc .UpdateExtSnapshotStreamingState (ctx , req )
@@ -65,7 +65,7 @@ func initiateSnapshotStream(ctx context.Context, dc apiv2.DgraphClient) (*apiv2.
6565// streamSnapshot takes a p directory and a set of group IDs and streams the data from the
6666// p directory to the corresponding group IDs. It first scans the provided directory for
6767// subdirectories named with numeric group IDs.
68- func streamSnapshot (ctx context.Context , dc apiv2 .DgraphClient , baseDir string , groups []uint32 ) error {
68+ func streamSnapshot (ctx context.Context , dc api .DgraphClient , baseDir string , groups []uint32 ) error {
6969 glog .Infof ("[import] Starting to stream snapshot from directory: %s" , baseDir )
7070
7171 errG , errGrpCtx := errgroup .WithContext (ctx )
@@ -90,7 +90,7 @@ func streamSnapshot(ctx context.Context, dc apiv2.DgraphClient, baseDir string,
9090 // If errors occurs during streaming of the external snapshot, we drop all the data and
9191 // go back to ensure a clean slate and the cluster remains in working state.
9292 glog .Info ("[import] dropping all the data and going back to clean slate" )
93- req := & apiv2 .UpdateExtSnapshotStreamingStateRequest {
93+ req := & api .UpdateExtSnapshotStreamingStateRequest {
9494 Start : false ,
9595 Finish : true ,
9696 DropData : true ,
@@ -104,7 +104,7 @@ func streamSnapshot(ctx context.Context, dc apiv2.DgraphClient, baseDir string,
104104 }
105105
106106 glog .Info ("[import] Completed streaming external snapshot" )
107- req := & apiv2 .UpdateExtSnapshotStreamingStateRequest {
107+ req := & api .UpdateExtSnapshotStreamingStateRequest {
108108 Start : false ,
109109 Finish : true ,
110110 DropData : false ,
@@ -119,7 +119,7 @@ func streamSnapshot(ctx context.Context, dc apiv2.DgraphClient, baseDir string,
119119
120120// streamSnapshotForGroup handles the actual data streaming process for a single group.
121121// It opens the BadgerDB at the specified directory and streams all data to the server.
122- func streamSnapshotForGroup (ctx context.Context , dc apiv2 .DgraphClient , pdir string , groupId uint32 ) error {
122+ func streamSnapshotForGroup (ctx context.Context , dc api .DgraphClient , pdir string , groupId uint32 ) error {
123123 glog .Infof ("Opening stream for group %d from directory %s" , groupId , pdir )
124124
125125 // Initialize stream with the server
@@ -152,7 +152,7 @@ func streamSnapshotForGroup(ctx context.Context, dc apiv2.DgraphClient, pdir str
152152
153153 // Send group ID as the first message in the stream
154154 glog .Infof ("[import] Sending request for streaming external snapshot for group ID [%v]" , groupId )
155- groupReq := & apiv2 .StreamExtSnapshotRequest {GroupId : groupId }
155+ groupReq := & api .StreamExtSnapshotRequest {GroupId : groupId }
156156 if err := out .Send (groupReq ); err != nil {
157157 return fmt .Errorf ("failed to send request for streaming external snapshot for group ID [%v] to the server: %w" ,
158158 groupId , err )
@@ -171,13 +171,13 @@ func streamSnapshotForGroup(ctx context.Context, dc apiv2.DgraphClient, pdir str
171171// streamBadger runs a BadgerDB stream to send key-value pairs to the specified group.
172172// It creates a new stream at the maximum sequence number and sends the data to the specified group.
173173// It also sends a final 'done' signal to mark completion.
174- func streamBadger (ctx context.Context , ps * badger.DB , out apiv2 .Dgraph_StreamExtSnapshotClient , groupId uint32 ) error {
174+ func streamBadger (ctx context.Context , ps * badger.DB , out api .Dgraph_StreamExtSnapshotClient , groupId uint32 ) error {
175175 stream := ps .NewStreamAt (math .MaxUint64 )
176176 stream .LogPrefix = "[import] Sending external snapshot to group [" + fmt .Sprintf ("%d" , groupId ) + "]"
177177 stream .KeyToList = nil
178178 stream .Send = func (buf * z.Buffer ) error {
179- p := & apiv2 .StreamPacket {Data : buf .Bytes ()}
180- if err := out .Send (& apiv2 .StreamExtSnapshotRequest {Pkt : p }); err != nil && ! errors .Is (err , io .EOF ) {
179+ p := & api .StreamPacket {Data : buf .Bytes ()}
180+ if err := out .Send (& api .StreamExtSnapshotRequest {Pkt : p }); err != nil && ! errors .Is (err , io .EOF ) {
181181 return fmt .Errorf ("failed to send data chunk: %w" , err )
182182 }
183183 return nil
@@ -190,9 +190,9 @@ func streamBadger(ctx context.Context, ps *badger.DB, out apiv2.Dgraph_StreamExt
190190
191191 // Send the final 'done' signal to mark completion
192192 glog .Infof ("[import] Sending completion signal for group [%d]" , groupId )
193- done := & apiv2 .StreamPacket {Done : true }
193+ done := & api .StreamPacket {Done : true }
194194
195- if err := out .Send (& apiv2 .StreamExtSnapshotRequest {Pkt : done }); err != nil && ! errors .Is (err , io .EOF ) {
195+ if err := out .Send (& api .StreamExtSnapshotRequest {Pkt : done }); err != nil && ! errors .Is (err , io .EOF ) {
196196 return fmt .Errorf ("failed to send 'done' signal for group [%d]: %w" , groupId , err )
197197 }
198198
0 commit comments