@@ -359,6 +359,7 @@ protected ISynchronizationPoint<IOException> canStartWriting() {
359
359
protected int writeSync (long pos , ByteBuffer buffer ) throws IOException {
360
360
Iterator <RangeLong > it = fragments .iterator ();
361
361
long p = 0 ;
362
+ int total = 0 ;
362
363
while (it .hasNext ()) {
363
364
RangeLong r = it .next ();
364
365
long s = r .max - r .min + 1 ;
@@ -376,9 +377,13 @@ protected int writeSync(long pos, ByteBuffer buffer) throws IOException {
376
377
} else {
377
378
len = ((IO .Writable .Seekable )io ).writeSync (r .min + start , buffer );
378
379
}
379
- return len ;
380
+ total += len ;
381
+ pos += len ;
382
+ p += s ;
383
+ if (!buffer .hasRemaining ())
384
+ return total ;
380
385
}
381
- return 0 ;
386
+ return total ;
382
387
}
383
388
384
389
protected AsyncWork <Integer , IOException > writeAsync (long pos , ByteBuffer buffer , RunnableWithParameter <Pair <Integer , IOException >> ondone ) {
@@ -391,26 +396,40 @@ protected AsyncWork<Integer, IOException> writeAsync(long pos, ByteBuffer buffer
391
396
p += s ;
392
397
continue ;
393
398
}
394
- long start = pos - p ;
395
- int len = buffer .remaining ();
396
- if (start + len > s ) {
397
- int prevLimit = buffer .limit ();
398
- buffer .limit ((int )(prevLimit - ((start + len ) - s )));
399
- return ((IO .Writable .Seekable )io ).writeAsync (r .min + start , buffer ,
400
- new RunnableWithParameter <Pair <Integer ,IOException >>() {
401
- @ Override
402
- public void run (Pair <Integer , IOException > param ) {
403
- buffer .limit (prevLimit );
404
- if (ondone != null ) ondone .run (param );
405
- }
406
- });
407
- }
408
- return operation (((IO .Writable .Seekable )io ).writeAsync (r .min + start , buffer , ondone ));
399
+ AsyncWork <Integer ,IOException > sp = new AsyncWork <>();
400
+ writeAsync (it , r , p , 0 , pos , buffer , ondone , sp );
401
+ return operation (sp );
409
402
}
410
403
AsyncWork <Integer ,IOException > sp = new AsyncWork <>();
411
404
if (ondone != null ) ondone .run (new Pair <>(Integer .valueOf (0 ), null ));
412
405
sp .unblockSuccess (Integer .valueOf (0 ));
413
406
return sp ;
414
407
}
415
408
409
+ protected void writeAsync (
410
+ Iterator <RangeLong > it , RangeLong r , long p , int done , long pos ,
411
+ ByteBuffer buffer , RunnableWithParameter <Pair <Integer , IOException >> ondone , AsyncWork <Integer ,IOException > sp
412
+ ) {
413
+ long start = pos - p ;
414
+ int len = buffer .remaining ();
415
+ long s = r .max - r .min + 1 ;
416
+ if (start + len > s ) {
417
+ int prevLimit = buffer .limit ();
418
+ buffer .limit ((int )(prevLimit - ((start + len ) - s )));
419
+ IOUtil .listenOnDone (((IO .Writable .Seekable )io ).writeAsync (r .min + start , buffer ), (nb ) -> {
420
+ buffer .limit (prevLimit );
421
+ int i = nb .intValue ();
422
+ if (!buffer .hasRemaining () || !it .hasNext ()) {
423
+ IOUtil .success (Integer .valueOf (i ), sp , ondone );
424
+ return ;
425
+ }
426
+ writeAsync (it , it .next (), p + s , done + i , pos + i , buffer , ondone , sp );
427
+ }, sp , ondone );
428
+ return ;
429
+ }
430
+ IOUtil .listenOnDone (((IO .Writable .Seekable )io ).writeAsync (r .min + start , buffer ), (nb ) -> {
431
+ IOUtil .success (Integer .valueOf (nb .intValue () + done ), sp , ondone );
432
+ }, sp , ondone );
433
+ }
434
+
416
435
}
0 commit comments