Skip to content

Commit cb6de90

Browse files
Merge branch 'devel-alf-publish-sca'
2 parents dd6cb2c + 6e49b26 commit cb6de90

File tree

7 files changed

+205
-35
lines changed

7 files changed

+205
-35
lines changed

src/CommandLineUtilities/AliceLowlevelFrontend/AliceLowlevelFrontend.h

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
#include <boost/algorithm/string/predicate.hpp>
1717
#include <boost/format.hpp>
1818
#include "AlfException.h"
19+
#include "Sca.h"
1920

2021
namespace AliceO2 {
2122
namespace roc {
@@ -161,6 +162,31 @@ class PublishRpc : DimRpcInfoWrapper
161162
}
162163
};
163164

165+
class PublishScaRpc : DimRpcInfoWrapper
166+
{
167+
public:
168+
PublishScaRpc(const std::string& serviceName)
169+
: DimRpcInfoWrapper(serviceName)
170+
{
171+
}
172+
173+
void publish(std::string dnsName, double frequency, const std::vector<Sca::CommandData>& commandDataPairs)
174+
{
175+
std::ostringstream stream;
176+
stream << dnsName << ';';
177+
for (size_t i = 0; i < commandDataPairs.size(); ++i) {
178+
stream << commandDataPairs[i].command << ',' << commandDataPairs[i].data;
179+
if ((i + 1) < commandDataPairs.size()) {
180+
stream << '\n';
181+
}
182+
}
183+
stream << ';' << frequency;
184+
printf("Publish SCA: %s\n", stream.str().c_str());
185+
setString(stream.str());
186+
getString();
187+
}
188+
};
189+
164190
class PublishStopRpc: DimRpcInfoWrapper
165191
{
166192
public:

src/CommandLineUtilities/AliceLowlevelFrontend/ProgramAliceLowlevelFrontendClient.cxx

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,10 +69,13 @@ class ProgramAliceLowlevelFrontendClient: public Program
6969
Alf::ScaGpioWriteRpc scaGpioWriteRpc(names.scaGpioWrite());
7070
Alf::ScaWriteSequence scaWriteSequence(names.scaWriteSequence());
7171
Alf::PublishRpc publishRpc(names.publishStartCommandRpc());
72+
Alf::PublishScaRpc publishScaRpc(names.publishScaStartCommandRpc());
7273

7374
publishRpc.publish("ALF/TEST/1", 1.0, {0x1fc});
7475
publishRpc.publish("ALF/TEST/2", 3.0, {0x100, 0x104, 0x108});
7576

77+
publishScaRpc.publish("ALF/TEST/SCA_1", 1.0, {{0x0, 0x1}, {0x10, 0x11}});
78+
7679
for (int i = 0; i < 10; ++i) {
7780
cout << "SCA GPIO write '" << i << "'" << endl;
7881
cout << " result: " << scaGpioWriteRpc.write(i) << endl;
@@ -135,6 +138,7 @@ class ProgramAliceLowlevelFrontendClient: public Program
135138
Alf::PublishStopRpc publishStopRpc(names.publishStopCommandRpc());
136139
publishStopRpc.stop("ALF/TEST/1");
137140
publishStopRpc.stop("ALF/TEST/2");
141+
publishStopRpc.stop("ALF/TEST/SCA_1");
138142
}
139143

140144
int mSerialNumber;

src/CommandLineUtilities/AliceLowlevelFrontend/ProgramAliceLowlevelFrontendServer.cxx

Lines changed: 138 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,17 @@
11
/// \file ProgramAliceLowlevelFrontendServer.cxx
22
/// \brief Utility that starts the ALICE Lowlevel Frontend (ALF) DIM server
33
///
4+
/// This file contains a bunch of classes that together form the server part of ALF.
5+
/// It is single-threaded, because an earlier multi-threaded implementation ran into strange locking issues with DIM's
6+
/// thread, and it was determined that it would cost less time to rewrite than to debug.
7+
///
8+
/// The idea is that the DIM thread calls the RPC handler functions of the ProgramAliceLowlevelFrontendServer class.
9+
/// These handlers then, depending on the RPC:
10+
/// a) Execute the request immediately (such as for register reads and writes)
11+
/// b) Put a corresponding command object in a lock-free thread-safe queue (such as for publish start/stop commands),
12+
/// the mCommandQueue. The main thread of ProgramAliceLowlevelFrontendServer periodically takes commands from this
13+
/// queue and handles them.
14+
///
415
/// \author Pascal Boeschoten ([email protected])
516

617
#include "Common/Program.h"
@@ -65,8 +76,19 @@ static std::vector<T> lexicalCastVector(const std::vector<U>& items)
6576
struct ServiceDescription
6677
{
6778
std::string dnsName;
68-
std::vector<uintptr_t> addresses;
6979
std::chrono::milliseconds interval;
80+
81+
struct Register
82+
{
83+
std::vector<uintptr_t> addresses;
84+
};
85+
86+
struct ScaSequence
87+
{
88+
std::vector<Sca::CommandData> commandDataPairs;
89+
};
90+
91+
boost::variant<Register, ScaSequence> type;
7092
};
7193

7294
/// Class that handles adding/removing publishers
@@ -101,7 +123,7 @@ class PublisherRegistry
101123
for (auto& kv: mServices) {
102124
auto& service = *kv.second;
103125
if (service.nextUpdate < now) {
104-
service.updateRegisterValues(*mChannel.get());
126+
service.updateValues(*mChannel.get());
105127
service.advanceUpdateTime();
106128
if (service.nextUpdate < next) {
107129
next = service.nextUpdate;
@@ -120,31 +142,67 @@ class PublisherRegistry
120142
{
121143
mDescription = description;
122144
nextUpdate = std::chrono::steady_clock::now();
123-
registerValues.resize(mDescription.addresses.size());
145+
146+
Visitor::apply(mDescription.type,
147+
[&](const ServiceDescription::Register& type){
148+
registerValues.resize(type.addresses.size());
149+
150+
getInfoLogger() << "Starting publisher '" << mDescription.dnsName << "' with "
151+
<< type.addresses.size() << " address(es) at interval "
152+
<< mDescription.interval.count() << "ms" << endm;
153+
},
154+
[&](const ServiceDescription::ScaSequence& type){
155+
registerValues.resize(type.commandDataPairs.size() * 2); // Two result 32-bit integers per pair
156+
157+
getInfoLogger() << "Starting SCA publisher '" << mDescription.dnsName << "' with "
158+
<< type.commandDataPairs.size() << " commands(s) at interval "
159+
<< mDescription.interval.count() << "ms" << endm;
160+
}
161+
);
124162

125163
auto format = (b::format("I:%1%") % registerValues.size()).str();
126164
dimService = std::make_unique<DimService>(mDescription.dnsName.c_str(), format.c_str(), registerValues.data(),
127165
registerValues.size());
128-
129-
getInfoLogger() << "Starting publisher '" << mDescription.dnsName << "' with "
130-
<< mDescription.addresses.size() << " address(es) at interval "
131-
<< mDescription.interval.count() << "ms" << endm;
132166
}
133167

134168
~Service()
135169
{
136170
getInfoLogger() << "Stopping publisher '" << mDescription.dnsName << "'" << endm;
137171
}
138172

139-
void updateRegisterValues(RegisterReadWriteInterface& channel)
173+
void updateValues(BarInterface& channel)
140174
{
141175
getInfoLogger() << "Updating '" << mDescription.dnsName << "':" << endm;
142-
for (size_t i = 0; i < mDescription.addresses.size(); ++i) {
143-
auto index = mDescription.addresses[i] / 4;
144-
auto value = channel.readRegister(index);
145-
getInfoLogger() << " " << mDescription.addresses[i] << " = " << value << endm;
146-
registerValues.at(i) = channel.readRegister(index);
147-
}
176+
177+
Visitor::apply(mDescription.type,
178+
[&](const ServiceDescription::Register& type){
179+
for (size_t i = 0; i < type.addresses.size(); ++i) {
180+
auto index = type.addresses[i] / 4;
181+
auto value = channel.readRegister(index);
182+
getInfoLogger() << " " << type.addresses[i] << " = " << value << endm;
183+
registerValues.at(i) = channel.readRegister(index);
184+
}
185+
},
186+
[&](const ServiceDescription::ScaSequence& type){
187+
std::fill(registerValues.begin(), registerValues.end(), 0); // Reset array in case of aborts
188+
auto sca = Sca(channel, channel.getCardType());
189+
for (size_t i = 0; i < type.commandDataPairs.size(); ++i) {
190+
try {
191+
const auto& pair = type.commandDataPairs[i];
192+
sca.write(pair);
193+
auto result = sca.read();
194+
registerValues[i*2] = result.command;
195+
registerValues[i*2 + 1] = result.data;
196+
} catch (const ScaException &e) {
197+
// If an SCA error occurs, we stop executing the sequence of commands and set the error value
198+
registerValues[i*2] = 0xffffffff;
199+
registerValues[i*2 + 1] = 0xffffffff;
200+
break;
201+
}
202+
}
203+
}
204+
);
205+
148206
dimService->updateService();
149207
}
150208

@@ -243,6 +301,8 @@ class ProgramAliceLowlevelFrontendServer: public AliceO2::Common::Program
243301
[&](auto parameter){return registerWrite(parameter, bar0);});
244302
auto serverPublishStart = makeServer(names.publishStartCommandRpc(),
245303
[&](auto parameter){return publishStartCommand(parameter, mCommandQueue);});
304+
auto serverPublishScaStart = makeServer(names.publishScaStartCommandRpc(),
305+
[&](auto parameter){return publishScaStartCommand(parameter, mCommandQueue);});
246306
auto serverPublishStop = makeServer(names.publishStopCommandRpc(),
247307
[&](auto parameter){return publishStopCommand(parameter, mCommandQueue);});
248308
auto serverScaRead = makeServer(names.scaRead(),
@@ -278,6 +338,8 @@ class ProgramAliceLowlevelFrontendServer: public AliceO2::Common::Program
278338
}
279339

280340
publisherRegistry.loop();
341+
342+
// Dummy service. Temporary.
281343
mTemperature = double((std::rand() % 100) + 400) / 10.0;
282344
temperatureService.updateService(mTemperature);
283345
}
@@ -292,6 +354,16 @@ class ProgramAliceLowlevelFrontendServer: public AliceO2::Common::Program
292354
}
293355
}
294356

