Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
******************************************************************************/
package org.eclipse.lsp4j.jsonrpc.json;

import java.io.BufferedOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.charset.StandardCharsets;
Expand All @@ -28,8 +29,7 @@ public class StreamMessageConsumer implements MessageConsumer, MessageConstants
private final MessageJsonHandler jsonHandler;

private final Object outputLock = new Object();

private OutputStream output;
private BufferedOutputStream output;

public StreamMessageConsumer(MessageJsonHandler jsonHandler) {
this(null, StandardCharsets.UTF_8.name(), jsonHandler);
Expand All @@ -40,7 +40,7 @@ public StreamMessageConsumer(OutputStream output, MessageJsonHandler jsonHandler
}

public StreamMessageConsumer(OutputStream output, String encoding, MessageJsonHandler jsonHandler) {
this.output = output;
setOutput(output);
this.encoding = encoding;
this.jsonHandler = jsonHandler;
}
Expand All @@ -50,7 +50,13 @@ public OutputStream getOutput() {
}

public void setOutput(OutputStream output) {
this.output = output;
synchronized (outputLock) {
this.output = output == null //
? null
: output instanceof BufferedOutputStream //
? (BufferedOutputStream) output
: new BufferedOutputStream(output);
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;

import org.eclipse.lsp4j.jsonrpc.Launcher;
Expand Down Expand Up @@ -54,18 +55,18 @@ public void testClientRequest() throws Exception {
@Test
public void testNotifications() throws Exception {
server.client.notify("12");
await(() -> client.result.length() == 2);
await(() -> client.result.get().length() == 2);
client.server.notify("foo");
await(() -> server.result.length() == 3);
await(() -> server.result.get().length() == 3);
server.client.notify("34");
await(() -> client.result.length() == 4);
await(() -> client.result.get().length() == 4);
client.server.notify("bar");
await(() -> server.result.length() == 6);
await(() -> server.result.get().length() == 6);
server.client.notify("56");
await(() -> client.result.length() == 6);
Assert.assertEquals("foobar", server.result);
Assert.assertEquals("123456", client.result);
await(() -> client.result.get().length() == 6);

Assert.assertEquals("foobar", server.result.get());
Assert.assertEquals("123456", client.result.get());
}

@Test
Expand All @@ -78,11 +79,11 @@ public void testManyConcurrentNotifications() throws Exception {
}
int expectedResultLenght = expectedResult.length();
try {
await(() -> server.result.length() == expectedResultLenght);
await(() -> server.result.get().length() == expectedResultLenght);
} catch (Error e) {
// discard this error so that the nice error displays in the assertEquals
}
Assert.assertEquals(expectedResult, server.result);
Assert.assertEquals(expectedResult, server.result.get());
}

@Test
Expand All @@ -95,9 +96,9 @@ public void testChunkedNotification() throws Exception {
String message = messageBuilder.toString();

server.client.notify(message);
await(() -> client.result.length() == message.length());
Assert.assertEquals(message, client.result);
await(() -> client.result.get().length() == message.length());

Assert.assertEquals(message, client.result.get());
}

private void await(Supplier<Boolean> condition) throws InterruptedException {
Expand All @@ -119,11 +120,11 @@ private interface ClientInterface {

private static class Client implements ClientInterface {
ServerInterface server;
String result = "";
AtomicReference<String> result = new AtomicReference<>("");

@Override
public void notify(String arg) {
this.result += arg;
this.result.getAndUpdate(s -> s + arg);
}
}

Expand All @@ -139,16 +140,16 @@ private interface ServerInterface {

private static class Server implements ServerInterface {
ClientInterface client;
String result = "";
AtomicReference<String> result = new AtomicReference<>("");

@Override
public CompletableFuture<String> request(String arg) {
return CompletableFuture.supplyAsync(() -> arg + "bar");
}

@Override
public void notify(String arg) {
this.result += arg;
this.result.getAndUpdate(s -> s + arg);
}
}

Expand Down
Loading