11/*
2- * Copyright (C) 2022 Edgar Asatryan
2+ * Copyright (C) 2025 Edgar Asatryan
33 *
44 * Licensed under the Apache License, Version 2.0 (the "License");
55 * you may not use this file except in compliance with the License.
1616
1717package io .github .nstdio .http .ext ;
1818
19+ import static io .github .nstdio .http .ext .IOUtils .closeQuietly ;
20+ import static java .nio .file .StandardOpenOption .TRUNCATE_EXISTING ;
21+ import static java .nio .file .StandardOpenOption .WRITE ;
22+
1923import java .io .IOException ;
2024import java .net .http .HttpResponse ;
2125import java .nio .ByteBuffer ;
2630import java .util .concurrent .CompletableFuture ;
2731import java .util .concurrent .CompletionStage ;
2832import java .util .concurrent .Flow ;
29-
30- import static io .github .nstdio .http .ext .IOUtils .closeQuietly ;
31- import static java .nio .file .StandardOpenOption .TRUNCATE_EXISTING ;
32- import static java .nio .file .StandardOpenOption .WRITE ;
33+ import java .util .concurrent .locks .Lock ;
34+ import java .util .concurrent .locks .ReentrantLock ;
3335
3436class PathSubscriber implements HttpResponse .BodySubscriber <Path > {
3537 private final StreamFactory streamFactory ;
3638 private final Path path ;
3739 private final CompletableFuture <Path > future = new CompletableFuture <>();
40+ private final Lock lock = new ReentrantLock ();
3841 private WritableByteChannel out ;
3942
4043 PathSubscriber (Path path ) {
@@ -56,15 +59,20 @@ public void onSubscribe(Flow.Subscription subscription) {
5659 createChannel ();
5760 }
5861
59- private synchronized void createChannel () {
60- if (out != null ) {
61- return ;
62- }
63-
62+ private void createChannel () {
63+ lock .lock ();
6464 try {
65- out = streamFactory .writable (path , WRITE , TRUNCATE_EXISTING );
66- } catch (IOException e ) {
67- future .completeExceptionally (e );
65+ if (out != null ) {
66+ return ;
67+ }
68+
69+ try {
70+ out = streamFactory .writable (path , WRITE , TRUNCATE_EXISTING );
71+ } catch (IOException e ) {
72+ future .completeExceptionally (e );
73+ }
74+ } finally {
75+ lock .unlock ();
6876 }
6977 }
7078
@@ -87,9 +95,14 @@ private void write(List<ByteBuffer> item) throws IOException {
8795 }
8896 }
8997
90- private synchronized void close () {
91- closeQuietly (out );
92- out = null ;
98+ private void close () {
99+ lock .lock ();
100+ try {
101+ closeQuietly (out );
102+ out = null ;
103+ } finally {
104+ lock .unlock ();
105+ }
93106 }
94107
95108 @ Override
0 commit comments