Skip to content

Commit 9610b7c

Browse files
committed
Fixed buffering issue when container-logging to S3
1 parent 86979b2 commit 9610b7c

File tree

3 files changed

+36
-4
lines changed

3 files changed

+36
-4
lines changed

src/main/java/eu/openanalytics/containerproxy/log/S3LogStorage.java

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -66,10 +66,17 @@ public void initialize() throws IOException {
6666
String endpoint = environment.getProperty("proxy.container-log-s3-endpoint", "https://s3-eu-west-1.amazonaws.com");
6767
enableSSE = Boolean.valueOf(environment.getProperty("proxy.container-log-s3-sse", "false"));
6868

69-
String subPath = containerLogPath.substring("s3://".length());
69+
String subPath = containerLogPath.substring("s3://".length()).trim();
70+
if (subPath.endsWith("/")) subPath = subPath.substring(0, subPath.length() - 1);
71+
7072
int bucketPathIndex = subPath.indexOf("/");
71-
bucketName = subPath.substring(0, bucketPathIndex);
72-
bucketPath = subPath.substring(bucketPathIndex + 1);
73+
if (bucketPathIndex == -1) {
74+
bucketName = subPath;
75+
bucketPath = "";
76+
} else {
77+
bucketName = subPath.substring(0, bucketPathIndex);
78+
bucketPath = subPath.substring(bucketPathIndex + 1) + "/";
79+
}
7380

7481
s3 = AmazonS3ClientBuilder.standard()
7582
.withEndpointConfiguration(new EndpointConfiguration(endpoint, null))
@@ -87,7 +94,8 @@ public OutputStream[] createOutputStreams(Proxy proxy) throws IOException {
8794
OutputStream[] streams = new OutputStream[2];
8895
for (int i = 0; i < streams.length; i++) {
8996
String fileName = paths[i].substring(paths[i].lastIndexOf("/") + 1);
90-
streams[i] = new BufferedOutputStream(new S3OutputStream(bucketPath + "/" + fileName), 1024*1024);
97+
// TODO kubernetes never flushes. So perform timed flushes, and also flush upon container shutdown
98+
streams[i] = new BufferedOutputStream(new S3OutputStream(bucketPath + fileName), 1024*1024);
9199
}
92100
return streams;
93101
}

src/main/java/eu/openanalytics/containerproxy/service/LogService.java

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,8 @@ public class LogService {
4646
private boolean loggingEnabled;
4747
private Logger log = LogManager.getLogger(LogService.class);
4848

49+
private static final String PARAM_STREAMS = "streams";
50+
4951
@Inject
5052
Environment environment;
5153

@@ -85,15 +87,36 @@ public void attachToOutput(Proxy proxy, BiConsumer<OutputStream, OutputStream> o
8587
if (streams == null || streams.length < 2) {
8688
log.error("Failed to attach logging of proxy " + proxy.getId() + ": no output streams defined");
8789
} else {
90+
proxy.getContainers().get(0).getParameters().put(PARAM_STREAMS, streams);
91+
if (log.isDebugEnabled()) log.debug("Container logging started for proxy " + proxy.getId());
8892
// Note that this call will block until the container is stopped.
8993
outputAttacher.accept(streams[0], streams[1]);
9094
}
9195
} catch (Exception e) {
9296
log.error("Failed to attach logging of proxy " + proxy.getId(), e);
9397
}
98+
if (log.isDebugEnabled()) log.debug("Container logging ended for proxy " + proxy.getId());
9499
});
95100
}
96101

102+
public void detach(Proxy proxy) {
103+
if (!isLoggingEnabled()) return;
104+
105+
OutputStream[] streams = (OutputStream[]) proxy.getContainers().get(0).getParameters().get(PARAM_STREAMS);
106+
if (streams == null || streams.length < 2) {
107+
log.warn("Cannot detach container logging: streams not found");
108+
return;
109+
}
110+
for (int i = 0; i < streams.length; i++) {
111+
try {
112+
streams[i].flush();
113+
streams[i].close();
114+
} catch (IOException e) {
115+
log.error("Failed to close container logging streams", e);
116+
}
117+
}
118+
}
119+
97120
public String[] getLogs(Proxy proxy) {
98121
if (!isLoggingEnabled()) return null;
99122

src/main/java/eu/openanalytics/containerproxy/service/ProxyService.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -251,6 +251,7 @@ public void stopProxy(Proxy proxy, boolean async, boolean ignoreAccessControl) {
251251
Runnable releaser = () -> {
252252
try {
253253
backend.stopProxy(proxy);
254+
logService.detach(proxy);
254255
log.info(String.format("Proxy released [user: %s] [spec: %s] [id: %s]", proxy.getUserId(), proxy.getSpec().getId(), proxy.getId()));
255256
eventService.post(EventType.ProxyStop.toString(), proxy.getUserId(), proxy.getSpec().getId());
256257
} catch (Exception e){

0 commit comments

Comments
 (0)