Skip to content

Commit fb722db

Browse files
authored
Merge pull request #65 from insa-unyte/feature/monitoring-by-pid
Feature/monitoring by generator id
2 parents ee264ae + 4d225fa commit fb722db

32 files changed

+1188
-368
lines changed

.gitignore

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,9 @@ test_version
99
sender_json
1010
libunyte-udp-notif.so
1111
scripts/resources
12+
sender_continuous
13+
client_continuous
14+
client_monitoring
1215

1316
docker/tmp
1417

Makefile

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,11 +15,11 @@ USE_LIB=
1515
###### c-collector source code ######
1616
SDIR=src
1717
ODIR=obj
18-
_OBJS=hexdump.o listening_worker.o unyte_utils.o queue.o parsing_worker.o unyte_collector.o segmentation_buffer.o cleanup_worker.o unyte_sender.o
18+
_OBJS=hexdump.o listening_worker.o unyte_udp_utils.o unyte_udp_queue.o parsing_worker.o unyte_udp_collector.o segmentation_buffer.o cleanup_worker.o unyte_sender.o monitoring_worker.o
1919
OBJS=$(patsubst %,$(ODIR)/%,$(_OBJS))
2020

2121
###### c-collector source headers ######
22-
_DEPS=hexdump.h listening_worker.h unyte_utils.h queue.h parsing_worker.h unyte_collector.h segmentation_buffer.h cleanup_worker.h unyte_sender.h
22+
_DEPS=hexdump.h listening_worker.h unyte_udp_utils.h unyte_udp_queue.h parsing_worker.h unyte_udp_collector.h segmentation_buffer.h cleanup_worker.h unyte_sender.h monitoring_worker.h
2323
DEPS=$(patsubst %,$(SDIR)/%,$(_DEPS))
2424

2525
###### c-collector examples ######
@@ -29,7 +29,7 @@ SAMPLES_ODIR=samples/obj
2929
###### c-collector test files ######
3030
TDIR=test
3131

32-
BINS=client_sample sender_sample sender_json
32+
BINS=client_sample sender_sample sender_json client_monitoring
3333
TESTBINS=test_malloc test_queue test_seg test_listener test_version
3434

3535
all: libunyte-udp-notif.so $(BINS)
@@ -52,7 +52,10 @@ sender_sample: $(SAMPLES_ODIR)/sender_sample.o $(OBJS)
5252
sender_json: $(SAMPLES_ODIR)/sender_json.o $(OBJS)
5353
$(CC) -pthread -o $@ $^ $(LDFLAGS)
5454

55-
## test files
55+
client_monitoring: $(SAMPLES_ODIR)/client_monitoring.o $(OBJS)
56+
$(CC) -pthread -o $@ $^ $(LDFLAGS)
57+
58+
## own test files
5659
test_listener: $(TDIR)/test_listener.o $(OBJS)
5760
$(CC) -pthread -o $@ $^ $(LDFLAGS)
5861

@@ -71,6 +74,9 @@ test_version: $(TDIR)/test_version.o $(OBJS)
7174
install: libunyte-udp-notif.so
7275
./install.sh
7376

77+
uninstall:
78+
./uninstall.sh
79+
7480
build: libunyte-udp-notif.so
7581

7682
clean:

README.md

