@@ -73,7 +73,7 @@ class NetworkSourceModule : public ModuleManager::Instance {
7373 }
7474
7575 // Define protocols
76- protocols.define (" TCP (Server)" , PROTOCOL_TCP_SERVER);
76+ // protocols.define("TCP (Server)", PROTOCOL_TCP_SERVER);
7777 protocols.define (" TCP (Client)" , PROTOCOL_TCP_CLIENT);
7878 protocols.define (" UDP" , PROTOCOL_UDP);
7979
@@ -164,7 +164,31 @@ class NetworkSourceModule : public ModuleManager::Instance {
164164 NetworkSourceModule* _this = (NetworkSourceModule*)ctx;
165165 if (_this->running ) { return ; }
166166
167- // TODO
167+ // Depends on protocol
168+ try {
169+ if (_this->proto == PROTOCOL_TCP_SERVER) {
170+ // Create TCP listener
171+ // TODO
172+
173+ // Start listen worker
174+ // TODO
175+ }
176+ else if (_this->proto == PROTOCOL_TCP_CLIENT) {
177+ // Connect to TCP server
178+ _this->sock = net::connect (_this->hostname , _this->port );
179+ }
180+ else if (_this->proto == PROTOCOL_UDP) {
181+ // Open UDP socket
182+ _this->sock = net::openudp (" 0.0.0.0" , _this->port , _this->hostname , _this->port , true );
183+ }
184+ }
185+ catch (const std::exception& e) {
186+ flog::error (" Could not start Network Source: {}" , e.what ());
187+ return ;
188+ }
189+
190+ // Start receive worker
191+ _this->workerThread = std::thread (&NetworkSourceModule::worker, _this);
168192
169193 _this->running = true ;
170194 flog::info (" NetworkSourceModule '{0}': Start!" , _this->name );
@@ -174,8 +198,17 @@ class NetworkSourceModule : public ModuleManager::Instance {
174198 NetworkSourceModule* _this = (NetworkSourceModule*)ctx;
175199 if (!_this->running ) { return ; }
176200
201+ // Stop listen worker
177202 // TODO
178203
204+ // Close connection
205+ if (_this->sock ) { _this->sock ->close (); }
206+
207+ // Stop worker thread
208+ _this->stream .stopWriter ();
209+ if (_this->workerThread .joinable ()) { _this->workerThread .join (); }
210+ _this->stream .clearWriteStop ();
211+
179212 _this->running = false ;
180213 flog::info (" NetworkSourceModule '{0}': Stop!" , _this->name );
181214 }
@@ -254,12 +287,8 @@ class NetworkSourceModule : public ModuleManager::Instance {
254287
255288 while (true ) {
256289 // Read samples from socket
257- int bytes;
258- {
259- std::lock_guard lck (sockMtx);
260- bytes = sock->recv (buffer, frameSize, true );
261- if (bytes <= 0 ) { break ; }
262- }
290+ int bytes = sock->recv (buffer, frameSize, true );
291+ if (bytes <= 0 ) { break ; }
263292
264293 // Convert to CF32 (note: problem if partial sample)
265294 int count = bytes / sampleSize;
@@ -297,7 +326,7 @@ class NetworkSourceModule : public ModuleManager::Instance {
297326
298327 int samplerate = 1000000 ;
299328 int srId;
300- Protocol proto = PROTOCOL_TCP_SERVER ;
329+ Protocol proto = PROTOCOL_UDP ;
301330 int protoId;
302331 SampleType sampType = SAMPLE_TYPE_INT16;
303332 int sampTypeId;
@@ -308,6 +337,7 @@ class NetworkSourceModule : public ModuleManager::Instance {
308337 OptionList<std::string, Protocol> protocols;
309338 OptionList<std::string, SampleType> sampleTypes;
310339
340+ std::thread workerThread;
311341 std::thread listenWorkerThread;
312342
313343 std::mutex sockMtx;
0 commit comments