diff --git a/elastic-otel-php.properties b/elastic-otel-php.properties index f77e152d..4d494add 100644 --- a/elastic-otel-php.properties +++ b/elastic-otel-php.properties @@ -5,7 +5,7 @@ test_all_php_versions_with_package_type=deb test_app_code_host_kinds_short_names=(cli http) test_groups_short_names=(no_ext_svc with_ext_svc) php_headers_version=2.0 -logger_features_enum_values=ALL=0,MODULE=1,REQUEST=2,TRANSPORT=3,BOOTSTRAP=4,HOOKS=5,INSTRUMENTATION=6,OTEL=7,DEPGUARD=8,OTLPEXPORT=9,OPAMP=10,CONFIG=11 +logger_features_enum_values=ALL=0,MODULE=1,REQUEST=2,TRANSPORT=3,BOOTSTRAP=4,HOOKS=5,INSTRUMENTATION=6,OTEL=7,DEPGUARD=8,OTLPEXPORT=9,OPAMP=10,CONFIG=11,COORDINATOR=12 otel_proto_version=v1.5.0 # after changing version of semantic convention you MUST call script ./tools/build/generate_semconv.sh and commit related changes to prod/native/libsemconv otel_semconv_version=1.32.0 diff --git a/prod/native/extension/code/ModuleFunctions.cpp b/prod/native/extension/code/ModuleFunctions.cpp index 7feeaaed..bbdfb1ec 100644 --- a/prod/native/extension/code/ModuleFunctions.cpp +++ b/prod/native/extension/code/ModuleFunctions.cpp @@ -26,7 +26,7 @@ #include "ModuleGlobals.h" #include "ModuleFunctionsImpl.h" #include "InternalFunctionInstrumentation.h" -#include "transport/HttpTransportAsync.h" +#include "coordinator/CoordinatorProcess.h" #undef snprintf #include "transport/OpAmp.h" #include "PhpBridge.h" @@ -212,7 +212,7 @@ PHP_FUNCTION(initialize) { } ZEND_HASH_FOREACH_END(); - EAPM_GL(httpTransportAsync_)->initializeConnection(std::string(ZSTR_VAL(endpoint), ZSTR_LEN(endpoint)), ZSTR_HASH(endpoint), std::string(ZSTR_VAL(contentType), ZSTR_LEN(contentType)), endpointHeaders, std::chrono::duration_cast(std::chrono::duration(timeout)), static_cast(maxRetries), std::chrono::milliseconds(retryDelay)); + EAPM_GL(coordinatorProcess_)->getCoordinatorSender().initializeConnection(std::string(ZSTR_VAL(endpoint), ZSTR_LEN(endpoint)), ZSTR_HASH(endpoint), std::string(ZSTR_VAL(contentType), ZSTR_LEN(contentType)), endpointHeaders, std::chrono::duration_cast(std::chrono::duration(timeout)), static_cast(maxRetries), std::chrono::milliseconds(retryDelay)); } ZEND_BEGIN_ARG_INFO_EX(ArgInfoSend, 0, 0, 2) @@ -228,7 +228,7 @@ PHP_FUNCTION(enqueue) { Z_PARAM_STR(payload) ZEND_PARSE_PARAMETERS_END(); - EAPM_GL(httpTransportAsync_)->enqueue(ZSTR_HASH(endpoint), std::span(reinterpret_cast(ZSTR_VAL(payload)), ZSTR_LEN(payload))); + EAPM_GL(coordinatorProcess_)->getCoordinatorSender().enqueue(ZSTR_HASH(endpoint), std::span(reinterpret_cast(ZSTR_VAL(payload)), ZSTR_LEN(payload))); } ZEND_BEGIN_ARG_WITH_RETURN_TYPE_INFO_EX(elastic_otel_force_set_object_property_value_arginfo, 0, 3, _IS_BOOL, 0) diff --git a/prod/native/extension/code/ModuleInit.cpp b/prod/native/extension/code/ModuleInit.cpp index cbff246b..f40d7a66 100644 --- a/prod/native/extension/code/ModuleInit.cpp +++ b/prod/native/extension/code/ModuleInit.cpp @@ -36,6 +36,7 @@ #include "SigSegvHandler.h" #include "os/OsUtils.h" #include "transport/OpAmp.h" +#include "coordinator/CoordinatorProcess.h" #include #include // PRIu64 @@ -89,6 +90,11 @@ void elasticApmModuleInit(int moduleType, int moduleNumber) { return; } + if (globals->coordinatorProcess_->start()) { + delete globals; + std::exit(0); + } + ELOGF_DEBUG(globals->logger_, MODULE, "MINIT Replacing hooks"); elasticapm::php::Hooking::getInstance().fetchOriginalHooks(); elasticapm::php::Hooking::getInstance().replaceHooks(globals->config_->get().inferred_spans_enabled, globals->config_->get().dependency_autoloader_guard_enabled); diff --git a/prod/native/libcommon/code/AgentGlobals.cpp b/prod/native/libcommon/code/AgentGlobals.cpp index a83fb245..24b28f03 100644 --- a/prod/native/libcommon/code/AgentGlobals.cpp +++ b/prod/native/libcommon/code/AgentGlobals.cpp @@ -31,6 +31,8 @@ #include "InstrumentedFunctionHooksStorage.h" #include "CommonUtils.h" #include "ResourceDetector.h" +#include "coordinator/CoordinatorProcess.h" +#include "coordinator/CoordinatorMessagesDispatcher.h" #include "transport/HttpTransportAsync.h" #include "transport/OpAmp.h" #include "DependencyAutoLoaderGuard.h" @@ -65,7 +67,9 @@ AgentGlobals::AgentGlobals(std::shared_ptr logger, elasticDynamicConfig_(std::make_shared()), opAmp_(std::make_shared(logger_, config_, httpTransportAsync_, resourceDetector_)), sharedMemory_(std::make_shared()), - requestScope_(std::make_shared(logger_, bridge_, sapi_, sharedMemory_, dependencyAutoLoaderGuard_, inferredSpans_, config_, [hs = hooksStorage_]() { hs->clear(); }, [this]() { return getPeriodicTaskExecutor();})) + requestScope_(std::make_shared(logger_, bridge_, sapi_, sharedMemory_, dependencyAutoLoaderGuard_, inferredSpans_, config_, [hs = hooksStorage_]() { hs->clear(); }, [this]() { return getPeriodicTaskExecutor();})), + messagesDispatcher_(std::make_shared(logger_, httpTransportAsync_)), + coordinatorProcess_(std::make_shared(logger_, messagesDispatcher_)) { config_->addConfigUpdateWatcher([logger = logger_, stderrsink = logSinkStdErr_, syslogsink = logSinkSysLog_, filesink = logSinkFile_](ConfigurationSnapshot const &cfg) { stderrsink->setLevel(cfg.log_level_stderr); diff --git a/prod/native/libcommon/code/AgentGlobals.h b/prod/native/libcommon/code/AgentGlobals.h index f4cfdd05..44da31de 100644 --- a/prod/native/libcommon/code/AgentGlobals.h +++ b/prod/native/libcommon/code/AgentGlobals.h @@ -51,6 +51,10 @@ class LoggerSinkInterface; class LogSinkFile; class InstrumentedFunctionHooksStorageInterface; class DependencyAutoLoaderGuard; +namespace coordinator { +class CoordinatorMessagesDispatcher; +class CoordinatorProcess; +} // namespace coordinator namespace transport { class CurlSender; class HttpEndpoints; @@ -92,7 +96,8 @@ class AgentGlobals { std::shared_ptr opAmp_; std::shared_ptr sharedMemory_; std::shared_ptr requestScope_; - + std::shared_ptr messagesDispatcher_; + std::shared_ptr coordinatorProcess_; }; } // namespace elasticapm::php diff --git a/prod/native/libcommon/code/CMakeLists.txt b/prod/native/libcommon/code/CMakeLists.txt index 7437ac02..9621710a 100644 --- a/prod/native/libcommon/code/CMakeLists.txt +++ b/prod/native/libcommon/code/CMakeLists.txt @@ -2,13 +2,31 @@ file(GLOB_RECURSE SrcFiles "./*.cpp" ) +set(CoordinatorProtoDir "${CMAKE_CURRENT_SOURCE_DIR}/coordinator/proto") + +file(GLOB_RECURSE CoordinatorProtoFiles + "./coordinator/proto/*.proto" +) + +message("Generating protobuf sources from: ${CoordinatorProtoFiles}") +file(MAKE_DIRECTORY "${CMAKE_CURRENT_BINARY_DIR}/coordinator/proto") + +protobuf_generate( + LANGUAGE cpp + OUT_VAR COORDINATOR_PROTO_SRCS + IMPORT_DIRS "${CoordinatorProtoDir}" + PROTOS ${CoordinatorProtoFiles} + PROTOC_OUT_DIR "${CMAKE_CURRENT_BINARY_DIR}/coordinator/proto" +) + set (_Target common) add_library (${_Target} - STATIC ${SrcFiles} + STATIC ${SrcFiles} ${COORDINATOR_PROTO_SRCS} ${COORDINATOR_PROTO_HDRS} ) target_link_libraries(${_Target} + PUBLIC protobuf::libprotobuf PRIVATE libunwind::libunwind PRIVATE CURL::libcurl PRIVATE opamp diff --git a/prod/native/libcommon/code/coordinator/ChunkedMessageProcessor.cpp b/prod/native/libcommon/code/coordinator/ChunkedMessageProcessor.cpp new file mode 100644 index 00000000..b5a51eb9 --- /dev/null +++ b/prod/native/libcommon/code/coordinator/ChunkedMessageProcessor.cpp @@ -0,0 +1,106 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch B.V. licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "ChunkedMessageProcessor.h" + +namespace elasticapm::php::coordinator { + +bool ChunkedMessageProcessor::sendPayload(const std::string &payload) { + msgId_++; + std::size_t dataPayloadSize = sizeof(CoordinatorPayload::payload); + + CoordinatorPayload chunk; + chunk.senderProcessId = senderProcessId_; + chunk.msgId = msgId_; + chunk.payloadTotalSize = payload.size(); + chunk.payloadOffset = 0; + + while (chunk.payloadOffset < payload.size()) { + size_t chunkSize = std::min(dataPayloadSize, payload.size() - chunk.payloadOffset); + + ELOG_TRACE(logger_, COORDINATOR, "ChunkedMessageProcessor: sending chunked message. msgId: {}, offset: {}, size: {}, totalSize: {}, data size in chunk: {}", msgId_, chunk.payloadOffset, chunkSize, payload.size(), chunkSize + offsetof(CoordinatorPayload, payload)); + + std::memcpy(chunk.payload.data(), payload.data() + chunk.payloadOffset, chunkSize); + + if (!sendBuffer_(&chunk, chunkSize + offsetof(CoordinatorPayload, payload))) { + ELOG_WARNING(logger_, COORDINATOR, "ChunkedMessageProcessor: failed to send chunked message. msgId: {}, offset: {}", msgId_, chunk.payloadOffset); + return false; + } + + chunk.payloadOffset += chunkSize; + } + return true; +} + +void ChunkedMessageProcessor::processReceivedChunk(const CoordinatorPayload *chunk, size_t chunkSize) { + ELOG_TRACE(logger_, COORDINATOR, "ChunkedMessageProcessor: received chunked message. msgId: {}, offset: {}, chunkSize: {}, totalSize: {}", chunk->msgId, chunk->payloadOffset, chunkSize, chunk->payloadTotalSize); + std::unique_lock lock(mutex_); + + auto &messagesForSender = recievedMessages_[chunk->senderProcessId]; + auto it = messagesForSender.find(chunk->msgId); + if (it == messagesForSender.end()) { + it = messagesForSender.emplace(chunk->msgId, ChunkedMessage(chunk->payloadTotalSize)).first; + } + + ChunkedMessage &message = it->second; + + std::size_t payloadSize = chunkSize - offsetof(CoordinatorPayload, payload); // actual payload size in this chunk + std::span chunkData(chunk->payload.data(), payloadSize); + + if (message.addNextChunk(chunkData)) { + ELOG_TRACE(logger_, COORDINATOR, "ChunkedMessageProcessor: received chunked message. msgId: {}, offset: {}, receivedSize: {}, totalSize: {}. Message complete, processing.", chunk->msgId, chunk->payloadOffset, message.getData().size(), chunk->payloadTotalSize); + + std::vector data; + message.swapData(data); + + messagesForSender.erase(it); + if (messagesForSender.empty()) { + recievedMessages_.erase(chunk->senderProcessId); + } + + lock.unlock(); + processMessage_(data); + + } else { + ELOG_TRACE(logger_, COORDINATOR, "ChunkedMessageProcessor: received chunked message. msgId: {}, offset: {}, receivedSize: {}, totalSize: {}", chunk->msgId, chunk->payloadOffset, message.getData().size(), chunk->payloadTotalSize); + } +} + +void ChunkedMessageProcessor::cleanupAbandonedMessages(std::chrono::steady_clock::time_point now, std::chrono::seconds maxAge) { + std::lock_guard lock(mutex_); + for (auto senderIt = recievedMessages_.begin(); senderIt != recievedMessages_.end();) { + auto &messagesForSender = senderIt->second; + for (auto msgIt = messagesForSender.begin(); msgIt != messagesForSender.end();) { + if (now - msgIt->second.getLastUpdated() > maxAge) { + ELOG_DEBUG(logger_, COORDINATOR, "ChunkedMessageProcessor: cleaning up old message from sender pid {} msgId {}", senderIt->first, msgIt->first); + msgIt = messagesForSender.erase(msgIt); + } else { + ++msgIt; + } + } + + if (messagesForSender.empty()) { + senderIt = recievedMessages_.erase(senderIt); + } else { + ++senderIt; + } + } +} + +} // namespace elasticapm::php::coordinator diff --git a/prod/native/libcommon/code/coordinator/ChunkedMessageProcessor.h b/prod/native/libcommon/code/coordinator/ChunkedMessageProcessor.h new file mode 100644 index 00000000..92eb53d0 --- /dev/null +++ b/prod/native/libcommon/code/coordinator/ChunkedMessageProcessor.h @@ -0,0 +1,106 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch B.V. licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +#include "LoggerInterface.h" + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "CoordinatorMessagesDispatcher.h" + +namespace elasticapm::php::coordinator { + +class ChunkedMessage { +public: + ChunkedMessage(std::size_t totalSize) : totalSize_(totalSize) { + data_.reserve(totalSize_); + } + + // return true if message is complete + bool addNextChunk(const std::span chunkData) { + if (data_.size() + chunkData.size_bytes() > totalSize_) { + throw std::runtime_error("ChunkedMessage: chunk exceeds total size"); + } + + data_.insert(data_.end(), chunkData.begin(), chunkData.end()); + lastUpdated_ = std::chrono::steady_clock::now(); + return data_.size() == totalSize_; + } + + const std::vector &getData() const { + return data_; + } + + void swapData(std::vector &second) { + data_.swap(second); + } + + const std::chrono::steady_clock::time_point &getLastUpdated() const { + return lastUpdated_; + } + +private: + std::size_t totalSize_; + std::vector data_; + std::chrono::steady_clock::time_point lastUpdated_; +}; + +struct CoordinatorPayload { + pid_t senderProcessId; + uint64_t msgId; + std::size_t payloadTotalSize; + std::size_t payloadOffset; + std::array payload; // it must be last field in the struct. sizeof(CoordinatorPayload) = 4096 bytes with current payload size +}; + +class ChunkedMessageProcessor { +public: + using sendBuffer_t = std::function; + using processMessage_t = std::function)>; + + using msgId_t = uint64_t; + + ChunkedMessageProcessor(std::shared_ptr logger, std::size_t maxChunkSize, sendBuffer_t sendBuffer, processMessage_t processMessage) : logger_(logger), maxChunkSize_(maxChunkSize), sendBuffer_(std::move(sendBuffer)), processMessage_(std::move(processMessage)) { + } + + bool sendPayload(const std::string &payload); + void processReceivedChunk(const CoordinatorPayload *chunk, size_t chunkSize); + void cleanupAbandonedMessages(std::chrono::steady_clock::time_point now, std::chrono::seconds maxAge); + +private: + std::mutex mutex_; + std::shared_ptr logger_; + pid_t senderProcessId_ = getpid(); + std::size_t maxChunkSize_; + sendBuffer_t sendBuffer_; + processMessage_t processMessage_; + std::unordered_map> recievedMessages_; + msgId_t msgId_ = 0; +}; + +} // namespace elasticapm::php::coordinator \ No newline at end of file diff --git a/prod/native/libcommon/code/coordinator/CoordinatorMessagesDispatcher.cpp b/prod/native/libcommon/code/coordinator/CoordinatorMessagesDispatcher.cpp new file mode 100644 index 00000000..5f19d589 --- /dev/null +++ b/prod/native/libcommon/code/coordinator/CoordinatorMessagesDispatcher.cpp @@ -0,0 +1,94 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch B.V. licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "CoordinatorMessagesDispatcher.h" +#include "coordinator/proto/CoordinatorCommands.pb.h" + +namespace elasticapm::php::coordinator { + +void CoordinatorMessagesDispatcher::processRecievedMessage(const std::span data) { + + coordinator::CoordinatorCommand command; + if (!command.ParseFromArray(data.data(), data.size())) { + ELOG_ERROR(logger_, COORDINATOR, "CoordinatorMessagesDispatcher: Failed to parse CoordinatorCommand"); + return; + } + + switch (command.type()) { + case coordinator::CoordinatorCommand::ESTABLISH_CONNECTION: + { + if (!command.has_establish_connection()) { + ELOG_ERROR(logger_, COORDINATOR, "CoordinatorMessagesDispatcher: Missing establish_connection payload"); + return; + } + const auto &c = command.establish_connection(); + ELOG_DEBUG(logger_, COORDINATOR, + "CoordinatorMessagesDispatcher: EstablishConnection: url='{}' hash={} content_type='{}' headers={} timeout_ms={} max_retries={} retry_delay_ms={}", + c.endpoint_url(), + c.endpoint_hash(), + c.content_type(), + c.endpoint_headers_size(), + c.timeout_ms(), + c.max_retries(), + c.retry_delay_ms()); + + std::vector> headers; + for (const auto &h : c.endpoint_headers()) { + headers.emplace_back(h.first, h.second); + } + + httpTransport_->initializeConnection( + c.endpoint_url(), + c.endpoint_hash(), + c.content_type(), + headers, + std::chrono::milliseconds(c.timeout_ms()), + c.max_retries(), + std::chrono::milliseconds(c.retry_delay_ms()) + ); + + break; + } + case coordinator::CoordinatorCommand::SEND_ENDPOINT_PAYLOAD: + { + if (!command.has_send_endpoint_payload()) { + ELOG_ERROR(logger_, COORDINATOR, "CoordinatorMessagesDispatcher: Missing send_endpoint_payload"); + return; + } + const auto &p = command.send_endpoint_payload(); + ELOG_DEBUG(logger_, COORDINATOR, + "CoordinatorMessagesDispatcher: SendEndpointPayload: hash={} payload_size={}", + p.endpoint_hash(), + p.payload().size()); + + + const std::string &raw = p.payload(); + std::span buf(reinterpret_cast(const_cast(raw.data())), raw.size()); + + httpTransport_->enqueue(p.endpoint_hash(), buf); + break; + } + default: + ELOG_WARNING(logger_, COORDINATOR, "CoordinatorMessagesDispatcher: Unknown CoordinatorCommand type={}", static_cast(command.type())); + break; + } + +} + +} // namespace elasticapm::php \ No newline at end of file diff --git a/prod/native/libcommon/code/coordinator/CoordinatorMessagesDispatcher.h b/prod/native/libcommon/code/coordinator/CoordinatorMessagesDispatcher.h new file mode 100644 index 00000000..80167acd --- /dev/null +++ b/prod/native/libcommon/code/coordinator/CoordinatorMessagesDispatcher.h @@ -0,0 +1,48 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch B.V. licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + + #pragma once + + + +#include "LoggerInterface.h" +#include "transport/HttpTransportAsyncInterface.h" + +#include +#include + + +namespace elasticapm::php::coordinator { + + +class CoordinatorMessagesDispatcher { +public: + CoordinatorMessagesDispatcher(std::shared_ptr logger, std::shared_ptr httpTransport) : logger_(std::move(logger)), httpTransport_(std::move(httpTransport)) { + } + + ~CoordinatorMessagesDispatcher() = default; + + void processRecievedMessage(const std::span data); + +private: + std::shared_ptr logger_; + std::shared_ptr httpTransport_; +}; + +} \ No newline at end of file diff --git a/prod/native/libcommon/code/coordinator/CoordinatorProcess.cpp b/prod/native/libcommon/code/coordinator/CoordinatorProcess.cpp new file mode 100644 index 00000000..d631c5b4 --- /dev/null +++ b/prod/native/libcommon/code/coordinator/CoordinatorProcess.cpp @@ -0,0 +1,75 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch B.V. licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "CoordinatorProcess.h" + +namespace elasticapm::php::coordinator { + +void CoordinatorProcess::coordinatorLoop() { + setupPeriodicTasks(); + periodicTaskExecutor_->resumePeriodicTasks(); + + char buffer[maxMqPayloadSize]; + while (working_.load()) { + size_t receivedSize = 0; + unsigned int priority = 0; + + try { + if (commandQueue_->timed_receive(buffer, maxMqPayloadSize, receivedSize, priority, std::chrono::steady_clock::now() + std::chrono::milliseconds(10))) { + processor_.processReceivedChunk(reinterpret_cast(buffer), receivedSize); + } + } catch (boost::interprocess::interprocess_exception &ex) { + if (logger_) { + ELOG_DEBUG(logger_, COORDINATOR, "CoordinatorProcess: message_queue receive failed: {}", ex.what()); + } + continue; + } + } +} + +void CoordinatorProcess::setupPeriodicTasks() { + periodicTaskExecutor_ = std::make_unique(std::vector{[this](PeriodicTaskExecutor::time_point_t now) { + // Check parent process is alive + if (getppid() != parentProcessId_) { + ELOG_DEBUG(logger_, COORDINATOR, "CoordinatorProcess: parent process has exited, shutting down coordinator process"); + working_ = false; + } + + static auto lastCleanupTime = std::chrono::steady_clock::now(); + if (now - lastCleanupTime >= cleanUpLostMessagesInterval) { + processor_.cleanupAbandonedMessages(now, std::chrono::seconds(10)); + lastCleanupTime = now; + } + }}); + periodicTaskExecutor_->setInterval(std::chrono::milliseconds(100)); +} + +bool CoordinatorProcess::enqueueMessage(const void *data, size_t size) { + try { + commandQueue_->try_send(data, size, 0); + return true; + } catch (boost::interprocess::interprocess_exception &ex) { + if (logger_) { + ELOG_DEBUG(logger_, COORDINATOR, "CoordinatorProcess: message_queue send failed: {}", ex.what()); + } + return false; + } +} + +} // namespace elasticapm::php::coordinator diff --git a/prod/native/libcommon/code/coordinator/CoordinatorProcess.h b/prod/native/libcommon/code/coordinator/CoordinatorProcess.h new file mode 100644 index 00000000..606d1904 --- /dev/null +++ b/prod/native/libcommon/code/coordinator/CoordinatorProcess.h @@ -0,0 +1,108 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch B.V. licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +#include "LoggerInterface.h" +#include "ForkableInterface.h" +#include "PeriodicTaskExecutor.h" +#include "ChunkedMessageProcessor.h" +#include "CoordinatorTelemetrySignalsSender.h" +#include "CoordinatorMessagesDispatcher.h" + +#include +#include + +#include +#include +#include +#include +#include + +namespace elasticapm::php::coordinator { + +namespace { +constexpr static size_t maxMqPayloadSize = sizeof(CoordinatorPayload); +constexpr static size_t maxQueueSize = 100; +constexpr static std::chrono::minutes cleanUpLostMessagesInterval(1); +} // namespace + +class CoordinatorProcess : public boost::noncopyable, public ForkableInterface { + +public: + CoordinatorProcess(std::shared_ptr logger, std::shared_ptr messagesDispatcher) + : logger_(std::move(logger)), messagesDispatcher_(std::move(messagesDispatcher)) { + } + ~CoordinatorProcess() { + } + + void prefork() final { + periodicTaskExecutor_->prefork(); + } + + void postfork([[maybe_unused]] bool child) final { + periodicTaskExecutor_->postfork(child); + } + + // returns true in scope of forked CoordinatorProcess + bool start() { + parentProcessId_ = getpid(); + processId_ = fork(); + if (processId_ < 0) { + if (logger_) { + ELOG_DEBUG(logger_, COORDINATOR, "CoordinatorProcess: fork() failed: {} ({})", strerror(errno), errno); + } + } else if (processId_ == 0) { + ELOG_DEBUG(logger_, COORDINATOR, "CoordinatorProcess starting collector process"); + coordinatorLoop(); + ELOG_DEBUG(logger_, COORDINATOR, "CoordinatorProcess: collector process is going to finish"); + return true; + } else { + if (logger_) { + ELOG_DEBUG(logger_, COORDINATOR, "CoordinatorProcess parent process continues initialization"); + } + } + return false; + } + + CoordinatorTelemetrySignalsSender &getCoordinatorSender() { + return coordinatorSender_; + } + +private: + void coordinatorLoop(); + void setupPeriodicTasks(); + bool enqueueMessage(const void *data, size_t size); + +private: + std::atomic_bool working_ = true; + std::shared_ptr logger_; + std::unique_ptr periodicTaskExecutor_; + + std::shared_ptr commandQueue_{std::make_shared(maxQueueSize, maxMqPayloadSize)}; + ChunkedMessageProcessor processor_{logger_, maxMqPayloadSize, [this](const void *data, size_t size) { return enqueueMessage(data, size); }, [this](const std::span data) { messagesDispatcher_->processRecievedMessage(data); }}; + + CoordinatorTelemetrySignalsSender coordinatorSender_{logger_, [this](const std::string &payload) { return processor_.sendPayload(payload); }}; + std::shared_ptr messagesDispatcher_; + + int processId_ = 0; + int parentProcessId_ = 0; +}; + +} diff --git a/prod/native/libcommon/code/coordinator/CoordinatorTelemetrySignalsSender.cpp b/prod/native/libcommon/code/coordinator/CoordinatorTelemetrySignalsSender.cpp new file mode 100644 index 00000000..5565637d --- /dev/null +++ b/prod/native/libcommon/code/coordinator/CoordinatorTelemetrySignalsSender.cpp @@ -0,0 +1,82 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch B.V. licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "CoordinatorTelemetrySignalsSender.h" + +#include +#include +#include + +#include "coordinator/proto/CoordinatorCommands.pb.h" + +namespace elasticapm::php::coordinator { + +void CoordinatorTelemetrySignalsSender::initializeConnection(std::string endpointUrl, std::size_t endpointHash, std::string contentType, enpointHeaders_t const &endpointHeaders, std::chrono::milliseconds timeout, std::size_t maxRetries, std::chrono::milliseconds retryDelay) { + + coordinator::EstablishConnectionCommand command; + command.set_endpoint_url(std::move(endpointUrl)); + command.set_endpoint_hash(endpointHash); + command.set_content_type(contentType); + for (const auto &[key, value] : endpointHeaders) { + (*command.mutable_endpoint_headers())[key] = value; + } + command.set_timeout_ms(timeout.count()); + command.set_max_retries(maxRetries); + command.set_retry_delay_ms(retryDelay.count()); + + coordinator::CoordinatorCommand coordCommand; + coordCommand.set_type(coordinator::CoordinatorCommand::ESTABLISH_CONNECTION); + *coordCommand.mutable_establish_connection() = command; + + std::string serializedCommand; + if (!coordCommand.SerializeToString(&serializedCommand)) { + if (logger_) { + ELOG_DEBUG(logger_, COORDINATOR, "CoordinatorTelemetrySignalsSender: failed to serialize EstablishConnectionCommand"); + } + return; + } + + if (!sendPayload_(serializedCommand)) { + ELOG_WARNING(logger_, COORDINATOR, "CoordinatorTelemetrySignalsSender: failed to send EstablishConnectionCommand, endpoint hash: {}", endpointHash); + } +} + +void CoordinatorTelemetrySignalsSender::enqueue(uint64_t endpointHash, std::span payload, responseCallback_t callback) { + coordinator::SendEndpointPayloadCommand command; + command.set_endpoint_hash(endpointHash); + command.set_payload(payload.data(), payload.size()); + + coordinator::CoordinatorCommand coordCommand; + coordCommand.set_type(coordinator::CoordinatorCommand::SEND_ENDPOINT_PAYLOAD); + *coordCommand.mutable_send_endpoint_payload() = command; + + std::string serializedCommand; + if (!coordCommand.SerializeToString(&serializedCommand)) { + if (logger_) { + ELOG_DEBUG(logger_, COORDINATOR, "CoordinatorSender: failed to serialize SendEndpointDataCommand"); + } + return; + } + + if (!sendPayload_(serializedCommand)) { + ELOG_WARNING(logger_, COORDINATOR, "CoordinatorTelemetrySignalsSender: Dropping payload. Endpoint hash: %zu, payload size: {}", endpointHash, payload.size()); + } +} + +} // namespace elasticapm::php diff --git a/prod/native/libcommon/code/coordinator/CoordinatorTelemetrySignalsSender.h b/prod/native/libcommon/code/coordinator/CoordinatorTelemetrySignalsSender.h new file mode 100644 index 00000000..8adce6ca --- /dev/null +++ b/prod/native/libcommon/code/coordinator/CoordinatorTelemetrySignalsSender.h @@ -0,0 +1,51 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch B.V. licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + + +#pragma once + +#include "LoggerInterface.h" +#include "transport/HttpTransportAsyncInterface.h" + +#include +#include +#include + +namespace elasticapm::php::coordinator { + +class CoordinatorTelemetrySignalsSender : public transport::HttpTransportAsyncInterface { +public: + using sendPayload_t = std::function; + + CoordinatorTelemetrySignalsSender(std::shared_ptr logger, sendPayload_t sendPayload) + : logger_(std::move(logger)), sendPayload_(std::move(sendPayload)) { + } + + ~CoordinatorTelemetrySignalsSender() = default; + + void initializeConnection(std::string endpointUrl, std::size_t endpointHash, std::string contentType, enpointHeaders_t const &endpointHeaders, std::chrono::milliseconds timeout, std::size_t maxRetries, std::chrono::milliseconds retryDelay); + void enqueue(std::size_t endpointHash, std::span payload, responseCallback_t callback = {}); + void updateRetryDelay(size_t endpointHash, std::chrono::milliseconds retryDelay) { + } + +private: + std::shared_ptr logger_; + sendPayload_t sendPayload_; +}; +} diff --git a/prod/native/libcommon/code/coordinator/proto/CoordinatorCommands.proto b/prod/native/libcommon/code/coordinator/proto/CoordinatorCommands.proto new file mode 100644 index 00000000..bb1b0c52 --- /dev/null +++ b/prod/native/libcommon/code/coordinator/proto/CoordinatorCommands.proto @@ -0,0 +1,45 @@ +syntax = "proto3"; + +package elasticapm.php.coordinator; + +message EstablishConnectionCommand { + string endpoint_url = 1; + uint64 endpoint_hash = 2; + string content_type = 3; + map endpoint_headers = 4; + uint64 timeout_ms = 5; + uint32 max_retries = 6; + uint64 retry_delay_ms = 7; +} + +message SendEndpointPayloadCommand { + uint64 endpoint_hash = 1; + bytes payload = 2; +} + +message WorkerStartedCommand { + uint32 process_id = 1; + uint32 parent_process_id = 2; +} + +message WorkerIsGoingToShutdownCommand { + uint32 process_id = 1; + uint32 parent_process_id = 2; +} + +message CoordinatorCommand { + enum CommandType { + UNKNOWN = 0; + ESTABLISH_CONNECTION = 1; + SEND_ENDPOINT_PAYLOAD = 2; + WORKER_STARTED = 3; + WORKER_IS_GOING_TO_SHUTDOWN = 4; + } + + CommandType type = 1; + + EstablishConnectionCommand establish_connection = 2; + SendEndpointPayloadCommand send_endpoint_payload = 3; + WorkerStartedCommand worker_started = 4; + WorkerIsGoingToShutdownCommand worker_is_going_to_shutdown = 5; +} \ No newline at end of file