66#include " Concurrent.h"
77#include " TableImp.h"
88#include " DolphinDB.h"
9+ #include " EventHandler.h"
910#include " Util.h"
1011#ifdef _MSC_VER
1112 #ifdef _USRDLL
@@ -76,6 +77,7 @@ using MessageQueueSP = SmartPointer<MessageQueue>;
7677using MessageTableQueueSP = SmartPointer<MessageTableQueue>;
7778using MessageHandler = std::function<void (Message)>;
7879using MessageBatchHandler = std::function<void (vector<Message>)>;
80+ using EventMessageHandler = std::function<void (const std::string&, std::vector<ConstantSP>&)>;
7981
8082#define DEFAULT_ACTION_NAME " cppStreamingAPI"
8183constexpr int DEFAULT_QUEUE_CAPACITY = 65536 ;
@@ -102,7 +104,8 @@ typedef SmartPointer<StreamDeserializer> StreamDeserializerSP;
102104
103105struct SubscribeInfo {
104106 SubscribeInfo ()
105- : host(" INVAILD" ),
107+ : ID(" INVALID" ),
108+ host (" INVAILD" ),
106109 port(-1 ),
107110 tableName(" INVALID" ),
108111 actionName(" INVALID" ),
@@ -116,11 +119,32 @@ struct SubscribeInfo {
116119 tqueue(nullptr ),
117120 userName(" " ),
118121 password(" " ),
119- streamDeserializer(nullptr ) {}
120- explicit SubscribeInfo (const string &host, int port, const string &tableName, const string &actionName, long long offset, bool resub,
121- const VectorSP &filter, bool msgAsTable, bool allowExists, int batchSize,
122- const string &userName, const string &password, const StreamDeserializerSP &blobDeserializer, const bool istqueue)
123- : host(move(host)),
122+ streamDeserializer(nullptr ),
123+ currentSiteIndex(-1 ),
124+ isEvent_(false ),
125+ resubTimeout(100 ),
126+ subOnce(false ),
127+ lastSiteIndex(-1 ) {}
128+ explicit SubscribeInfo (const string &id,
129+ const string &host,
130+ int port,
131+ const string &tableName,
132+ const string &actionName,
133+ long long offset,
134+ bool resub,
135+ const VectorSP &filter,
136+ bool msgAsTable,
137+ bool allowExists,
138+ int batchSize,
139+ const string &userName,
140+ const string &password,
141+ const StreamDeserializerSP &blobDeserializer,
142+ const bool istqueue,
143+ bool isEvent,
144+ int resubTimeout,
145+ bool subOnce)
146+ : ID(move(id)),
147+ host(move(host)),
124148 port(port),
125149 tableName(move(tableName)),
126150 actionName(move(actionName)),
@@ -136,9 +160,15 @@ struct SubscribeInfo {
136160 userName(move(userName)),
137161 password(move(password)),
138162 istqueue(istqueue),
139- streamDeserializer(blobDeserializer){
163+ streamDeserializer(blobDeserializer),
164+ currentSiteIndex(-1 ),
165+ isEvent_(isEvent),
166+ resubTimeout(resubTimeout),
167+ subOnce(subOnce),
168+ lastSiteIndex(-1 ) {
140169 }
141170
171+ string ID;
142172 string host;
143173 int port;
144174 string tableName;
@@ -158,6 +188,12 @@ struct SubscribeInfo {
158188 SocketSP socket;
159189
160190 vector<ThreadSP> handleThread;
191+ vector<pair<string, int >> availableSites;
192+ int currentSiteIndex;
193+ bool isEvent_;
194+ int resubTimeout;
195+ bool subOnce;
196+ int lastSiteIndex;
161197 void setExitFlag () {
162198 if (istqueue) {
163199 tqueue->setExitFlag ();
@@ -181,6 +217,22 @@ struct SubscribeInfo {
181217 }
182218 handleThread.clear ();
183219 }
220+
221+ void updateByReconnect (int currentReconnSiteIndex, const std::string &topic) {
222+ auto thisTopicLastSuccessfulNode = this ->lastSiteIndex ;
223+ if (this ->subOnce && thisTopicLastSuccessfulNode != currentReconnSiteIndex) {
224+ // update currentSiteIndex
225+ if (thisTopicLastSuccessfulNode < currentReconnSiteIndex) {
226+ currentReconnSiteIndex--;
227+ }
228+ // update info
229+ this ->availableSites .erase (this ->availableSites .begin () + thisTopicLastSuccessfulNode);
230+ this ->currentSiteIndex = currentReconnSiteIndex;
231+
232+ // update lastSuccessfulNode
233+ this ->lastSiteIndex = currentReconnSiteIndex;
234+ }
235+ }
184236};
185237
186238
@@ -198,13 +250,27 @@ class EXPORT_DECL StreamingClient {
198250 int64_t offset = -1 , bool resubscribe = true , const VectorSP &filter = nullptr ,
199251 bool msgAsTable = false , bool allowExists = false , int batchSize = 1 ,
200252 string userName=" " , string password=" " ,
201- const StreamDeserializerSP &blobDeserializer = nullptr , bool istqueue = false );
253+ const StreamDeserializerSP &blobDeserializer = nullptr , bool istqueue = false ,
254+ const std::vector<std::string> &backupSites = std::vector<std::string>(), bool isEvent = false,
255+ int resubTimeout = 100, bool subOnce = false);
202256 void unsubscribeInternal (string host, int port, string tableName, string actionName = DEFAULT_ACTION_NAME);
203257
204258protected:
205259 SmartPointer<StreamingClientImpl> impl_;
206260};
207261
262+ class EventClient : public StreamingClient {
263+ public:
264+ EventClient (const std::vector<EventSchema>& eventSchemes, const std::vector<std::string>& eventTimeKeys, const std::vector<std::string>& commonKeys);
265+ ThreadSP subscribe (const string& host, int port, const EventMessageHandler &handler, const string& tableName, const string& actionName = DEFAULT_ACTION_NAME, int64_t offset = -1 ,
266+ bool resub = true , const string& userName=" " , const string& password=" " );
267+ void unsubscribe (const string& host, int port, const string& tableName, const string& actionName = DEFAULT_ACTION_NAME);
268+
269+ private:
270+ EventHandler eventHandler_;
271+ };
272+
273+
208274class EXPORT_DECL ThreadedClient : public StreamingClient {
209275public:
210276 // listeningPort > 0 : listen mode, wait for server connection
@@ -215,13 +281,17 @@ class EXPORT_DECL ThreadedClient : public StreamingClient {
215281 string actionName = DEFAULT_ACTION_NAME, int64_t offset = -1 , bool resub = true ,
216282 const VectorSP &filter = nullptr , bool msgAsTable = false , bool allowExists = false ,
217283 string userName=" " , string password=" " ,
218- const StreamDeserializerSP &blobDeserializer = nullptr );
284+ const StreamDeserializerSP &blobDeserializer = nullptr ,
285+ const std::vector<std::string> &backupSites = std::vector<std::string>(),
286+ int resubTimeout = 100, bool subOnce = false);
219287 ThreadSP subscribe (string host, int port, const MessageBatchHandler &handler, string tableName,
220288 string actionName = DEFAULT_ACTION_NAME, int64_t offset = -1 , bool resub = true ,
221289 const VectorSP &filter = nullptr , bool allowExists = false , int batchSize = 1 ,
222290 double throttle = 1 ,bool msgAsTable = false ,
223291 string userName = " " , string password = " " ,
224- const StreamDeserializerSP &blobDeserializer = nullptr );
292+ const StreamDeserializerSP &blobDeserializer = nullptr ,
293+ const std::vector<std::string> &backupSites = std::vector<std::string>(),
294+ int resubTimeout = 100, bool subOnce = false);
225295 size_t getQueueDepth (const ThreadSP &thread);
226296 void unsubscribe (string host, int port, string tableName, string actionName = DEFAULT_ACTION_NAME);
227297};
@@ -236,7 +306,9 @@ class EXPORT_DECL ThreadPooledClient : public StreamingClient {
236306 string actionName, int64_t offset = -1 , bool resub = true ,
237307 const VectorSP &filter = nullptr , bool msgAsTable = false , bool allowExists = false ,
238308 string userName = " " , string password = " " ,
239- const StreamDeserializerSP &blobDeserializer = nullptr );
309+ const StreamDeserializerSP &blobDeserializer = nullptr ,
310+ const std::vector<std::string> &backupSites = std::vector<std::string>(),
311+ int resubTimeout = 100, bool subOnce = false);
240312 void unsubscribe (string host, int port, string tableName, string actionName = DEFAULT_ACTION_NAME);
241313 size_t getQueueDepth (const ThreadSP &thread);
242314
@@ -254,7 +326,9 @@ class EXPORT_DECL PollingClient : public StreamingClient {
254326 int64_t offset = -1 , bool resub = true , const VectorSP &filter = nullptr ,
255327 bool msgAsTable = false , bool allowExists = false ,
256328 string userName=" " , string password=" " ,
257- const StreamDeserializerSP &blobDeserializer = nullptr );
329+ const StreamDeserializerSP &blobDeserializer = nullptr ,
330+ const std::vector<std::string> &backupSites = std::vector<std::string>(),
331+ int resubTimeout = 100, bool subOnce = false);
258332 void unsubscribe (string host, int port, string tableName, string actionName = DEFAULT_ACTION_NAME);
259333};
260334
0 commit comments