Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions velox/common/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ add_subdirectory(hyperloglog)
add_subdirectory(io)
add_subdirectory(memory)
add_subdirectory(process)
add_subdirectory(rpc)
add_subdirectory(serialization)
add_subdirectory(time)
add_subdirectory(testutil)
Expand Down
31 changes: 31 additions & 0 deletions velox/common/rpc/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
# Copyright (c) Facebook, Inc. and its affiliates.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

velox_install_library_headers()

velox_add_library(velox_rpc_types RPCTypes.h)

velox_add_library(velox_rpc_client IRPCClient.h)

velox_link_libraries(velox_rpc_client velox_rpc_types Folly::folly)

velox_add_library(velox_mock_rpc_client clients/MockRPCClient.cpp)

velox_link_libraries(
velox_mock_rpc_client
velox_rpc_client
velox_common_base
velox_exception
Folly::folly
)
105 changes: 105 additions & 0 deletions velox/common/rpc/IRPCClient.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#pragma once

#include <memory>
#include <string>
#include <vector>

#include <folly/futures/Future.h>

#include "velox/common/rpc/RPCTypes.h"

namespace facebook::velox::core {
class QueryConfig;
} // namespace facebook::velox::core

namespace facebook::velox::rpc {

/// Interface for RPC clients (transport layer).
///
/// IRPCClient is concerned with how to send requests and receive responses
/// over the network. It is decoupled from the business logic — domain-specific
/// request/response formatting is handled by AsyncRPCFunction (in
/// velox/expression/rpc/).
///
/// Implementations provide the actual transport (e.g., Thrift, gRPC, mock).
///
/// Thread safety: Implementations MUST be thread-safe for concurrent calls.
/// The operator may dispatch multiple RPCs concurrently from a single thread,
/// and completion callbacks run on the client's executor threads.
class IRPCClient {
public:
virtual ~IRPCClient() = default;

/// Execute a single RPC call asynchronously.
/// @param request The request to send.
/// @return A SemiFuture that will contain the response when complete.
virtual folly::SemiFuture<RPCResponse> call(const RPCRequest& request) = 0;

/// Execute a batch of RPC calls as a single request.
/// Default implementation fans out to individual call()s.
/// Override for backends that support native batching (e.g., batch
/// inference).
/// @param requests The batch of requests to send.
/// @return A SemiFuture that will contain all responses when complete.
virtual folly::SemiFuture<std::vector<RPCResponse>> callBatch(
const std::vector<RPCRequest>& requests) {
std::vector<folly::SemiFuture<RPCResponse>> futures;
futures.reserve(requests.size());
for (const auto& request : requests) {
futures.push_back(call(request));
}
// Capture rowIds to preserve them in error responses.
std::vector<int64_t> rowIds;
rowIds.reserve(requests.size());
for (const auto& request : requests) {
rowIds.push_back(request.rowId);
}
return folly::collectAll(std::move(futures))
.deferValue([rowIds = std::move(rowIds)](
std::vector<folly::Try<RPCResponse>> tries) {
std::vector<RPCResponse> responses;
responses.reserve(tries.size());
for (size_t i = 0; i < tries.size(); ++i) {
if (tries[i].hasValue()) {
responses.push_back(std::move(tries[i].value()));
} else {
RPCResponse errorResp;
errorResp.rowId = rowIds[i];
errorResp.error = tries[i].exception().what().toStdString();
responses.push_back(std::move(errorResp));
}
}
return responses;
});
}

/// Returns the service tier key for rate limiting (e.g.,
/// "service.backend.prod"). Requests from clients sharing the same tier key
/// share a concurrency budget in RPCRateLimiter.
/// Empty string means "no tier configured" — uses the global default limit.
virtual std::string tierKey() const {
return "";
}

/// Set the query config for session-level parameters.
/// Called by the operator before dispatching RPCs.
virtual void setQueryConfig(const core::QueryConfig* /*config*/) {}
};

} // namespace facebook::velox::rpc
94 changes: 94 additions & 0 deletions velox/common/rpc/RPCTypes.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#pragma once

#include <map>
#include <optional>
#include <string>

#include "velox/vector/TypeAliases.h"

