Skip to content
2 changes: 1 addition & 1 deletion Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -80,4 +80,4 @@ endif
libdiagnostic_la_CPPFLAGS = -I$(top_srcdir)/source/diagnostic/include -I$(top_srcdir)/source/diagnostic/BbhmDiagIpPing -I$(top_srcdir)/source/dmltad -I$(top_srcdir)/source/TandDSsp
libdiagnostic_la_DEPENDENCIES = $(EXTRA_DEPENDENCIES)
libdiagnostic_la_LIBADD = $(libdiagnostic_la_DEPENDENCIES)
libdiagnostic_la_LDFLAGS = -lccsp_common
libdiagnostic_la_LDFLAGS = -lccsp_common -lmosquitto
2 changes: 1 addition & 1 deletion source/LatencyMeasurement/xNetDP/Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -21,4 +21,4 @@ bin_PROGRAMS = xNetDP

xNetDP_CPPFLAGS = -I${PKG_CONFIG_SYSROOT_DIR}$(includedir)/rbus
xNetDP_SOURCES = xNetDP.c
xNetDP_LDFLAGS = -lpcap -lrbus -lsyscfg -lm -lpthread
xNetDP_LDFLAGS = -lpcap -lrbus -lsyscfg -lm -lpthread -lmosquitto
64 changes: 64 additions & 0 deletions source/LatencyMeasurement/xNetDP/xNetDP.c
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
#include <errno.h>
#include <math.h>
#include <rbus/rbus.h>
#include <mosquitto.h>
#include "syscfg/syscfg.h"
#define ADD_MAX_SAMPLE 10
#define MAX_SAMPLE 50
Expand Down Expand Up @@ -74,6 +75,11 @@ pthread_mutex_t latency_report_lock = PTHREAD_MUTEX_INITIALIZER;
#define TRUE 1
#define FALSE 0
#define BUF_SIZE 200
#define MQTT_LOCAL_MQTT_BROKER_IP_ADDR "192.168.245.254"
#define MQTT_LOCAL_MQTT_BROKER_PORT_VAL 1883
#define TCP_LAN_latency_TOPIC "device/TCP_LAN_latency"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

keep topic as local/tcplatency

#define MQTT_KEEPALIVE_TIME 60

enum ip_family
{
IPV4=0,
Expand Down Expand Up @@ -1049,6 +1055,55 @@ void* LatencyReportThread(void* arg)


#if 1

// Function that takes MAC + LAN latency (microseconds)
int send_latency_message(const char *mac, long long lan_latency_sec, long long lan_latency_usec) {
struct mosquitto *mosq;
char payload[256];
int rc =0;

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

init and connect we need to do just once. only publish api needs to be called while sending the latency data

// JSON payload: you can format however you need
snprintf(payload, sizeof(payload),
"{\"mac\":\"%s\", \"lan_latency_sec\":%lld.%06lld}", mac, lan_latency_sec, lan_latency_usec);

mosquitto_lib_init();
mosq = mosquitto_new("LatencyPublisher", true, NULL);
if (!mosq) {
dbg_log("Failed to create Mosquitto client");
mosquitto_lib_cleanup();
return -1;
}

rc = mosquitto_connect(mosq,
MQTT_LOCAL_MQTT_BROKER_IP_ADDR,
MQTT_LOCAL_MQTT_BROKER_PORT_VAL,
MQTT_KEEPALIVE_TIME);

if (rc != MOSQ_ERR_SUCCESS) {
dbg_log("Failed to connect to MQTT broker: %s", mosquitto_strerror(rc));

mosquitto_destroy(mosq);
mosquitto_lib_cleanup();
return 1;
}

// Publish message
rc = mosquitto_publish(mosq, NULL, TCP_LAN_latency_TOPIC,
strlen(payload), payload,
1, false);

if (rc == MOSQ_ERR_SUCCESS) {
dbg_log("Published: %s", payload);
} else {
dbg_log("Failed to publish message: %s", mosquitto_strerror(rc));
}

mosquitto_disconnect(mosq);
mosquitto_destroy(mosq);
mosquitto_lib_cleanup();
return rc;
}

void* LatencyReportThreadPerSession(void* arg)
{
// detach the current thread
Expand Down Expand Up @@ -1081,6 +1136,15 @@ void* LatencyReportThreadPerSession(void* arg)
if(hashArray[i].bComputed == TRUE)
{
tempCount = snprintf(str1,sizeof(str1),"%s,%u,%lld.%lld,%lld.%06lld|",hashArray[i].mac,hashArray[i].TcpInfo[INDEX_SYN].th_seq,hashArray[i].latency_sec,hashArray[i].latency_usec,hashArray[i].Lan_latency_sec,hashArray[i].Lan_latency_usec);
//Send LAN side Latency Notification to HCM module
int rc = send_latency_message(hashArray[i].mac,
hashArray[i].Lan_latency_sec, hashArray[i].Lan_latency_usec);

if (rc != MOSQ_ERR_SUCCESS) {
dbg_log("MQTT publish failed for MAC %s: %s",
hashArray[i].mac, mosquitto_strerror(rc));
}

if(tempCount)
{
if((byteCount+tempCount) < MAX_REPORT_SIZE)
Expand Down