Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 41 additions & 0 deletions protocol/chainlib/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,47 @@ func addHeadersAndSendString(c *fiber.Ctx, metaData []pairingtypes.Metadata, dat
return c.SendString(data)
}

// addHeadersAndSendBytes sends response bytes directly without string conversion.
// This is more memory-efficient for large responses as it avoids []byte to string allocation.
func addHeadersAndSendBytes(c *fiber.Ctx, metaData []pairingtypes.Metadata, data []byte) error {
for _, value := range metaData {
c.Set(value.Name, value.Value)
}

return c.Send(data)
}

// jsonRPCMethodRequest is a minimal struct to extract only the method field from JSON-RPC request
type jsonRPCMethodRequest struct {
Method string `json:"method"`
}

// extractJSONRPCMethodFromRequest extracts the method name from a JSON-RPC request body.
// Returns empty string if parsing fails (non-JSON-RPC request or invalid JSON).
func extractJSONRPCMethodFromRequest(requestBody []byte) string {
var req jsonRPCMethodRequest
if err := json.Unmarshal(requestBody, &req); err != nil {
return ""
}
return req.Method
}

// isPassthroughMethod returns true if the given method should use passthrough mode.
// Passthrough mode skips response body logging and uses direct byte sending
// to reduce memory allocations for large responses.
func isPassthroughMethod(method string) bool {
return IsPassthroughMethod(method)
}

// IsPassthroughMethod returns true if the given method should use passthrough mode.
// Passthrough mode skips expensive operations like decompression, response body logging,
// and string conversions to reduce memory allocations for large responses.
// This is exported so it can be used by other packages like rpcsmartrouter.
func IsPassthroughMethod(method string) bool {
// Currently only debug_traceTransaction is in passthrough mode
return method == "debug_traceTransaction"
}