namespace facebook::velox::rpc {

/// Streaming mode for RPC execution.
/// Controls how RPC results are emitted to downstream operators.
enum class RPCStreamingMode {
/// Emit rows as they complete individually (default).
/// Lower tail latency for high-variance workloads (e.g., LLM).
kPerRow,

/// Wait for all rows in batch before emitting.
/// Lower overhead, useful for uniform-latency workloads.
kBatch
};

/// Parse streaming mode from config string.
/// Returns kPerRow (default) unless explicitly set to "batch".
inline RPCStreamingMode parseStreamingMode(const std::string& value) {
if (value == "batch") {
return RPCStreamingMode::kBatch;
}
return RPCStreamingMode::kPerRow;
}

/// Generic request structure for RPC calls.
/// This is a minimal, domain-agnostic structure that works for any backend.
/// Domain-specific formatting (e.g., LLM prompts, embedding inputs) is handled
/// by the plan node's buildRequests() method.
struct RPCRequest {
/// Row ID for tracking which row this request belongs to.
/// This is a globally unique ID assigned by the operator.
int64_t rowId{0};

/// Original row index in the input batch.
/// This is used to slice the correct row from input columns when storing
/// passthrough data. Unlike rowId (which is globally unique across batches),
/// this is the index within the current input batch and is set by
/// prepareRequests() based on the SelectivityVector iteration.
/// CRITICAL: When prepareRequests() skips null rows, originalRowIndex
/// tracks the actual input position to avoid slicing mismatch.
vector_size_t originalRowIndex{0};

/// The request payload (opaque to the framework).
std::string payload;

/// Type-safe options for backend-specific parameters.
std::map<std::string, std::string> options;
};

/// Generic response structure from RPC calls.
/// This is a minimal, domain-agnostic structure that works for any backend.
struct RPCResponse {
/// Row ID for correlating response with the original request.
int64_t rowId{0};

/// The response result (opaque to the framework).
std::string result;

/// Type-safe metadata from the backend.
std::map<std::string, std::string> metadata;

/// Error message if the request failed.
std::optional<std::string> error;

/// Returns true if this response represents an error.
bool hasError() const {
return error.has_value();
}
};

} // namespace facebook::velox::rpc
124 changes: 124 additions & 0 deletions velox/common/rpc/clients/MockRPCClient.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include "velox/common/rpc/clients/MockRPCClient.h"

#include <folly/futures/Future.h>
#include <folly/futures/Promise.h>

namespace facebook::velox::rpc {

namespace {
// Function-local static pattern for thread-local RNG to avoid
// NonPodStaticDeclaration lint warning.
std::mt19937& threadLocalRng() {
thread_local std::mt19937 rng{std::random_device{}()};
return rng;
}
} // namespace

MockRPCClient::MockRPCClient(
std::chrono::milliseconds latency,
double errorRate,
std::shared_ptr<folly::CPUThreadPoolExecutor> executor)
: latency_(latency), errorRate_(errorRate) {
if (executor) {
executor_ = std::move(executor);
} else {
ownedExecutor_ = std::make_shared<folly::CPUThreadPoolExecutor>(4);
executor_ = ownedExecutor_;
}
}

MockRPCClient::~MockRPCClient() = default;

RPCResponse MockRPCClient::generateResponse(
const RPCRequest& request,
bool isError) {
if (isError) {
return RPCResponse{
.rowId = request.rowId,
.result = "",
.metadata = {},
.error = "Simulated error for row " + std::to_string(request.rowId)};
}

// Generate a mock response
std::string responseText = "Response for: ";
if (request.payload.size() > 30) {
responseText += request.payload.substr(0, 30) + "...";
} else {
responseText += request.payload;
}

return RPCResponse{
.rowId = request.rowId,
.result = std::move(responseText),
.metadata = {},
.error = std::nullopt};
}

folly::SemiFuture<RPCResponse> MockRPCClient::call(const RPCRequest& request) {
callCount_.fetch_add(1);

// Determine if this request should fail
std::uniform_real_distribution<double> dist(0.0, 1.0);
bool shouldError = dist(threadLocalRng()) < errorRate_;

// Use folly::via with the thread pool executor for safe async execution
return folly::via(
executor_.get(),
[this, request = request, shouldError, latency = latency_]()
-> RPCResponse {
// Simulate network latency
/* sleep override */ std::this_thread::sleep_for(latency);
// Generate and return the response
return generateResponse(request, shouldError);
});
}

folly::SemiFuture<std::vector<RPCResponse>> MockRPCClient::callBatch(
const std::vector<RPCRequest>& requests) {
// Capture error rate for thread safety
double errorRate = errorRate_;

// Use folly::via with the thread pool executor for safe async execution
return folly::via(
executor_.get(),
[this, requests, errorRate, latency = latency_]()
-> std::vector<RPCResponse> {
// Simulate network latency (single batch = single latency)
/* sleep override */ std::this_thread::sleep_for(latency);

std::vector<RPCResponse> responses;
responses.reserve(requests.size());

// Create RNG inside lambda to avoid thread-local access issues.
// Each executor thread will have its own properly initialized RNG.
thread_local std::mt19937 localRng{std::random_device{}()};
std::uniform_real_distribution<double> dist(0.0, 1.0);

for (const auto& request : requests) {
callCount_.fetch_add(1);
bool shouldError = dist(localRng) < errorRate;
responses.push_back(generateResponse(request, shouldError));
}

return responses;
});
}

} // namespace facebook::velox::rpc
Loading
Loading