|
| 1 | +/// |
| 2 | +/// \file 16-AliECS.cxx |
| 3 | +/// \author Adam Wegrzynek <[email protected]> |
| 4 | +/// |
| 5 | + |
| 6 | +#include <iostream> |
| 7 | +#include <grpc++/grpc++.h> |
| 8 | +#include "o2control.grpc.pb.h" |
| 9 | +#include "helpers/HttpConnection.h" |
| 10 | +#include <boost/program_options.hpp> |
| 11 | +#include <thread> |
| 12 | +#include <regex> |
| 13 | +#include "../src/Backends/InfluxDB.h" |
| 14 | +#include "../src/Transports/HTTP.h" |
| 15 | +#include "../src/MonLogger.h" |
| 16 | + |
| 17 | +using o2::monitoring::MonLogger; |
| 18 | +using grpc::Channel; |
| 19 | +using grpc::ClientContext; |
| 20 | +using grpc::Status; |
| 21 | + |
| 22 | +using o2control::EnvironmentInfo; |
| 23 | +using namespace o2::monitoring; |
| 24 | + |
| 25 | +class AliEcsClient { |
| 26 | + public: |
| 27 | + AliEcsClient(std::shared_ptr<Channel> channel) : mStub(o2control::Control::NewStub(channel)) {} |
| 28 | + void sendRunDetails(const auto& influxBackend) { |
| 29 | + o2control::GetEnvironmentsRequest request; |
| 30 | + request.set_showall(false); |
| 31 | + request.set_showtaskinfos(false); |
| 32 | + o2control::GetEnvironmentsReply reply; |
| 33 | + ClientContext context; |
| 34 | + Status status = mStub->GetEnvironments(&context, request, &reply); |
| 35 | + if (status.ok()) { |
| 36 | + MonLogger::Get() << "Status call OK" << MonLogger::End(); |
| 37 | + for (int i = 0; i < reply.environments_size(); i++) { |
| 38 | + if (reply.environments(i).currentrunnumber() > 1) { |
| 39 | + MonLogger::Get() << "Env ID" << reply.environments(i).id() << MonLogger::End(); |
| 40 | + auto metric = Metric{"tasks"}; |
| 41 | + metric.addValue(reply.environments(i).numberofactivetasks(), "active").addValue(reply.environments(i).numberofinactivetasks(), "inactive"); |
| 42 | + influxBackend->sendWithRun(metric, reply.environments(i).id(), std::to_string(reply.environments(i).currentrunnumber())); |
| 43 | + } |
| 44 | + } |
| 45 | + } else { |
| 46 | + std::cout << status.error_code() << ": " << status.error_message() << std::endl; |
| 47 | + } |
| 48 | + } |
| 49 | + private: |
| 50 | + std::unique_ptr<o2control::Control::Stub> mStub; |
| 51 | +}; |
| 52 | + |
| 53 | + |
| 54 | +int main(int argc, char* argv[]) { |
| 55 | + boost::program_options::options_description desc("Program options"); |
| 56 | + desc.add_options() |
| 57 | + ("aliecs-host", boost::program_options::value<std::string>()->required(), "AliECS hostname") |
| 58 | + ("aliecs-port", boost::program_options::value<unsigned short>()->required(), "AliECS port") |
| 59 | + ("influxdb-url", boost::program_options::value<std::string>()->required(), "InfluxDB hostname") |
| 60 | + ("influxdb-token", boost::program_options::value<std::string>()->required(), "InfluxDB token") |
| 61 | + ("influxdb-org", boost::program_options::value<std::string>()->default_value("cern"), "InfluxDB organisation") |
| 62 | + ("influxdb-bucket", boost::program_options::value<std::string>()->default_value("aliecs"), "InfluxDB bucket"); |
| 63 | + boost::program_options::variables_map vm; |
| 64 | + boost::program_options::store(boost::program_options::parse_command_line(argc, argv, desc), vm); |
| 65 | + boost::program_options::notify(vm); |
| 66 | + MonLogger::mLoggerSeverity = o2::monitoring::Severity::Debug; |
| 67 | + MonLogger::Get() << "Connected to AliECS server: " << vm["aliecs-host"].as<std::string>() << ":" << vm["aliecs-port"].as<unsigned short>() << MonLogger::End(); |
| 68 | + grpc::ChannelArguments args; |
| 69 | + args.SetMaxReceiveMessageSize(20*1024*1024); |
| 70 | + AliEcsClient client(grpc::CreateCustomChannel( |
| 71 | + vm["aliecs-host"].as<std::string>() + ":" + std::to_string(vm["aliecs-port"].as<unsigned short>()), |
| 72 | + grpc::InsecureChannelCredentials(), |
| 73 | + args |
| 74 | + )); |
| 75 | + auto httpTransport = std::make_unique<transports::HTTP>( |
| 76 | + vm["influxdb-url"].as<std::string>() + "/api/v2/write?" + |
| 77 | + "org=" + vm["influxdb-org"].as<std::string>() + "&" + |
| 78 | + "bucket=" + vm["influxdb-bucket"].as<std::string>() |
| 79 | + ); |
| 80 | + httpTransport->addHeader("Authorization: Token " + vm["influxdb-token"].as<std::string>()); |
| 81 | + auto influxdbBackend = std::make_unique<backends::InfluxDB>(std::move(httpTransport)); |
| 82 | + for (;;) { |
| 83 | + client.sendRunDetails(influxdbBackend); |
| 84 | + std::this_thread::sleep_for(std::chrono::seconds(15)); |
| 85 | + } |
| 86 | +} |
0 commit comments