2222import java .io .InputStream ;
2323import java .io .InterruptedIOException ;
2424import java .io .OutputStream ;
25+ import java .lang .ref .Cleaner ;
2526import java .nio .file .FileAlreadyExistsException ;
2627import java .nio .file .OpenOption ;
2728import java .util .Calendar ;
3031import org .apache .commons .net .ProtocolCommandListener ;
3132import org .apache .commons .net .ftp .FTPClient ;
3233import org .apache .commons .net .ftp .FTPFile ;
34+ import com .github .robtimus .filesystems .CleanerSupport ;
3335import com .github .robtimus .pool .LogLevel ;
3436import com .github .robtimus .pool .Pool ;
3537import com .github .robtimus .pool .PoolConfig ;
4345 */
4446final class FTPClientPool {
4547
48+ private static final Cleaner CLEANER = Cleaner .create ();
49+
4650 private final String hostname ;
4751 private final int port ;
4852
@@ -232,30 +236,46 @@ private void applyTransferOptions(TransferOptions options) throws IOException {
232236 InputStream newInputStream (FTPPath path , OpenOptions options ) throws IOException {
233237 assert options .read ;
234238
239+ boolean deleteOnClose = options .deleteOnClose ;
240+
235241 applyTransferOptions (options );
236242
237243 InputStream in = ftpClient .retrieveFileStream (path .path ());
238244 if (in == null ) {
239245 throw exceptionFactory .createNewInputStreamException (path .path (), ftpClient .getReplyCode (), ftpClient .getReplyString ());
240246 }
241- in = new FTPInputStream (path , in , options .deleteOnClose );
242- addReference (in );
243- return in ;
247+
248+ // The reference will be closed when the cleanable is invoked
249+ Reference <IOException > reference = addReference ();
250+ CleanerSupport .CleanAction cleanAction = () -> close (in , reference , path , deleteOnClose );
251+
252+ InputStream result = new FTPInputStream (in , cleanAction );
253+
254+ logEvent (() -> FTPMessages .log .createdInputStream (path .path ()));
255+
256+ return result ;
257+ }
258+
259+ private void close (InputStream in , Reference <IOException > reference , FTPPath path , boolean deleteOnClose ) throws IOException { // NOSONAR
260+ // Always finalize the stream, to prevent pool starvation
261+ Closeable finalizer = this ::finalizeStream ;
262+ try (reference ; finalizer ; in ) {
263+ // This block will close in first, finalize the stream second, close reference third, and always perform all three actions
264+ }
265+ if (deleteOnClose ) {
266+ delete (path , false );
267+ }
268+ logEvent (() -> FTPMessages .log .closedInputStream (path .path ()));
244269 }
245270
246271 private final class FTPInputStream extends InputStream {
247272
248- private final FTPPath path ;
249273 private final InputStream in ;
250- private final boolean deleteOnClose ;
274+ private final Cleaner . Cleanable cleanable ;
251275
252- private boolean open = true ;
253-
254- private FTPInputStream (FTPPath path , InputStream in , boolean deleteOnClose ) {
255- this .path = path ;
276+ private FTPInputStream (InputStream in , CleanerSupport .CleanAction cleanAction ) {
256277 this .in = in ;
257- this .deleteOnClose = deleteOnClose ;
258- logEvent (() -> FTPMessages .log .createdInputStream (path .path ()));
278+ this .cleanable = CleanerSupport .register (CLEANER , this , cleanAction );
259279 }
260280
261281 @ Override
@@ -285,20 +305,7 @@ public int available() throws IOException {
285305
286306 @ Override
287307 public void close () throws IOException {
288- if (open ) {
289- try {
290- in .close ();
291- } finally {
292- // always finalize the stream, to prevent pool starvation
293- // set open to false as well, to prevent finalizing the stream twice
294- open = false ;
295- finalizeStream (this );
296- }
297- if (deleteOnClose ) {
298- delete (path , false );
299- }
300- logEvent (() -> FTPMessages .log .closedInputStream (path .path ()));
301- }
308+ CleanerSupport .clean (cleanable );
302309 }
303310
304311 @ Override
@@ -321,6 +328,8 @@ public boolean markSupported() {
321328 OutputStream newOutputStream (FTPPath path , OpenOptions options ) throws IOException {
322329 assert options .write ;
323330
331+ boolean deleteOnClose = options .deleteOnClose ;
332+
324333 applyTransferOptions (options );
325334
326335 OutputStream out = options .append
@@ -330,24 +339,38 @@ OutputStream newOutputStream(FTPPath path, OpenOptions options) throws IOExcepti
330339 throw exceptionFactory .createNewOutputStreamException (path .path (), ftpClient .getReplyCode (), ftpClient .getReplyString (),
331340 options .options );
332341 }
333- out = new FTPOutputStream (path , out , options .deleteOnClose );
334- addReference (out );
335- return out ;
342+
343+ // The reference will be closed when the cleanable is invoked
344+ Reference <IOException > reference = addReference ();
345+ CleanerSupport .CleanAction cleanAction = () -> close (out , reference , path , deleteOnClose );
346+
347+ OutputStream result = new FTPOutputStream (out , cleanAction );
348+
349+ logEvent (() -> FTPMessages .log .createdInputStream (path .path ()));
350+
351+ return result ;
352+ }
353+
354+ private void close (OutputStream out , Reference <IOException > reference , FTPPath path , boolean deleteOnClose ) throws IOException { // NOSONAR
355+ // Always finalize the stream, to prevent pool starvation
356+ Closeable finalizer = this ::finalizeStream ;
357+ try (reference ; finalizer ; out ) {
358+ // This block will close in first, finalize the stream second, close reference third, and always perform all three actions
359+ }
360+ if (deleteOnClose ) {
361+ delete (path , false );
362+ }
363+ logEvent (() -> FTPMessages .log .closedOutputStream (path .path ()));
336364 }
337365
338366 private final class FTPOutputStream extends OutputStream {
339367
340- private final FTPPath path ;
341368 private final OutputStream out ;
342- private final boolean deleteOnClose ;
343-
344- private boolean open = true ;
369+ private final Cleaner .Cleanable cleanable ;
345370
346- private FTPOutputStream (FTPPath path , OutputStream out , boolean deleteOnClose ) {
347- this .path = path ;
371+ private FTPOutputStream (OutputStream out , CleanerSupport .CleanAction cleanAction ) {
348372 this .out = out ;
349- this .deleteOnClose = deleteOnClose ;
350- logEvent (() -> FTPMessages .log .createdOutputStream (path .path ()));
373+ this .cleanable = CleanerSupport .register (CLEANER , this , cleanAction );
351374 }
352375
353376 @ Override
@@ -372,30 +395,13 @@ public void flush() throws IOException {
372395
373396 @ Override
374397 public void close () throws IOException {
375- if (open ) {
376- try {
377- out .close ();
378- } finally {
379- // always finalize the stream, to prevent pool starvation
380- // set open to false as well, to prevent finalizing the stream twice
381- open = false ;
382- finalizeStream (this );
383- }
384- if (deleteOnClose ) {
385- delete (path , false );
386- }
387- logEvent (() -> FTPMessages .log .closedOutputStream (path .path ()));
388- }
398+ CleanerSupport .clean (cleanable );
389399 }
390400 }
391401
392- private void finalizeStream (Object stream ) throws IOException {
393- try {
394- if (!ftpClient .completePendingCommand ()) {
395- throw new FTPFileSystemException (ftpClient .getReplyCode (), ftpClient .getReplyString ());
396- }
397- } finally {
398- removeReference (stream );
402+ private void finalizeStream () throws IOException {
403+ if (!ftpClient .completePendingCommand ()) {
404+ throw new FTPFileSystemException (ftpClient .getReplyCode (), ftpClient .getReplyString ());
399405 }
400406 }
401407
0 commit comments