Skip to content

Commit a5fb13c

Browse files
denniskleincuveland
authored andcommitted
adopt namespaced fairmq api consistently
see FairRootGroup/FairMQ#423
1 parent d0aada8 commit a5fb13c

20 files changed

+63
-64
lines changed

CMakeLists.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,7 @@ message(STATUS "CMAKE_CXX_STANDARD = ${CMAKE_CXX_STANDARD}")
9292
list(APPEND CMAKE_MODULE_PATH "${CMAKE_CURRENT_LIST_DIR}/cmake")
9393

9494
find_package(Threads REQUIRED)
95-
find_package(FairMQ 1.4 REQUIRED)
95+
find_package(FairMQ 1.4.41 REQUIRED)
9696
find_package(FairLogger ${FairMQ_FairLogger_VERSION} REQUIRED)
9797
message(STATUS "Looking for FairLogger dependencies: ${FairLogger_PACKAGE_DEPENDENCIES}" )
9898
foreach(dep IN LISTS FairLogger_PACKAGE_DEPENDENCIES)

src/ReadoutEmulator/CruMemoryHandler.cxx

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,8 @@
1717

1818
#include <ConcurrentQueue.h>
1919

20-
#include <fairmq/FairMQUnmanagedRegion.h>
21-
#include <fairmq/FairMQDevice.h> /* NewUnmanagedRegionFor */
20+
#include <fairmq/UnmanagedRegion.h>
21+
#include <fairmq/Device.h> /* NewUnmanagedRegionFor */
2222
#include <fairmq/ProgOptions.h>
2323

2424
#include <chrono>

src/ReadoutEmulator/ReadoutDevice.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ class ReadoutDevice : public DataDistDevice
6363
void SendingThread();
6464

6565
// data and Descriptor regions
66-
// must be here because NewUnmanagedRegionFor() is a method of FairMQDevice...
66+
// must be here because NewUnmanagedRegionFor() is a method of fair::mq::Device...
6767
std::unique_ptr<DataRegionAllocatorResource> mDataRegion;
6868

6969
std::string mOutChannelName;

src/ReadoutEmulator/runReadoutEmulatorDevice.cxx

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,7 @@
1414
#include "ReadoutDevice.h"
1515

1616
#include <fairmq/ProgOptions.h>
17-
18-
#include "runFairMQDevice.h"
17+
#include <fairmq/runDevice.h>
1918

2019
namespace bpo = boost::program_options;
2120
template class std::basic_string<char, std::char_traits<char>, std::allocator<char> >; // Workaround for bug in CC7 devtoolset7
@@ -46,7 +45,7 @@ void addCustomOptions(bpo::options_description& options)
4645
"Input throughput per link (bits per second).");
4746
}
4847

49-
FairMQDevicePtr getDevice(const fair::mq::ProgOptions& /*config*/)
48+
std::unique_ptr<fair::mq::Device> getDevice(const fair::mq::ProgOptions& /*config*/)
5049
{
51-
return new o2::DataDistribution::ReadoutDevice();
50+
return std::make_unique<o2::DataDistribution::ReadoutDevice>();
5251
}

src/StfBuilder/StfBuilderDevice.cxx

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -110,7 +110,7 @@ void StfBuilderDevice::InitTask()
110110
}
111111
if (I().mPartitionId.empty()) {
112112
EDDLOG("Partition id is not provided during InitTask(). Check command line or ECS parameters. Exiting.");
113-
ChangeState(fair::mq::Transition::ErrorFound);
113+
ChangeStateOrThrow(fair::mq::Transition::ErrorFound);
114114
return;
115115
}
116116

@@ -143,7 +143,7 @@ void StfBuilderDevice::InitTask()
143143
// check run type
144144
if (ReadoutDataUtils::sRunType == ReadoutDataUtils::RunType::eInvalid) {
145145
EDDLOG("Run type paramter must be correctly set.");
146-
ChangeState(fair::mq::Transition::ErrorFound);
146+
ChangeStateOrThrow(fair::mq::Transition::ErrorFound);
147147
return;
148148
}
149149

@@ -153,7 +153,7 @@ void StfBuilderDevice::InitTask()
153153
(ReadoutDataUtils::sSpecifiedDataOrigin == o2::header::gDataOriginMFT))) {
154154
EDDLOG("Run type paramter 'topology' is supported only for ITS and MFT. Please specify the detector option. detector={}",
155155
ReadoutDataUtils::sSpecifiedDataOrigin.as<std::string>());
156-
ChangeState(fair::mq::Transition::ErrorFound);
156+
ChangeStateOrThrow(fair::mq::Transition::ErrorFound);
157157
return;
158158
}
159159
}
@@ -180,13 +180,13 @@ void StfBuilderDevice::InitTask()
180180

181181
// File sink
182182
if (!I().mFileSink->loadVerifyConfig(*(this->GetConfig()))) {
183-
ChangeState(fair::mq::Transition::ErrorFound);
183+
ChangeStateOrThrow(fair::mq::Transition::ErrorFound);
184184
return;
185185
}
186186

187187
// File source
188188
if (!I().mFileSource->loadVerifyConfig(*(this->GetConfig()))) {
189-
ChangeState(fair::mq::Transition::ErrorFound);
189+
ChangeStateOrThrow(fair::mq::Transition::ErrorFound);
190190
return;
191191
}
192192

@@ -214,15 +214,15 @@ void StfBuilderDevice::InitTask()
214214
(ReadoutDataUtils::sSpecifiedDataOrigin == o2::header::gDataOriginAny)) {
215215
EDDLOG("Detector string parameter must be specified when receiving the data from the "
216216
"readout and not using RDHv6 or greater.");
217-
ChangeState(fair::mq::Transition::ErrorFound);
217+
ChangeStateOrThrow(fair::mq::Transition::ErrorFound);
218218
return;
219219
} else {
220220
IDDLOG("READOUT INTERFACE: Configured detector: {}", ReadoutDataUtils::sSpecifiedDataOrigin.as<std::string>());
221221
}
222222

223223
if (ReadoutDataUtils::sRdhVersion == ReadoutDataUtils::RdhVersion::eRdhInvalid) {
224224
EDDLOG("The RDH version must be specified when receiving data from readout.");
225-
ChangeState(fair::mq::Transition::ErrorFound);
225+
ChangeStateOrThrow(fair::mq::Transition::ErrorFound);
226226
return;
227227
} else {
228228
IDDLOG("READOUT INTERFACE: Configured RDHv{}", ReadoutDataUtils::sRdhVersion);
@@ -261,7 +261,7 @@ void StfBuilderDevice::InitTask()
261261
GetChannel(I().mInputChannelName);
262262
} catch(std::exception &) {
263263
EDDLOG("Input channel not configured (from o2-readout-exe) and not running with file source enabled.");
264-
ChangeState(fair::mq::Transition::ErrorFound);
264+
ChangeStateOrThrow(fair::mq::Transition::ErrorFound);
265265
return;
266266
}
267267
}
@@ -272,7 +272,7 @@ void StfBuilderDevice::InitTask()
272272
}
273273
} catch(std::exception &e) {
274274
EDDLOG("Output channel (to DPL or StfSender) must be configured if not running in stand-alone mode.");
275-
ChangeState(fair::mq::Transition::ErrorFound);
275+
ChangeStateOrThrow(fair::mq::Transition::ErrorFound);
276276
return;
277277
}
278278
}

src/StfBuilder/StfBuilderInput.cxx

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
#include <DataDistLogger.h>
2222
#include <DataDistributionOptions.h>
2323

24-
#include <FairMQDevice.h>
24+
#include <fairmq/Device.h>
2525

2626
#include <vector>
2727
#include <queue>

src/StfSender/StfSenderDevice.cxx

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,7 @@ void StfSenderDevice::Init()
9292
I().mPartitionId = Config::getPartitionOption(*GetConfig()).value_or("");
9393
if (I().mPartitionId.empty()) {
9494
WDDLOG("StfSender 'discovery-partition' parameter not set during Init(). Exiting.");
95-
ChangeState(fair::mq::Transition::ErrorFound);
95+
ChangeStateOrThrow(fair::mq::Transition::ErrorFound);
9696
return;
9797
}
9898

@@ -130,7 +130,7 @@ void StfSenderDevice::Init()
130130
lBuffersAllocatedFuture.wait();
131131
if (!lBuffersAllocatedFuture.get()) {
132132
EDDLOG("Init::MemorySegment allocation failed. Exiting...");
133-
ChangeState(fair::mq::Transition::ErrorFound);
133+
ChangeStateOrThrow(fair::mq::Transition::ErrorFound);
134134
return;
135135
}
136136

@@ -185,7 +185,7 @@ void StfSenderDevice::InitTask()
185185
// Not available in Init()
186186
if (fair::mq::Transport::SHM != Transport()->GetType()) {
187187
EDDLOG("Default transport parameter must be set to shm.");
188-
ChangeState(fair::mq::Transition::ErrorFound);
188+
ChangeStateOrThrow(fair::mq::Transition::ErrorFound);
189189
return;
190190
}
191191

@@ -195,13 +195,13 @@ void StfSenderDevice::InitTask()
195195
GetChannel(I().mInputChannelName, 0);
196196
} catch (...) {
197197
EDDLOG("Requested input channel is not configured. input_chan={}", I().mInputChannelName);
198-
ChangeState(fair::mq::Transition::ErrorFound);
198+
ChangeStateOrThrow(fair::mq::Transition::ErrorFound);
199199
return;
200200
}
201201

202202
// File sink
203203
if (!I().mFileSink->loadVerifyConfig(*GetConfig())) {
204-
ChangeState(fair::mq::Transition::ErrorFound);
204+
ChangeStateOrThrow(fair::mq::Transition::ErrorFound);
205205
return;
206206
}
207207

