Skip to content

DataBufferUtils#join could leak buffers in case of error from the source, How can I fix this memory leak issue? #34080

@carl-HelloWorld

Description

@carl-HelloWorld

@component
public class ResponseBodyLogGlobalFilter implements GlobalFilter, Ordered {

private static final Log log = LogFactory.getLog(ResponseBodyLogGlobalFilter.class);

@Value("${gateway.logging.response-max-payload-length:200}")
private int responseMaxPayLoadLength;

public int getResponseMaxPayLoadLength() {
    return responseMaxPayLoadLength;
}

@Override
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
    return chain.filter(exchange.mutate()
            .response(new ResponseBodyServerHttpResponse(exchange)).build());
}

@Override
public int getOrder() {
    // NettyWriteResponseFilter的顺序默认为-1,在NettyWriteResponseFilter之前即可
    return -10;
}

private class ResponseBodyServerHttpResponse extends ServerHttpResponseDecorator {

    private final ServerWebExchange exchange;

    public ResponseBodyServerHttpResponse(ServerWebExchange exchange) {
        super(exchange.getResponse());
        this.exchange = exchange;
    }

    @SuppressWarnings("unchecked")
    @Override
    public Mono<Void> writeWith(Publisher<? extends DataBuffer> body) {

        MediaType contentType = exchange.getResponse().getHeaders().getContentType();
        if (!shouldResponseBody(contentType)) {
            return super.writeWith(body);
        }

        ServerHttpResponse originalResponse = exchange.getResponse();
        DataBufferFactory originalBufferFactory = originalResponse.bufferFactory();

        // 参见NettyWriteResponseFilter返回的都是flux
        if (body instanceof Flux) {
            Flux<? extends DataBuffer> flux = (Flux<? extends DataBuffer>) body;
            //todo flux.buffer() Does flux.buffer() cause a memory leak when an exception occurs?
            return super.writeWith(flux.buffer().map(dataBuffers  -> {
                // 合并多个流集合,解决返回体分段传输

                DataBufferFactory dataBufferFactory = new DefaultDataBufferFactory();
                DataBuffer joinDataBuffer = dataBufferFactory.join(dataBuffers);
                // 1kb = 1024kb,防止响应内容太大导致内存过大和影响速度
                if (joinDataBuffer.readableByteCount() > 0 && joinDataBuffer.readableByteCount() < 1024) {
                    byte[] originContent = new byte[joinDataBuffer.readableByteCount()];
                    joinDataBuffer.read(originContent);
                    // 释放掉内存
                   // todo There could be a leak in the code here because I myself have a new DefaultDataBufferFactory instead of 
                    NettyDataBufferFactory, will it cause a memory leak?
                    DataBufferUtils.release(joinDataBuffer);

                    String responseBodyLog = getResponseBodyLog(originContent);
                    if (log.isDebugEnabled()) {
                        log.debug(exchange.getLogPrefix() + "responseBodyLog is:" + responseBodyLog);
                    }
                    if (StringUtils.isNotEmpty(responseBodyLog)) {
                        exchange.getAttributes().put(Constants.RESPONSE_BODY_LOG_ATTR, responseBodyLog);
                    }
                    return originalBufferFactory.wrap(originContent);
                } else {
                    return joinDataBuffer;
                }
            }));
        }else {
            return super.writeWith(body);
        }

    }

    @Override
    public Mono<Void> writeAndFlushWith(Publisher<? extends Publisher<? extends DataBuffer>> body) {
        return writeWith(Flux.from(body).flatMapSequential(p -> p));
    }

    private boolean shouldResponseBody(MediaType contentType) {

        if (contentType == null) {
            return false;
        }

        if (MediaType.APPLICATION_JSON.isCompatibleWith(contentType)) {
            return true;
        }

        return false;
    }

}

private String getResponseBodyLog(byte[] buf) {
    if (buf != null && buf.length > 0) {
        int length = Math.min(buf.length,getResponseMaxPayLoadLength());
        try {
            String responseBodyToUse = new String(buf, 0, length, StandardCharsets.UTF_8.name());
            if (buf.length > getResponseMaxPayLoadLength()) {
                responseBodyToUse = responseBodyToUse + "......";
            }
            return responseBodyToUse;
        }
        catch (UnsupportedEncodingException ex) {
            return "[unknown]";
        }
    }
    return null;
}

}

Metadata

Metadata

Assignees

No one assigned

    Labels

    in: coreIssues in core modules (aop, beans, core, context, expression)status: invalidAn issue that we don't feel is valid

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions