Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion elastic-otel-php.properties
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ test_all_php_versions_with_package_type=deb
test_app_code_host_kinds_short_names=(cli http)
test_groups_short_names=(no_ext_svc with_ext_svc)
php_headers_version=2.0
logger_features_enum_values=ALL=0,MODULE=1,REQUEST=2,TRANSPORT=3,BOOTSTRAP=4,HOOKS=5,INSTRUMENTATION=6,OTEL=7,DEPGUARD=8,OTLPEXPORT=9,OPAMP=10,CONFIG=11
logger_features_enum_values=ALL=0,MODULE=1,REQUEST=2,TRANSPORT=3,BOOTSTRAP=4,HOOKS=5,INSTRUMENTATION=6,OTEL=7,DEPGUARD=8,OTLPEXPORT=9,OPAMP=10,CONFIG=11,COORDINATOR=12
otel_proto_version=v1.5.0
# after changing version of semantic convention you MUST call script ./tools/build/generate_semconv.sh and commit related changes to prod/native/libsemconv
otel_semconv_version=1.32.0
Expand Down
6 changes: 3 additions & 3 deletions prod/native/extension/code/ModuleFunctions.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
#include "ModuleGlobals.h"
#include "ModuleFunctionsImpl.h"
#include "InternalFunctionInstrumentation.h"
#include "transport/HttpTransportAsync.h"
#include "coordinator/CoordinatorProcess.h"
#undef snprintf
#include "transport/OpAmp.h"
#include "PhpBridge.h"
Expand Down Expand Up @@ -212,7 +212,7 @@ PHP_FUNCTION(initialize) {
}
ZEND_HASH_FOREACH_END();

EAPM_GL(httpTransportAsync_)->initializeConnection(std::string(ZSTR_VAL(endpoint), ZSTR_LEN(endpoint)), ZSTR_HASH(endpoint), std::string(ZSTR_VAL(contentType), ZSTR_LEN(contentType)), endpointHeaders, std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::duration<double>(timeout)), static_cast<std::size_t>(maxRetries), std::chrono::milliseconds(retryDelay));
EAPM_GL(coordinatorProcess_)->getCoordinatorSender().initializeConnection(std::string(ZSTR_VAL(endpoint), ZSTR_LEN(endpoint)), ZSTR_HASH(endpoint), std::string(ZSTR_VAL(contentType), ZSTR_LEN(contentType)), endpointHeaders, std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::duration<double>(timeout)), static_cast<std::size_t>(maxRetries), std::chrono::milliseconds(retryDelay));
}

ZEND_BEGIN_ARG_INFO_EX(ArgInfoSend, 0, 0, 2)
Expand All @@ -228,7 +228,7 @@ PHP_FUNCTION(enqueue) {
Z_PARAM_STR(payload)
ZEND_PARSE_PARAMETERS_END();

EAPM_GL(httpTransportAsync_)->enqueue(ZSTR_HASH(endpoint), std::span<std::byte>(reinterpret_cast<std::byte *>(ZSTR_VAL(payload)), ZSTR_LEN(payload)));
EAPM_GL(coordinatorProcess_)->getCoordinatorSender().enqueue(ZSTR_HASH(endpoint), std::span<std::byte>(reinterpret_cast<std::byte *>(ZSTR_VAL(payload)), ZSTR_LEN(payload)));
}

ZEND_BEGIN_ARG_WITH_RETURN_TYPE_INFO_EX(elastic_otel_force_set_object_property_value_arginfo, 0, 3, _IS_BOOL, 0)
Expand Down
6 changes: 6 additions & 0 deletions prod/native/extension/code/ModuleInit.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
#include "SigSegvHandler.h"
#include "os/OsUtils.h"
#include "transport/OpAmp.h"
#include "coordinator/CoordinatorProcess.h"

#include <curl/curl.h>
#include <inttypes.h> // PRIu64
Expand Down Expand Up @@ -89,6 +90,11 @@ void elasticApmModuleInit(int moduleType, int moduleNumber) {
return;
}

if (globals->coordinatorProcess_->start()) {
delete globals;
std::exit(0);
}

ELOGF_DEBUG(globals->logger_, MODULE, "MINIT Replacing hooks");
elasticapm::php::Hooking::getInstance().fetchOriginalHooks();
elasticapm::php::Hooking::getInstance().replaceHooks(globals->config_->get().inferred_spans_enabled, globals->config_->get().dependency_autoloader_guard_enabled);
Expand Down
6 changes: 5 additions & 1 deletion prod/native/libcommon/code/AgentGlobals.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@
#include "InstrumentedFunctionHooksStorage.h"
#include "CommonUtils.h"
#include "ResourceDetector.h"
#include "coordinator/CoordinatorProcess.h"
#include "coordinator/CoordinatorMessagesDispatcher.h"
#include "transport/HttpTransportAsync.h"
#include "transport/OpAmp.h"
#include "DependencyAutoLoaderGuard.h"
Expand Down Expand Up @@ -65,7 +67,9 @@ AgentGlobals::AgentGlobals(std::shared_ptr<LoggerInterface> logger,
elasticDynamicConfig_(std::make_shared<opentelemetry::php::config::ElasticDynamicConfigurationAdapter>()),
opAmp_(std::make_shared<opentelemetry::php::transport::OpAmp>(logger_, config_, httpTransportAsync_, resourceDetector_)),
sharedMemory_(std::make_shared<elasticapm::php::SharedMemoryState>()),
requestScope_(std::make_shared<elasticapm::php::RequestScope>(logger_, bridge_, sapi_, sharedMemory_, dependencyAutoLoaderGuard_, inferredSpans_, config_, [hs = hooksStorage_]() { hs->clear(); }, [this]() { return getPeriodicTaskExecutor();}))
requestScope_(std::make_shared<elasticapm::php::RequestScope>(logger_, bridge_, sapi_, sharedMemory_, dependencyAutoLoaderGuard_, inferredSpans_, config_, [hs = hooksStorage_]() { hs->clear(); }, [this]() { return getPeriodicTaskExecutor();})),
messagesDispatcher_(std::make_shared<elasticapm::php::coordinator::CoordinatorMessagesDispatcher>(logger_, httpTransportAsync_)),
coordinatorProcess_(std::make_shared<elasticapm::php::coordinator::CoordinatorProcess>(logger_, messagesDispatcher_))
{
config_->addConfigUpdateWatcher([logger = logger_, stderrsink = logSinkStdErr_, syslogsink = logSinkSysLog_, filesink = logSinkFile_](ConfigurationSnapshot const &cfg) {
stderrsink->setLevel(cfg.log_level_stderr);
Expand Down
7 changes: 6 additions & 1 deletion prod/native/libcommon/code/AgentGlobals.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,10 @@ class LoggerSinkInterface;
class LogSinkFile;
class InstrumentedFunctionHooksStorageInterface;
class DependencyAutoLoaderGuard;
namespace coordinator {
class CoordinatorMessagesDispatcher;
class CoordinatorProcess;
} // namespace coordinator
namespace transport {
class CurlSender;
class HttpEndpoints;
Expand Down Expand Up @@ -92,7 +96,8 @@ class AgentGlobals {
std::shared_ptr<opentelemetry::php::transport::OpAmp> opAmp_;
std::shared_ptr<SharedMemoryState> sharedMemory_;
std::shared_ptr<RequestScope> requestScope_;

std::shared_ptr<coordinator::CoordinatorMessagesDispatcher> messagesDispatcher_;
std::shared_ptr<coordinator::CoordinatorProcess> coordinatorProcess_;
};

} // namespace elasticapm::php
20 changes: 19 additions & 1 deletion prod/native/libcommon/code/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,31 @@ file(GLOB_RECURSE SrcFiles
"./*.cpp"
)

set(CoordinatorProtoDir "${CMAKE_CURRENT_SOURCE_DIR}/coordinator/proto")

file(GLOB_RECURSE CoordinatorProtoFiles
"./coordinator/proto/*.proto"
)

message("Generating protobuf sources from: ${CoordinatorProtoFiles}")
file(MAKE_DIRECTORY "${CMAKE_CURRENT_BINARY_DIR}/coordinator/proto")

protobuf_generate(
LANGUAGE cpp
OUT_VAR COORDINATOR_PROTO_SRCS
IMPORT_DIRS "${CoordinatorProtoDir}"
PROTOS ${CoordinatorProtoFiles}
PROTOC_OUT_DIR "${CMAKE_CURRENT_BINARY_DIR}/coordinator/proto"
)

set (_Target common)

add_library (${_Target}
STATIC ${SrcFiles}
STATIC ${SrcFiles} ${COORDINATOR_PROTO_SRCS} ${COORDINATOR_PROTO_HDRS}
)

target_link_libraries(${_Target}
PUBLIC protobuf::libprotobuf
PRIVATE libunwind::libunwind
PRIVATE CURL::libcurl
PRIVATE opamp
Expand Down
106 changes: 106 additions & 0 deletions prod/native/libcommon/code/coordinator/ChunkedMessageProcessor.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch B.V. licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

#include "ChunkedMessageProcessor.h"

namespace elasticapm::php::coordinator {

bool ChunkedMessageProcessor::sendPayload(const std::string &payload) {
msgId_++;
std::size_t dataPayloadSize = sizeof(CoordinatorPayload::payload);

CoordinatorPayload chunk;
chunk.senderProcessId = senderProcessId_;
chunk.msgId = msgId_;
chunk.payloadTotalSize = payload.size();
chunk.payloadOffset = 0;

while (chunk.payloadOffset < payload.size()) {
size_t chunkSize = std::min(dataPayloadSize, payload.size() - chunk.payloadOffset);

ELOG_TRACE(logger_, COORDINATOR, "ChunkedMessageProcessor: sending chunked message. msgId: {}, offset: {}, size: {}, totalSize: {}, data size in chunk: {}", msgId_, chunk.payloadOffset, chunkSize, payload.size(), chunkSize + offsetof(CoordinatorPayload, payload));

std::memcpy(chunk.payload.data(), payload.data() + chunk.payloadOffset, chunkSize);

if (!sendBuffer_(&chunk, chunkSize + offsetof(CoordinatorPayload, payload))) {
ELOG_WARNING(logger_, COORDINATOR, "ChunkedMessageProcessor: failed to send chunked message. msgId: {}, offset: {}", msgId_, chunk.payloadOffset);
return false;
}

chunk.payloadOffset += chunkSize;
}
return true;
}

void ChunkedMessageProcessor::processReceivedChunk(const CoordinatorPayload *chunk, size_t chunkSize) {
ELOG_TRACE(logger_, COORDINATOR, "ChunkedMessageProcessor: received chunked message. msgId: {}, offset: {}, chunkSize: {}, totalSize: {}", chunk->msgId, chunk->payloadOffset, chunkSize, chunk->payloadTotalSize);
std::unique_lock<std::mutex> lock(mutex_);

auto &messagesForSender = recievedMessages_[chunk->senderProcessId];
auto it = messagesForSender.find(chunk->msgId);
if (it == messagesForSender.end()) {
it = messagesForSender.emplace(chunk->msgId, ChunkedMessage(chunk->payloadTotalSize)).first;
}

ChunkedMessage &message = it->second;

std::size_t payloadSize = chunkSize - offsetof(CoordinatorPayload, payload); // actual payload size in this chunk
std::span<const std::byte> chunkData(chunk->payload.data(), payloadSize);

if (message.addNextChunk(chunkData)) {
ELOG_TRACE(logger_, COORDINATOR, "ChunkedMessageProcessor: received chunked message. msgId: {}, offset: {}, receivedSize: {}, totalSize: {}. Message complete, processing.", chunk->msgId, chunk->payloadOffset, message.getData().size(), chunk->payloadTotalSize);

std::vector<std::byte> data;
message.swapData(data);

messagesForSender.erase(it);
if (messagesForSender.empty()) {
recievedMessages_.erase(chunk->senderProcessId);
}

lock.unlock();
processMessage_(data);

} else {
ELOG_TRACE(logger_, COORDINATOR, "ChunkedMessageProcessor: received chunked message. msgId: {}, offset: {}, receivedSize: {}, totalSize: {}", chunk->msgId, chunk->payloadOffset, message.getData().size(), chunk->payloadTotalSize);
}
}

void ChunkedMessageProcessor::cleanupAbandonedMessages(std::chrono::steady_clock::time_point now, std::chrono::seconds maxAge) {
std::lock_guard<std::mutex> lock(mutex_);
for (auto senderIt = recievedMessages_.begin(); senderIt != recievedMessages_.end();) {
auto &messagesForSender = senderIt->second;
for (auto msgIt = messagesForSender.begin(); msgIt != messagesForSender.end();) {
if (now - msgIt->second.getLastUpdated() > maxAge) {
ELOG_DEBUG(logger_, COORDINATOR, "ChunkedMessageProcessor: cleaning up old message from sender pid {} msgId {}", senderIt->first, msgIt->first);
msgIt = messagesForSender.erase(msgIt);
} else {
++msgIt;
}
}

if (messagesForSender.empty()) {
senderIt = recievedMessages_.erase(senderIt);
} else {
++senderIt;
}
}
}

} // namespace elasticapm::php::coordinator
106 changes: 106 additions & 0 deletions prod/native/libcommon/code/coordinator/ChunkedMessageProcessor.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch B.V. licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

#pragma once

#include "LoggerInterface.h"

#include <array>
#include <chrono>
#include <cstring>
#include <functional>
#include <memory>
#include <mutex>
#include <unordered_map>
#include <stdexcept>
#include <vector>

#include "CoordinatorMessagesDispatcher.h"

namespace elasticapm::php::coordinator {

class ChunkedMessage {
public:
ChunkedMessage(std::size_t totalSize) : totalSize_(totalSize) {
data_.reserve(totalSize_);
}

// return true if message is complete
bool addNextChunk(const std::span<const std::byte> chunkData) {
if (data_.size() + chunkData.size_bytes() > totalSize_) {
throw std::runtime_error("ChunkedMessage: chunk exceeds total size");
}

data_.insert(data_.end(), chunkData.begin(), chunkData.end());
lastUpdated_ = std::chrono::steady_clock::now();
return data_.size() == totalSize_;
}

const std::vector<std::byte> &getData() const {
return data_;
}

void swapData(std::vector<std::byte> &second) {
data_.swap(second);
}

const std::chrono::steady_clock::time_point &getLastUpdated() const {
return lastUpdated_;
}

private:
std::size_t totalSize_;
std::vector<std::byte> data_;
std::chrono::steady_clock::time_point lastUpdated_;
};

struct CoordinatorPayload {
pid_t senderProcessId;
uint64_t msgId;
std::size_t payloadTotalSize;
std::size_t payloadOffset;
std::array<std::byte, 4064> payload; // it must be last field in the struct. sizeof(CoordinatorPayload) = 4096 bytes with current payload size
};

class ChunkedMessageProcessor {
public:
using sendBuffer_t = std::function<bool(const void *, size_t)>;
using processMessage_t = std::function<void(const std::span<const std::byte>)>;

using msgId_t = uint64_t;

ChunkedMessageProcessor(std::shared_ptr<LoggerInterface> logger, std::size_t maxChunkSize, sendBuffer_t sendBuffer, processMessage_t processMessage) : logger_(logger), maxChunkSize_(maxChunkSize), sendBuffer_(std::move(sendBuffer)), processMessage_(std::move(processMessage)) {
}

bool sendPayload(const std::string &payload);
void processReceivedChunk(const CoordinatorPayload *chunk, size_t chunkSize);
void cleanupAbandonedMessages(std::chrono::steady_clock::time_point now, std::chrono::seconds maxAge);

private:
std::mutex mutex_;
std::shared_ptr<LoggerInterface> logger_;
pid_t senderProcessId_ = getpid();
std::size_t maxChunkSize_;
sendBuffer_t sendBuffer_;
processMessage_t processMessage_;
std::unordered_map<pid_t, std::unordered_map<msgId_t, ChunkedMessage>> recievedMessages_;
msgId_t msgId_ = 0;
};

} // namespace elasticapm::php::coordinator
Loading