@@ -224,7 +224,7 @@ void StfSenderDevice::InitTask()
224224
if (NewStatePending()) {
225225
IDDLOG("InitTask: The control system requested abort.");
226226
AbortInitTask();
227-
ChangeState(fair::mq::Transition::ErrorFound);
227+
ChangeStateOrThrow(fair::mq::Transition::ErrorFound);
228228
return;
229229
}
230230

@@ -234,7 +234,7 @@ void StfSenderDevice::InitTask()
234234
// Did we fail to connect to the TfScheduler?
235235
if (!I().mTfSchedulerRpcClient.started()) {
236236
EDDLOG("InitTask: Failed to connect to TfScheduler. Exiting.");
237-
ChangeState(fair::mq::Transition::ErrorFound);
237+
ChangeStateOrThrow(fair::mq::Transition::ErrorFound);
238238
return;
239239
}
240240
}

src/StfSender/StfSenderOutput.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -147,8 +147,8 @@ class StfSenderOutput
147147
ConcurrentFifo<StfCopyInfo> mCopyQueue;
148148

149149
std::mutex mStfOrderingLock;
150-
std::condition_variable mStfOrderingCv;
151-
std::queue<std::uint64_t> mStfOrderingQueue;
150+
std::condition_variable mStfOrderingCv;
151+
std::queue<std::uint64_t> mStfOrderingQueue;
152152

153153
/// Scheduler threads
154154
struct StfSchedInfo {

src/StfSender/StfSenderOutputFairMQ.cxx

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -255,4 +255,4 @@ void StfSenderOutputFairMQ::DataHandlerThread(const std::string pTfBuilderId)
255255
}
256256

257257

258-
} /* o2::DataDistribution */
258+
} /* o2::DataDistribution */

src/TfBuilder/TfBuilderDevice.cxx

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ void TfBuilderDevice::Init()
4949
mPartitionId = Config::getPartitionOption(*GetConfig()).value_or("");
5050
if (mPartitionId.empty()) {
5151
WDDLOG("TfBuilder 'discovery-partition' parameter not set during Init(). Exiting.");
52-
ChangeState(fair::mq::Transition::ErrorFound);
52+
ChangeStateOrThrow(fair::mq::Transition::ErrorFound);
5353
return;
5454
}
5555

@@ -145,7 +145,7 @@ void TfBuilderDevice::InitTask()
145145
} catch (...) {
146146
lBuffersAllocatedFuture.wait();
147147
EDDLOG("Consul Initialization failed. Exiting...");
148-
ChangeState(fair::mq::Transition::ErrorFound);
148+
ChangeStateOrThrow(fair::mq::Transition::ErrorFound);
149149
return;
150150
}
151151

@@ -160,7 +160,7 @@ void TfBuilderDevice::InitTask()
160160
if (!mFileSink.loadVerifyConfig(*(this->GetConfig()))) {
161161
lBuffersAllocatedFuture.wait();
162162
AbortInitTask();
163-
ChangeState(fair::mq::Transition::ErrorFound);
163+
ChangeStateOrThrow(fair::mq::Transition::ErrorFound);
164164
return;
165165
}
166166

@@ -179,7 +179,7 @@ void TfBuilderDevice::InitTask()
179179
EDDLOG("Can not start TfBuilder. id={}", lStatus.info().process_id());
180180
EDDLOG("Process with the same id already running? If not, clear the key manually.");
181181
EDDLOG("Discovery database error: this TfBuilder was/is already present. Exiting.");
182-
ChangeState(fair::mq::Transition::ErrorFound);
182+
ChangeStateOrThrow(fair::mq::Transition::ErrorFound);
183183
return;
184184
}
185185
}
@@ -218,7 +218,7 @@ void TfBuilderDevice::InitTask()
218218
if (!mFlpInputHandler->start()) {
219219
AbortInitTask();
220220
EDDLOG("Could not initialize input connections. Exiting.");
221-
ChangeState(fair::mq::Transition::ErrorFound);
221+
ChangeStateOrThrow(fair::mq::Transition::ErrorFound);
222222
return;
223223
}
224224

@@ -227,15 +227,15 @@ void TfBuilderDevice::InitTask()
227227
if (!lBuffersAllocatedFuture.get()) {
228228
AbortInitTask();
229229
EDDLOG("InitTask::MemorySegment allocation failed. Exiting...");
230-
ChangeState(fair::mq::Transition::ErrorFound);
230+
ChangeStateOrThrow(fair::mq::Transition::ErrorFound);
231231
return;
232232
}
233233

234234
// Start input handlers after the memory is finished allocating
235235
if (!mFlpInputHandler->map_data_region()) {
236236
AbortInitTask();
237237
EDDLOG("Could not initialize input connections. Exiting.");
238-
ChangeState(fair::mq::Transition::ErrorFound);
238+
ChangeStateOrThrow(fair::mq::Transition::ErrorFound);
239239
return;
240240
}
241241

0 commit comments

Comments
 (0)