@@ -127,23 +127,18 @@ func streamSnapshotForGroup(ctx context.Context, dc api.DgraphClient, pdir strin
127127	if  err  !=  nil  {
128128		return  fmt .Errorf ("failed to start external snapshot stream for group %d: %w" , groupId , err )
129129	}
130- 
131130	defer  func () {
132- 		if  _ , err  :=  out .CloseAndRecv (); err  !=  nil  {
133- 			glog .Errorf ("failed to close the stream for group [%v]: %v" , groupId , err )
134- 		}
135- 
136- 		glog .Infof ("[import] Group [%v]: Received ACK " , groupId )
131+ 		_  =  out .CloseSend ()
137132	}()
138133
139134	// Open the BadgerDB instance at the specified directory 
140135	opt  :=  badger .DefaultOptions (pdir )
136+ 	opt .ReadOnly  =  true 
141137	ps , err  :=  badger .OpenManaged (opt )
142138	if  err  !=  nil  {
143139		glog .Errorf ("failed to open BadgerDB at [%s]: %v" , pdir , err )
144140		return  fmt .Errorf ("failed to open BadgerDB at [%v]: %v" , pdir , err )
145141	}
146- 
147142	defer  func () {
148143		if  err  :=  ps .Close (); err  !=  nil  {
149144			glog .Warningf ("[import] Error closing BadgerDB: %v" , err )
@@ -154,17 +149,19 @@ func streamSnapshotForGroup(ctx context.Context, dc api.DgraphClient, pdir strin
154149	glog .Infof ("[import] Sending request for streaming external snapshot for group ID [%v]" , groupId )
155150	groupReq  :=  & api.StreamExtSnapshotRequest {GroupId : groupId }
156151	if  err  :=  out .Send (groupReq ); err  !=  nil  {
157- 		return  fmt .Errorf ("failed to send request for streaming external snapshot for group ID [%v] to the server: %w" ,
158- 			groupId , err )
152+ 		return  fmt .Errorf ("failed to send request for group ID [%v] to the server: %w" , groupId , err )
153+ 	}
154+ 	if  _ , err  :=  out .Recv (); err  !=  nil  {
155+ 		return  fmt .Errorf ("failed to receive response for group ID [%v] from the server: %w" , groupId , err )
159156	}
160157
158+ 	glog .Infof ("[import] Group [%v]: Received ACK for sending group request" , groupId )
159+ 
161160	// Configure and start the BadgerDB stream 
162161	glog .Infof ("[import] Starting BadgerDB stream for group [%v]" , groupId )
163- 
164162	if  err  :=  streamBadger (ctx , ps , out , groupId ); err  !=  nil  {
165163		return  fmt .Errorf ("badger streaming failed for group [%v]: %v" , groupId , err )
166164	}
167- 
168165	return  nil 
169166}
170167
@@ -180,6 +177,11 @@ func streamBadger(ctx context.Context, ps *badger.DB, out api.Dgraph_StreamExtSn
180177		if  err  :=  out .Send (& api.StreamExtSnapshotRequest {Pkt : p }); err  !=  nil  &&  ! errors .Is (err , io .EOF ) {
181178			return  fmt .Errorf ("failed to send data chunk: %w" , err )
182179		}
180+ 		if  _ , err  :=  out .Recv (); err  !=  nil  {
181+ 			return  fmt .Errorf ("failed to receive response for group ID [%v] from the server: %w" , groupId , err )
182+ 		}
183+ 		glog .Infof ("[import] Group [%v]: Received ACK for sending data chunk" , groupId )
184+ 
183185		return  nil 
184186	}
185187
@@ -196,5 +198,25 @@ func streamBadger(ctx context.Context, ps *badger.DB, out api.Dgraph_StreamExtSn
196198		return  fmt .Errorf ("failed to send 'done' signal for group [%d]: %w" , groupId , err )
197199	}
198200
201+ 	for  {
202+ 		if  ctx .Err () !=  nil  {
203+ 			return  ctx .Err ()
204+ 		}
205+ 		resp , err  :=  out .Recv ()
206+ 		if  errors .Is (err , io .EOF ) {
207+ 			return  fmt .Errorf ("server closed stream before Finish=true for group [%d]" , groupId )
208+ 		}
209+ 		if  err  !=  nil  {
210+ 			return  fmt .Errorf ("failed to receive final response for group ID [%v] from the server: %w" , groupId , err )
211+ 		}
212+ 		if  resp .Finish  {
213+ 			glog .Infof ("[import] Group [%v]: Received final Finish=true" , groupId )
214+ 			break 
215+ 		}
216+ 		glog .Infof ("[import] Group [%v]: Waiting for Finish=true, got interim ACK" , groupId )
217+ 	}
218+ 
219+ 	glog .Infof ("[import] Group [%v]: Received ACK for sending completion signal" , groupId )
220+ 
199221	return  nil 
200222}
0 commit comments