Skip to content

Commit 578ddbd

Browse files
committed
added input output stream support
1 parent d0ba935 commit 578ddbd

File tree

14 files changed

+275
-87
lines changed

14 files changed

+275
-87
lines changed

nbcode/notebooks/src/org/netbeans/modules/nbcode/java/notebook/CodeEval.java

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import org.netbeans.modules.java.lsp.server.notebook.CellExecutionResult;
3333
import org.netbeans.modules.java.lsp.server.notebook.NotebookCellExecutionProgressResultParams;
3434
import org.netbeans.modules.java.lsp.server.notebook.NotebookCellExecutionProgressResultParams.EXECUTION_STATUS;
35+
import org.netbeans.modules.java.lsp.server.protocol.NbCodeLanguageClient;
3536
import org.openide.util.RequestProcessor;
3637

3738
/**
@@ -86,10 +87,10 @@ private String interruptCodeExecution(String notebookId) {
8687
try {
8788
JShell jshell = NotebookSessionManager.getInstance().getSession(notebookId);
8889
String cellId = activeCellExecutionMapping.get(notebookId);
89-
if(cellId != null){
90+
if (cellId != null) {
9091
sendNotification(notebookId, cellId, EXECUTION_STATUS.INTERRUPTED);
9192
}
92-
93+
flushStreams(notebookId);
9394
List<CompletableFuture<Boolean>> tasks = pendingTasks.get(notebookId);
9495
if (tasks != null) {
9596
tasks.forEach(task -> {
@@ -158,7 +159,7 @@ private void codeEvalTaskRunnable(CompletableFuture<Boolean> future, JShell jshe
158159
}
159160

160161
runCode(jshell, sourceCode, notebookId);
161-
162+
flushStreams(notebookId);
162163
sendNotification(notebookId, EXECUTION_STATUS.SUCCESS);
163164

164165
future.complete(true);
@@ -240,6 +241,13 @@ private List<String> getSnippetValue(SnippetEvent event) {
240241
return snippetValues;
241242
}
242243

244+
private void flushStreams(String notebookId) {
245+
JshellStreamsHandler streamHandler = NotebookSessionManager.getInstance().getJshellStreamsHandler(notebookId);
246+
if (streamHandler != null) {
247+
streamHandler.flushOutputStreams();
248+
}
249+
}
250+
243251
// This method is directly taken from JShell tool implementation in jdk with some minor modifications
244252
private List<String> displayableDiagnostic(String source, Diag diag) {
245253
List<String> toDisplay = new ArrayList<>();
@@ -363,10 +371,12 @@ private void sendNotification(String notebookId, String cellId, byte[] msg, List
363371
}
364372
}
365373

366-
LanguageClientInstance.getInstance().getClient().notifyNotebookCellExecutionProgress(params);
374+
NbCodeLanguageClient client = LanguageClientInstance.getInstance().getClient();
375+
if (client != null) {
376+
client.notifyNotebookCellExecutionProgress(params);
377+
}
367378
} catch (Exception ex) {
368379
LOG.log(Level.SEVERE, "Some error ocurred while sending code eval notification to the client {0}", ex.getMessage());
369380
}
370381
}
371-
372382
}
Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
/*
2+
* Copyright (c) 2025, Oracle and/or its affiliates.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package org.netbeans.modules.nbcode.java.notebook;
17+
18+
import java.io.ByteArrayInputStream;
19+
import java.io.IOException;
20+
import java.io.InputStream;
21+
import java.lang.ref.WeakReference;
22+
import java.nio.charset.StandardCharsets;
23+
import java.util.concurrent.CompletableFuture;
24+
import java.util.concurrent.ExecutionException;
25+
import java.util.logging.Level;
26+
import java.util.logging.Logger;
27+
import org.netbeans.modules.java.lsp.server.input.ShowInputBoxParams;
28+
import org.netbeans.modules.java.lsp.server.protocol.NbCodeLanguageClient;
29+
30+
/**
31+
*
32+
* @author atalati
33+
*/
34+
public class CustomInputStream extends InputStream {
35+
36+
private static final Logger LOG = Logger.getLogger(CustomInputStream.class.getName());
37+
private ByteArrayInputStream currentStream;
38+
WeakReference<NbCodeLanguageClient> client;
39+
private static final String USER_PROMPT_REQUEST = "Please provide scanner input here";
40+
41+
public CustomInputStream(NbCodeLanguageClient client) {
42+
this.client = new WeakReference<>(client);
43+
}
44+
45+
@Override
46+
public synchronized int read(byte[] b, int off, int len) throws IOException {
47+
try {
48+
if (client == null || client.get() == null) {
49+
LOG.log(Level.WARNING, "client is null");
50+
return -1;
51+
}
52+
53+
if (currentStream == null || currentStream.available() == 0) {
54+
CompletableFuture<String> future = client.get().showInputBox(new ShowInputBoxParams(USER_PROMPT_REQUEST, "", true));
55+
String userInput = future.get();
56+
57+
if (userInput == null) {
58+
LOG.log(Level.WARNING, "User input is null");
59+
return -1;
60+
}
61+
62+
byte[] inputBytes = (userInput + System.lineSeparator()).getBytes(StandardCharsets.UTF_8);
63+
currentStream = new ByteArrayInputStream(inputBytes);
64+
}
65+
66+
return currentStream.read(b, off, len);
67+
} catch (InterruptedException ex) {
68+
Thread.currentThread().interrupt();
69+
throw new IOException("Interrupted while waiting for user input", ex);
70+
} catch (ExecutionException ex) {
71+
throw new IOException("Failed to get user input", ex.getCause());
72+
}
73+
}
74+
75+
@Override
76+
public int read() throws IOException {
77+
byte[] oneByte = new byte[1];
78+
int n = read(oneByte, 0, 1);
79+
return (n == -1) ? -1 : oneByte[0] & 0xFF;
80+
}
81+
}

