Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions DataService/src/AuthService.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ bool AuthService::authenticate( std::shared_ptr<DistributedATS::SQLiteConnection

int ret = m_logon_dw->write(logon.get());

if (!ret) {
if ( ret != eprosima::fastdds::dds::RETCODE_OK ) {
LOG4CXX_ERROR(logger, "Error Publishing to DDS :" << ss_logon.str());
}

Expand All @@ -146,17 +146,18 @@ bool AuthService::authenticate( std::shared_ptr<DistributedATS::SQLiteConnection

logout.DATS_Source(logon->DATS_Destination());
logout.DATS_Destination(logon->DATS_Source());
logout.DATS_SourceUser("AUTH");
logout.DATS_DestinationUser(logon->RawData());

logout.Text(textOut);

std::stringstream ss_logout;
LogoutLogger::log(ss_logout, logout);
LOG4CXX_INFO(logger, "Auth Service Logout : %s\n" << ss_logout.str());
LOG4CXX_INFO(logger, "Auth Service Logout : " << ss_logout.str());

int ret = m_logout_dw->write(&logout);

if (!ret) {
if ( ret != eprosima::fastdds::dds::RETCODE_OK ) {
LOG4CXX_ERROR(logger, "Logout write returned error : " << ret );
}
}
Expand Down
2 changes: 1 addition & 1 deletion DataService/src/MarketDataService.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ bool MarketDataService::processMarketDataRequest( const MarketDataRequestPtr& ma
std::cout << "Publishing Full Market Data Snapshot : " << marketDataSnapshotFullRefresh.Symbol() << std::endl;
int ret = _market_data_shapshot_full_refresh_dw->write(&marketDataSnapshotFullRefresh);

if (!ret) {
if ( ret != eprosima::fastdds::dds::RETCODE_OK ) {
LOG4CXX_ERROR(logger, "Market Data Snapshot Data Write returned : " << ret );
}

Expand Down
2 changes: 1 addition & 1 deletion DataService/src/OrderMassStatusRequestService.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ bool OrderMassStatusRequestService::processMassOrderStatusServiceRequest( OrderM

int ret = _execution_report_dw->write( execReport.get() );

if (!ret) {
if ( ret != eprosima::fastdds::dds::RETCODE_OK ) {
LOG4CXX_ERROR(logger, "MassOrderStatusRequestDataReader/Execution Report write returned :" << ret);
}

Expand Down
2 changes: 1 addition & 1 deletion DataService/src/RefDataService.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ bool RefDataService::processRefDataRequest( const SecurityListRequestPtr& securi

int ret = _security_list_dw->write(&securityList);

if (!ret) {
if ( ret != eprosima::fastdds::dds::RETCODE_OK ) {
LOG4CXX_ERROR(logger, "Security List write returned :" << ret);
}

Expand Down
18 changes: 9 additions & 9 deletions MatchingEngine/src/Market.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ void Market::publishSecurityListRequest( eprosima::fastdds::dds::DataWriter* dwr

auto ret = dwr->write(&securityListRequest);

if (!ret) {
if ( ret != eprosima::fastdds::dds::RETCODE_OK ) {
LOG4CXX_ERROR(logger, "SecurityListRequest write returned : " << ret);
}
}
Expand Down Expand Up @@ -164,9 +164,9 @@ void Market::publishMarketDataRequest()

LoggerHelper::log_info<std::stringstream, MarketDataRequestLogger,DistributedATS_MarketDataRequest::MarketDataRequest>(logger,marketDataRequest, "MarketDataRequest");

bool ret = dataWriterContainerPtr_->market_data_request_dw->write(&marketDataRequest);
auto ret = dataWriterContainerPtr_->market_data_request_dw->write(&marketDataRequest);

if (!ret)
if ( ret != eprosima::fastdds::dds::RETCODE_OK )
{
LOG4CXX_ERROR(logger, "MarketDataRequest write returned : " << ret);
}
Expand Down Expand Up @@ -292,9 +292,9 @@ void Market::publishExecutionReport(DistributedATS_ExecutionReport::ExecutionRep
LoggerHelper::log_debug<std::stringstream, ExecutionReportLogger, DistributedATS_ExecutionReport::ExecutionReport>(logger,
executionReport, "ExecutionReport");

bool ret = dataWriterContainerPtr_->execution_report_dw->write(&executionReport);
auto ret = dataWriterContainerPtr_->execution_report_dw->write(&executionReport);

if (!ret)
if ( ret != eprosima::fastdds::dds::RETCODE_OK )
{
LOG4CXX_ERROR(logger, "Execution Report write returned : " << ret);
}
Expand All @@ -305,9 +305,9 @@ void Market::publishOrderMassCancelReport(DistributedATS_OrderMassCancelReport::

LoggerHelper::log_debug<std::stringstream, OrderMassCancelReportLogger, DistributedATS_OrderMassCancelReport::OrderMassCancelReport>(logger, orderMassCancelReport, "OrderMassCancelReport");

bool ret = dataWriterContainerPtr_->order_mass_cancel_report_dw->write(&orderMassCancelReport);
auto ret = dataWriterContainerPtr_->order_mass_cancel_report_dw->write(&orderMassCancelReport);

if (!ret)
if ( ret != eprosima::fastdds::dds::RETCODE_OK )
{
LOG4CXX_ERROR(logger, "Order Mass Cancel Report write returned: " << ret);
}
Expand All @@ -320,9 +320,9 @@ void Market::publishOrderCancelReject(
LoggerHelper::log_debug<std::stringstream, OrderCancelRejectLogger,
DistributedATS_OrderCancelReject::OrderCancelReject>(logger, orderCancelReject, "OrderCancelReject");

bool ret = dataWriterContainerPtr_->order_cancel_reject_dw->write(&orderCancelReject);
auto ret = dataWriterContainerPtr_->order_cancel_reject_dw->write(&orderCancelReject);

if (ret)
if ( ret != eprosima::fastdds::dds::RETCODE_OK )
{
LOG4CXX_ERROR(logger, "Order Cancel Reject write returned: " << ret);
}
Expand Down
4 changes: 2 additions & 2 deletions MatchingEngine/src/Order.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ void Order::onCancelRejected(const char *reason)
LoggerHelper::log_debug<std::stringstream, OrderCancelRejectLogger,
DistributedATS_OrderCancelReject::OrderCancelReject>(logger,orderCancelReject, "OrderCancelReject");

int ret = dataWriterContainerPtr_->order_cancel_reject_dw->write(&orderCancelReject);
auto ret = dataWriterContainerPtr_->order_cancel_reject_dw->write(&orderCancelReject);

if (ret != eprosima::fastdds::dds::RETCODE_OK) {
LOG4CXX_ERROR(logger, "OrderCancelReject write returned :" << ret);
Expand Down Expand Up @@ -188,7 +188,7 @@ void Order::onReplaceRejected(const char *reason)
DistributedATS_OrderCancelReject::OrderCancelReject>( logger,
orderCancelReject, "OrderCancelReject");

int ret = dataWriterContainerPtr_->order_cancel_reject_dw->write(&orderCancelReject);
auto ret = dataWriterContainerPtr_->order_cancel_reject_dw->write(&orderCancelReject);

if (ret != eprosima::fastdds::dds::RETCODE_OK) {
LOG4CXX_ERROR(logger, "OrdereplaceRejected write returned :" << ret);
Expand Down
2 changes: 1 addition & 1 deletion MatchingEngine/src/PriceDepthPublisherService.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ int PriceDepthPublisherService::service()
MarketDataIncrementalRefreshLogger::log(
ss, chunkedIncrementalMarketDataRefresh);

int ret = _market_data_incremental_refresh_dw->write(
auto ret = _market_data_incremental_refresh_dw->write(
&chunkedIncrementalMarketDataRefresh);

if (ret != eprosima::fastdds::dds::RETCODE_OK) {
Expand Down
2 changes: 2 additions & 0 deletions MiscClients/cpp_ws_reactjs/fix_ws_proxy/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -31,3 +31,5 @@ target_link_libraries(fix_ws_proxy
Boost::json
Boost::chrono
)

install(TARGETS fix_ws_proxy)
109 changes: 71 additions & 38 deletions MiscClients/cpp_ws_reactjs/fix_ws_proxy/fix_application.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,61 +6,94 @@
void fix_application::toAdmin(FIX::Message& message, const FIX::SessionID& sessionID)
{
const FIX::Header& header = message.getHeader();
const FIX::MsgType& msgType = FIELD_GET_REF(header, MsgType);

const FIX::MsgType& msgType = FIELD_GET_REF( header, MsgType );
// Convert FIX message to JSON and add common fields
auto json_obj = distributed_ats::fix_json::fix_to_json(message);
json_obj["session_qualifier"] = sessionID.getSessionQualifier();
json_obj["data_type"] = "FIX";
json_obj["success"] = true;
auto json_str = boost::json::serialize(json_obj);

if ( msgType == FIX::MsgType_Logon )
{
if (msgType == FIX::MsgType_Logon) {
if (auto session = _ws_session.lock()) {
auto ws_logon = session->get_pending_logon();

std::cout << std::endl << "Sending Logon : " << sessionID << "Pending Logon : " << ws_logon << std::endl;


std::cout << "\nSending Logon: " << sessionID
<< " | Pending Logon: " << ws_logon << std::endl;

// Set Username
FIX::Username userName(sessionID.getSenderCompID());
message.setField(userName);


// Set Password from WebSocket logon
FIX::Password password;
ws_logon.getField(password);

message.setField(password);

std::cout << "Sending Logon : " << message.toString() << std::endl;

// Print JSON log
std::cout << "Sending Logon JSON: " << json_str << std::endl;
}
} else if ( msgType == FIX::MsgType_Logout )
{
std::cout << "Logout : " << message << std::endl;
}
}
else if (msgType == FIX::MsgType_Logout) {
std::cout << "Logout JSON: " << json_str << std::endl;
}

// Optional: log via QuickFIX
if (auto fix_session = FIX::Session::lookupSession(sessionID)) {
if (auto log = fix_session->getLog()) {
log->onEvent(json_str);
}
}
}


void fix_application::fromAdmin(const FIX::Message& message, const FIX::SessionID& sessionId) throw(FIX::FieldNotFound, FIX::IncorrectDataFormat, FIX::IncorrectTagValue, FIX::RejectLogon)
{
if (auto session = _ws_session.lock()) {

auto json_obj = distributed_ats::fix_json::fix_to_json(message);

json_obj["session_qualifier"] = sessionId.getSessionQualifier();
json_obj["data_type"] = "FIX";
json_obj["success"] = true;

auto json_str = boost::json::serialize(json_obj);

session->send_string(json_str);
}
// Convert FIX message to JSON
auto json_obj = distributed_ats::fix_json::fix_to_json(message);
json_obj["session_qualifier"] = sessionId.getSessionQualifier();
json_obj["data_type"] = "FIX";
json_obj["success"] = true;

auto json_str = boost::json::serialize(json_obj);

// Send JSON to WebSocket session
if (auto session = _ws_session.lock()) {
session->send_string(json_str);
}

// Log the JSON message using QuickFIX logger
if (auto fix_session = FIX::Session::lookupSession(sessionId)) {
if (auto log = fix_session->getLog()) {
log->onEvent(json_str); // Log as a general event
// If you prefer, you could also log as incoming FIX message:
// log->onIncoming(message, sessionId);
}
}
}


void fix_application::fromApp(const FIX::Message& message, const FIX::SessionID& sessionId) throw(FIX::FieldNotFound, FIX::IncorrectDataFormat, FIX::IncorrectTagValue, FIX::UnsupportedMessageType)
{
if (auto session = _ws_session.lock()) {

auto json_obj = distributed_ats::fix_json::fix_to_json(message);

json_obj["session_qualifier"] = sessionId.getSessionQualifier();
json_obj["data_type"] = "FIX";
json_obj["success"] = true;

auto json_str = boost::json::serialize(json_obj);

session->send_string(json_str);
}
// Convert FIX message to JSON
auto json_obj = distributed_ats::fix_json::fix_to_json(message);
json_obj["session_qualifier"] = sessionId.getSessionQualifier();
json_obj["data_type"] = "FIX";
json_obj["success"] = true;

auto json_str = boost::json::serialize(json_obj);

// Send to your WebSocket session
if (auto session = _ws_session.lock()) {
session->send_string(json_str);
}

// Log the JSON message using QuickFIX logger
if (auto fix_session = FIX::Session::lookupSession(sessionId)) {
if (auto log = fix_session->getLog()) {
log->onEvent(json_str); // Log as an event
// log->onIncoming(message, sessionId);
}
}
}
5 changes: 1 addition & 4 deletions MiscClients/cpp_ws_reactjs/fix_ws_proxy/fix_application.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,7 @@ class fix_application : public FIX::Application {

public:

fix_application(const std::weak_ptr<Session>& ws_session) : _ws_session(ws_session)
{
//std::cout << "Pending Logon : " << _ws_logon << std::endl;
}
fix_application(const std::weak_ptr<Session>& ws_session) : _ws_session(ws_session) {}

void onCreate(const FIX::SessionID& id) override {
std::cout << "Created: " << id << std::endl;
Expand Down
5 changes: 4 additions & 1 deletion MiscClients/cpp_ws_reactjs/fix_ws_proxy/fix_ws_proxy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,12 @@ int main(int argc, char** argv) {
if (!fix_client_config.empty())
std::cout << "Using FIX client config file: " << fix_client_config << "\n";


auto fix_session_setttings = std::make_shared<FIX::SessionSettings>(fix_client_config);


asio::io_context ioc{1}; // number of ws threads
std::make_shared<Listener>(ioc, tcp::endpoint{address, port})->run();
std::make_shared<Listener>(ioc, tcp::endpoint{address, port}, fix_session_setttings)->run();

std::cout << "WebSocket echo server listening on port " << port << "\n";
ioc.run();
Expand Down
2 changes: 1 addition & 1 deletion MiscClients/cpp_ws_reactjs/fix_ws_proxy/fixproxy.ini
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,4 @@ TargetCompID=FIX_GWY_1
SocketConnectHost=127.0.0.1
SocketConnectPort=15001
HeartBtInt=30
DataDictionary=FIX44.xml
DataDictionary=/Users/mkipnis/git/DistributedATS/FIXGateway/spec/FIX44.xml
12 changes: 9 additions & 3 deletions MiscClients/cpp_ws_reactjs/fix_ws_proxy/ws_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,15 +37,16 @@ void Session::on_read(beast::error_code ec, std::size_t bytes_transferred) {
boost::json::object replay;
if ( !FIX::Session::doesSessionExist(qualified_id) )
{
settings_ = std::make_shared<FIX::SessionSettings>("/Users/mkipnis/Documents/oms/oms/fixproxy.ini");
settings_->set(qualified_id, FIX::Dictionary());

pending_logon_ = fix_message;

storeFactory_ = std::make_shared<FIX::NullStoreFactory>();
logFactory_ = std::make_shared<FIX::ScreenLogFactory>( *settings_ );

std::cout << "FIX Message : " << fix_message << std::endl;
auto json_obj = distributed_ats::fix_json::fix_to_json(fix_message);
auto json_str = boost::json::serialize(json_obj);
std::cout << "FIX Message : " << json_str << std::endl;

application_ = std::make_shared<fix_application>(shared_from_this());
socket_initator_ = std::make_shared<FIX::ThreadedSocketInitiator>( *application_, *storeFactory_, *settings_, *logFactory_ );
Expand All @@ -55,7 +56,12 @@ void Session::on_read(beast::error_code ec, std::size_t bytes_transferred) {
replay["success"] = true;
} else {

std::cout << "About to send: " << fix_message << std::endl;
//std::cout << "About to send: " << fix_message << std::endl;

auto json_obj = distributed_ats::fix_json::fix_to_json(fix_message);
auto json_str = boost::json::serialize(json_obj);
std::cout << "FIX Message : " << json_str << std::endl;

replay["token"] = fix_session_qualifier_;

bool success = FIX::Session::sendToTarget(fix_message, qualified_id);
Expand Down
19 changes: 13 additions & 6 deletions MiscClients/cpp_ws_reactjs/fix_ws_proxy/ws_server.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,12 @@ class fix_application;
// A single WebSocket session (handles one client)
class Session : public std::enable_shared_from_this<Session> {
public:
explicit Session(tcp::socket socket)
: ws_(std::move(socket)) {}
explicit Session(tcp::socket socket, const std::stringstream& session_settings_stream)
: ws_(std::move(socket))
{
std::stringstream ss(session_settings_stream.str());
settings_ = std::make_shared<FIX::SessionSettings>(ss);
}

void start() {
// Accept the websocket handshake
Expand Down Expand Up @@ -62,7 +66,6 @@ class Session : public std::enable_shared_from_this<Session> {
std::string fix_session_qualifier_;
std::string ws_session_qualifier_;


std::shared_ptr<FIX::SessionSettings> settings_;
std::shared_ptr<FIX::NullStoreFactory> storeFactory_;
std::shared_ptr<FIX::ScreenLogFactory> logFactory_;
Expand Down Expand Up @@ -172,8 +175,11 @@ class Session : public std::enable_shared_from_this<Session> {
// Accepts incoming connections and launches sessions
class Listener : public std::enable_shared_from_this<Listener> {
public:
Listener(asio::io_context& ioc, tcp::endpoint endpoint)
: ioc_(ioc), acceptor_(ioc) {
Listener(asio::io_context& ioc, tcp::endpoint endpoint, std::shared_ptr<FIX::SessionSettings>& fix_session_settings)
: ioc_(ioc), acceptor_(ioc)
{
session_settings_stream_ << *fix_session_settings;

beast::error_code ec;

acceptor_.open(endpoint.protocol(), ec);
Expand All @@ -196,6 +202,7 @@ class Listener : public std::enable_shared_from_this<Listener> {
private:
asio::io_context& ioc_;
tcp::acceptor acceptor_;
std::stringstream session_settings_stream_;

void do_accept() {
acceptor_.async_accept(
Expand All @@ -208,7 +215,7 @@ class Listener : public std::enable_shared_from_this<Listener> {
std::cerr << "accept failed: " << ec.message() << "\n";
} else {
// Create and run session
std::make_shared<Session>(std::move(socket))->start();
std::make_shared<Session>(std::move(socket), session_settings_stream_)->start();
}
do_accept();
}
Expand Down
Loading