Lines changed: 70 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
# C-Collector for UDP-notif
2-
Library for collecting UDP-notif protocol messages.
2+
Library for collecting UDP-notif protocol messages defined in the IETF draft [draft-ietf-netconf-udp-notif-01](https://tools.ietf.org/html/draft-ietf-netconf-udp-notif-01).
33

44
## Build & install
5-
To build the project and test example clients, just `make` on root folder. Il will compile with gcc all dependences and the clients.
5+
To build the project and test example clients, just `make` on root folder. Il will compile with gcc all dependencies and the clients.
66

77
### Installing
88
To install the library on a machine, run `make install` with sudo and `export.sh` without sudo. Export script will export the LD_LIBRARY_PATH on user space.
@@ -14,20 +14,20 @@ $ ./export.sh
1414

1515
### Uninstalling
1616
```
17-
$ sudo ./uninstall.sh
17+
$ sudo make uninstall
1818
```
1919
You should remove the export of the lib in your bashrc manually yourself to fully remove the lib.
2020

2121
## Usage
2222
### Usage of the UDP-notif collector
2323
The collector allows to read and parse UDP-notif protocol messages from a ip/port specified on the parameters. It allows to get directly the buffer and the metadata of the message in a struct.
2424

25-
The api is in `unyte_collector.h` :
26-
- `unyte_collector_t *unyte_start_collector(unyte_options_t *options)` from `unyte_collector.h`: Initialize the UDP-notif messages collector. It accepts a struct with different options: address (the IP address to listen to), port (port to listen to), recvmmsg_vlen (vlen used on recvmmsg syscall meaning how many messages to receive on every syscall, by default 10)
27-
- `void *unyte_queue_read(queue_t *queue)` from `queue.h` : read from a queue a struct with all the message buffer and metadata.
28-
- `int unyte_free_all(unyte_seg_met_t *seg)` from `unyte_collector.h`: free all struct used on a message received.
25+
The api is in `unyte_udp_collector.h` :
26+
- `unyte_udp_collector_t *unyte_udp_start_collector(unyte_udp_options_t *options)` from `unyte_udp_collector.h`: Initialize the UDP-notif messages collector. It accepts a struct with different options: address (the IP address to listen to), port (port to listen to), recvmmsg_vlen (vlen used on recvmmsg syscall meaning how many messages to receive on every syscall, by default 10)
27+
- `void *unyte_udp_queue_read(unyte_udp_queue_t *queue)` from `unyte_udp_queue.h` : read from a queue a struct with all the message buffer and metadata.
28+
- `int unyte_udp_free_all(unyte_seg_met_t *seg)` from `unyte_udp_collector.h`: free all struct used on a message received.
2929

30-
Simple exemple of usage :
30+
Simple example of usage :
3131
```
3232
#include <stdio.h>
3333
#include <stdlib.h>
@@ -38,17 +38,17 @@ Simple exemple of usage :
3838
#include <unistd.h>
3939
4040
// include installed library headers
41-
#include <unyte-udp-notif/unyte_collector.h>
42-
#include <unyte-udp-notif/unyte_utils.h>
43-
#include <unyte-udp-notif/queue.h>
41+
#include <unyte-udp-notif/unyte_udp_collector.h>
42+
#include <unyte-udp-notif/unyte_udp_utils.h>
43+
#include <unyte-udp-notif/unyte_udp_queue.h>
4444
4545
#define PORT 10001
4646
#define ADDR "192.168.0.17"
4747
4848
int main()
4949
{
5050
// Initialize collector options
51-
unyte_options_t options = {0};
51+
unyte_udp_options_t options = {0};
5252
options.address = ADDR;
5353
options.port = PORT;
5454
// if argument set to 0, defaults are used
@@ -59,55 +59,47 @@ int main()
5959
options.parsers_queue_size = 0; // parser queue size. Default: 500
6060
6161
// Initialize collector
62-
unyte_collector_t *collector = unyte_start_collector(&options);
62+
unyte_udp_collector_t *collector = unyte_udp_start_collector(&options);
6363
64-
// Exemple with infinity loop, change the break condition to be able to free all gracefully
64+
// Example with infinity loop, change the break condition to be able to free all gracefully
6565
while (1)
6666
{
6767
// Read message on queue
68-
unyte_seg_met_t *seg = (unyte_seg_met_t *)unyte_queue_read(collector->queue);
68+
unyte_seg_met_t *seg = (unyte_seg_met_t *)unyte_udp_queue_read(collector->queue);
6969
7070
// TODO: Process the UDP-notif message here
71-
printf("get_version: %u\n", get_version(seg));
72-
printf("get_space: %u\n", get_space(seg));
73-
printf("get_encoding_type: %u\n", get_encoding_type(seg));
74-
printf("get_header_length: %u\n", get_header_length(seg));
75-
printf("get_message_length: %u\n", get_message_length(seg));
76-
printf("get_generator_id: %u\n", get_generator_id(seg));
77-
printf("get_message_id: %u\n", get_message_id(seg));
78-
printf("get_src_port: %u\n", get_src_port(seg));
79-
printf("get_src_addr: %u\n", get_src_addr(seg));
80-
printf("get_dest_addr: %u\n", get_dest_addr(seg));
81-
printf("get_payload: %s\n", get_payload(seg));
82-
printf("get_payload_length: %u\n", get_payload_length(seg));
71+
printf("unyte_udp_get_version: %u\n", unyte_udp_get_version(seg));
72+
printf("unyte_udp_get_space: %u\n", unyte_udp_get_space(seg));
73+
printf("unyte_udp_get_encoding_type: %u\n", unyte_udp_get_encoding_type(seg));
74+
printf("unyte_udp_get_header_length: %u\n", unyte_udp_get_header_length(seg));
75+
printf("unyte_udp_get_message_length: %u\n", unyte_udp_get_message_length(seg));
76+
printf("unyte_udp_get_generator_id: %u\n", unyte_udp_get_generator_id(seg));
77+
printf("unyte_udp_get_message_id: %u\n", unyte_udp_get_message_id(seg));
78+
printf("unyte_udp_get_src_port: %u\n", unyte_udp_get_src_port(seg));
79+
printf("unyte_udp_get_src_addr: %u\n", unyte_udp_get_src_addr(seg));
80+
printf("unyte_udp_get_dest_addr: %u\n", unyte_udp_get_dest_addr(seg));
81+
printf("unyte_udp_get_payload: %s\n", unyte_udp_get_payload(seg));
82+
printf("unyte_udp_get_payload_length: %u\n", unyte_udp_get_payload_length(seg));
8383
8484
// Free UDP-notif message after
85-
unyte_free_all(seg);
85+
unyte_udp_free_all(seg);
8686
}
8787
88-
// To shut down the collector, just shutdown and close the socket.
89-
shutdown(*collector->sockfd, SHUT_RDWR);
88+
// To shut down the collector, just close the socket.
9089
close(*collector->sockfd);
9190
9291
// wait for main_tread to finish
9392
pthread_join(*collector->main_thread, NULL);
9493
95-
// Free last packets in the queue
96-
while (is_queue_empty(collector->queue) != 0)
97-
{
98-
unyte_seg_met_t *seg = (unyte_seg_met_t *)unyte_queue_read(collector->queue);
99-
unyte_free_all(seg);
100-
}
101-
102-
// freeing collector mallocs
103-
unyte_free_collector(collector);
94+
// Freeing collector mallocs and last messages for every queue if there is any message not consumed
95+
unyte_udp_free_collector(collector);
10496
10597
return 0;
10698
}
10799
```
108100

109101
#### Segments data
110-
To process the message data, all the headers, meta-data and payload are found on the struct unyte_seg_met_t defined on unyte_utils.h:
102+
To process the message data, all the headers, meta-data and payload are found on the struct unyte_seg_met_t defined on unyte_udp_utils.h:
111103
```
112104
typedef struct unyte_segment_with_metadata
113105
{
@@ -117,18 +109,43 @@ typedef struct unyte_segment_with_metadata
117109
} unyte_seg_met_t;
118110
```
119111
##### Getters for segments data
120-
- `uint8_t get_version(unyte_seg_met_t *message);` : encoding version
121-
- `uint8_t get_space(unyte_seg_met_t *message);` : space of encoding version
122-
- `uint8_t get_encoding_type(unyte_seg_met_t *message);` : dentifier to indicate the encoding type used for the Notification Message
123-
- `uint16_t get_header_length(unyte_seg_met_t *message);` : length of the message header in octets
124-
- `uint16_t get_message_length(unyte_seg_met_t *message);` : total length of the message within one UDP datagram, measured in octets, including the message header
125-
- `uint32_t get_generator_id(unyte_seg_met_t *message);` : observation domain id of the message
126-
- `uint32_t get_message_id(unyte_seg_met_t *message);` : message id of the message
127-
- `uint16_t get_src_port(unyte_seg_met_t *message);` : source port of the message
128-
- `uint32_t get_src_addr(unyte_seg_met_t *message);` : source address of the message
129-
- `uint32_t get_dest_addr(unyte_seg_met_t *message);` : collector address
130-
- `char *get_payload(unyte_seg_met_t *message);` : payload buffer
131-
- `uint16_t get_payload_length(unyte_seg_met_t *message);` : payload length
112+
- `uint8_t unyte_udp_get_version(unyte_seg_met_t *message);` : encoding version
113+
- `uint8_t unyte_udp_get_space(unyte_seg_met_t *message);` : space of encoding version
114+
- `uint8_t unyte_udp_get_encoding_type(unyte_seg_met_t *message);` : dentifier to indicate the encoding type used for the Notification Message
115+
- `uint16_t unyte_udp_get_header_length(unyte_seg_met_t *message);` : length of the message header in octets
116+
- `uint16_t unyte_udp_get_message_length(unyte_seg_met_t *message);` : total length of the message within one UDP datagram, measured in octets, including the message header
117+
- `uint32_t unyte_udp_get_generator_id(unyte_seg_met_t *message);` : observation domain id of the message
118+
- `uint32_t unyte_udp_get_message_id(unyte_seg_met_t *message);` : message id of the message
119+
- `uint16_t unyte_udp_get_src_port(unyte_seg_met_t *message);` : source port of the message
120+
- `uint32_t unyte_udp_get_src_addr(unyte_seg_met_t *message);` : source address of the message
121+
- `uint32_t unyte_udp_get_dest_addr(unyte_seg_met_t *message);` : collector address
122+
- `char *unyte_udp_get_payload(unyte_seg_met_t *message);` : payload buffer
123+
- `uint16_t unyte_udp_get_payload_length(unyte_seg_met_t *message);` : payload length
124+
125+
#### Monitoring of the lib
126+
There is a monitoring thread that could be started to monitor packets loss and packets received in bad order.
127+
To activate this thread, you must initiate the monitoring thread queue size (`monitoring_queue_size`):
128+
```
129+
typedef struct
130+
{
131+
char *address;
132+
uint16_t port;
133+
...
134+
uint monitoring_queue_size; // monitoring queue size if wanted to activate the monitoring thread. Default: 0. Recommended: 500.
135+
uint monitoring_delay; // monitoring queue frequence in seconds. Default: 5 seconds
136+
} unyte_udp_options_t;
137+
```
138+
The thread will every `monitoring_delay` seconds send all generators id's counters.
139+
140+
##### Type of threads
141+
The threads types are defined in `monitoring_worker.h`:
142+
- `PARSER_WORKER`: worker in charge of parsing the segments. Reassembles or saves in memory the segmented messages.
143+
- `LISTENER_WORKER`: worker in charge of receiving the bytes from the socket. It calls `recvmmsg()` syscall to receive multiple messages at once.
144+
145+
##### Packets loss
146+
Two usecases are possible monitoring packets loss:
147+
- Drops on `PARSER_WORKER`: It means the client consuming the parsed messages is not consuming that fast. You may want to multithread the client consuming the `collector->queue` (output_queue) or increase the `output_queue_size` option to avoid packets drops on spikes.
148+
- Drops on `LISTENER_WORKER`: It means the `N` parsers are not consuming that fast and the `LISTENER_WORKER` is pushing to the `input_queue` faster than the parsers could read. You may want to increment the number of parsers instantiated or increase `parsers_queue_size` option to avoid packets drops on spikes.
132149

133150
### Usage of the sender
134151
The sender allows the user to send UDP-notif protocol to a IP/port specified. It cuts the message into segments of the protocol if it is larger than the MTU specified in parameters.
@@ -158,7 +175,7 @@ Simple usage of the sender :
158175
#include <stdlib.h>
159176
160177
#include <unyte-udp-notif/unyte_sender.h>
161-
#include <unyte-udp-notif/unyte_utils.h>
178+
#include <unyte-udp-notif/unyte_udp_utils.h>
162179
163180
#define PORT 10001
164181
#define ADDR "192.168.0.17"

install.sh

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@ LIB_DIR=lib
55
H_DIR=include
66
PKG_DIR=/usr/lib/pkgconfig
77

8+
export PKG_CONFIG_PATH=$PKG_CONFIG_PATH:/usr/local/lib/pkgconfig
9+
810
if [ "$EUID" -ne 0 ]
911
then
1012
echo "Please run as root."
@@ -43,7 +45,7 @@ fi
4345

4446
echo "Copying pkg-config file to $PKG_DIR"
4547
sed -e "s/<<install>>/${INSTALL_DIR//\//\\/}/g" -e "s/<<include>>/$H_DIR/g" -e "s/<<lib>>/$LIB_DIR/g" unyte-pkg.pc > unyte-udp-notif.pc
46-
cp unyte-udp-notif.pc $PKG_DIR
48+
cp unyte-udp-notif.pc $PKG_DIR/unyte-udp-notif.pc
4749

4850
if [ $? -ne 0 ]
4951
then

samples/client_monitoring.c

Lines changed: 121 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,121 @@
1+
#include <stdio.h>
2+
#include <stdlib.h>
3+
#include <pthread.h>
4+
#include <stdint.h>
5+
#include <string.h>
6+
#include <signal.h>
7+
#include <unistd.h>
8+
9+
#include "../src/hexdump.h"
10+
#include "../src/unyte_udp_collector.h"
11+
#include "../src/unyte_udp_utils.h"
12+
#include "../src/unyte_udp_queue.h"
13+
#include "../src/monitoring_worker.h"
14+
15+
#define USED_VLEN 10
16+
#define MAX_TO_RECEIVE 20
17+
18+
struct read_input
19+
{
20+
unyte_udp_queue_t *output_queue;
21+
int stop;
22+
};
23+
24+
void *t_read(void *input)
25+
{
26+
struct read_input *in = (struct read_input *)input;
27+
unyte_udp_queue_t *output_queue = in->output_queue;
28+
while (in->stop == 0)
29+
{
30+
unyte_seg_met_t *seg = (unyte_seg_met_t *)unyte_udp_queue_read(output_queue);
31+
unyte_udp_free_all(seg);
32+
}
33+
pthread_exit(NULL);
34+
}
35+
36+
int main(int argc, char *argv[])
37+
{
38+
if (argc != 3)
39+
{
40+
printf("Error: arguments not valid\n");
41+
printf("Usage: ./client_monitoring <ip> <port>\n");
42+
exit(1);
43+
}
44+
45+
// Initialize collector options
46+
unyte_udp_options_t options = {0};
47+
options.address = argv[1];
48+
options.port = atoi(argv[2]);
49+
options.recvmmsg_vlen = USED_VLEN;
50+
options.monitoring_delay = 2;
51+
options.monitoring_queue_size = 500;
52+
53+
printf("Listening on %s:%d\n", options.address, options.port);
54+
55+
/* Initialize collector */
56+
unyte_udp_collector_t *collector = unyte_udp_start_collector(&options);
57+
int recv_count = 0;
58+
int max = MAX_TO_RECEIVE;
59+
60+
struct read_input input = {0};
61+
input.output_queue = collector->queue;
62+
input.stop = 0;
63+
64+
// Thread simulating collector reading packets
65+
pthread_t *th_read = (pthread_t *)malloc(sizeof(pthread_t));
66+
pthread_create(th_read, NULL, t_read, (void *)&input);
67+
68+
while (recv_count < max)
69+
{
70+
void *counter_pointer = unyte_udp_queue_read(collector->monitoring_queue);
71+
if (counter_pointer == NULL)
72+
{
73+
printf("counter_pointer null\n");
74+
fflush(stdout);
75+
}
76+
unyte_udp_sum_counter_t *counter = (unyte_udp_sum_counter_t *)counter_pointer;
77+
78+
// Getters
79+
// printf("Thread id: %ld\n", unyte_udp_get_thread_id(counter));
80+
// printf("Thread type: %d\n", unyte_udp_get_th_type(counter));
81+
// printf("Generator id: %d\n", unyte_udp_get_gen_id(counter));
82+
// printf("Last msg id: %d\n", unyte_udp_get_last_msg_id(counter));
83+
// printf("Received OK: %d\n", unyte_udp_get_received_seg(counter));
84+
// printf("Dropped: %d\n", unyte_udp_get_dropped_seg(counter));
85+
// printf("Reordered: %d\n", unyte_udp_get_reordered_seg(counter));
86+
87+
unyte_udp_print_counters(counter, stdout);
88+
89+
// listening worker is losing segments
90+
if (counter->type == LISTENER_WORKER && unyte_udp_get_dropped_seg(counter) > 0)
91+
{
92+
// The input_queue used by the parser is becoming full and is dropping messages. Try instantiating more
93+
// parsers or increasing the parsers_queue_size.
94+
// printf("/!\\ Losing messages on input_queue. The parsers threads are not consuming the segments that fast.\n");
95+
}
96+
// parser worker is losing segments
97+
else if (counter->type == PARSER_WORKER && unyte_udp_get_dropped_seg(counter) > 0)
98+
{
99+
// The output_queue used by the client is becoming full and is dropping messages. Try multithreading the
100+
// client (this main thread) to consume faster or increasing the output_queue_size.
101+
// printf("/!\\ Losing messages on output_queue. The main client is not consuming the parsed messages that fast.\n");
102+
}
103+
104+
recv_count++;
105+
fflush(stdout);
106+
free(counter);
107+
}
108+
109+
printf("Shutdown the socket\n");
110+
close(*collector->sockfd);
111+
112+
input.stop = 1;
113+
pthread_join(*th_read, NULL);
114+
pthread_join(*collector->main_thread, NULL);
115+
116+
// freeing collector mallocs
117+
unyte_udp_free_collector(collector);
118+
free(th_read);
119+
fflush(stdout);
120+
return 0;
121+
}

0 commit comments

Comments
 (0)