Skip to content

Commit 8c242ae

Browse files
authored
Merge pull request #36 from mkipnis/misc_1025
FIX/WebSocket Proxy
2 parents 8a86fb4 + 48492ad commit 8c242ae

37 files changed

+1392
-620
lines changed

DataService/src/AuthService.cpp

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -130,7 +130,7 @@ bool AuthService::authenticate( std::shared_ptr<DistributedATS::SQLiteConnection
130130

131131
int ret = m_logon_dw->write(logon.get());
132132

133-
if (!ret) {
133+
if ( ret != eprosima::fastdds::dds::RETCODE_OK ) {
134134
LOG4CXX_ERROR(logger, "Error Publishing to DDS :" << ss_logon.str());
135135
}
136136

@@ -146,17 +146,18 @@ bool AuthService::authenticate( std::shared_ptr<DistributedATS::SQLiteConnection
146146

147147
logout.DATS_Source(logon->DATS_Destination());
148148
logout.DATS_Destination(logon->DATS_Source());
149+
logout.DATS_SourceUser("AUTH");
149150
logout.DATS_DestinationUser(logon->RawData());
150151

151152
logout.Text(textOut);
152153

153154
std::stringstream ss_logout;
154155
LogoutLogger::log(ss_logout, logout);
155-
LOG4CXX_INFO(logger, "Auth Service Logout : %s\n" << ss_logout.str());
156+
LOG4CXX_INFO(logger, "Auth Service Logout : " << ss_logout.str());
156157

157158
int ret = m_logout_dw->write(&logout);
158159

159-
if (!ret) {
160+
if ( ret != eprosima::fastdds::dds::RETCODE_OK ) {
160161
LOG4CXX_ERROR(logger, "Logout write returned error : " << ret );
161162
}
162163
}

DataService/src/MarketDataService.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -182,7 +182,7 @@ bool MarketDataService::processMarketDataRequest( const MarketDataRequestPtr& ma
182182
std::cout << "Publishing Full Market Data Snapshot : " << marketDataSnapshotFullRefresh.Symbol() << std::endl;
183183
int ret = _market_data_shapshot_full_refresh_dw->write(&marketDataSnapshotFullRefresh);
184184

185-
if (!ret) {
185+
if ( ret != eprosima::fastdds::dds::RETCODE_OK ) {
186186
LOG4CXX_ERROR(logger, "Market Data Snapshot Data Write returned : " << ret );
187187
}
188188

DataService/src/OrderMassStatusRequestService.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -148,7 +148,7 @@ bool OrderMassStatusRequestService::processMassOrderStatusServiceRequest( OrderM
148148

149149
int ret = _execution_report_dw->write( execReport.get() );
150150

151-
if (!ret) {
151+
if ( ret != eprosima::fastdds::dds::RETCODE_OK ) {
152152
LOG4CXX_ERROR(logger, "MassOrderStatusRequestDataReader/Execution Report write returned :" << ret);
153153
}
154154

DataService/src/RefDataService.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -204,7 +204,7 @@ bool RefDataService::processRefDataRequest( const SecurityListRequestPtr& securi
204204

205205
int ret = _security_list_dw->write(&securityList);
206206

207-
if (!ret) {
207+
if ( ret != eprosima::fastdds::dds::RETCODE_OK ) {
208208
LOG4CXX_ERROR(logger, "Security List write returned :" << ret);
209209
}
210210

MatchingEngine/src/Market.cpp

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -135,7 +135,7 @@ void Market::publishSecurityListRequest( eprosima::fastdds::dds::DataWriter* dwr
135135

136136
auto ret = dwr->write(&securityListRequest);
137137

138-
if (!ret) {
138+
if ( ret != eprosima::fastdds::dds::RETCODE_OK ) {
139139
LOG4CXX_ERROR(logger, "SecurityListRequest write returned : " << ret);
140140
}
141141
}
@@ -164,9 +164,9 @@ void Market::publishMarketDataRequest()
164164

165165
LoggerHelper::log_info<std::stringstream, MarketDataRequestLogger,DistributedATS_MarketDataRequest::MarketDataRequest>(logger,marketDataRequest, "MarketDataRequest");
166166

167-
bool ret = dataWriterContainerPtr_->market_data_request_dw->write(&marketDataRequest);
167+
auto ret = dataWriterContainerPtr_->market_data_request_dw->write(&marketDataRequest);
168168

169-
if (!ret)
169+
if ( ret != eprosima::fastdds::dds::RETCODE_OK )
170170
{
171171
LOG4CXX_ERROR(logger, "MarketDataRequest write returned : " << ret);
172172
}
@@ -292,9 +292,9 @@ void Market::publishExecutionReport(DistributedATS_ExecutionReport::ExecutionRep
292292
LoggerHelper::log_debug<std::stringstream, ExecutionReportLogger, DistributedATS_ExecutionReport::ExecutionReport>(logger,
293293
executionReport, "ExecutionReport");
294294

295-
bool ret = dataWriterContainerPtr_->execution_report_dw->write(&executionReport);
295+
auto ret = dataWriterContainerPtr_->execution_report_dw->write(&executionReport);
296296

297-
if (!ret)
297+
if ( ret != eprosima::fastdds::dds::RETCODE_OK )
298298
{
299299
LOG4CXX_ERROR(logger, "Execution Report write returned : " << ret);
300300
}
@@ -305,9 +305,9 @@ void Market::publishOrderMassCancelReport(DistributedATS_OrderMassCancelReport::
305305

306306
LoggerHelper::log_debug<std::stringstream, OrderMassCancelReportLogger, DistributedATS_OrderMassCancelReport::OrderMassCancelReport>(logger, orderMassCancelReport, "OrderMassCancelReport");
307307

308-
bool ret = dataWriterContainerPtr_->order_mass_cancel_report_dw->write(&orderMassCancelReport);
308+
auto ret = dataWriterContainerPtr_->order_mass_cancel_report_dw->write(&orderMassCancelReport);
309309

310-
if (!ret)
310+
if ( ret != eprosima::fastdds::dds::RETCODE_OK )
311311
{
312312
LOG4CXX_ERROR(logger, "Order Mass Cancel Report write returned: " << ret);
313313
}
@@ -320,9 +320,9 @@ void Market::publishOrderCancelReject(
320320
LoggerHelper::log_debug<std::stringstream, OrderCancelRejectLogger,
321321
DistributedATS_OrderCancelReject::OrderCancelReject>(logger, orderCancelReject, "OrderCancelReject");
322322

323-
bool ret = dataWriterContainerPtr_->order_cancel_reject_dw->write(&orderCancelReject);
323+
auto ret = dataWriterContainerPtr_->order_cancel_reject_dw->write(&orderCancelReject);
324324

325-
if (ret)
325+
if ( ret != eprosima::fastdds::dds::RETCODE_OK )
326326
{
327327
LOG4CXX_ERROR(logger, "Order Cancel Reject write returned: " << ret);
328328
}

MatchingEngine/src/Order.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -131,7 +131,7 @@ void Order::onCancelRejected(const char *reason)
131131
LoggerHelper::log_debug<std::stringstream, OrderCancelRejectLogger,
132132
DistributedATS_OrderCancelReject::OrderCancelReject>(logger,orderCancelReject, "OrderCancelReject");
133133

134-
int ret = dataWriterContainerPtr_->order_cancel_reject_dw->write(&orderCancelReject);
134+
auto ret = dataWriterContainerPtr_->order_cancel_reject_dw->write(&orderCancelReject);
135135

136136
if (ret != eprosima::fastdds::dds::RETCODE_OK) {
137137
LOG4CXX_ERROR(logger, "OrderCancelReject write returned :" << ret);
@@ -188,7 +188,7 @@ void Order::onReplaceRejected(const char *reason)
188188
DistributedATS_OrderCancelReject::OrderCancelReject>( logger,
189189
orderCancelReject, "OrderCancelReject");
190190

191-
int ret = dataWriterContainerPtr_->order_cancel_reject_dw->write(&orderCancelReject);
191+
auto ret = dataWriterContainerPtr_->order_cancel_reject_dw->write(&orderCancelReject);
192192

193193
if (ret != eprosima::fastdds::dds::RETCODE_OK) {
194194
LOG4CXX_ERROR(logger, "OrdereplaceRejected write returned :" << ret);

MatchingEngine/src/PriceDepthPublisherService.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -127,7 +127,7 @@ int PriceDepthPublisherService::service()
127127
MarketDataIncrementalRefreshLogger::log(
128128
ss, chunkedIncrementalMarketDataRefresh);
129129

130-
int ret = _market_data_incremental_refresh_dw->write(
130+
auto ret = _market_data_incremental_refresh_dw->write(
131131
&chunkedIncrementalMarketDataRefresh);
132132

133133
if (ret != eprosima::fastdds::dds::RETCODE_OK) {

MiscClients/cpp_ws_reactjs/fix_ws_proxy/CMakeLists.txt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,3 +31,5 @@ target_link_libraries(fix_ws_proxy
3131
Boost::json
3232
Boost::chrono
3333
)
34+
35+
install(TARGETS fix_ws_proxy)

MiscClients/cpp_ws_reactjs/fix_ws_proxy/fix_application.cpp

Lines changed: 71 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -6,61 +6,94 @@
66
void fix_application::toAdmin(FIX::Message& message, const FIX::SessionID& sessionID)
77
{
88
const FIX::Header& header = message.getHeader();
9+
const FIX::MsgType& msgType = FIELD_GET_REF(header, MsgType);
910

10-
const FIX::MsgType& msgType = FIELD_GET_REF( header, MsgType );
11+
// Convert FIX message to JSON and add common fields
12+
auto json_obj = distributed_ats::fix_json::fix_to_json(message);
13+
json_obj["session_qualifier"] = sessionID.getSessionQualifier();
14+
json_obj["data_type"] = "FIX";
15+
json_obj["success"] = true;
16+
auto json_str = boost::json::serialize(json_obj);
1117

12-
if ( msgType == FIX::MsgType_Logon )
13-
{
18+
if (msgType == FIX::MsgType_Logon) {
1419
if (auto session = _ws_session.lock()) {
1520
auto ws_logon = session->get_pending_logon();
16-
17-
std::cout << std::endl << "Sending Logon : " << sessionID << "Pending Logon : " << ws_logon << std::endl;
18-
21+
22+
std::cout << "\nSending Logon: " << sessionID
23+
<< " | Pending Logon: " << ws_logon << std::endl;
24+
25+
// Set Username
1926
FIX::Username userName(sessionID.getSenderCompID());
2027
message.setField(userName);
21-
28+
29+
// Set Password from WebSocket logon
2230
FIX::Password password;
2331
ws_logon.getField(password);
24-
2532
message.setField(password);
26-
27-
std::cout << "Sending Logon : " << message.toString() << std::endl;
33+
34+
// Print JSON log
35+
std::cout << "Sending Logon JSON: " << json_str << std::endl;
2836
}
29-
} else if ( msgType == FIX::MsgType_Logout )
30-
{
31-
std::cout << "Logout : " << message << std::endl;
32-
}
37+
}
38+
else if (msgType == FIX::MsgType_Logout) {
39+
std::cout << "Logout JSON: " << json_str << std::endl;
40+
}
41+
42+
// Optional: log via QuickFIX
43+
if (auto fix_session = FIX::Session::lookupSession(sessionID)) {
44+
if (auto log = fix_session->getLog()) {
45+
log->onEvent(json_str);
46+
}
47+
}
3348
}
3449

50+
3551
void fix_application::fromAdmin(const FIX::Message& message, const FIX::SessionID& sessionId) throw(FIX::FieldNotFound, FIX::IncorrectDataFormat, FIX::IncorrectTagValue, FIX::RejectLogon)
3652
{
37-
if (auto session = _ws_session.lock()) {
38-
39-
auto json_obj = distributed_ats::fix_json::fix_to_json(message);
40-
41-
json_obj["session_qualifier"] = sessionId.getSessionQualifier();
42-
json_obj["data_type"] = "FIX";
43-
json_obj["success"] = true;
44-
45-
auto json_str = boost::json::serialize(json_obj);
46-
47-
session->send_string(json_str);
48-
}
53+
// Convert FIX message to JSON
54+
auto json_obj = distributed_ats::fix_json::fix_to_json(message);
55+
json_obj["session_qualifier"] = sessionId.getSessionQualifier();
56+
json_obj["data_type"] = "FIX";
57+
json_obj["success"] = true;
58+
59+
auto json_str = boost::json::serialize(json_obj);
60+
61+
// Send JSON to WebSocket session
62+
if (auto session = _ws_session.lock()) {
63+
session->send_string(json_str);
64+
}
65+
66+
// Log the JSON message using QuickFIX logger
67+
if (auto fix_session = FIX::Session::lookupSession(sessionId)) {
68+
if (auto log = fix_session->getLog()) {
69+
log->onEvent(json_str); // Log as a general event
70+
// If you prefer, you could also log as incoming FIX message:
71+
// log->onIncoming(message, sessionId);
72+
}
73+
}
4974
}
5075

5176

5277
void fix_application::fromApp(const FIX::Message& message, const FIX::SessionID& sessionId) throw(FIX::FieldNotFound, FIX::IncorrectDataFormat, FIX::IncorrectTagValue, FIX::UnsupportedMessageType)
5378
{
54-
if (auto session = _ws_session.lock()) {
55-
56-
auto json_obj = distributed_ats::fix_json::fix_to_json(message);
57-
58-
json_obj["session_qualifier"] = sessionId.getSessionQualifier();
59-
json_obj["data_type"] = "FIX";
60-
json_obj["success"] = true;
61-
62-
auto json_str = boost::json::serialize(json_obj);
63-
64-
session->send_string(json_str);
65-
}
79+
// Convert FIX message to JSON
80+
auto json_obj = distributed_ats::fix_json::fix_to_json(message);
81+
json_obj["session_qualifier"] = sessionId.getSessionQualifier();
82+
json_obj["data_type"] = "FIX";
83+
json_obj["success"] = true;
84+
85+
auto json_str = boost::json::serialize(json_obj);
86+
87+
// Send to your WebSocket session
88+
if (auto session = _ws_session.lock()) {
89+
session->send_string(json_str);
90+
}
91+
92+
// Log the JSON message using QuickFIX logger
93+
if (auto fix_session = FIX::Session::lookupSession(sessionId)) {
94+
if (auto log = fix_session->getLog()) {
95+
log->onEvent(json_str); // Log as an event
96+
// log->onIncoming(message, sessionId);
97+
}
98+
}
6699
}

MiscClients/cpp_ws_reactjs/fix_ws_proxy/fix_application.h

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -12,10 +12,7 @@ class fix_application : public FIX::Application {
1212

1313
public:
1414

15-
fix_application(const std::weak_ptr<Session>& ws_session) : _ws_session(ws_session)
16-
{
17-
//std::cout << "Pending Logon : " << _ws_logon << std::endl;
18-
}
15+
fix_application(const std::weak_ptr<Session>& ws_session) : _ws_session(ws_session) {}
1916

2017
void onCreate(const FIX::SessionID& id) override {
2118
std::cout << "Created: " << id << std::endl;

0 commit comments

Comments
 (0)