Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
97 changes: 24 additions & 73 deletions db/api_history.c
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,12 @@
#include <pthread.h>
#include <string.h>

#include "comdb2.h"
#include "logmsg.h"
#include "sql.h"

#include "api_history.h"

struct api_history {
hash_t *entries;
pthread_rwlock_t lock;
};

static int hash_entry(api_driver_t *entry, int len)
{
return hash_default_fixedwidth((unsigned char*)entry->name, strlen(entry->name));
Expand Down Expand Up @@ -73,101 +70,55 @@ static int free_entry(api_driver_t *entry, void *arg)
return 0;
}

void acquire_api_history_lock(api_history_t *api_history, int write)
{
if (write) {
Pthread_rwlock_wrlock(&api_history->lock);
} else {
Pthread_rwlock_rdlock(&api_history->lock);
}
}

void release_api_history_lock(api_history_t *api_history)
{
Pthread_rwlock_unlock(&api_history->lock);
}

api_history_t *init_api_history()
{
api_history_t *api_history = malloc(sizeof(api_history_t));
assert(api_history);
Pthread_rwlock_init(&api_history->lock, NULL);

api_history->entries = hash_init_user((hashfunc_t *)hash_entry, (cmpfunc_t *)compare_entries, 0, 0);
if (!api_history->entries) {
api_history_t *api_history = hash_init_user((hashfunc_t *)hash_entry, (cmpfunc_t *)compare_entries, 0, 0);
if (!api_history) {
logmsg(LOGMSG_FATAL, "Unable to initialize api history.\n");
abort();
}

return api_history;
}

int free_api_history(api_history_t *api_history)
void free_api_history(api_history_t *api_history)
{
assert(api_history);
acquire_api_history_lock(api_history, 1);

int rc = hash_for(api_history->entries, (hashforfunc_t *)free_entry, NULL);
if (rc) {
logmsg(LOGMSG_FATAL, "Unable to free api history entries.\n");
release_api_history_lock(api_history);
return rc;
}

hash_free(api_history->entries);
release_api_history_lock(api_history);
Pthread_rwlock_destroy(&api_history->lock);
free(api_history);
return 0;
hash_for(api_history, (hashforfunc_t *)free_entry, NULL);
hash_free(api_history);
}

api_driver_t *get_next_api_history_entry(api_history_t *api_history, void **curr, unsigned int *iter)
int get_num_api_history_entries(struct rawnodestats *stats)
{
assert(api_history);
acquire_api_history_lock(api_history, 0);
api_driver_t *entry;

if (!*curr && !*iter) {
entry = (api_driver_t *)hash_first(api_history->entries, curr, iter);
} else {
entry = (api_driver_t *)hash_next(api_history->entries, curr, iter);
}

release_api_history_lock(api_history);
return entry;
}

int get_num_api_history_entries(api_history_t *api_history) {
assert(api_history);
acquire_api_history_lock(api_history, 0);

int num = hash_get_num_entries(api_history->entries);

release_api_history_lock(api_history);
assert(stats);
Pthread_mutex_lock(&stats->lk);
int num = stats->api_history ? hash_get_num_entries(stats->api_history) : 0;
Pthread_mutex_unlock(&stats->lk);
return num;
}

int update_api_history(api_history_t *api_history, char *api_driver_name, char *api_driver_version)
int update_api_history(struct sqlclntstate *clnt)
{
assert(api_history);
acquire_api_history_lock(api_history, 1);

api_driver_t *search_entry = create_entry(api_driver_name, api_driver_version);
api_driver_t *found_entry = hash_find(api_history->entries, search_entry);

assert(clnt && clnt->rawnodestats);
struct rawnodestats *stats = clnt->rawnodestats;

Pthread_mutex_lock(&stats->lk);
api_driver_t *search_entry = create_entry(clnt->api_driver_name, clnt->api_driver_version);
api_driver_t *found_entry = hash_find(stats->api_history, search_entry);

if (found_entry) {
time(&found_entry->last_seen);
free_entry(search_entry, NULL);
} else {
time(&search_entry->last_seen);
int rc = hash_add(api_history->entries, search_entry);
int rc = hash_add(stats->api_history, search_entry);
if (rc) {
Pthread_mutex_unlock(&stats->lk);
logmsg(LOGMSG_FATAL, "Unable to add api history entry.\n");
release_api_history_lock(api_history);
return rc;
}
}

release_api_history_lock(api_history);
Pthread_mutex_unlock(&stats->lk);
return 0;
}
15 changes: 8 additions & 7 deletions db/api_history.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,21 +18,22 @@
#define _API_HISTORY_H_

#include <sys/time.h>
#include <plhash_glue.h>

typedef struct api_driver {
char *name;
char *version;
time_t last_seen;
} api_driver_t;

typedef struct api_history api_history_t;
typedef hash_t api_history_t; // holds api_driver_t entries, keyed by name+version

struct sqlclntstate;
struct rawnodestats;

void acquire_api_history_lock(api_history_t*, int);
void release_api_history_lock(api_history_t*);
api_history_t *init_api_history();
int free_api_history(api_history_t*);
api_driver_t *get_next_api_history_entry(api_history_t*, void**, unsigned int*);
int get_num_api_history_entries(api_history_t*);
int update_api_history(api_history_t*, char*, char*);
void free_api_history(api_history_t *);
int get_num_api_history_entries(struct rawnodestats *);
int update_api_history(struct sqlclntstate *);

#endif
2 changes: 2 additions & 0 deletions db/comdb2.c
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ void berk_memp_sync_alarm_ms(int);
#include "sql.h"
#include "logmsg.h"
#include "reqlog.h"
#include "reqlog_int.h"

#include "comdb2_trn_intrl.h"
#include "history.h"
Expand Down Expand Up @@ -1800,6 +1801,7 @@ void clean_exit(void)

backend_cleanup(thedb);
net_cleanup();
cleanup_clientstats();

if (gbl_ssl_ctx != NULL) {
SSL_CTX_free(gbl_ssl_ctx);
Expand Down
48 changes: 27 additions & 21 deletions db/reqlog.c
Original file line number Diff line number Diff line change
Expand Up @@ -2550,6 +2550,7 @@ static void update_clientstats_cache(nodestats_t *entry) {
Pthread_mutex_unlock(&clientstats_cache_mtx);
}

static int free_nodestats_entry(void *elem, void *unused);
static nodestats_t *add_clientstats(unsigned checksum, const char *whoami, int task_len, int stack_len, int id_len,
char *host, int node, int fd)
{
Expand Down Expand Up @@ -2582,15 +2583,7 @@ static nodestats_t *add_clientstats(unsigned checksum, const char *whoami, int t
nodestats_t *old_entry = listc_rtl(&clientstats_cache);
if (old_entry) {
hash_del(clientstats, old_entry);
if (old_entry->rawtotals.fingerprints) {
hash_for(old_entry->rawtotals.fingerprints, hash_free_element, NULL);
hash_free(old_entry->rawtotals.fingerprints);
}
free_api_history(old_entry->rawtotals.api_history);
time_metric_free(old_entry->rawtotals.svc_time);
Pthread_mutex_destroy(&old_entry->mtx);
Pthread_mutex_destroy(&old_entry->rawtotals.lk);
free(old_entry);
free_nodestats_entry(old_entry, NULL);
} else {
break;
}
Expand Down Expand Up @@ -2664,7 +2657,6 @@ static int release_clientstats(unsigned checksum, int node)
rc = -1;
goto done;
}

Pthread_mutex_lock(&entry->mtx);
entry->ref--;
update_clientstats_cache(entry);
Expand All @@ -2676,20 +2668,34 @@ static int release_clientstats(unsigned checksum, int node)
return rc;
}

nodestats_t *get_next_clientstats_entry(void **curr, unsigned int *iter)
static int free_nodestats_entry(void *elem, void *unused)
{
assert(clientstats);
acquire_clientstats_lock(0);
nodestats_t *entry;

if (!*curr && !*iter) {
entry = (nodestats_t *)hash_first(clientstats, curr, iter);
} else {
entry = (nodestats_t *)hash_next(clientstats, curr, iter);
nodestats_t *entry = (nodestats_t *)elem;
if (entry->rawtotals.fingerprints) {
hash_for(entry->rawtotals.fingerprints, hash_free_element, NULL);
hash_free(entry->rawtotals.fingerprints);
}

free_api_history(entry->rawtotals.api_history);
time_metric_free(entry->rawtotals.svc_time);
Pthread_mutex_destroy(&entry->mtx);
Pthread_mutex_destroy(&entry->rawtotals.lk);
free(entry);
return 0;
}

void cleanup_clientstats(void)
{
acquire_clientstats_lock(1);
hash_for(clientstats, free_nodestats_entry, NULL);
hash_free(clientstats);
clientstats = NULL;
release_clientstats_lock();
return entry;
}

int hash_for_clientstats(hashforfunc_t *func, void *arg)
{
assert(clientstats);
return hash_for(clientstats, func, arg);
}

struct rawnodestats *get_raw_node_stats(const char *task, const char *stack, const char *id, char *host, int fd,
Expand Down
4 changes: 3 additions & 1 deletion db/reqlog_int.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
#include "cson.h"
#include "sql.h"
#include "reqlog.h"
#include <plhash_glue.h>

/* This used to be private to reqlog. Moving to a shared header since
eventlog also needs access to reqlog internals. I am not sure
Expand Down Expand Up @@ -242,7 +243,8 @@ typedef struct nodestats {

void acquire_clientstats_lock(int);
void release_clientstats_lock();
nodestats_t *get_next_clientstats_entry(void**, unsigned int*);
void cleanup_clientstats(void);
int hash_for_clientstats(hashforfunc_t *func, void *arg);

extern int gbl_time_fdb;

Expand Down
2 changes: 1 addition & 1 deletion db/sqlinterfaces.c
Original file line number Diff line number Diff line change
Expand Up @@ -1354,7 +1354,7 @@ static void sql_statement_done(struct sql_thread *thd, struct reqlogger *logger,
if (have_fingerprint)
add_fingerprint_to_rawstats(clnt->rawnodestats, fingerprint, cost,
rows, time);
update_api_history(clnt->rawnodestats->api_history, clnt->api_driver_name, clnt->api_driver_version);
update_api_history(clnt);
}

reset_sql_steps(thd);
Expand Down
2 changes: 1 addition & 1 deletion lua/sp.c
Original file line number Diff line number Diff line change
Expand Up @@ -2371,7 +2371,7 @@ static void lua_end_step(struct sqlclntstate *clnt, SP sp,
put_ref(&sql_ref);
if (clnt->rawnodestats) {
add_fingerprint_to_rawstats(clnt->rawnodestats, fingerprint, cost, pVdbe->luaRows, timeMs);
update_api_history(clnt->rawnodestats->api_history, clnt->api_driver_name, clnt->api_driver_version);
update_api_history(clnt);
}

clnt->spcost.cost += cost;
Expand Down
Loading