Skip to content

Commit 0a79802

Browse files
authored
Implement WebSocketConnection for ReplayServer (#3338)
* Implement WebSocketConnection for ReplayServer * Proxy live replays * Fix test * Fix error * Fix issues not allowing saving of local replay and remote replay * Finish live replay server and don't stop it prematurely
1 parent be9c7eb commit 0a79802

File tree

16 files changed

+212
-201
lines changed

16 files changed

+212
-201
lines changed

src/main/java/com/faforever/client/chat/kitteh/network/WebSocketConnection.java

Lines changed: 13 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -89,28 +89,20 @@ public WebSocketConnection(final Client.@NonNull WithManagement client) {
8989
})
9090
.websocket()
9191
.uri(URI.create("wss://%s:%d".formatted(serverAddress.getHost(), serverAddress.getPort())))
92-
.connect()
93-
.doOnNext(connection -> {
94-
Mono<Void> inbound = connection.inbound()
95-
.receive()
96-
.asString(StandardCharsets.UTF_8)
97-
.doOnNext(message -> {
98-
this.client.getInputListener().queue(message);
99-
this.client.processLine(message);
100-
this.lastMessage = message;
101-
})
102-
.doOnError(this::handleException)
103-
.then();
104-
105-
Mono<Void> outbound = connection.outbound()
106-
.sendString(outboundMessages.doOnNext(
107-
message -> this.client.getOutputListener().queue(message))
108-
.doOnError(this::handleException))
109-
.neverComplete();
110-
111-
Mono.firstWithSignal(inbound, outbound).subscribeOn(Schedulers.single()).subscribe();
92+
.handle((inbound, outbound) -> {
93+
outbound.sendString(outboundMessages.doOnNext(message -> this.client.getOutputListener().queue(message))
94+
.doOnError(this::handleException))
95+
.then()
96+
.subscribeOn(Schedulers.single())
97+
.subscribe();
98+
99+
return inbound.receive().asString(StandardCharsets.UTF_8).doOnError(this::handleException);
112100
})
113-
.subscribe();
101+
.subscribe(message -> {
102+
this.client.getInputListener().queue(message);
103+
this.client.processLine(message);
104+
this.lastMessage = message;
105+
});
114106
}
115107

116108
private void scheduleReconnect(int delay) {

src/main/java/com/faforever/client/config/ClientProperties.java

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,6 @@ public static class Vault {
9999

100100
@Data
101101
public static class Replay {
102-
private String remoteHost;
103102
private int remotePort;
104103
private String replayFileFormat = "%d-%s.fafreplay";
105104
private String replayFileGlob = "*.fafreplay";
@@ -159,12 +158,6 @@ public void updateFromEndpoint(ServerEndpoints serverEndpoints) {
159158
this.user.setBaseUrl(user.getUrl());
160159
}
161160

162-
SocketEndpoint liveReplay = serverEndpoints.getLiveReplay();
163-
if (liveReplay != null) {
164-
replay.setRemoteHost(liveReplay.getHost());
165-
replay.setRemotePort(liveReplay.getPort());
166-
}
167-
168161
SocketEndpoint chat = serverEndpoints.getChat();
169162
if (chat != null) {
170163
this.irc.setHost(chat.getHost());

src/main/java/com/faforever/client/login/LoginController.java

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@
22

33
import com.faforever.client.config.ClientProperties;
44
import com.faforever.client.config.ClientProperties.Irc;
5-
import com.faforever.client.config.ClientProperties.Replay;
65
import com.faforever.client.config.ClientProperties.User;
76
import com.faforever.client.fx.FxApplicationThreadExecutor;
87
import com.faforever.client.fx.JavaFxUtil;
@@ -210,9 +209,6 @@ private void populateEndpointFields() {
210209
fxApplicationThreadExecutor.execute(() -> {
211210
User user = clientProperties.getUser();
212211
userUrlField.setText(user.getBaseUrl());
213-
Replay replay = clientProperties.getReplay();
214-
replayServerHostField.setText(replay.getRemoteHost());
215-
replayServerPortField.setText(String.valueOf(replay.getRemotePort()));
216212
Irc irc = clientProperties.getIrc();
217213
ircServerHostField.setText(irc.getHost());
218214
ircServerPortField.setText(String.valueOf(irc.getPort()));
@@ -249,10 +245,6 @@ public void onLoginButtonClicked() {
249245
clientProperties.getUser()
250246
.setBaseUrl(userUrlField.getText());
251247

252-
clientProperties.getReplay()
253-
.setRemoteHost(replayServerHostField.getText())
254-
.setRemotePort(Integer.parseInt(replayServerPortField.getText()));
255-
256248
clientProperties.getIrc()
257249
.setHost(ircServerHostField.getText())
258250
.setPort(Integer.parseInt(ircServerPortField.getText()));

src/main/java/com/faforever/client/remote/FafServerAccessor.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -157,9 +157,7 @@ public Mono<Player> connectAndLogIn() {
157157
return userWebClientFactory.getObject()
158158
.get()
159159
.uri("/lobby/access")
160-
.retrieve()
161-
.bodyToMono(LobbyAccess.class)
162-
.map(LobbyAccess::accessUrl)
160+
.retrieve().bodyToMono(HmacAccess.class).map(HmacAccess::accessUrl)
163161
.zipWith(tokenRetriever.getRefreshedTokenValue())
164162
.map(TupleUtils.function(
165163
(lobbyUrl, token) -> new Config(token, Version.getCurrentVersion(),
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
package com.faforever.client.remote;
2+
3+
public record HmacAccess(String accessUrl) {}

src/main/java/com/faforever/client/remote/LobbyAccess.java

Lines changed: 0 additions & 4 deletions
This file was deleted.
Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
package com.faforever.client.replay;
2+
3+
import com.faforever.client.remote.HmacAccess;
4+
import io.netty.resolver.DefaultAddressResolverGroup;
5+
import lombok.RequiredArgsConstructor;
6+
import lombok.extern.slf4j.Slf4j;
7+
import org.springframework.beans.factory.ObjectFactory;
8+
import org.springframework.beans.factory.annotation.Qualifier;
9+
import org.springframework.context.annotation.Lazy;
10+
import org.springframework.stereotype.Component;
11+
import org.springframework.web.reactive.function.client.WebClient;
12+
import reactor.core.publisher.Flux;
13+
import reactor.netty.DisposableServer;
14+
import reactor.netty.http.client.HttpClient;
15+
import reactor.netty.tcp.TcpServer;
16+
17+
@Lazy
18+
@Component
19+
@Slf4j
20+
@RequiredArgsConstructor
21+
public class LiveReplayProxyServer {
22+
23+
@Qualifier("userWebClient")
24+
private final ObjectFactory<WebClient> userWebClientFactory;
25+
26+
private DisposableServer tcpServer;
27+
28+
public void stop() {
29+
if (tcpServer != null) {
30+
tcpServer.dispose();
31+
}
32+
}
33+
34+
public int start() {
35+
tcpServer = TcpServer.create()
36+
.doOnBound(server -> log.debug("Opening local live replay server on port {}", server.port()))
37+
.doOnUnbound(server -> log.debug("Closing local live replay server on port {}", server.port()))
38+
.handle((inbound, outbound) -> userWebClientFactory.getObject()
39+
.get()
40+
.uri("/replay/access")
41+
.retrieve()
42+
.bodyToMono(HmacAccess.class)
43+
.map(HmacAccess::accessUrl)
44+
.flatMapMany(
45+
url -> HttpClient.newConnection()
46+
.doOnConnect(
47+
config -> log.info(
48+
"Connecting to replay server at `{}`",
49+
config.uri()))
50+
.resolver(
51+
DefaultAddressResolverGroup.INSTANCE)
52+
.websocket()
53+
.uri(url)
54+
.handle(
55+
((websocketInbound, websocketOutbound) -> Flux.merge(
56+
outbound.sendByteArray(
57+
websocketInbound.receive()
58+
.asByteArray()),
59+
websocketOutbound.sendByteArray(
60+
inbound.receive()
61+
.asByteArray()))))
62+
.doOnError(
63+
throwable -> log.warn(
64+
"Error sending data to local replay server",
65+
throwable))))
66+
.bindNow();
67+
68+
return tcpServer.port();
69+
}
70+
}

src/main/java/com/faforever/client/replay/ReplayRunner.java

Lines changed: 18 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
package com.faforever.client.replay;
22

3-
import com.faforever.client.config.ClientProperties;
43
import com.faforever.client.domain.server.GameInfo;
54
import com.faforever.client.exception.NotifiableException;
65
import com.faforever.client.fa.ForgedAllianceLaunchService;
@@ -30,6 +29,7 @@
3029
import org.springframework.web.util.UriComponentsBuilder;
3130
import reactor.core.publisher.Mono;
3231

32+
import java.net.InetAddress;
3333
import java.net.URI;
3434
import java.nio.file.Path;
3535
import java.util.List;
@@ -60,7 +60,7 @@ public class ReplayRunner implements InitializingBean {
6060
private final PlayerService playerService;
6161
private final GamePathHandler gamePathHandler;
6262
private final FxApplicationThreadExecutor fxApplicationThreadExecutor;
63-
private final ClientProperties clientProperties;
63+
private final LiveReplayProxyServer liveReplayProxyServer;
6464

6565
private final ReadOnlyObjectWrapper<Process> process = new ReadOnlyObjectWrapper<>();
6666
private final ReadOnlyBooleanWrapper running = new ReadOnlyBooleanWrapper();
@@ -91,9 +91,9 @@ public void runWithReplay(Path path, @Nullable Integer replayId, String featured
9191
}
9292

9393
if (!preferencesService.hasValidGamePath()) {
94-
gamePathHandler.chooseAndValidateGameDirectory().thenAccept(
95-
pathSet -> runWithReplay(path, replayId, featuredModName, baseFafVersion, featuredModFileVersions, simMods,
96-
mapFolderName));
94+
gamePathHandler.chooseAndValidateGameDirectory()
95+
.thenAccept(pathSet -> runWithReplay(path, replayId, featuredModName, baseFafVersion,
96+
featuredModFileVersions, simMods, mapFolderName));
9797
return;
9898
}
9999

@@ -133,17 +133,18 @@ public void runWithLiveReplay(GameInfo game) {
133133
return;
134134
}
135135

136+
int port = liveReplayProxyServer.start();
137+
136138
/* A courtesy towards the replay server so we can see in logs who we're dealing with. */
137139
String playerName = playerService.getCurrentPlayer().getUsername();
138-
139140
String featuredModName = game.getFeaturedMod();
140141
String mapName = game.getMapFolderName();
141142
URI replayUrl = UriComponentsBuilder.newInstance()
142143
.scheme(GPGNET_SCHEME)
143-
.host(clientProperties.getReplay().getRemoteHost())
144-
.port(clientProperties.getReplay().getRemotePort())
145-
.path(
146-
"/" + game.getId() + "/" + playerName + ReplayService.SUP_COM_REPLAY_FILE_ENDING)
144+
.host(InetAddress.getLoopbackAddress().getHostAddress())
145+
.port(port)
146+
.pathSegment(String.valueOf(game.getId()),
147+
playerName + ReplayService.SUP_COM_REPLAY_FILE_ENDING)
147148
.build()
148149
.toUri();
149150

@@ -156,15 +157,20 @@ public void runWithLiveReplay(GameInfo game) {
156157
CompletableFuture<Void> downloadMapFuture = downloadMapAskIfError(mapName).toFuture();
157158
CompletableFuture.allOf(updateFeaturedModFuture, installAndActivateSimModsFuture, downloadMapFuture)
158159
.thenApply(ignored -> forgedAllianceLaunchService.startReplay(replayUrl, game.getId()))
159-
.thenAcceptAsync(process::set, fxApplicationThreadExecutor)
160+
.thenApplyAsync(process -> {
161+
this.process.set(process);
162+
return process;
163+
}, fxApplicationThreadExecutor)
160164
.exceptionally(throwable -> {
161165
if (throwable instanceof NotifiableException notifiableException) {
162166
notificationService.addErrorNotification(notifiableException);
163167
} else {
164168
notificationService.addImmediateErrorNotification(throwable, "liveReplayCouldNotBeStarted");
165169
}
166170
return null;
167-
});
171+
})
172+
.thenCompose(Process::onExit)
173+
.whenComplete((ignored, throwable) -> liveReplayProxyServer.stop());
168174
}
169175

170176
public boolean isRunning() {

0 commit comments

Comments
 (0)