357+
/// Try to add a command to the queue
358+
static bool tryAddToQueue(const CommandVariant& command, CommandQueue& queue)
359+
{
360+
if (!queue.write(command)) {
361+
getInfoLogger() << " command queue was full!" << endm;
362+
return false;
363+
}
364+
return true;
365+
}
366+
295367
/// RPC handler for register reads
296368
static std::string registerRead(const std::string& parameter, ChannelSharedPtr channel)
297369
{
@@ -332,39 +404,61 @@ class ProgramAliceLowlevelFrontendServer: public AliceO2::Common::Program
332404
}
333405

334406
/// RPC handler for publish commands
335-
static std::string publishStartCommand(const std::string& parameter, CommandQueue& commandQueue)
407+
static std::string publishStartCommand(const std::string& parameter, CommandQueue& queue)
336408
{
337409
getInfoLogger() << "Received publish command: '" << parameter << "'" << endm;
338410
auto params = split(parameter, ";");
339411

340412
ServiceDescription description;
341-
description.addresses = lexicalCastVector<uintptr_t >(split(params.at(1), ","));
413+
description.type = ServiceDescription::Register{lexicalCastVector<uintptr_t >(split(params.at(1), ","))};
342414
description.dnsName = params.at(0);
343415
description.interval = std::chrono::milliseconds(int64_t(b::lexical_cast<double>(params.at(2)) * 1000.0));
344416

345-
if (!commandQueue.write(CommandPublishStart{std::move(description)})) {
346-
getInfoLogger() << " command queue was full!" << endm;
417+
tryAddToQueue(CommandPublishStart{description}, queue);
418+
return "";
419+
}
420+
421+
/// RPC handler for SCA publish commands
422+
static std::string publishScaStartCommand(const std::string& parameter, CommandQueue& queue)
423+
{
424+
getInfoLogger() << "Received SCA publish command: '" << parameter << "'" << endm;
425+
426+
auto params = split(parameter, ";");
427+
428+
// Convert command-data pair string sequence to binary format
429+
ServiceDescription::ScaSequence sca;
430+
auto commandDataPairStrings = split(params.at(1), "\n");
431+
sca.commandDataPairs.resize(commandDataPairStrings.size());
432+
for (size_t i = 0; i < commandDataPairStrings.size(); ++i) {
433+
sca.commandDataPairs[i] = stringToScaCommandDataPair(commandDataPairStrings[i]);
347434
}
435+
436+
ServiceDescription description;
437+
description.type = sca;
438+
description.dnsName = params.at(0);
439+
description.interval = std::chrono::milliseconds(int64_t(b::lexical_cast<double>(params.at(2)) * 1000.0));
440+
441+
tryAddToQueue(CommandPublishStart{description}, queue);
348442
return "";
349443
}
350444

351445
/// RPC handler for publish stop commands
352-
static std::string publishStopCommand(const std::string& parameter, CommandQueue& commandQueue)
446+
static std::string publishStopCommand(const std::string& parameter, CommandQueue& queue)
353447
{
354448
getInfoLogger() << "Received stop command: '" << parameter << "'" << endm;
355-
if (!commandQueue.write(CommandPublishStop{parameter})) {
356-
getInfoLogger() << " command queue was full!" << endm;
357-
}
449+
tryAddToQueue(CommandPublishStop{parameter}, queue);
358450
return "";
359451
}
360452

453+
/// RPC handler for SCA read commands
361454
static std::string scaRead(const std::string&, ChannelSharedPtr bar2)
362455
{
363456
getInfoLogger() << "SCA_READ" << endm;
364457
auto result = Sca(*bar2, bar2->getCardType()).read();
365458
return (b::format("0x%x,0x%x") % result.command % result.data).str();
366459
}
367460

461+
/// RPC handler for SCA write commands
368462
static std::string scaWrite(const std::string& parameter, ChannelSharedPtr bar2)
369463
{
370464
getInfoLogger() << "SCA_WRITE: '" << parameter << "'" << endm;
@@ -375,13 +469,15 @@ class ProgramAliceLowlevelFrontendServer: public AliceO2::Common::Program
375469
return "";
376470
}
377471

472+
/// RPC handler for SCA GPIO read commands
378473
static std::string scaGpioRead(const std::string&, ChannelSharedPtr bar2)
379474
{
380475
getInfoLogger() << "SCA_GPIO_READ" << endm;
381476
auto result = Sca(*bar2, bar2->getCardType()).gpioRead();
382477
return (b::format("0x%x") % result.data).str();
383478
}
384479

480+
/// RPC handler for SCA GPIO write commands
385481
static std::string scaGpioWrite(const std::string& parameter, ChannelSharedPtr bar2)
386482
{
387483
getInfoLogger() << "SCA_GPIO_WRITE: '" << parameter << "'" << endm;
@@ -390,6 +486,7 @@ class ProgramAliceLowlevelFrontendServer: public AliceO2::Common::Program
390486
return "";
391487
}
392488

489+
/// RPC handler for SCA blob write commands (sequence of commands)
393490
static std::string scaBlobWrite(const std::string& parameter, ChannelSharedPtr bar2)
394491
{
395492
getInfoLogger() << "SCA_BLOB_WRITE size=" << parameter.size() << " bytes" << endm;
@@ -401,26 +498,18 @@ class ProgramAliceLowlevelFrontendServer: public AliceO2::Common::Program
401498
std::stringstream resultBuffer;
402499
auto sca = Sca(*bar2, bar2->getCardType());
403500

404-
for (std::string token : tokenizer(parameter, sep)) {
501+
for (const std::string& token : tokenizer(parameter, sep)) {
405502
// Walk through the tokens, these should be the pairs (or comments).
406503
if (token.find('#') == 0) {
407504
// We have a comment, skip this token
408505
continue;
409506
} else {
410-
// The pairs are comma-separated, so we split them.
411-
std::vector<std::string> pair = split(token, ",");
412-
if (pair.size() != 2) {
413-
BOOST_THROW_EXCEPTION(
414-
AlfException() << ErrorInfo::Message("SCA command-data pair not formatted correctly"));
415-
}
416-
auto command = convertHexString(pair[0]);
417-
auto data = convertHexString(pair[1]);
418-
507+
auto commandData = stringToScaCommandDataPair(token);
419508
try {
420-
sca.write(command, data);
509+
sca.write(commandData);
421510
auto result = sca.read();
422-
getInfoLogger() << (b::format("cmd=0x%x data=0x%x result=0x%x") % command % data % result.data).str()
423-
<< endm;
511+
getInfoLogger() << (b::format("cmd=0x%x data=0x%x result=0x%x") % commandData.command % commandData.data %
512+
result.data).str() << endm;
424513
resultBuffer << std::hex << result.data << '\n';
425514
} catch (const ScaException &e) {
426515
// If an SCA error occurs, we stop executing the sequence of commands and return the results as far as we got
@@ -434,6 +523,20 @@ class ProgramAliceLowlevelFrontendServer: public AliceO2::Common::Program
434523
return resultBuffer.str();
435524
}
436525

526+
static Sca::CommandData stringToScaCommandDataPair(const std::string& string)
527+
{
528+
// The pairs are comma-separated, so we split them.
529+
std::vector<std::string> pair = split(string, ",");
530+
if (pair.size() != 2) {
531+
BOOST_THROW_EXCEPTION(
532+
AlfException() << ErrorInfo::Message("SCA command-data pair not formatted correctly"));
533+
}
534+
Sca::CommandData commandData;
535+
commandData.command = convertHexString(pair[0]);
536+
commandData.data = convertHexString(pair[1]);
537+
return commandData;
538+
};
539+
437540
int mSerialNumber = 0;
438541
CommandQueue mCommandQueue;
439542
double mTemperature = 40;

0 commit comments

Comments
 (0)