nbcode/notebooks/src/org/netbeans/modules/nbcode/java/notebook/JshellStreamsHandler.java

Lines changed: 17 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
import java.io.ByteArrayOutputStream;
1919
import java.io.IOException;
20+
import java.io.InputStream;
2021
import java.io.PrintStream;
2122
import java.util.function.BiConsumer;
2223
import java.util.function.Consumer;
@@ -35,6 +36,7 @@ public class JshellStreamsHandler implements AutoCloseable {
3536
private final StreamingOutputStream errStream;
3637
private final PrintStream printOutStream;
3738
private final PrintStream printErrStream;
39+
private final InputStream inputStream;
3840

3941
public JshellStreamsHandler(String notebookId, BiConsumer<String, byte[]> streamCallback) {
4042
this(notebookId, streamCallback, streamCallback);
@@ -52,6 +54,7 @@ public JshellStreamsHandler(String notebookId,
5254
this.errStream = new StreamingOutputStream(createCallback(errStreamCallback));
5355
this.printOutStream = new PrintStream(outStream);
5456
this.printErrStream = new PrintStream(errStream);
57+
this.inputStream = new CustomInputStream(LanguageClientInstance.getInstance().getClient());
5558
}
5659

5760
private Consumer<byte[]> createCallback(BiConsumer<String, byte[]> callback) {
@@ -66,14 +69,6 @@ public void setErrStreamCallback(BiConsumer<String, byte[]> callback) {
6669
errStream.setCallback(createCallback(callback));
6770
}
6871

69-
public ByteArrayOutputStream getOutStream() {
70-
return outStream.getOutputStream();
71-
}
72-
73-
public ByteArrayOutputStream getErrStream() {
74-
return errStream.getOutputStream();
75-
}
76-
7772
public PrintStream getPrintOutStream() {
7873
return printOutStream;
7974
}
@@ -82,17 +77,31 @@ public PrintStream getPrintErrStream() {
8277
return printErrStream;
8378
}
8479

80+
public InputStream getInputStream() {
81+
return inputStream;
82+
}
83+
8584
public String getNotebookId() {
8685
return notebookId;
8786
}
8887

88+
public void flushOutputStreams() {
89+
try {
90+
outStream.flush();
91+
errStream.flush();
92+
} catch (IOException ignored) {
93+
// nothing can be done
94+
}
95+
}
96+
8997
@Override
9098
public void close() {
9199
try {
92100
printOutStream.close();
93101
printErrStream.close();
94102
outStream.close();
95103
errStream.close();
104+
inputStream.close();
96105
} catch (IOException ex) {
97106
LOG.log(Level.WARNING, "IOException occurred while closing the streams {0}", ex.getMessage());
98107
}

nbcode/notebooks/src/org/netbeans/modules/nbcode/java/notebook/LanguageClientInstance.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,14 +15,15 @@
1515
*/
1616
package org.netbeans.modules.nbcode.java.notebook;
1717

18+
import java.lang.ref.WeakReference;
1819
import org.netbeans.modules.java.lsp.server.protocol.NbCodeLanguageClient;
1920

2021
/**
2122
*
2223
* @author atalati
2324
*/
2425
public class LanguageClientInstance {
25-
private NbCodeLanguageClient client = null;
26+
private WeakReference<NbCodeLanguageClient> client = null;
2627

2728
private LanguageClientInstance() {
2829
}
@@ -37,10 +38,10 @@ private static class Singleton {
3738
}
3839

3940
public NbCodeLanguageClient getClient(){
40-
return this.client;
41+
return this.client == null ? null: this.client.get();
4142
}
4243

4344
public void setClient(NbCodeLanguageClient client){
44-
this.client = client;
45+
this.client = new WeakReference<>(client);
4546
}
4647
}

nbcode/notebooks/src/org/netbeans/modules/nbcode/java/notebook/NotebookDocumentServiceHandlerImpl.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,9 @@ public class NotebookDocumentServiceHandlerImpl implements NotebookDocumentServi
8585
public void didOpen(DidOpenNotebookDocumentParams params) {
8686
try {
8787
NbCodeLanguageClient client = LanguageClientInstance.getInstance().getClient();
88+
if(client == null){
89+
return;
90+
}
8891
client.showStatusBarMessage(new ShowStatusMessageParams(MessageType.Info,"Intializing Java kernel for notebook."));
8992
NotebookSessionManager.getInstance().createSession(params.getNotebookDocument()).whenComplete((JShell jshell,Throwable t) -> {
9093
if (t == null) {

nbcode/notebooks/src/org/netbeans/modules/nbcode/java/notebook/NotebookSessionManager.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,13 +71,15 @@ private CompletableFuture<JShell> jshellBuilder(JshellStreamsHandler streamsHand
7171
return JShell.builder()
7272
.out(streamsHandler.getPrintOutStream())
7373
.err(streamsHandler.getPrintErrStream())
74+
.in(streamsHandler.getInputStream())
7475
.compilerOptions()
7576
.remoteVMOptions()
7677
.build();
7778
} else {
7879
return JShell.builder()
7980
.out(streamsHandler.getPrintOutStream())
8081
.err(streamsHandler.getPrintErrStream())
82+
.in(streamsHandler.getInputStream())
8183
.compilerOptions(compilerOptions.toArray(new String[0]))
8284
.remoteVMOptions(remoteOptions.toArray(new String[0]))
8385
.build();

nbcode/notebooks/src/org/netbeans/modules/nbcode/java/notebook/StreamingOutputStream.java

Lines changed: 47 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -18,83 +18,97 @@
1818
import java.io.ByteArrayOutputStream;
1919
import java.io.IOException;
2020
import java.io.OutputStream;
21+
import java.util.concurrent.atomic.AtomicBoolean;
2122
import java.util.function.Consumer;
23+
import org.openide.util.RequestProcessor;
24+
import org.openide.util.RequestProcessor.Task;
2225

2326
/**
2427
*
2528
* @author atalati
2629
*/
2730
public class StreamingOutputStream extends OutputStream {
31+
2832
private final ByteArrayOutputStream buffer = new ByteArrayOutputStream();
2933
private Consumer<byte[]> callback;
30-
// What if line endings are not there then how to flush Systen.out.print()?
31-
private static final char NEW_LINE_ENDING = '\n';
32-
private static final char NEW_LINE_ENDING_DOS = '\r';
34+
private static final int MAX_BUFFER_SIZE = 1024;
35+
private final AtomicBoolean isPeriodicFlushOutputStream;
36+
37+
static RequestProcessor getRequestProcessor() {
38+
return RPSingleton.instance;
39+
}
40+
41+
private static int getRequestPeriodicTime() {
42+
return RPSingleton.PERIODIC_TIME;
43+
}
44+
45+
private static final class RPSingleton {
46+
47+
private static final RequestProcessor instance = new RequestProcessor(StreamingOutputStream.class.getName(), 1, true, false);
48+
private static final int PERIODIC_TIME = 100;
49+
}
3350

3451
public StreamingOutputStream(Consumer<byte[]> callback) {
3552
this.callback = callback;
53+
this.isPeriodicFlushOutputStream = new AtomicBoolean(true);
54+
createAndScheduleTask();
3655
}
3756

3857
@Override
39-
public void write(int b) throws IOException {
58+
public synchronized void write(int b) throws IOException {
4059
buffer.write(b);
41-
checkNewlineEndings((byte) b);
60+
ifBufferOverflowFlush();
4261
}
4362

4463
@Override
45-
public void write(byte[] b, int off, int len) throws IOException {
64+
public synchronized void write(byte[] b, int off, int len) throws IOException {
4665
buffer.write(b, off, len);
47-
checkForNewlineAndFlush(b, off, len);
66+
ifBufferOverflowFlush();
4867
}
4968

5069
@Override
51-
public void flush() throws IOException {
52-
super.flush();
70+
public synchronized void flush() throws IOException {
5371
flushToCallback();
5472
}
5573

5674
@Override
57-
public void write(byte[] b) throws IOException {
75+
public synchronized void write(byte[] b) throws IOException {
5876
buffer.write(b);
59-
checkForNewlineAndFlush(b, 0, b.length);
77+
ifBufferOverflowFlush();
6078
}
61-
79+
6280
@Override
63-
public void close() throws IOException {
81+
public synchronized void close() throws IOException {
6482
flushToCallback();
83+
isPeriodicFlushOutputStream.set(false);
6584
super.close();
6685
}
67-
68-
public void setCallback(Consumer<byte[]> cb){
69-
this.callback = cb;
70-
}
71-
72-
public ByteArrayOutputStream getOutputStream(){
73-
return buffer;
74-
}
7586

76-
private void checkForNewlineAndFlush(byte[] b, int offset, int length) {
77-
for (int i = offset; i < offset + length; i++) {
78-
boolean isNew = checkNewlineEndings(b[i]);
79-
if (isNew) {
80-
break;
81-
}
82-
}
87+
public void setCallback(Consumer<byte[]> cb) {
88+
this.callback = cb;
8389
}
8490

85-
private boolean checkNewlineEndings(byte b) {
86-
if (b == NEW_LINE_ENDING || b == NEW_LINE_ENDING_DOS) {
91+
private void ifBufferOverflowFlush() {
92+
if (buffer.size() > MAX_BUFFER_SIZE) {
8793
flushToCallback();
88-
return true;
8994
}
90-
return false;
9195
}
9296

93-
private void flushToCallback() {
97+
private synchronized void flushToCallback() {
9498
if (buffer.size() > 0) {
9599
byte[] output = buffer.toByteArray();
96100
buffer.reset();
97101
callback.accept(output);
98102
}
99103
}
104+
105+
private void createAndScheduleTask() {
106+
if (isPeriodicFlushOutputStream.get()) {
107+
Task task = getRequestProcessor().create(() -> {
108+
flushToCallback();
109+
createAndScheduleTask();
110+
});
111+
task.schedule(getRequestPeriodicTime());
112+
}
113+
}
100114
}

0 commit comments

Comments
 (0)