Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions SilKit/IntegrationTests/ITest_MessageAggregation.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ TEST_F(ITest_MessageAggregation, timeout_in_case_of_deadlock_when_using_async_si
SilKit::Services::PubSub::PubSubSpec dataSpecPing{"ping", {}};
SilKit::Services::PubSub::PubSubSpec dataSpecPong{"pong", {}};

bool msgReceived{false};
std::atomic_bool msgReceived{false};

// participant with async simulation step handler & enabled message aggregation
{
Expand All @@ -105,8 +105,8 @@ TEST_F(ITest_MessageAggregation, timeout_in_case_of_deadlock_when_using_async_si
});

timeSyncService->SetSimulationStepHandlerAsync(
[dataPublisher, lifecycleService, &msgReceived](std::chrono::nanoseconds /*now*/,
std::chrono::nanoseconds /*duration*/) {
[dataPublisher, lifecycleService, &msgReceived](std::chrono::nanoseconds,
std::chrono::nanoseconds) {
// send ping
std::vector<uint8_t> ping(1, '?');
dataPublisher->Publish(std::move(ping));
Expand Down Expand Up @@ -135,7 +135,7 @@ TEST_F(ITest_MessageAggregation, timeout_in_case_of_deadlock_when_using_async_si
});

timeSyncService->SetSimulationStepHandlerAsync(
[timeSyncService](std::chrono::nanoseconds /*now*/, std::chrono::nanoseconds /*duration*/) {
[timeSyncService](std::chrono::nanoseconds, std::chrono::nanoseconds) {
timeSyncService->CompleteSimulationStep();
}, 1s);
}
Expand Down
15 changes: 10 additions & 5 deletions SilKit/IntegrationTests/ITest_NetSimFlexRay.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
// SPDX-License-Identifier: MIT

#include "ITest_NetSim.hpp"
#include "ITestThreadSafeLogger.hpp"
#include "silkit/services/flexray/all.hpp"

namespace {
Expand All @@ -23,8 +24,9 @@ struct ITest_NetSimFlexray : ITest_NetSim
CallCountsSilKitHandlersFlexray& callCountsSilKitHandlersFlexray)
{
controller->AddCycleStartHandler(
[&callCountsSilKitHandlersFlexray](IFlexrayController*, const FlexrayCycleStartEvent& /*msg*/) {
[&callCountsSilKitHandlersFlexray](IFlexrayController*, const FlexrayCycleStartEvent& msg) {
callCountsSilKitHandlersFlexray.CycleStartHandler++;
Log() << "Cycle Start: " << (int)msg.cycleCounter;
});
controller->AddFrameHandler(
[&callCountsSilKitHandlersFlexray](IFlexrayController*, const FlexrayFrameEvent& /*msg*/) {
Expand Down Expand Up @@ -273,13 +275,14 @@ void MySimulatedFlexrayController::OnTxBufferUpdate(

TEST_F(ITest_NetSimFlexray, basic_networksimulation_flexray)
{
const auto configSynchronizationPoints = "EnableSynchronizationPoints: true";
{
// ----------------------------
// NetworkSimulator
// ----------------------------

//auto configWithLogging = MakeParticipantConfigurationStringWithLogging(SilKit::Services::Logging::Level::Info);
auto&& simParticipant = _simTestHarness->GetParticipant(_participantNameNetSim);
auto&& simParticipant = _simTestHarness->GetParticipant(_participantNameNetSim, configSynchronizationPoints);
auto&& lifecycleService = simParticipant->GetOrCreateLifecycleService();
auto&& timeSyncService = simParticipant->GetOrCreateTimeSyncService();
auto&& networkSimulator = simParticipant->GetOrCreateNetworkSimulator();
Expand Down Expand Up @@ -307,7 +310,8 @@ TEST_F(ITest_NetSimFlexray, basic_networksimulation_flexray)

timeSyncService->SetSimulationStepHandler(
[this, simulatedNetworkPtr, lifecycleService, flexrayController](
auto now, const std::chrono::nanoseconds /*duration*/) {
auto now, const std::chrono::nanoseconds duration) {
(void)duration;
if (now == _stopAtMs)
{
lifecycleService->Stop("stopping the simulation");
Expand All @@ -328,7 +332,7 @@ TEST_F(ITest_NetSimFlexray, basic_networksimulation_flexray)

for (const auto& participantName : _participantNamesSimulated)
{
auto&& simParticipant = _simTestHarness->GetParticipant(participantName);
auto&& simParticipant = _simTestHarness->GetParticipant(participantName, configSynchronizationPoints);
auto&& lifecycleService = simParticipant->GetOrCreateLifecycleService();
auto&& timeSyncService = simParticipant->GetOrCreateTimeSyncService();

Expand All @@ -337,8 +341,9 @@ TEST_F(ITest_NetSimFlexray, basic_networksimulation_flexray)
SetupFlexrayController(lifecycleService, flexrayController, callCounts.silKitHandlersFlexray);

timeSyncService->SetSimulationStepHandler(
[this, flexrayController](auto now, const std::chrono::nanoseconds /*duration*/) {
[this, flexrayController](auto now, const std::chrono::nanoseconds duration) {
OnetimeActions(now, flexrayController);
Log() << "Simulation step: " << now.count() << " : " << duration.count();
}, _stepSize);
}
}
Expand Down
2 changes: 2 additions & 0 deletions SilKit/source/config/ParticipantConfiguration.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -379,6 +379,8 @@ struct ParticipantConfiguration : public IParticipantConfiguration
Includes includes;
Middleware middleware;
Experimental experimental;
// experimental synchronization points
bool enableSynchronizationPoints{false};
};

bool operator==(const CanController& lhs, const CanController& rhs);
Expand Down
5 changes: 5 additions & 0 deletions SilKit/source/config/ParticipantConfigurationFromXImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ struct ConfigIncludeData
std::map<std::string, SilKit::Config::RpcClient> rpcClientCache;
std::map<std::string, SilKit::Config::TraceSink> traceSinkCache;
std::map<std::string, SilKit::Config::TraceSource> traceSourceCache;
bool enableSynchronizationPoints;
};


Expand Down Expand Up @@ -550,6 +551,8 @@ void MergeExperimentalCache(const ExperimentalCache& cache, Experimental& experi
auto MergeConfigs(ConfigIncludeData& configIncludeData) -> SilKit::Config::ParticipantConfiguration
{
SilKit::Config::ParticipantConfiguration config;
config.enableSynchronizationPoints = configIncludeData.enableSynchronizationPoints;

for (const auto& include : configIncludeData.configBuffer)
{
// Merge all vectors first!
Expand Down Expand Up @@ -666,6 +669,8 @@ auto PaticipantConfigurationWithIncludes(const std::string& text, struct ConfigI
throw SilKit::ConfigurationError{fmt::format("Unknown schema version '{}' found in participant configuration!",
configuration.schemaVersion)};
}
configData.enableSynchronizationPoints = configuration.enableSynchronizationPoints;

configData.configBuffer.push_back(ConfigInclude("root", configuration));

AppendToSearchPaths(configuration, configData);
Expand Down
3 changes: 3 additions & 0 deletions SilKit/source/config/YamlReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -484,6 +484,9 @@ void YamlReader::Read(SilKit::Config::ParticipantConfiguration& obj)
OptionalRead(obj.middleware, "Middleware");
OptionalRead(obj.includes, "Includes");
OptionalRead(obj.experimental, "Experimental");

// design proposal
OptionalRead(obj.enableSynchronizationPoints, "EnableSynchronizationPoints");
}

void YamlReader::Read(SilKit::Config::HealthCheck& obj)
Expand Down
3 changes: 3 additions & 0 deletions SilKit/source/config/YamlWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -600,6 +600,9 @@ void YamlWriter::Write(const SilKit::Config::ParticipantConfiguration& obj)
NonDefaultWrite(obj.middleware, "Middleware", defaultObj.middleware);
NonDefaultWrite(obj.includes, "Includes", defaultObj.includes);
NonDefaultWrite(obj.experimental, "Experimental", defaultObj.experimental);

//design proposal
NonDefaultWrite(obj.enableSynchronizationPoints, "EnableSynchronizationPoints", defaultObj.enableSynchronizationPoints);
}

void YamlWriter::Write(const SilKit::Config::HealthCheck& obj)
Expand Down
2 changes: 2 additions & 0 deletions SilKit/source/core/internal/IParticipantInternal.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -330,6 +330,8 @@ class IParticipantInternal : public IParticipant

virtual auto GetMetricsProcessor() -> IMetricsProcessor* = 0;
virtual auto GetMetricsSender() -> IMetricsSender* = 0;

virtual auto GetConfiguration() -> const Config::ParticipantConfiguration& = 0;
};

} // namespace Core
Expand Down
6 changes: 6 additions & 0 deletions SilKit/source/core/internal/OrchestrationDatatypes.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,12 @@ struct NextSimTask
std::chrono::nanoseconds duration{0};
};

static constexpr NextSimTask ZeroSimTask{std::chrono::nanoseconds{0}, std::chrono::nanoseconds{0}};
inline auto operator==(const NextSimTask& lhs, const NextSimTask& rhs)
{
return lhs.duration == rhs.duration && lhs.timePoint == rhs.timePoint;
}

//! System-wide command for the simulation flow.
struct SystemCommand
{
Expand Down
26 changes: 26 additions & 0 deletions SilKit/source/core/internal/traits/SilKitMsgTraits.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ namespace Core {
// ==================================================================
// Trait which checks that '.timestamp' works
// ==================================================================
template<class T>
using RemoveCvRef = std::remove_cv_t<std::remove_reference_t<T>>;

template <typename T, typename = void>
struct HasTimestamp : std::false_type
Expand Down Expand Up @@ -58,6 +60,15 @@ struct SilKitMsgTraitForbidSelfDelivery
}
};

template <class MsgT>
struct SilKitMsgTraitIsSynchronizationPoint
{
static constexpr bool IsSynchronizationPoint()
{
return false;
}
};

// The final message traits
template <class MsgT>
struct SilKitMsgTraits
Expand All @@ -67,6 +78,7 @@ struct SilKitMsgTraits
, SilKitMsgTraitVersion<MsgT>
, SilKitMsgTraitSerdesName<MsgT>
, SilKitMsgTraitForbidSelfDelivery<MsgT>
, SilKitMsgTraitIsSynchronizationPoint<MsgT>
{
};

Expand Down Expand Up @@ -110,6 +122,16 @@ struct SilKitMsgTraits
} \
}

#define DefineSilKitMsgTrait_IsSynchronizationPoint(Namespace, MsgName) \
template <> \
struct SilKitMsgTraitIsSynchronizationPoint<Namespace::MsgName> \
{ \
static constexpr bool IsSynchronizationPoint() \
{ \
return true; \
} \
}

DefineSilKitMsgTrait_TypeName(SilKit::Services::Logging, LogMsg);
DefineSilKitMsgTrait_TypeName(VSilKit, MetricsUpdate);
DefineSilKitMsgTrait_TypeName(SilKit::Services::Orchestration, SystemCommand);
Expand Down Expand Up @@ -164,5 +186,9 @@ DefineSilKitMsgTrait_EnforceSelfDelivery(SilKit::Services::Lin, LinSendFrameHead
// Messages with forbidden self delivery
DefineSilKitMsgTrait_ForbidSelfDelivery(SilKit::Services::Orchestration, SystemCommand);

// Messages which are Synchronization Points
DefineSilKitMsgTrait_IsSynchronizationPoint(SilKit::Services::Flexray, FlexrayCycleStartEvent);
DefineSilKitMsgTrait_IsSynchronizationPoint(SilKit::Services::PubSub, WireDataMessageEvent); //for testing

} // namespace Core
} // namespace SilKit
7 changes: 6 additions & 1 deletion SilKit/source/core/mock/participant/MockParticipant.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -242,14 +242,15 @@ class DummyMetricsManager : public IMetricsManager
std::unordered_map<std::string, DummyAttributeMetric> _attributes;
};

class DummyParticipant : public IParticipantInternal
class DummyParticipant: public IParticipantInternal
{
public:
DummyParticipant()
{
ON_CALL(mockLifecycleService, GetTimeSyncService).WillByDefault(testing::Return(&mockTimeSyncService));
ON_CALL(mockLifecycleService, CreateTimeSyncService).WillByDefault(testing::Return(&mockTimeSyncService));
ON_CALL(logger, GetLogLevel()).WillByDefault(testing::Return(Services::Logging::Level::Debug));
ON_CALL(*this, GetConfiguration()).WillByDefault(testing::ReturnRef(_participantConfiguration));
}

auto CreateCanController(const std::string& /*canonicalName*/,
Expand Down Expand Up @@ -720,6 +721,8 @@ class DummyParticipant : public IParticipantInternal
return nullptr;
}

MOCK_METHOD(const Config::ParticipantConfiguration&, GetConfiguration, (), (override));

const std::string _name = "MockParticipant";
const std::string _registryUri = "silkit://mock.participant.silkit:0";
testing::NiceMock<MockLogger> logger;
Expand All @@ -733,6 +736,8 @@ class DummyParticipant : public IParticipantInternal
MockParticipantReplies mockParticipantReplies;
DummyNetworkSimulator mockNetworkSimulator;
DummyMetricsManager mockMetricsManager;
Config::ParticipantConfiguration _participantConfiguration;

};

// ================================================================================
Expand Down
7 changes: 6 additions & 1 deletion SilKit/source/core/participant/Participant.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ namespace SilKit {
namespace Core {

template <class SilKitConnectionT>
class Participant final : public IParticipantInternal
class Participant final: public IParticipantInternal
{
public:
// ----------------------------------------
Expand Down Expand Up @@ -440,6 +440,11 @@ class Participant final : public IParticipantInternal

auto MakeTimerThread() -> std::unique_ptr<IMetricsTimerThread>;

auto GetConfiguration() -> const Config::ParticipantConfiguration& override;

template<typename MessageT>
void HandleSynchronizationPoint();

private:
// ----------------------------------------
// private members
Expand Down
30 changes: 29 additions & 1 deletion SilKit/source/core/participant/Participant_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
#include "Uuid.hpp"
#include "Assert.hpp"
#include "ExecutionEnvironment.hpp"
#include "traits/SilKitMsgTraits.hpp"

#include "fmt/ranges.h"

Expand Down Expand Up @@ -1323,14 +1324,35 @@ void Participant<SilKitConnectionT>::SendMsg(const IServiceEndpoint* from,
SendMsgImpl(from, std::move(msg));
}

template <typename SilKitConnectionT>
template <typename MessageT>
void Participant<SilKitConnectionT>::HandleSynchronizationPoint()
{
if constexpr (SilKitMsgTraits<RemoveCvRef<MessageT>>::IsSynchronizationPoint())
{
if (auto* lifecycle = static_cast<Orchestration::LifecycleService*>(GetLifecycleService()); lifecycle)
{
if (auto* timesync = static_cast<Orchestration::TimeSyncService*>(lifecycle->GetTimeSyncService());
timesync)
{
if(_participantConfig.enableSynchronizationPoints)
{
timesync->TriggerSynchronization();
}
}
}
}

}

template <class SilKitConnectionT>
template <typename SilKitMessageT>
void Participant<SilKitConnectionT>::SendMsgImpl(const IServiceEndpoint* from, SilKitMessageT&& msg)
{
TraceTx(GetLoggerInternal(), from, msg);
_connection.SendMsg(from, std::forward<SilKitMessageT>(msg));
HandleSynchronizationPoint<SilKitMessageT>();
}

// Targeted messaging
template <class SilKitConnectionT>
void Participant<SilKitConnectionT>::SendMsg(const IServiceEndpoint* from, const std::string& targetParticipantName,
Expand Down Expand Up @@ -1633,6 +1655,7 @@ void Participant<SilKitConnectionT>::SendMsgImpl(const IServiceEndpoint* from, c
{
TraceTx(GetLoggerInternal(), from, targetParticipantName, msg);
_connection.SendMsg(from, targetParticipantName, std::forward<SilKitMessageT>(msg));
HandleSynchronizationPoint<SilKitMessageT>();
}


Expand Down Expand Up @@ -2024,6 +2047,11 @@ auto Participant<SilKitConnectionT>::MakeTimerThread() -> std::unique_ptr<IMetri
_participantConfig.experimental.metrics.updateInterval,
[this] { ExecuteDeferred([this] { GetMetricsManager()->SubmitUpdates(); }); });
}
template <class SilKitConnectionT>
auto Participant<SilKitConnectionT>::GetConfiguration() -> const Config::ParticipantConfiguration&
{
return _participantConfig;
}


} // namespace Core
Expand Down
21 changes: 21 additions & 0 deletions SilKit/source/core/participant/Test_Participant.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,20 @@
#include "gmock/gmock.h"
#include "gtest/gtest.h"

#include "WireDataMessages.hpp"
#include "WireCanMessages.hpp"
#include "WireEthernetMessages.hpp"
#include "WireLinMessages.hpp"
#include "WireFlexrayMessages.hpp"
#include "WireRpcMessages.hpp"
#include "LoggingDatatypesInternal.hpp"
#include "OrchestrationDatatypes.hpp"
#include "ServiceDatatypes.hpp"
#include "RequestReplyDatatypes.hpp"
#include "MetricsDatatypes.hpp"

#include "traits/SilKitMsgTraits.hpp"

#include "NullConnectionParticipant.hpp"
#include "CanController.hpp"
#include "ConfigurationTestUtils.hpp"
Expand All @@ -24,6 +38,13 @@ class Test_Participant : public testing::Test
Test_Participant() {}
};

TEST(Test_Traits, ensure_traits)
{
EXPECT_EQ(SilKitMsgTraits<SilKit::Services::PubSub::WireDataMessageEvent>::TypeName(),
std::string{"SilKit::Services::PubSub::WireDataMessageEvent"});
EXPECT_TRUE(SilKitMsgTraits<SilKit::Services::PubSub::WireDataMessageEvent>::IsSynchronizationPoint());
}

TEST_F(Test_Participant, throw_on_empty_participant_name)
{
EXPECT_THROW(CreateNullConnectionParticipantImpl(SilKit::Config::MakeEmptyParticipantConfigurationImpl(), ""),
Expand Down
Loading
Loading