16
16
17
17
package org .springframework .boot .buildpack .platform .socket ;
18
18
19
- import java .io .FileNotFoundException ;
20
19
import java .io .IOException ;
21
20
import java .io .InputStream ;
22
21
import java .io .OutputStream ;
23
- import java .io .RandomAccessFile ;
24
22
import java .net .Socket ;
23
+ import java .nio .ByteBuffer ;
24
+ import java .nio .channels .AsynchronousByteChannel ;
25
+ import java .nio .channels .AsynchronousCloseException ;
26
+ import java .nio .channels .AsynchronousFileChannel ;
27
+ import java .nio .channels .Channels ;
28
+ import java .nio .channels .CompletionHandler ;
29
+ import java .nio .file .NoSuchFileException ;
30
+ import java .nio .file .Paths ;
31
+ import java .nio .file .StandardOpenOption ;
32
+ import java .util .concurrent .CompletableFuture ;
33
+ import java .util .concurrent .Future ;
25
34
import java .util .concurrent .TimeUnit ;
26
35
import java .util .function .Consumer ;
27
36
32
41
* A {@link Socket} implementation for named pipes.
33
42
*
34
43
* @author Phillip Webb
44
+ * @author Scott Frederick
35
45
* @since 2.3.0
36
46
*/
37
47
public class NamedPipeSocket extends Socket {
@@ -40,27 +50,22 @@ public class NamedPipeSocket extends Socket {
40
50
41
51
private static final long TIMEOUT = TimeUnit .MILLISECONDS .toNanos (1000 );
42
52
43
- private final RandomAccessFile file ;
44
-
45
- private final InputStream inputStream ;
46
-
47
- private final OutputStream outputStream ;
53
+ private final AsynchronousFileByteChannel channel ;
48
54
49
55
NamedPipeSocket (String path ) throws IOException {
50
- this .file = open (path );
51
- this .inputStream = new NamedPipeInputStream ();
52
- this .outputStream = new NamedPipeOutputStream ();
56
+ this .channel = open (path );
53
57
}
54
58
55
- private static RandomAccessFile open (String path ) throws IOException {
59
+ private AsynchronousFileByteChannel open (String path ) throws IOException {
56
60
Consumer <String > awaiter = Platform .isWindows () ? new WindowsAwaiter () : new SleepAwaiter ();
57
61
long startTime = System .nanoTime ();
58
62
while (true ) {
59
63
try {
60
- return new RandomAccessFile (path , "rw" );
64
+ return new AsynchronousFileByteChannel (AsynchronousFileChannel .open (Paths .get (path ),
65
+ StandardOpenOption .READ , StandardOpenOption .WRITE ));
61
66
}
62
- catch (FileNotFoundException ex ) {
63
- if (System .nanoTime () - startTime > TIMEOUT ) {
67
+ catch (NoSuchFileException ex ) {
68
+ if (System .nanoTime () - startTime >= TIMEOUT ) {
64
69
throw ex ;
65
70
}
66
71
awaiter .accept (path );
@@ -70,21 +75,19 @@ private static RandomAccessFile open(String path) throws IOException {
70
75
71
76
@ Override
72
77
public InputStream getInputStream () {
73
- return this .inputStream ;
78
+ return Channels . newInputStream ( this .channel ) ;
74
79
}
75
80
76
81
@ Override
77
82
public OutputStream getOutputStream () {
78
- return this .outputStream ;
83
+ return Channels . newOutputStream ( this .channel ) ;
79
84
}
80
85
81
86
@ Override
82
87
public void close () throws IOException {
83
- this .file .close ();
84
- }
85
-
86
- protected final RandomAccessFile getFile () {
87
- return this .file ;
88
+ if (this .channel != null ) {
89
+ this .channel .close ();
90
+ }
88
91
}
89
92
90
93
/**
@@ -98,35 +101,81 @@ public static NamedPipeSocket get(String path) throws IOException {
98
101
}
99
102
100
103
/**
101
- * {@link InputStream} returned from the {@link NamedPipeSocket }.
104
+ * Adapt an {@code AsynchronousByteChannel} to an {@code AsynchronousFileChannel }.
102
105
*/
103
- private class NamedPipeInputStream extends InputStream {
106
+ private static class AsynchronousFileByteChannel implements AsynchronousByteChannel {
107
+
108
+ private final AsynchronousFileChannel fileChannel ;
109
+
110
+ AsynchronousFileByteChannel (AsynchronousFileChannel fileChannel ) {
111
+ this .fileChannel = fileChannel ;
112
+ }
104
113
105
114
@ Override
106
- public int read () throws IOException {
107
- return getFile ().read ();
115
+ public <A > void read (ByteBuffer dst , A attachment , CompletionHandler <Integer , ? super A > handler ) {
116
+ this .fileChannel .read (dst , 0 , attachment , new CompletionHandler <Integer , A >() {
117
+
118
+ @ Override
119
+ public void completed (Integer read , A attachment ) {
120
+ handler .completed ((read > 0 ) ? read : -1 , attachment );
121
+ }
122
+
123
+ @ Override
124
+ public void failed (Throwable exc , A attachment ) {
125
+ if (exc instanceof AsynchronousCloseException ) {
126
+ handler .completed (-1 , attachment );
127
+ return ;
128
+ }
129
+ handler .failed (exc , attachment );
130
+ }
131
+ });
132
+
108
133
}
109
134
110
135
@ Override
111
- public int read (byte [] bytes , int off , int len ) throws IOException {
112
- return getFile ().read (bytes , off , len );
136
+ public Future <Integer > read (ByteBuffer dst ) {
137
+ CompletableFutureHandler future = new CompletableFutureHandler ();
138
+ this .fileChannel .read (dst , 0 , null , future );
139
+ return future ;
113
140
}
114
141
115
- }
142
+ @ Override
143
+ public <A > void write (ByteBuffer src , A attachment , CompletionHandler <Integer , ? super A > handler ) {
144
+ this .fileChannel .write (src , 0 , attachment , handler );
145
+ }
116
146
117
- /**
118
- * {@link InputStream} returned from the {@link NamedPipeSocket}.
119
- */
120
- private class NamedPipeOutputStream extends OutputStream {
147
+ @ Override
148
+ public Future < Integer > write ( ByteBuffer src ) {
149
+ return this . fileChannel . write ( src , 0 );
150
+ }
121
151
122
152
@ Override
123
- public void write ( int value ) throws IOException {
124
- NamedPipeSocket . this .file . write ( value );
153
+ public void close ( ) throws IOException {
154
+ this .fileChannel . close ( );
125
155
}
126
156
127
157
@ Override
128
- public void write (byte [] bytes , int off , int len ) throws IOException {
129
- NamedPipeSocket .this .file .write (bytes , off , len );
158
+ public boolean isOpen () {
159
+ return this .fileChannel .isOpen ();
160
+ }
161
+
162
+ private static class CompletableFutureHandler extends CompletableFuture <Integer >
163
+ implements CompletionHandler <Integer , Object > {
164
+
165
+ @ Override
166
+ public void completed (Integer read , Object attachment ) {
167
+ complete ((read > 0 ) ? read : -1 );
168
+ }
169
+
170
+ @ Override
171
+ public void failed (Throwable exc , Object attachment ) {
172
+ if (exc instanceof AsynchronousCloseException ) {
173
+ complete (-1 );
174
+ return ;
175
+ }
176
+ completeExceptionally (exc );
177
+ }
178
+
130
179
}
131
180
132
181
}
0 commit comments