Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,8 @@ set(
src/response_sender.h
src/pb_stub.h
src/pb_stub.cc
src/pb_stub_log.h
src/pb_stub_log.cc
src/pb_response_iterator.h
src/pb_response_iterator.cc
src/pb_cancel.cc
Expand Down
37 changes: 34 additions & 3 deletions src/message_queue.h
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2021-2022, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// Copyright 2021-2025, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
//
// Redistribution and use in source and binary forms, with or without
// modification, are permitted provided that the following conditions
Expand Down Expand Up @@ -32,7 +32,12 @@
#include <boost/thread/thread_time.hpp>
#include <cstddef>

#include "pb_exception.h"
#include "pb_utils.h"
#include "shm_manager.h"
#ifdef TRITON_PB_STUB
#include "pb_stub_log.h"
#endif

namespace triton { namespace backend { namespace python {
namespace bi = boost::interprocess;
Expand Down Expand Up @@ -110,7 +115,20 @@ class MessageQueue {

{
bi::scoped_lock<bi::interprocess_mutex> lock{*MutexMutable()};
Buffer()[Head()] = message;
int head_idx = Head();
// Additional check to avoid out of bounds read/write. Check DLIS-8378 for
// additional details.
if (head_idx < 0 || static_cast<size_t>(head_idx) >= Size()) {
constexpr const char* error_msg =
"Message queue head index out of bounds";
#ifdef TRITON_PB_STUB
LOG_ERROR << error_msg;
#else
LOG_MESSAGE(TRITONSERVER_LOG_ERROR, error_msg);
#endif
return;
}
Buffer()[head_idx] = message;
HeadIncrement();
}
SemFullMutable()->post();
Expand Down Expand Up @@ -145,7 +163,20 @@ class MessageQueue {
}
success = true;

Buffer()[Head()] = message;
int head_idx = Head();
// Additional check to avoid out of bounds read/write. Check DLIS-8378 for
// additional details.
if (head_idx < 0 || static_cast<size_t>(head_idx) >= Size()) {
constexpr const char* error_msg =
"Message queue head index out of bounds";
#ifdef TRITON_PB_STUB
LOG_ERROR << error_msg;
#else
LOG_MESSAGE(TRITONSERVER_LOG_ERROR, error_msg);
#endif
return;
}
Buffer()[head_idx] = message;
HeadIncrement();
}
SemFullMutable()->post();
Expand Down
1 change: 1 addition & 0 deletions src/pb_bls_cancel.cc
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
#include "pb_bls_cancel.h"

#include "pb_stub.h"
#include "pb_stub_log.h"

namespace triton { namespace backend { namespace python {

Expand Down
3 changes: 2 additions & 1 deletion src/pb_cancel.cc
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2023-2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// Copyright 2023-2025, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
//
// Redistribution and use in source and binary forms, with or without
// modification, are permitted provided that the following conditions
Expand Down Expand Up @@ -27,6 +27,7 @@
#include "pb_cancel.h"

#include "pb_stub.h"
#include "pb_stub_log.h"

namespace triton { namespace backend { namespace python {

Expand Down
133 changes: 1 addition & 132 deletions src/pb_stub.cc
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
#include "pb_preferred_memory.h"
#include "pb_response_iterator.h"
#include "pb_string.h"
#include "pb_stub_log.h"
#include "pb_utils.h"
#include "response_sender.h"
#include "scoped_defer.h"
Expand Down Expand Up @@ -1569,138 +1570,6 @@ Stub::ProcessBLSResponseDecoupled(std::unique_ptr<IPCMessage>& ipc_message)
}
}

std::unique_ptr<Logger> Logger::log_instance_;

std::unique_ptr<Logger>&
Logger::GetOrCreateInstance()
{
if (Logger::log_instance_.get() == nullptr) {
Logger::log_instance_ = std::make_unique<Logger>();
}

return Logger::log_instance_;
}

// Bound function, called from the python client
void
Logger::Log(const std::string& message, LogLevel level)
{
std::unique_ptr<Stub>& stub = Stub::GetOrCreateInstance();
py::object frame = py::module_::import("inspect").attr("currentframe");
py::object caller_frame = frame();
py::object info = py::module_::import("inspect").attr("getframeinfo");
py::object caller_info = info(caller_frame);
py::object filename_python = caller_info.attr("filename");
std::string filename = filename_python.cast<std::string>();
py::object lineno = caller_info.attr("lineno");
uint32_t line = lineno.cast<uint32_t>();

if (!stub->StubToParentServiceActive()) {
Logger::GetOrCreateInstance()->Log(filename, line, level, message);
} else {
std::unique_ptr<PbLog> log_msg(new PbLog(filename, line, message, level));
stub->EnqueueLogRequest(log_msg);
}
}

// Called internally (.e.g. LOG_ERROR << "Error"; )
void
Logger::Log(
const std::string& filename, uint32_t lineno, LogLevel level,
const std::string& message)
{
// If the log monitor service is not active yet, format
// and pass messages to cerr
if (!BackendLoggingActive()) {
std::string path(filename);
size_t pos = path.rfind(std::filesystem::path::preferred_separator);
if (pos != std::string::npos) {
path = path.substr(pos + 1, std::string::npos);
}
#ifdef _WIN32
std::stringstream ss;
SYSTEMTIME system_time;
GetSystemTime(&system_time);
ss << LeadingLogChar(level) << std::setfill('0') << std::setw(2)
<< system_time.wMonth << std::setw(2) << system_time.wDay << ' '
<< std::setw(2) << system_time.wHour << ':' << std::setw(2)
<< system_time.wMinute << ':' << std::setw(2) << system_time.wSecond
<< '.' << std::setw(6) << system_time.wMilliseconds * 1000 << ' '
<< static_cast<uint32_t>(GetCurrentProcessId()) << ' ' << path << ':'
<< lineno << "] ";
#else
std::stringstream ss;
struct timeval tv;
gettimeofday(&tv, NULL);
struct tm tm_time;
gmtime_r(((time_t*)&(tv.tv_sec)), &tm_time);
ss << LeadingLogChar(level) << std::setfill('0') << std::setw(2)
<< (tm_time.tm_mon + 1) << std::setw(2) << tm_time.tm_mday << " "
<< std::setw(2) << tm_time.tm_hour << ':' << std::setw(2)
<< tm_time.tm_min << ':' << std::setw(2) << tm_time.tm_sec << "."
<< std::setw(6) << tv.tv_usec << ' ' << static_cast<uint32_t>(getpid())
<< ' ' << path << ':' << lineno << "] ";
std::cerr << ss.str() << " " << message << std::endl;
#endif
} else {
// Ensure we do not create a stub instance before it has initialized
std::unique_ptr<Stub>& stub = Stub::GetOrCreateInstance();
std::unique_ptr<PbLog> log_msg(new PbLog(filename, lineno, message, level));
stub->EnqueueLogRequest(log_msg);
}
}

void
Logger::LogInfo(const std::string& message)
{
Logger::Log(message, LogLevel::kInfo);
}

void
Logger::LogWarn(const std::string& message)
{
Logger::Log(message, LogLevel::kWarning);
}

void
Logger::LogError(const std::string& message)
{
Logger::Log(message, LogLevel::kError);
}

void
Logger::LogVerbose(const std::string& message)
{
Logger::Log(message, LogLevel::kVerbose);
}

const std::string
Logger::LeadingLogChar(const LogLevel& level)
{
switch (level) {
case LogLevel::kWarning:
return "W";
case LogLevel::kError:
return "E";
case LogLevel::kInfo:
case LogLevel::kVerbose:
default:
return "I";
}
}

void
Logger::SetBackendLoggingActive(bool status)
{
backend_logging_active_ = status;
}

bool
Logger::BackendLoggingActive()
{
return backend_logging_active_;
}

PYBIND11_EMBEDDED_MODULE(c_python_backend_utils, module)
{
py::class_<PbError, std::shared_ptr<PbError>> triton_error(
Expand Down
102 changes: 0 additions & 102 deletions src/pb_stub.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,6 @@
#include <pybind11/numpy.h>
#include <pybind11/stl.h>

#include <filesystem>

#include "infer_request.h"
#include "infer_response.h"
#include "ipc_message.h"
Expand All @@ -41,7 +39,6 @@
#include "pb_cancel.h"
#include "pb_log.h"
#include "pb_response_iterator.h"
#include "pb_utils.h"


namespace bi = boost::interprocess;
Expand All @@ -54,105 +51,6 @@ using cudaStream_t = void*;

namespace triton { namespace backend { namespace python {

#define LOG_IF_EXCEPTION(X) \
do { \
try { \
(X); \
} \
catch (const PythonBackendException& pb_exception) { \
LOG_INFO << pb_exception.what(); \
} \
} while (false)

#define LOG_EXCEPTION(E) \
do { \
LOG_INFO << E.what(); \
} while (false)

/// Macros that use current filename and line number.
#define LOG_INFO LOG_FL(__FILE__, __LINE__, LogLevel::kInfo)
#define LOG_WARN LOG_FL(__FILE__, __LINE__, LogLevel::kWarning)
#define LOG_ERROR LOG_FL(__FILE__, __LINE__, LogLevel::kError)
#define LOG_VERBOSE LOG_FL(__FILE__, __LINE__, LogLevel::kVerbose)

class Logger {
public:
Logger() { backend_logging_active_ = false; };
~Logger() { log_instance_.reset(); };
/// Python client log function
static void Log(const std::string& message, LogLevel level = LogLevel::kInfo);

/// Python client log info function
static void LogInfo(const std::string& message);

/// Python client warning function
static void LogWarn(const std::string& message);

/// Python client log error function
static void LogError(const std::string& message);

/// Python client log verbose function
static void LogVerbose(const std::string& message);

/// Internal log function
void Log(
const std::string& filename, uint32_t lineno, LogLevel level,
const std::string& message);

/// Log format helper function
const std::string LeadingLogChar(const LogLevel& level);

/// Set PYBE Logging Status
void SetBackendLoggingActive(bool status);

/// Get PYBE Logging Status
bool BackendLoggingActive();

/// Singleton Getter Function
static std::unique_ptr<Logger>& GetOrCreateInstance();

DISALLOW_COPY_AND_ASSIGN(Logger);

/// Flush the log.
void Flush() { std::cerr << std::flush; }

private:
static std::unique_ptr<Logger> log_instance_;
bool backend_logging_active_;
};

class LogMessage {
public:
/// Create a log message, stripping the path down to the filename only
LogMessage(const char* file, int line, LogLevel level) : level_(level)
{
std::string path(file);
const char os_slash = std::filesystem::path::preferred_separator;
size_t pos = path.rfind(os_slash);
if (pos != std::string::npos) {
path = path.substr(pos + 1, std::string::npos);
}
file_ = path;
line_ = static_cast<uint32_t>(line);
}
/// Log message to console or send to backend (see Logger::Log for details)
~LogMessage()
{
Logger::GetOrCreateInstance()->Log(file_, line_, level_, stream_.str());
}

std::stringstream& stream() { return stream_; }

private:
std::stringstream stream_;
std::string file_;
uint32_t line_;
LogLevel level_;
};

#define LOG_FL(FN, LN, LVL) LogMessage((char*)(FN), LN, LVL).stream()


class ModelContext {
public:
// Scans and establishes path for serving the python model.
Expand Down
Loading
Loading