Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
106 changes: 105 additions & 1 deletion client/sub_client.c
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,9 @@ bool connack_received = false;
#ifdef WIN32
static HANDLE timeout_h = NULL;
#endif
static int mid_sent = 0;
static int last_mid_sent = -1;
static bool connected = true;

#ifdef WIN32
void CALLBACK timeout_cb(PVOID lpParameter, BOOLEAN TimerOrWaitFired)
Expand Down Expand Up @@ -198,6 +201,102 @@ static void my_log_callback(struct mosquitto *mosq, void *obj, int level, const
printf("%s\n", str);
}

void my_publish_callback(struct mosquitto *mosq, void *obj, int mid, int reason_code, const mosquitto_property *properties)
{
UNUSED(mosq);
UNUSED(obj);
UNUSED(reason_code);
UNUSED(properties);

last_mid_sent = mid;
if(mid == last_mid){
printf("published\n");
}
}

void my_disconnect_callback(struct mosquitto *mosq, void *obj, int rc, const mosquitto_property *properties)
{
UNUSED(mosq);
UNUSED(obj);
UNUSED(rc);
UNUSED(properties);

connected = false;
}

static int pub_stdin_line_loop(struct mosquitto *mosq)
{
char *buf, *buf2;
int buf_len = 1024;
int buf_len_actual;
int read_len;
int pos;
int rc;

buf = malloc((size_t)buf_len);
if(!buf){
fprintf(stderr, "Error: Out of memory.\n");
return 1;
}

do {
pos = 0;
read_len = buf_len;
while(fgets(&buf[pos], read_len, stdin)){
buf_len_actual = (int)strlen(buf);
if(buf[buf_len_actual-1] == '\n'){
char *topic, *token;
int qos,retain;
rc = MOSQ_ERR_INVAL;
topic = strtok(buf,":");
token = strtok(NULL,":");
if(!token)
goto pub_err;
qos = atoi(token);
token = strtok(NULL,":");
if(!token)
goto pub_err;
retain = atoi(token);
buf[buf_len_actual-1] = '\0';
buf += strlen(topic)+5;
buf_len_actual = (int)strlen(buf);
rc = mosquitto_publish(mosq, &mid_sent, topic, buf_len_actual, buf, qos, retain);
pub_err:
if(rc){
fprintf(stderr, "Error: Publish returned %d\n", rc);
}
break;
}else{
buf_len += 1024;
pos += 1023;
read_len = 1024;
buf2 = realloc(buf, (size_t)buf_len);
if(!buf2){
free(buf);
fprintf(stderr, "Publish Error: Out of memory.\n");
buf = malloc((size_t)buf_len);
if(!buf){
fprintf(stderr, "Error: Out of memory.\n");
return 1;
}
}
buf = buf2;
}
}

if(feof(stdin) && (last_mid != -1))
last_mid = mid_sent;

struct timespec ts;
ts.tv_sec = 0;
ts.tv_nsec = 100000000;
nanosleep(&ts, NULL);
} while(connected);

free(buf);
return MOSQ_ERR_SUCCESS;
}

static void print_version(void)
{
int major, minor, revision;
Expand Down Expand Up @@ -397,7 +496,9 @@ int main(int argc, char *argv[])
}
mosquitto_subscribe_callback_set(g_mosq, my_subscribe_callback);
mosquitto_connect_v5_callback_set(g_mosq, my_connect_callback);
mosquitto_disconnect_v5_callback_set(g_mosq, my_disconnect_callback);
mosquitto_message_v5_callback_set(g_mosq, my_message_callback);
mosquitto_publish_v5_callback_set(g_mosq, my_publish_callback);

output_init(&cfg);

Expand Down Expand Up @@ -438,8 +539,11 @@ int main(int argc, char *argv[])
}
#endif

rc = mosquitto_loop_forever(g_mosq, -1, 1);
mosquitto_loop_start(g_mosq);

rc = pub_stdin_line_loop(g_mosq);

mosquitto_loop_stop(g_mosq, false);
mosquitto_destroy(g_mosq);
mosquitto_lib_cleanup();

Expand Down