Skip to content

Commit 7c4383a

Browse files
committed
Added file upload functionaloity
1 parent 00cab25 commit 7c4383a

File tree

9 files changed

+408
-87
lines changed

9 files changed

+408
-87
lines changed

services-api/src/main/java/io/scalecube/services/api/ServiceMessage.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,9 @@ public final class ServiceMessage {
3030
/** Request method header. */
3131
public static final String HEADER_REQUEST_METHOD = "requestMethod";
3232

33+
/** Upload filename header. */
34+
public static final String HEADER_UPLOAD_FILENAME = "uploadFilename";
35+
3336
/** Null value for error type. */
3437
public static final int NULL_ERROR_TYPE = -1;
3538

@@ -199,6 +202,15 @@ public String requestMethod() {
199202
return headers.get(HEADER_REQUEST_METHOD);
200203
}
201204

205+
/**
206+
* Returns upload filename header.
207+
*
208+
* @return upload filename, or null if such header doesn't exist.
209+
*/
210+
public String uploadFilename() {
211+
return headers.get(HEADER_UPLOAD_FILENAME);
212+
}
213+
202214
@Override
203215
public String toString() {
204216
return new StringJoiner(", ", "ServiceMessage" + "[", "]")
Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
package io.scalecube.services.files;
2+
3+
import java.io.IOException;
4+
import java.nio.ByteBuffer;
5+
import java.nio.channels.FileChannel;
6+
import java.nio.file.Path;
7+
import reactor.core.publisher.Flux;
8+
9+
public class FileChannelFlux {
10+
11+
private static final int DEFAULT_CHUNK_SIZE = 64 * 1024;
12+
13+
private FileChannelFlux() {
14+
// Do not instantiate
15+
}
16+
17+
public static Flux<byte[]> createFrom(Path filePath) {
18+
return createFrom(filePath, ByteBuffer.allocate(DEFAULT_CHUNK_SIZE));
19+
}
20+
21+
public static Flux<byte[]> createFrom(Path filePath, ByteBuffer chunkBuffer) {
22+
return Flux.generate(
23+
() -> FileChannel.open(filePath),
24+
(channel, sink) -> {
25+
try {
26+
int read;
27+
chunkBuffer.clear();
28+
do {
29+
read = channel.read(chunkBuffer);
30+
} while (read == 0);
31+
32+
chunkBuffer.flip();
33+
if (chunkBuffer.remaining() > 0) {
34+
final var bytes = new byte[chunkBuffer.remaining()];
35+
chunkBuffer.get(bytes);
36+
sink.next(bytes);
37+
}
38+
39+
if (read == -1) {
40+
sink.complete();
41+
}
42+
} catch (IOException e) {
43+
sink.error(e);
44+
}
45+
return channel;
46+
},
47+
channel -> {
48+
try {
49+
channel.close();
50+
} catch (Exception ex) {
51+
// no-op
52+
}
53+
});
54+
}
55+
}

services-api/src/main/java/io/scalecube/services/methods/ServiceMethodInvoker.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -142,12 +142,31 @@ public Flux<ServiceMessage> invokeBidirectional(Publisher<ServiceMessage> publis
142142
.switchOnFirst(
143143
(first, messages) -> {
144144
final var message = first.get();
145+
final var request = toRequest(message);
145146
final var qualifier = message.qualifier();
146147
final var dataFormat = message.dataFormat();
147148

148149
return messages
149150
.map(this::toRequest)
150151
.transform(this::invokeRequest)
152+
.doOnSubscribe(
153+
s -> {
154+
if (logger != null && logger.isDebugEnabled()) {
155+
logger.debug("[{}][subscribe] request: {}", message.qualifier(), request);
156+
}
157+
})
158+
.doOnComplete(
159+
() -> {
160+
if (logger != null && logger.isDebugEnabled()) {
161+
logger.debug("[{}][complete] request: {}", message.qualifier(), request);
162+
}
163+
})
164+
.doOnError(
165+
ex -> {
166+
if (logger != null) {
167+
logger.error("[{}][error] request: {}", message.qualifier(), request, ex);
168+
}
169+
})
151170
.map(response -> toResponse(response, qualifier, dataFormat))
152171
.onErrorResume(ex -> Flux.just(errorMapper.toMessage(qualifier, ex)))
153172
.subscribeOn(methodInfo.scheduler());

services-gateway/src/main/java/io/scalecube/services/gateway/http/HttpGatewayAcceptor.java

Lines changed: 105 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@
55
import static io.netty.handler.codec.http.HttpResponseStatus.NO_CONTENT;
66
import static io.netty.handler.codec.http.HttpResponseStatus.OK;
77
import static io.scalecube.services.api.ServiceMessage.HEADER_REQUEST_METHOD;
8+
import static io.scalecube.services.api.ServiceMessage.HEADER_UPLOAD_FILENAME;
9+
import static io.scalecube.services.gateway.ReferenceCountUtil.safestRelease;
810
import static io.scalecube.services.gateway.http.HttpGateway.SUPPORTED_METHODS;
911

1012
import io.netty.buffer.ByteBuf;
@@ -13,19 +15,24 @@
1315
import io.netty.buffer.Unpooled;
1416
import io.netty.handler.codec.http.HttpMethod;
1517
import io.netty.handler.codec.http.HttpResponseStatus;
18+
import io.netty.handler.codec.http.multipart.FileUpload;
19+
import io.netty.handler.codec.http.multipart.HttpData;
1620
import io.scalecube.services.ServiceCall;
1721
import io.scalecube.services.ServiceReference;
1822
import io.scalecube.services.api.DynamicQualifier;
1923
import io.scalecube.services.api.ErrorData;
2024
import io.scalecube.services.api.ServiceMessage;
25+
import io.scalecube.services.api.ServiceMessage.Builder;
2126
import io.scalecube.services.exceptions.ServiceException;
2227
import io.scalecube.services.exceptions.ServiceProviderErrorMapper;
23-
import io.scalecube.services.gateway.ReferenceCountUtil;
28+
import io.scalecube.services.files.FileChannelFlux;
2429
import io.scalecube.services.registry.api.ServiceRegistry;
2530
import io.scalecube.services.routing.StaticAddressRouter;
2631
import io.scalecube.services.transport.api.DataCodec;
32+
import java.io.IOException;
2733
import java.util.List;
2834
import java.util.function.BiFunction;
35+
import java.util.function.Consumer;
2936
import org.reactivestreams.Publisher;
3037
import org.slf4j.Logger;
3138
import org.slf4j.LoggerFactory;
@@ -41,6 +48,7 @@ public class HttpGatewayAcceptor
4148
private static final Logger LOGGER = LoggerFactory.getLogger(HttpGatewayAcceptor.class);
4249

4350
private static final String ERROR_NAMESPACE = "io.scalecube.services.error";
51+
private static final long MAX_SERVICE_MESSAGE_SIZE = 1024 * 1024;
4452

4553
private final ServiceCall serviceCall;
4654
private final ServiceRegistry serviceRegistry;
@@ -69,42 +77,89 @@ public Publisher<Void> apply(HttpServerRequest httpRequest, HttpServerResponse h
6977
return methodNotAllowed(httpResponse);
7078
}
7179

72-
return httpRequest
73-
.receive()
74-
.aggregate()
75-
.defaultIfEmpty(Unpooled.EMPTY_BUFFER)
76-
.map(ByteBuf::retain)
77-
.flatMap(content -> handleRequest(content, httpRequest, httpResponse))
78-
.onErrorResume(ex -> error(httpResponse, errorMapper.toMessage(ERROR_NAMESPACE, ex)));
80+
if (httpRequest.isMultipart()) {
81+
return handleFileUploadRequest(httpRequest, httpResponse);
82+
} else {
83+
return handleServiceRequest(httpRequest, httpResponse);
84+
}
7985
}
8086

81-
private Mono<Void> handleRequest(
82-
ByteBuf content, HttpServerRequest httpRequest, HttpServerResponse httpResponse) {
83-
final var message = toMessage(httpRequest, content);
87+
private Mono<Void> handleFileUploadRequest(
88+
HttpServerRequest httpRequest, HttpServerResponse httpResponse) {
89+
return httpRequest
90+
.receiveForm()
91+
.flatMap(
92+
httpData ->
93+
serviceCall
94+
.requestBidirectional(
95+
createFlux(httpData)
96+
.map(
97+
data ->
98+
toMessage(
99+
httpRequest,
100+
builder -> {
101+
final var filename =
102+
((FileUpload) httpData).getFilename();
103+
builder.header(HEADER_UPLOAD_FILENAME, filename);
104+
builder.data(data);
105+
})))
106+
.last()
107+
.flatMap(
108+
response ->
109+
response.isError() // check error
110+
? error(httpResponse, response)
111+
: response.hasData() // check data
112+
? ok(httpResponse, response)
113+
: noContent(httpResponse)))
114+
.then();
115+
}
84116

85-
// Match and handle file request
117+
private Mono<Void> handleServiceRequest(
118+
HttpServerRequest httpRequest, HttpServerResponse httpResponse) {
119+
return httpRequest
120+
.receive()
121+
.reduceWith(
122+
Unpooled::buffer,
123+
(acc, byteBuf) -> {
124+
final var readableBytes = acc.readableBytes();
125+
final var limit = MAX_SERVICE_MESSAGE_SIZE;
126+
if (readableBytes >= limit) {
127+
throw new RuntimeException(
128+
"Payload too large, size: " + readableBytes + ", limit: " + limit);
129+
}
130+
return acc.writeBytes(byteBuf);
131+
})
132+
.defaultIfEmpty(Unpooled.EMPTY_BUFFER)
133+
.flatMap(
134+
data -> {
135+
final var message = toMessage(httpRequest, builder -> builder.data(data));
86136

87-
final var serviceReference = matchFileRequest(serviceRegistry.lookupService(message));
88-
if (serviceReference != null) {
89-
return handleFileRequest(serviceReference, message, httpResponse);
90-
}
137+
// Match and handle file request
91138

92-
// Handle normal service request
139+
final var service = matchFileDownloadRequest(serviceRegistry.lookupService(message));
140+
if (service != null) {
141+
return handleFileDownloadRequest(service, message, httpResponse);
142+
}
93143

94-
return serviceCall
95-
.requestOne(message)
96-
.switchIfEmpty(Mono.defer(() -> emptyMessage(message)))
97-
.doOnError(th -> releaseRequestOnError(message))
98-
.flatMap(
99-
response ->
100-
response.isError() // check error
101-
? error(httpResponse, response)
102-
: response.hasData() // check data
103-
? ok(httpResponse, response)
104-
: noContent(httpResponse));
144+
// Handle normal service request
145+
146+
return serviceCall
147+
.requestOne(message)
148+
.switchIfEmpty(Mono.defer(() -> emptyMessage(message)))
149+
.doOnError(th -> safestRelease(message.data()))
150+
.flatMap(
151+
response ->
152+
response.isError() // check error
153+
? error(httpResponse, response)
154+
: response.hasData() // check data
155+
? ok(httpResponse, response)
156+
: noContent(httpResponse));
157+
})
158+
.onErrorResume(ex -> error(httpResponse, errorMapper.toMessage(ERROR_NAMESPACE, ex)));
105159
}
106160

107-
private static ServiceMessage toMessage(HttpServerRequest httpRequest, ByteBuf content) {
161+
private static ServiceMessage toMessage(
162+
HttpServerRequest httpRequest, Consumer<Builder> consumer) {
108163
final var builder = ServiceMessage.builder();
109164

110165
// Copy http headers to service message
@@ -115,11 +170,14 @@ private static ServiceMessage toMessage(HttpServerRequest httpRequest, ByteBuf c
115170

116171
// Add http method to service message (used by REST services)
117172

118-
return builder
173+
builder
119174
.header(HEADER_REQUEST_METHOD, httpRequest.method().name())
120-
.qualifier(httpRequest.uri().substring(1))
121-
.data(content)
122-
.build();
175+
.qualifier(httpRequest.uri().substring(1));
176+
if (consumer != null) {
177+
consumer.accept(builder);
178+
}
179+
180+
return builder.build();
123181
}
124182

125183
private static Mono<ServiceMessage> emptyMessage(ServiceMessage message) {
@@ -169,31 +227,27 @@ private static ByteBuf encodeData(Object data, String dataFormat) {
169227
try {
170228
DataCodec.getInstance(dataFormat).encode(new ByteBufOutputStream(byteBuf), data);
171229
} catch (Throwable t) {
172-
ReferenceCountUtil.safestRelease(byteBuf);
230+
safestRelease(byteBuf);
173231
LOGGER.error("Failed to encode data: {}", data, t);
174232
return Unpooled.EMPTY_BUFFER;
175233
}
176234

177235
return byteBuf;
178236
}
179237

180-
private static void releaseRequestOnError(ServiceMessage request) {
181-
ReferenceCountUtil.safestRelease(request.data());
182-
}
183-
184-
private static ServiceReference matchFileRequest(List<ServiceReference> list) {
238+
private static ServiceReference matchFileDownloadRequest(List<ServiceReference> list) {
185239
if (list.size() != 1) {
186240
return null;
187241
}
188-
final var sr = list.get(0);
189-
if ("application/file".equals(sr.tags().get("Content-Type"))) {
190-
return sr;
242+
final var service = list.get(0);
243+
if ("application/file".equals(service.tags().get("Content-Type"))) {
244+
return service;
191245
} else {
192246
return null;
193247
}
194248
}
195249

196-
private Mono<Void> handleFileRequest(
250+
private Mono<Void> handleFileDownloadRequest(
197251
ServiceReference service, ServiceMessage message, HttpServerResponse response) {
198252
return serviceCall
199253
.router(StaticAddressRouter.forService(service.address(), service.endpointName()).build())
@@ -275,4 +329,12 @@ private static String errorMessage(int statusCode, String fileName) {
275329
return HttpResponseStatus.valueOf(statusCode).reasonPhrase();
276330
}
277331
}
332+
333+
private static Flux<byte[]> createFlux(HttpData httpData) {
334+
try {
335+
return FileChannelFlux.createFrom(httpData.getFile().toPath());
336+
} catch (IOException e) {
337+
throw new RuntimeException(e);
338+
}
339+
}
278340
}

0 commit comments

Comments
 (0)