Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion contrib/libmqtt/ttq/inc/ttqLogging.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ extern "C" {
#define __attribute__(attrib)
#endif

int ttq_log(struct tmqtt *ttq, unsigned int level, const char *fmt, ...) __attribute__((format(printf, 3, 4)));
void ttq_log(struct tmqtt *ttq, unsigned int level, const char *fmt, ...) __attribute__((format(printf, 3, 4)));

#ifdef __cplusplus
}
Expand Down
2 changes: 1 addition & 1 deletion source/libs/tmqtt/mgmt/src/tmqttMgmt.c
Original file line number Diff line number Diff line change
Expand Up @@ -320,7 +320,7 @@ void mqttMgmtStopMqttd() {
}
atomic_store_32(&pData->stopCalled, 1);
pData->needCleanUp = false;
uv_process_kill(&pData->process, SIGTERM);
UNUSED(uv_process_kill(&pData->process, SIGTERM));
uv_barrier_destroy(&pData->barrier);

if (uv_thread_join(&pData->thread) != 0) {
Expand Down
20 changes: 10 additions & 10 deletions source/libs/tmqtt/mqtt/src/tmqttContext.c
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,12 @@

#include <time.h>

#include "tthash.h"
#include "ttqAlias.h"
#include "ttqMemory.h"
#include "ttqPacket.h"
#include "ttqProperty.h"
#include "ttqTime.h"
#include "tthash.h"
#include "ttqUtil.h"

struct tmqtt *ttqCxtInit(ttq_sock_t sock) {
Expand All @@ -35,7 +35,7 @@ struct tmqtt *ttqCxtInit(ttq_sock_t sock) {
#ifdef WITH_EPOLL
context->ident = id_client;
#endif
tmqtt__set_state(context, ttq_cs_new);
UNUSED(tmqtt__set_state(context, ttq_cs_new));
context->sock = sock;
context->last_msg_in = db.now_s;
context->next_msg_out = db.now_s + 60;
Expand Down Expand Up @@ -127,12 +127,12 @@ void ttqCxtCleanup(struct tmqtt *context, bool force_free) {
ttq_free(context->password);
context->password = NULL;

net__socket_close(context);
UNUSED(net__socket_close(context));
if (force_free) {
ttqSubCleanSession(context);
UNUSED(ttqSubCleanSession(context));
}

ttqDbMessageDelete(context, force_free);
UNUSED(ttqDbMessageDelete(context, force_free));

ttq_free(context->address);
context->address = NULL;
Expand Down Expand Up @@ -179,7 +179,7 @@ void ttqCxtDisconnect(struct tmqtt *context) {
// plugin__handle_disconnect(context, -1);

// ttqCxtSendWill(context);
net__socket_close(context);
UNUSED(net__socket_close(context));
{
if (context->session_expiry_interval == 0) {
/* Client session is due to be expired now */
Expand All @@ -188,17 +188,17 @@ void ttqCxtDisconnect(struct tmqtt *context) {
ttqCxtAddToDisused(context);
}
} else {
ttqSessionExpiryAdd(context);
UNUSED(ttqSessionExpiryAdd(context));
}
}
ttqKeepaliveRemove(context);
tmqtt__set_state(context, ttq_cs_disconnected);
UNUSED(ttqKeepaliveRemove(context));
UNUSED(tmqtt__set_state(context, ttq_cs_disconnected));
}

void ttqCxtAddToDisused(struct tmqtt *context) {
if (context->state == ttq_cs_disused) return;

tmqtt__set_state(context, ttq_cs_disused);
UNUSED(tmqtt__set_state(context, ttq_cs_disused));

if (context->id) {
ttqCxtRemoveFromById(context);
Expand Down
14 changes: 7 additions & 7 deletions source/libs/tmqtt/mqtt/src/tmqttLogging.c
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,10 @@
#include <sys/stat.h>
#endif

#include "tmqttBrokerInt.h"
#include "ttqLogging.h"
#include "ttqMemory.h"
#include "ttqMisc.h"
#include "tmqttBrokerInt.h"
#include "ttqUtil.h"

static char log_fptr_buffer[BUFSIZ];
Expand Down Expand Up @@ -105,15 +105,15 @@ int ttqLogInit(struct tmqtt__config *config) {
if (log_destinations & MQTT3_LOG_FILE) {
config->log_fptr = tmqtt__fopen(config->log_file, "at", true);
if (config->log_fptr) {
setvbuf(config->log_fptr, log_fptr_buffer, _IOLBF, sizeof(log_fptr_buffer));
UNUSED(setvbuf(config->log_fptr, log_fptr_buffer, _IOLBF, sizeof(log_fptr_buffer)));
} else {
log_destinations = MQTT3_LOG_STDERR;
log_priorities = TTQ_LOG_ERR;
ttq_log(NULL, TTQ_LOG_ERR, "Error: Unable to open log file %s for writing.", config->log_file);
}
}
if (log_destinations & MQTT3_LOG_STDOUT) {
setvbuf(stdout, NULL, _IOLBF, 0);
UNUSED(setvbuf(stdout, NULL, _IOLBF, 0));
}
#ifdef WITH_DLT
if (log_destinations & MQTT3_LOG_DLT) {
Expand All @@ -133,7 +133,7 @@ int ttqLogClose(struct tmqtt__config *config) {
}
if (log_destinations & MQTT3_LOG_FILE) {
if (config->log_fptr) {
fclose(config->log_fptr);
UNUSED(fclose(config->log_fptr));
config->log_fptr = NULL;
}
}
Expand Down Expand Up @@ -251,7 +251,7 @@ static int log__vprintf(unsigned int priority, const char *fmt, va_list va) {
if (log_timestamp) {
if (log_timestamp_format) {
struct tm *ti = NULL;
get_time(&ti);
UNUSED(get_time(&ti));
log_line_pos = strftime(log_line, sizeof(log_line), log_timestamp_format, ti);
if (log_line_pos == 0) {
log_line_pos = (size_t)snprintf(log_line, sizeof(log_line), "Time error");
Expand Down Expand Up @@ -311,7 +311,7 @@ static int log__vprintf(unsigned int priority, const char *fmt, va_list va) {
return TTQ_ERR_SUCCESS;
}

int ttq_log(struct tmqtt *ttq, unsigned int priority, const char *fmt, ...) {
void ttq_log(struct tmqtt *ttq, unsigned int priority, const char *fmt, ...) {
va_list va;
int rc;

Expand All @@ -321,7 +321,7 @@ int ttq_log(struct tmqtt *ttq, unsigned int priority, const char *fmt, ...) {
rc = log__vprintf(priority, fmt, va);
va_end(va);

return rc;
UNUSED(rc);
}

void ttqLogInternal(const char *fmt, ...) {
Expand Down
8 changes: 4 additions & 4 deletions source/libs/tmqtt/mqtt/src/tmqttSocket.c
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ static void listeners__stop(void) {

for (i = 0; i < listensock_count; i++) {
if (listensock[i].sock != INVALID_SOCKET) {
COMPAT_CLOSE(listensock[i].sock);
UNUSED(COMPAT_CLOSE(listensock[i].sock));
}
}

Expand All @@ -152,7 +152,7 @@ static void listeners__stop(void) {
static void ttq_rand_init(void) {
struct timeval tv;

gettimeofday(&tv, NULL);
UNUSED(gettimeofday(&tv, NULL));
srand((unsigned int)(tv.tv_sec + tv.tv_usec));
}

Expand Down Expand Up @@ -322,7 +322,7 @@ static int ttq_init(int argc, char *argv[], struct tmqtt__config *config) {
}

static void ttq_cleanup(void) {
ttqMuxCleanup();
UNUSED(ttqMuxCleanup());

ttq_log_stopping();

Expand All @@ -333,7 +333,7 @@ static void ttq_cleanup(void) {
ttqSessionExpiryRemoveAll();
ttq_cxt_cleanup();
listeners__stop();
ttqDbClose();
UNUSED(ttqDbClose());
// tmqtt_security_module_cleanup();
ttqLogClose(db.config);
ttqConfigCleanup(db.config);
Expand Down
19 changes: 16 additions & 3 deletions source/libs/tmqtt/tools/src/topic-producer.c
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@
#include "cJSON.h"
#include "taos.h"

#ifndef UNUSED
#define UNUSED(A) (void)(A)
#endif

volatile int thread_stop = 0;
static int running = 1;
static int count = 0;
Expand Down Expand Up @@ -117,7 +121,7 @@ static void* prep_data(void* arg) {
#ifdef WINDOWS
Sleep(1000);
#else
usleep(1);
UNUSED(usleep(1));
#endif
}
fprintf(stdout, "Prepare data thread exit\n");
Expand Down Expand Up @@ -546,7 +550,11 @@ int topic_prep(void) {
}

// thread_stop = 1;
pthread_join(thread_id, NULL);
int rc = pthread_join(thread_id, NULL);
if (rc) {
fprintf(stderr, "Failed to join thread in topic_prep: %s\n", strerror(rc));
return -1;
}

return 0;
}
Expand Down Expand Up @@ -1180,7 +1188,12 @@ int main(int argc, char* argv[]) {
}

thread_stop = 1;
pthread_join(thread_id, NULL);

rc = pthread_join(thread_id, NULL);
if (rc) {
fprintf(stderr, "Failed to join thread: %s\n", strerror(rc));
return -1;
}

if (drop_topic_without_connect(pConn) < 0) {
fprintf(stderr, "Failed to drop topic.\n");
Expand Down
Loading