@@ -224,6 +224,80 @@ static struct dlm_rsb *find_rsb_root(struct dlm_ls *ls, const char *name,
224
224
return NULL ;
225
225
}
226
226
227
+ struct dlm_dir_dump {
228
+ /* init values to match if whole
229
+ * dump fits to one seq. Sanity check only.
230
+ */
231
+ uint64_t seq_init ;
232
+ uint64_t nodeid_init ;
233
+ /* compare local pointer with last lookup,
234
+ * just a sanity check.
235
+ */
236
+ struct list_head * last ;
237
+
238
+ unsigned int sent_res ; /* for log info */
239
+ unsigned int sent_msg ; /* for log info */
240
+
241
+ struct list_head list ;
242
+ };
243
+
244
+ static void drop_dir_ctx (struct dlm_ls * ls , int nodeid )
245
+ {
246
+ struct dlm_dir_dump * dd , * safe ;
247
+
248
+ write_lock (& ls -> ls_dir_dump_lock );
249
+ list_for_each_entry_safe (dd , safe , & ls -> ls_dir_dump_list , list ) {
250
+ if (dd -> nodeid_init == nodeid ) {
251
+ log_error (ls , "drop dump seq %llu" ,
252
+ (unsigned long long )dd -> seq_init );
253
+ list_del (& dd -> list );
254
+ kfree (dd );
255
+ }
256
+ }
257
+ write_unlock (& ls -> ls_dir_dump_lock );
258
+ }
259
+
260
+ static struct dlm_dir_dump * lookup_dir_dump (struct dlm_ls * ls , int nodeid )
261
+ {
262
+ struct dlm_dir_dump * iter , * dd = NULL ;
263
+
264
+ read_lock (& ls -> ls_dir_dump_lock );
265
+ list_for_each_entry (iter , & ls -> ls_dir_dump_list , list ) {
266
+ if (iter -> nodeid_init == nodeid ) {
267
+ dd = iter ;
268
+ break ;
269
+ }
270
+ }
271
+ read_unlock (& ls -> ls_dir_dump_lock );
272
+
273
+ return dd ;
274
+ }
275
+
276
+ static struct dlm_dir_dump * init_dir_dump (struct dlm_ls * ls , int nodeid )
277
+ {
278
+ struct dlm_dir_dump * dd ;
279
+
280
+ dd = lookup_dir_dump (ls , nodeid );
281
+ if (dd ) {
282
+ log_error (ls , "found ongoing dir dump for node %d, will drop it" ,
283
+ nodeid );
284
+ drop_dir_ctx (ls , nodeid );
285
+ }
286
+
287
+ dd = kzalloc (sizeof (* dd ), GFP_ATOMIC );
288
+ if (!dd )
289
+ return NULL ;
290
+
291
+ dd -> seq_init = ls -> ls_recover_seq ;
292
+ dd -> nodeid_init = nodeid ;
293
+
294
+ write_lock (& ls -> ls_dir_dump_lock );
295
+ list_add (& dd -> list , & ls -> ls_dir_dump_list );
296
+ write_unlock (& ls -> ls_dir_dump_lock );
297
+
298
+ return dd ;
299
+ }
300
+
227
301
/* Find the rsb where we left off (or start again), then send rsb names
228
302
for rsb's we're master of and whose directory node matches the requesting
229
303
node. inbuf is the rsb name last sent, inlen is the name's length */
@@ -234,20 +308,46 @@ void dlm_copy_master_names(struct dlm_ls *ls, const char *inbuf, int inlen,
234
308
struct list_head * list ;
235
309
struct dlm_rsb * r ;
236
310
int offset = 0 , dir_nodeid ;
311
+ struct dlm_dir_dump * dd ;
237
312
__be16 be_namelen ;
238
313
239
314
read_lock (& ls -> ls_masters_lock );
240
315
241
316
if (inlen > 1 ) {
317
+ dd = lookup_dir_dump (ls , nodeid );
318
+ if (!dd ) {
319
+ log_error (ls , "failed to lookup dir dump context nodeid: %d" ,
320
+ nodeid );
321
+ goto out ;
322
+ }
323
+
324
+ /* next chunk in dump */
242
325
r = find_rsb_root (ls , inbuf , inlen );
243
326
if (!r ) {
244
327
log_error (ls , "copy_master_names from %d start %d %.*s" ,
245
328
nodeid , inlen , inlen , inbuf );
246
329
goto out ;
247
330
}
248
331
list = r -> res_masters_list .next ;
332
+
333
+ /* sanity checks */
334
+ if (dd -> last != & r -> res_masters_list ||
335
+ dd -> seq_init != ls -> ls_recover_seq ) {
336
+ log_error (ls , "failed dir dump sanity check seq_init: %llu seq: %llu" ,
337
+ (unsigned long long )dd -> seq_init ,
338
+ (unsigned long long )ls -> ls_recover_seq );
339
+ goto out ;
340
+ }
249
341
} else {
342
+ dd = init_dir_dump (ls , nodeid );
343
+ if (!dd ) {
344
+ log_error (ls , "failed to allocate dir dump context" );
345
+ goto out ;
346
+ }
347
+
348
+ /* start dump */
250
349
list = ls -> ls_masters_list .next ;
350
+ dd -> last = list ;
251
351
}
252
352
253
353
for (offset = 0 ; list != & ls -> ls_masters_list ; list = list -> next ) {
@@ -269,7 +369,7 @@ void dlm_copy_master_names(struct dlm_ls *ls, const char *inbuf, int inlen,
269
369
be_namelen = cpu_to_be16 (0 );
270
370
memcpy (outbuf + offset , & be_namelen , sizeof (__be16 ));
271
371
offset += sizeof (__be16 );
272
- ls -> ls_recover_dir_sent_msg ++ ;
372
+ dd -> sent_msg ++ ;
273
373
goto out ;
274
374
}
275
375
@@ -278,7 +378,8 @@ void dlm_copy_master_names(struct dlm_ls *ls, const char *inbuf, int inlen,
278
378
offset += sizeof (__be16 );
279
379
memcpy (outbuf + offset , r -> res_name , r -> res_length );
280
380
offset += r -> res_length ;
281
- ls -> ls_recover_dir_sent_res ++ ;
381
+ dd -> sent_res ++ ;
382
+ dd -> last = list ;
282
383
}
283
384
284
385
/*
@@ -288,10 +389,18 @@ void dlm_copy_master_names(struct dlm_ls *ls, const char *inbuf, int inlen,
288
389
289
390
if ((list == & ls -> ls_masters_list ) &&
290
391
(offset + sizeof (uint16_t ) <= outlen )) {
392
+ /* end dump */
291
393
be_namelen = cpu_to_be16 (0xFFFF );
292
394
memcpy (outbuf + offset , & be_namelen , sizeof (__be16 ));
293
395
offset += sizeof (__be16 );
294
- ls -> ls_recover_dir_sent_msg ++ ;
396
+ dd -> sent_msg ++ ;
397
+ log_rinfo (ls , "dlm_recover_directory nodeid %d sent %u res out %u messages" ,
398
+ nodeid , dd -> sent_res , dd -> sent_msg );
399
+
400
+ write_lock (& ls -> ls_dir_dump_lock );
401
+ list_del_init (& dd -> list );
402
+ write_unlock (& ls -> ls_dir_dump_lock );
403
+ kfree (dd );
295
404
}
296
405
out :
297
406
read_unlock (& ls -> ls_masters_lock );
0 commit comments