Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions net/net_appsock.h
Original file line number Diff line number Diff line change
Expand Up @@ -72,4 +72,6 @@ void rem_appsock_connection_evbuffer(void);
#define container_of(ptr, type) (type *)((uint8_t *)ptr - offsetof(type, ptr))
#endif

#define timeval_to_ms(x) x.tv_sec * 1000 + x.tv_usec / 1000

#endif
1 change: 0 additions & 1 deletion net/net_evbuffer.c
Original file line number Diff line number Diff line change
Expand Up @@ -471,7 +471,6 @@ static void *do_pstack(void *arg)
return NULL;
}

#define timeval_to_ms(x) x.tv_sec * 1000 + x.tv_usec / 1000
int gbl_timer_warn_interval = 1500; //msec. To disable check, set to 0.
int gbl_timer_pstack_threshold = 5000; //msec.
int gbl_timer_pstack_interval = 0; //sec. To disable pstack, but keep monitoring, set to 0.
Expand Down
25 changes: 16 additions & 9 deletions net/sqlwriter.c
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@
#include <sqlwriter.h>
#include <ssl_evbuffer.h>

//send heartbeat if no data every (seconds)
#define min_hb_time 1
//send heartbeat if no data every (msec)
#define min_hb_time 500

struct sqlwriter {
sql_dispatch_timeout_fn *dispatch_timeout;
Expand All @@ -44,7 +44,7 @@ struct sqlwriter {
struct event_base *timer_base;
pthread_t timer_thd;
struct event_base *wr_base;
time_t sent_at;
struct timeval sent_at;
int64_t blocked_at;
sql_pack_fn *pack;
sql_pack_fn *pack_hb;
Expand Down Expand Up @@ -208,7 +208,7 @@ static void sql_flush_cb(int fd, short what, void *arg)
LOCK_WR_LOCK_ONLY_IF_NOT_PACKING(writer);
while (evbuffer_get_length(writer->wr_buf)) {
if ((n = wr_evbuffer(writer, fd)) <= 0) break;
writer->sent_at = time(NULL);
gettimeofday(&writer->sent_at, NULL);
update_writer_state(writer, WRITE_SUCCEEDED);
}
if (evbuffer_get_length(writer->wr_buf) == 0) {
Expand Down Expand Up @@ -389,7 +389,11 @@ static void sql_trickle_int(struct sqlwriter *writer, int fd)
}
const int outstanding = evbuffer_get_length(writer->wr_buf);
if (!outstanding) {
if (difftime(time(NULL), writer->sent_at) >= min_hb_time && !writer->timed_out) {
struct timeval now, diff;
gettimeofday(&now, NULL);
timersub(&now, &writer->sent_at, &diff);
int diffms = timeval_to_ms(diff);
if (diffms >= min_hb_time && !writer->timed_out) {
sql_pack_heartbeat(writer);
} else {
return;
Expand All @@ -403,7 +407,7 @@ static void sql_trickle_int(struct sqlwriter *writer, int fd)
return;
}
update_writer_state(writer, WRITE_SUCCEEDED);
writer->sent_at = time(NULL);
gettimeofday(&writer->sent_at, NULL);
}

static void sql_trickle_cb(int fd, short what, void *arg)
Expand All @@ -424,8 +428,11 @@ static void sql_heartbeat_cb(int fd, short what, void *arg)
if (pthread_mutex_trylock(&writer->wr_lock)) return;
if (writer->wr_continue) {
int len = evbuffer_get_length(writer->wr_buf);
time_t now = time(NULL);
if (len || difftime(now, writer->sent_at) >= min_hb_time) {
struct timeval now, diff;
gettimeofday(&now, NULL);
timersub(&now, &writer->sent_at, &diff);
int diffms = timeval_to_ms(diff);
if (len || diffms >= min_hb_time) {
event_add(writer->heartbeat_trickle_ev, NULL);
}
}
Expand All @@ -437,7 +444,7 @@ void sql_reset(struct sqlwriter *writer)
writer->bad = 0;
writer->dispatch_timeout = NULL;
writer->done = 0;
writer->sent_at = time(NULL);
gettimeofday(&writer->sent_at, NULL);
writer->timed_out = 0;
writer->wr_continue = 1;
}
Expand Down
Loading