Skip to content
This repository was archived by the owner on Jan 25, 2024. It is now read-only.

Commit 7a58f6e

Browse files
committed
Finished Inital Latice work for all supported service types. Service Needs to be refactored to take a Pattern in but that will be harder then it sounds so for now people using Service will need to also standup a patern and Service will simply handle announcments and call backs for new services. I'll stream line this later.
1 parent 08f43db commit 7a58f6e

File tree

26 files changed

+1556
-85
lines changed

26 files changed

+1556
-85
lines changed

framework.sublime-project

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,12 @@
33
[
44
{
55
"path": "projects"
6+
},
7+
{
8+
"path": "cmake"
9+
},
10+
{
11+
"path": "share"
612
}
713
]
814
}

projects/libpfc_net/cpp/net/Service.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ Service::Service(const Config config, const std::string& multicast_bind_address,
6464
pfc_service_announcment service;
6565
service._name = config.name;
6666
service._protacol = (config.style == Config::pub_sub) ? pfc_protocol::pub_sub : pfc_protocol::req_req;
67-
service._address = config.address;
67+
service._address = config.address.c_str();
6868
service._brief = config.brief;
6969
service._port = config.port;
7070

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
#ifndef SUSTAIN_FRAMEWORK_NET_NANOMSG_HELPER_H
2+
#define SUSTAIN_FRAMEWORK_NET_NANOMSG_HELPER_H
3+
4+
#include <sustain/framework/util/Error.h>
5+
6+
#include <nanomsg/nn.h>
7+
8+
namespace pfc {
9+
inline Error nano_to_Error(int code)
10+
{
11+
Error l_ec;
12+
switch (code) {
13+
14+
case EAFNOSUPPORT:
15+
l_ec = Error::PFC_ADDRESS_FAMILY_NOT_SUPPORTED;
16+
break;
17+
case EINVAL:
18+
l_ec = Error::PFC_PROTOCOL_NOT_SUPPORTED;
19+
break;
20+
case EMFILE:
21+
l_ec = Error::PFC_SOCKET_LIMIT_REACHED;
22+
break;
23+
case ETERM:
24+
l_ec = Error::PFC_LIBRARY_SHUTDOWN;
25+
break;
26+
case EBADF:
27+
l_ec = Error::PFC_INVALID_SOCKET;
28+
break;
29+
case ENAMETOOLONG:
30+
l_ec = Error::PFC_BIND_ERROR;
31+
break;
32+
case EPROTONOSUPPORT:
33+
l_ec = Error::PFC_PROTOCOL_NOT_SUPPORTED;
34+
break;
35+
case EADDRNOTAVAIL:
36+
l_ec = Error::PFC_INVALID_ENDPOINT;
37+
break;
38+
case ENODEV:
39+
l_ec = Error::PFC_BIND_ERROR;
40+
break;
41+
case EADDRINUSE:
42+
l_ec = Error::PFC_ADDRESS_IN_USE;
43+
break;
44+
case ENOTSUP:
45+
l_ec = Error::PFC_BAD_OPERATION;
46+
break;
47+
case EFSM:
48+
l_ec = Error::PFC_BAD_OPERATION;
49+
break;
50+
case EAGAIN:
51+
l_ec = Error::PFC_BAD_OPERATION;
52+
break;
53+
case EINTR:
54+
l_ec = Error::PFC_INTERUPT;
55+
break;
56+
case ETIMEDOUT:
57+
l_ec = Error::PFC_TIMEOUT;
58+
break;
59+
default:
60+
break;
61+
}
62+
return l_ec;
63+
}
64+
//-------------------------------------------------------------------------------
65+
}
66+
#endif //SUSTAIN_FRAMEWORK_NET_NANOMSG_HELPER_H
Lines changed: 122 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,128 @@
11
#include <sustain/framework/net/patterns/pub_sub/Publisher.h>
22

3+
#include <iostream>
4+
#include <thread>
5+
6+
#include <nanomsg/pubsub.h>
7+
8+
#include "../nanomsg_helper.h"
9+
310
namespace pfc {
4-
PubSub_Publisher::~PubSub_Publisher() {}
511

6-
void PubSub_Publisher::broadcast(std::function<void(void)>) {}
7-
void PubSub_Publisher::async_broadcast(std::function<void(void)>) {}
12+
struct nn_buffer : std::streambuf {
13+
nn_buffer(char* begin, char* end)
14+
{
15+
this->setg(begin, begin, end);
16+
}
17+
};
18+
19+
struct PubSub_Publisher::Implementation {
20+
Implementation(URI&&);
21+
~Implementation();
22+
23+
Implementation(const Implementation&) = delete;
24+
Implementation(Implementation&&) = delete;
25+
Implementation& operator==(const Implementation&) = delete;
26+
Implementation& operator==(Implementation&&) = delete;
27+
28+
URI uri;
29+
int socket;
30+
int rv;
31+
char* msg_buffer;
32+
bool running;
33+
34+
void publish();
35+
36+
std::thread pubsub_main_thread;
37+
BroadcastFunc generate_message_func;
38+
39+
Error ec;
40+
};
41+
//-------------------------------------------------------------------------------
42+
PubSub_Publisher::Implementation::~Implementation()
43+
{
44+
if(rv && socket)
45+
{
46+
nn_shutdown(socket,rv);
47+
}
848

9-
void PubSub_Publisher::standup() {}
10-
void PubSub_Publisher::shutdown() {}
49+
if(socket)
50+
{
51+
nn_close(socket);
52+
}
53+
if(msg_buffer)
54+
{
55+
nn_freemsg(msg_buffer);
56+
msg_buffer = nullptr;
57+
}
58+
}
59+
//-------------------------------------------------------------------------------
60+
PubSub_Publisher::Implementation::Implementation(URI&& u)
61+
: uri(std::move(u))
62+
, socket(0)
63+
, rv(0)
64+
, msg_buffer(nullptr)
65+
, running(false)
66+
{
67+
if ((socket = nn_socket(AF_SP, NN_PUB)) < 0) {
68+
ec = nano_to_Error(socket);
69+
}
70+
if ((rv = nn_bind(socket, uri.c_str())) < 0) {
71+
ec = nano_to_Error(rv);
72+
}
73+
}
74+
//-----------------------------------------------------------------------------
75+
void PubSub_Publisher::Implementation::publish()
76+
{
77+
do {
78+
int bytes = 0;
79+
auto buffer = generate_message_func();
80+
if ((bytes = nn_send(socket, &buffer[0], buffer.size(), 0)) < 0) {
81+
ec = nano_to_Error(bytes);
82+
}
83+
} while (running);
84+
}
85+
//-----------------------------------------------------------------------------
86+
PubSub_Publisher::PubSub_Publisher(URI uri)
87+
: _impl(std::make_unique<Implementation>(std::move(uri)))
88+
{
89+
}
90+
//-----------------------------------------------------------------------------
91+
PubSub_Publisher::~PubSub_Publisher()
92+
{
93+
nn_close(_impl->socket);
94+
_impl = nullptr;
95+
}
96+
//-----------------------------------------------------------------------------
97+
void PubSub_Publisher::set_response_callaback_func(CallbackFunc func)
98+
{
99+
}
100+
//-----------------------------------------------------------------------------
101+
void PubSub_Publisher::broadcast(BroadcastFunc func)
102+
{
103+
_impl->generate_message_func = func;
104+
_impl->running = false;
105+
_impl->publish();
106+
}
107+
//-----------------------------------------------------------------------------
108+
void PubSub_Publisher::async_broadcast(BroadcastFunc func)
109+
{
110+
_impl->generate_message_func = func;
111+
_impl->running = true;
112+
_impl->pubsub_main_thread = std::thread(&Implementation::publish, _impl.get());
113+
}
114+
//-----------------------------------------------------------------------------
115+
void PubSub_Publisher::standup()
116+
{
117+
}
118+
//-----------------------------------------------------------------------------
119+
void PubSub_Publisher::shutdown()
120+
{
121+
_impl->running = true;
122+
if (_impl->socket) {
123+
nn_close(_impl->socket);
124+
_impl->socket = 0;
125+
}
126+
}
127+
//-----------------------------------------------------------------------------
11128
}
Lines changed: 104 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,111 @@
11
#include <sustain/framework/net/patterns/pub_sub/Subscriber.h>
22

3+
#include <thread>
4+
5+
#include <nanomsg/pubsub.h>
6+
7+
#include "../nanomsg_helper.h"
38
#include <sustain/framework/util/Error.h>
49

510
namespace pfc {
6-
PubSub_Subscriber::~PubSub_Subscriber() {}
11+
struct PubSub_Subscriber::Implementation {
12+
Implementation(URI&&);
13+
~Implementation();
14+
15+
URI uri;
16+
int socket;
17+
int rv;
18+
char* msg_buffer;
19+
bool running;
20+
21+
void listen();
22+
23+
std::thread pubsub_main_thread;
24+
ListenFunc message_process_function;
725

8-
void PubSub_Subscriber::listen(std::function<void(void)>) {}
9-
void PubSub_Subscriber::async_listen(std::function<void(void)>) {}
26+
Error ec;
27+
};
28+
//-------------------------------------------------------------------------------
29+
PubSub_Subscriber::Implementation::~Implementation()
30+
{
31+
if (rv && socket) {
32+
nn_shutdown(socket, rv);
33+
}
1034

11-
void PubSub_Subscriber::standup() {}
12-
void PubSub_Subscriber::shutdown() {}
13-
}
35+
if (socket) {
36+
nn_close(socket);
37+
}
38+
if (msg_buffer) {
39+
nn_freemsg(msg_buffer);
40+
msg_buffer = nullptr;
41+
}
42+
}
43+
//-------------------------------------------------------------------------------
44+
PubSub_Subscriber::Implementation::Implementation(URI&& u)
45+
: uri(std::move(u))
46+
, socket(0)
47+
, rv(0)
48+
, msg_buffer(nullptr)
49+
, running(false)
50+
{
51+
if ((socket = nn_socket(AF_SP, NN_SUB)) < 0) {
52+
ec = nano_to_Error(socket);
53+
}
54+
if (nn_setsockopt(socket, NN_SUB, NN_SUB_SUBSCRIBE, "", 0) < 0) {
55+
ec = nano_to_Error(socket);
56+
}
57+
if (nn_connect(socket, uri.c_str()) < 0) {
58+
ec = nano_to_Error(socket);
59+
}
60+
}
61+
//-------------------------------------------------------------------------------
62+
void PubSub_Subscriber::Implementation::listen()
63+
{
64+
do {
65+
char* l_buffer = NULL;
66+
int bytes;
67+
if ((bytes = nn_recv(socket, &l_buffer, NN_MSG, 0)) < 0) {
68+
ec = nano_to_Error(bytes);
69+
}
70+
std::vector<char> response = message_process_function(l_buffer, bytes);
71+
nn_freemsg(l_buffer);
72+
} while (running);
73+
}
74+
//-------------------------------------------------------------------------------
75+
PubSub_Subscriber::PubSub_Subscriber(URI uri)
76+
: _impl(std::make_unique<Implementation>(std::move(uri)))
77+
{
78+
}
79+
//-------------------------------------------------------------------------------
80+
PubSub_Subscriber::~PubSub_Subscriber()
81+
{
82+
if (_impl->socket) {
83+
nn_close(_impl->socket);
84+
}
85+
_impl->socket = 0;
86+
}
87+
//-------------------------------------------------------------------------------
88+
void PubSub_Subscriber::listen(ListenFunc func)
89+
{
90+
_impl->message_process_function = func;
91+
_impl->running = false;
92+
_impl->listen();
93+
}
94+
//-------------------------------------------------------------------------------
95+
void PubSub_Subscriber::async_listen(ListenFunc func)
96+
{
97+
_impl->message_process_function = func;
98+
_impl->running = true;
99+
_impl->pubsub_main_thread = std::thread(&Implementation::listen, _impl.get());
100+
}
101+
//-------------------------------------------------------------------------------
102+
void PubSub_Subscriber::standup()
103+
{
104+
}
105+
//-------------------------------------------------------------------------------
106+
void PubSub_Subscriber::shutdown()
107+
{
108+
_impl->running = false;
109+
}
110+
//-------------------------------------------------------------------------------
111+
}

0 commit comments

Comments
 (0)