Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion conda/dev-environment-unix.yml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ dependencies:
- libarrow=16
- libboost>=1.80.0
- libboost-headers>=1.80.0
- libprotobuf
- librdkafka
- lz4-c
- mamba
Expand Down
1 change: 0 additions & 1 deletion conda/dev-environment-win.yml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ dependencies:
- libarrow=16
- libboost>=1.80.0
- libboost-headers>=1.80.0
- libprotobuf
- librdkafka
- lz4-c
- make
Expand Down
7 changes: 0 additions & 7 deletions cpp/cmake/modules/FindDepsBase.cmake
Original file line number Diff line number Diff line change
@@ -1,15 +1,8 @@
cmake_minimum_required(VERSION 3.7.2)

# ABSL
find_package(absl CONFIG REQUIRED)
include_directories(${ABSL_INCLUDE_DIRS})

# RapidJson (for adapter utils)
find_package(RapidJSON CONFIG REQUIRED)
include_directories(${RapidJSON_INCLUDE_DIRS})

# For EXPRTK node
find_path(EXPRTK_INCLUDE_DIRS "exprtk.hpp")

# For adapter utils
find_package(Protobuf REQUIRED)
19 changes: 5 additions & 14 deletions cpp/csp/adapters/kafka/KafkaPublisher.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,20 +14,11 @@ KafkaPublisher::KafkaPublisher( KafkaAdapterManager * mgr, const Dictionary & pr
m_topic( std::move( topic ) )

{
utils::MsgProtocol protocol = utils::MsgProtocol( properties.get<std::string>( "protocol" ) );
switch( protocol )
{
case utils::MsgProtocol::JSON:
m_msgWriter = std::make_shared<utils::JSONMessageWriter>( properties );
break;

case utils::MsgProtocol::RAW_BYTES:
break;

default:
CSP_THROW( NotImplemented, "msg protocol " << protocol << " not currently supported for kafka output adapters" );
break;
}
auto protocol = properties.get<std::string>( "protocol" );
if( protocol == "JSON" )
m_msgWriter = std::make_shared<utils::JSONMessageWriter>( properties );
else if( protocol != "RAW_BYTES" )
CSP_THROW( NotImplemented, "msg protocol " << protocol << " not currently supported for kafka output adapters" );
}

KafkaPublisher::~KafkaPublisher()
Expand Down
15 changes: 1 addition & 14 deletions cpp/csp/adapters/utils/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@ set(ADAPTER_UTILS_PUBLIC_HEADERS
MessageEnums.h
MessageWriter.h
MessageStructConverter.h
ProtobufHelper.h
ProtobufMessageStructConverter.h
RawBytesMessageStructConverter.h
StructAdapterInfo.h
ValueDispatcher.h
Expand All @@ -16,23 +14,12 @@ set(ADAPTER_UTILS_FILES
MessageWriter.cpp
MessageEnums.cpp
MessageStructConverter.cpp
ProtobufHelper.cpp
ProtobufMessageStructConverter.cpp
RawBytesMessageStructConverter.cpp
)
# See https://github.com/Point72/csp/issues/454
if(WIN32)
set(CSP_LIB_TYPE STATIC)
else()
set(CSP_LIB_TYPE SHARED)
endif()

add_library(csp_adapter_utils ${CSP_LIB_TYPE} ${ADAPTER_UTILS_FILES})
add_library(csp_adapter_utils STATIC ${ADAPTER_UTILS_FILES})
set_target_properties(csp_adapter_utils PROPERTIES PUBLIC_HEADER "${ADAPTER_UTILS_PUBLIC_HEADERS}" PREFIX lib)

find_package(Protobuf REQUIRED)
target_link_libraries(csp_adapter_utils PRIVATE protobuf::libprotoc protobuf::libprotobuf protobuf::libprotobuf-lite)

install(TARGETS csp_adapter_utils
PUBLIC_HEADER DESTINATION include/csp/adapters/utils
RUNTIME DESTINATION ${CSP_RUNTIME_INSTALL_SUBDIR}
Expand Down
2 changes: 0 additions & 2 deletions cpp/csp/adapters/utils/JSONMessageStructConverter.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@ class JSONMessageStructConverter: public MessageStructConverter

csp::StructPtr asStruct( void * bytes, size_t size ) final;

MsgProtocol protocol() const override { return MsgProtocol::JSON; }

static MessageStructConverter * create( const CspTypePtr & type, const Dictionary & properties )
{
return new JSONMessageStructConverter( type, properties );
Expand Down
2 changes: 1 addition & 1 deletion cpp/csp/adapters/utils/JSONMessageWriter.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ class JSONMessageWriter : public MessageWriter
csp::CspType::Type::STRING
>;

JSONMessageWriter( const Dictionary & properties ) : MessageWriter( MsgProtocol::JSON )
JSONMessageWriter( const Dictionary & properties )
{
m_doc.SetObject();
m_datetimeWireType = utils::DateTimeWireType( properties.get<std::string>( "datetime_type" ) );
Expand Down
7 changes: 0 additions & 7 deletions cpp/csp/adapters/utils/MessageEnums.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,4 @@ INIT_CSP_ENUM( csp::adapters::utils::DateTimeWireType,
"UINT64_SECONDS"
);

INIT_CSP_ENUM( csp::adapters::utils::MsgProtocol,
"UNKNOWN",
"JSON",
"PROTOBUF",
"RAW_BYTES"
);

}
18 changes: 0 additions & 18 deletions cpp/csp/adapters/utils/MessageEnums.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,24 +25,6 @@ struct DateTimeWireTypeTraits

using DateTimeWireType = csp::Enum<DateTimeWireTypeTraits>;

//Note this should match enum defined in python
struct MsgProtocolTraits
{
enum _enum : unsigned char
{
UNKNOWN = 0,
JSON = 1,
PROTOBUF = 2,
RAW_BYTES = 3,
NUM_TYPES
};

protected:
_enum m_value;
};

using MsgProtocol = csp::Enum<MsgProtocolTraits>;

};

#endif
18 changes: 8 additions & 10 deletions cpp/csp/adapters/utils/MessageStructConverter.cpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
#include <csp/adapters/utils/MessageStructConverter.h>
#include <csp/adapters/utils/JSONMessageStructConverter.h>
#include <csp/adapters/utils/ProtobufMessageStructConverter.h>
#include <csp/adapters/utils/RawBytesMessageStructConverter.h>

namespace csp::adapters::utils
Expand All @@ -14,14 +13,13 @@ MessageStructConverter::MessageStructConverter( const CspTypePtr & type, const D

MessageStructConverterCache::MessageStructConverterCache()
{
registerConverter( MsgProtocol::RAW_BYTES, &RawBytesMessageStructConverter::create );
registerConverter( MsgProtocol::JSON, &JSONMessageStructConverter::create );
registerConverter( MsgProtocol::PROTOBUF, &ProtobufMessageStructConverter::create );
registerConverter( "RAW_BYTES", &RawBytesMessageStructConverter::create );
registerConverter( "JSON", &JSONMessageStructConverter::create );
}

bool MessageStructConverterCache::registerConverter( MsgProtocol protocol, Creator creator )
bool MessageStructConverterCache::registerConverter( std::string protocol, Creator creator )
{
if( m_creators[ protocol ] )
if( m_creators.find( protocol ) != m_creators.end() )
CSP_THROW( RuntimeException, "Attempted to register creator for MessageStructConverter type " << protocol << " more than once" );

m_creators[ protocol ] = creator;
Expand All @@ -42,12 +40,12 @@ MessageStructConverterPtr MessageStructConverterCache::create( const CspTypePtr
if( !rv.second )
return rv.first -> second;

auto protocol = MsgProtocol( properties.get<std::string>( "protocol" ) );
auto creator = m_creators[ protocol ];
if( !creator )
auto protocol = properties.get<std::string>( "protocol" );
auto creatorIt = m_creators.find( protocol );
if( creatorIt == m_creators.end() )
CSP_THROW( ValueError, "MessageStructConverter for type " << protocol << " is not defined" );

auto result = std::shared_ptr<MessageStructConverter>( creator( type, properties ) );
auto result = std::shared_ptr<MessageStructConverter>( creatorIt -> second( type, properties ) );
rv.first -> second = result;
return rv.first -> second;
}
Expand Down
11 changes: 5 additions & 6 deletions cpp/csp/adapters/utils/MessageStructConverter.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
#include <memory>
#include <mutex>
#include <string>
#include <unordered_map>

namespace csp::adapters::utils
{
Expand All @@ -21,8 +22,6 @@ class MessageStructConverter

virtual csp::StructPtr asStruct( void * bytes, size_t size ) = 0;

virtual MsgProtocol protocol() const = 0;

StructMetaPtr structMeta() { return m_structMeta; }

protected:
Expand Down Expand Up @@ -51,15 +50,15 @@ class MessageStructConverterCache

using Creator = std::function<MessageStructConverter*( const CspTypePtr &, const Dictionary & )>;

bool registerConverter( MsgProtocol protocol, Creator creator );
bool registerConverter( std::string protocol, Creator creator );

private:
using CacheKey = std::pair<const CspType*,Dictionary>;
using Cache = std::unordered_map<CacheKey,MessageStructConverterPtr,csp::hash::hash_pair>;

std::mutex m_cacheMutex;
Cache m_cache;
Creator m_creators[ MsgProtocol::NUM_TYPES ];
std::unordered_map<std::string,Creator> m_creators;
std::mutex m_cacheMutex;
Cache m_cache;
};

}
Expand Down
5 changes: 1 addition & 4 deletions cpp/csp/adapters/utils/MessageWriter.h
Original file line number Diff line number Diff line change
Expand Up @@ -67,11 +67,9 @@ class MessageWriter
public:
using FieldEntry = OutputDataMapper::FieldEntry;

MessageWriter( MsgProtocol protocol ) : m_protocol( protocol ) {}
MessageWriter() {}
virtual ~MessageWriter();

MsgProtocol protocol() const { return m_protocol; }

//returns the finalized message as bytes
virtual std::pair<const void *,size_t> finalize() = 0;

Expand All @@ -83,7 +81,6 @@ class MessageWriter

private:
virtual void processTickImpl( const OutputDataMapper & dataMapper, const TimeSeriesProvider * sourcets ) = 0;
MsgProtocol m_protocol;
};

using MessageWriterPtr=std::shared_ptr<MessageWriter>;
Expand Down
Loading
Loading