func convertToJsonError(errorMsg string) string {
jsonResponse, err := json.Marshal(fiber.Map{
"error": errorMsg,
Expand Down
46 changes: 46 additions & 0 deletions protocol/chainlib/jsonRPC.go
Original file line number Diff line number Diff line change
Expand Up @@ -496,6 +496,52 @@ func (apil *JsonRPCChainListener) Serve(ctx context.Context, cmdFlags common.Con
return addHeadersAndSendString(fiberCtx, reply.GetMetadata(), response)
}

// Check if this is a passthrough method (e.g., debug_traceTransaction)
// Passthrough mode skips string conversion and response body logging to reduce memory allocations
requestMethod := extractJSONRPCMethodFromRequest(fiberCtx.Body())
if isPassthroughMethod(requestMethod) {
// Passthrough mode: skip string conversion and response body logging
utils.LavaFormatDebug("Passthrough mode: skipping response body logging for large response method",
utils.LogAttr("GUID", ctx),
utils.LogAttr("method", requestMethod),
utils.LogAttr("response_size_bytes", len(reply.Data)),
)
if relayResult.GetStatusCode() != 0 {
fiberCtx.Status(relayResult.StatusCode)
}
// Ensure Content-Encoding header is set if data is compressed
// Check if it's already in metadata (set by relayInner when skipping decompression)
hasContentEncoding := false
for _, meta := range reply.GetMetadata() {
if strings.EqualFold(meta.Name, "Content-Encoding") {
hasContentEncoding = true
utils.LavaFormatDebug("Found Content-Encoding in reply metadata",
utils.LogAttr("GUID", ctx),
utils.LogAttr("value", meta.Value),
)
break
}
}
// Set Content-Encoding header explicitly BEFORE setting other headers
// This ensures Fiber's compression middleware sees it and skips compression
// For passthrough methods, the data is compressed (we skipped decompression)
// So we always set the header, even if it's not in metadata (defensive)
fiberCtx.Set("Content-Encoding", "gzip")
if !hasContentEncoding {
utils.LavaFormatWarning("Content-Encoding not found in metadata for passthrough method, setting it explicitly", nil,
utils.LogAttr("GUID", ctx),
utils.LogAttr("method", requestMethod),
)
}
// Send bytes directly without string conversion
// addHeadersAndSendBytes will set all metadata headers
err = addHeadersAndSendBytes(fiberCtx, reply.GetMetadata(), reply.Data)
apil.logger.AddMetricForProcessingLatencyAfterProvider(metricsData, chainID, apiInterface)
apil.logger.SetEndToEndLatency(chainID, apiInterface, time.Since(startTime))
return err
}

// Normal mode: process response with string conversion and logging
response := checkBTCResponseAndFixReply(chainID, reply.Data)
// Log request and response
apil.logger.LogRequestAndResponse("jsonrpc http",
Expand Down
15 changes: 15 additions & 0 deletions protocol/common/compression.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package common
import (
"bytes"
"compress/gzip"
"encoding/binary"
"fmt"
"io"

"github.com/lavanet/lava/v5/utils"
Expand Down Expand Up @@ -79,3 +81,16 @@ func DecompressData(compressedData []byte) ([]byte, error) {

return decompressedData, nil
}

// GetUncompressedSizeFromGzip reads the ISIZE field from gzip footer
// Returns uncompressed size (accurate for files < 4GB due to modulo 2^32)
// The ISIZE field is located in the last 4 bytes of the gzip file (little-endian)
func GetUncompressedSizeFromGzip(compressedData []byte) (uint32, error) {
if len(compressedData) < 4 {
return 0, fmt.Errorf("gzip data too short to contain ISIZE field (need at least 4 bytes, got %d)", len(compressedData))
}

// ISIZE is in the last 4 bytes (little-endian)
isize := binary.LittleEndian.Uint32(compressedData[len(compressedData)-4:])
return isize, nil
}
60 changes: 60 additions & 0 deletions protocol/common/compression_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,66 @@ func TestCompressData_ExactThreshold(t *testing.T) {
require.Equal(t, exactData, result, "Uncompressed data should be unchanged")
}

func TestGetUncompressedSizeFromGzip_ValidGzip(t *testing.T) {
// Create test data and compress it
originalData := []byte(strings.Repeat("This is test data. ", 100))
compressed, _, err := CompressData(originalData, 0) // Force compression regardless of size
require.NoError(t, err, "Compression should succeed")

// Get uncompressed size from ISIZE field
uncompressedSize, err := GetUncompressedSizeFromGzip(compressed)

require.NoError(t, err, "GetUncompressedSizeFromGzip should not error on valid gzip")
require.Equal(t, uint32(len(originalData)), uncompressedSize, "ISIZE should match original data size")
}

func TestGetUncompressedSizeFromGzip_LargePayload(t *testing.T) {
// Test with large payload (similar to debug_traceTransaction responses)
largePayload := []byte(strings.Repeat("0x", 2000000)) // ~4MB
compressed, _, err := CompressData(largePayload, CompressionThreshold)
require.NoError(t, err, "Compression should succeed")

// Get uncompressed size from ISIZE field
uncompressedSize, err := GetUncompressedSizeFromGzip(compressed)

require.NoError(t, err, "GetUncompressedSizeFromGzip should not error on large gzip")
require.Equal(t, uint32(len(largePayload)), uncompressedSize, "ISIZE should match original large data size")
}

func TestGetUncompressedSizeFromGzip_TooShort(t *testing.T) {
// Test with data too short to contain ISIZE field
shortData := []byte{0x1f, 0x8b} // Only 2 bytes (gzip magic number)

uncompressedSize, err := GetUncompressedSizeFromGzip(shortData)

require.Error(t, err, "GetUncompressedSizeFromGzip should error on data too short")
require.Contains(t, err.Error(), "too short", "Error message should mention data is too short")
require.Equal(t, uint32(0), uncompressedSize, "Uncompressed size should be 0 on error")
}

func TestGetUncompressedSizeFromGzip_EmptyData(t *testing.T) {
// Test with empty data
emptyData := []byte{}

uncompressedSize, err := GetUncompressedSizeFromGzip(emptyData)

require.Error(t, err, "GetUncompressedSizeFromGzip should error on empty data")
require.Equal(t, uint32(0), uncompressedSize, "Uncompressed size should be 0 on error")
}

func TestGetUncompressedSizeFromGzip_InvalidGzip(t *testing.T) {
// Test with invalid gzip data (but long enough to have 4 bytes)
invalidData := []byte{0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00}

// This should still read the ISIZE field (last 4 bytes) even if gzip is invalid
// The function doesn't validate gzip format, just reads the footer
uncompressedSize, err := GetUncompressedSizeFromGzip(invalidData)

require.NoError(t, err, "GetUncompressedSizeFromGzip should not error on invalid gzip (it just reads footer)")
// The value might not be meaningful, but the function should not error
require.GreaterOrEqual(t, uncompressedSize, uint32(0), "ISIZE should be a valid uint32")
}

func TestCompressData_OneByteAboveThreshold(t *testing.T) {
// Test data one byte above threshold
data := []byte(strings.Repeat("a", CompressionThreshold+1))
Expand Down
61 changes: 55 additions & 6 deletions protocol/rpcsmartrouter/rpcsmartrouter_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -1447,15 +1447,64 @@ func (rpcss *RPCSmartRouterServer) relayInner(ctx context.Context, singleConsume
reply, err = endpointClient.Relay(connectCtx, relayRequest, grpc.Header(&responseHeader), grpc.Trailer(&relayResult.ProviderTrailer))
relayLatency = time.Since(relaySentTime)

// Decompress response if compressed
// Decompress response if compressed (skip for passthrough methods to save memory)
if reply != nil && reply.Data != nil {
if lavaCompressionValues := responseHeader.Get(common.LavaCompressionHeader); len(lavaCompressionValues) > 0 && lavaCompressionValues[0] == common.LavaCompressionGzip {
decompressedData, decompressErr := common.DecompressData(reply.Data)
if decompressErr != nil {
utils.LavaFormatError("Failed to decompress response", decompressErr, utils.LogAttr("GUID", ctx))
return nil, 0, decompressErr, false
// Check if this is a passthrough method - if so, verify payload size before skipping decompression
methodName := chainMessage.GetApi().Name
if chainlib.IsPassthroughMethod(methodName) {
// Verify payload is actually large using ISIZE field from gzip footer
uncompressedSize, err := common.GetUncompressedSizeFromGzip(reply.Data)
if err == nil && uncompressedSize >= uint32(common.CompressionThreshold) {
// Large payload confirmed - use passthrough mode
// Passthrough mode: keep compressed data to avoid memory allocation from decompression
// Add Content-Encoding header so HTTP client knows to decompress the response
reply.Metadata = append(reply.Metadata, pairingtypes.Metadata{
Name: "Content-Encoding",
Value: "gzip",
})
utils.LavaFormatDebug("Passthrough mode: skipping decompression for large response method",
utils.LogAttr("GUID", ctx),
utils.LogAttr("method", methodName),
utils.LogAttr("compressed_size_bytes", len(reply.Data)),
utils.LogAttr("uncompressed_size_bytes", uncompressedSize),
)
} else {
// Small payload or error reading ISIZE - decompress normally to detect errors
// This handles edge cases like small error responses that got compressed
if err != nil {
utils.LavaFormatWarning("Failed to read ISIZE from gzip, falling back to normal decompression",
err,
utils.LogAttr("GUID", ctx),
utils.LogAttr("method", methodName),
utils.LogAttr("compressed_size_bytes", len(reply.Data)),
)
} else {
utils.LavaFormatDebug("Passthrough method but payload too small, using normal decompression",
utils.LogAttr("GUID", ctx),
utils.LogAttr("method", methodName),
utils.LogAttr("compressed_size_bytes", len(reply.Data)),
utils.LogAttr("uncompressed_size_bytes", uncompressedSize),
utils.LogAttr("threshold_bytes", common.CompressionThreshold),
)
}
// Normal mode: decompress the response
decompressedData, decompressErr := common.DecompressData(reply.Data)
if decompressErr != nil {
utils.LavaFormatError("Failed to decompress response", decompressErr, utils.LogAttr("GUID", ctx))
return nil, 0, decompressErr, false
}
reply.Data = decompressedData
}
} else {
// Normal mode: decompress the response
decompressedData, decompressErr := common.DecompressData(reply.Data)
if decompressErr != nil {
utils.LavaFormatError("Failed to decompress response", decompressErr, utils.LogAttr("GUID", ctx))
return nil, 0, decompressErr, false
}
reply.Data = decompressedData
}
reply.Data = decompressedData
}
}

Expand Down
Loading