diff --git a/include/MySQL_Monitor.hpp b/include/MySQL_Monitor.hpp index 6ce9b83ff..ed403a34e 100644 --- a/include/MySQL_Monitor.hpp +++ b/include/MySQL_Monitor.hpp @@ -334,13 +334,19 @@ class MySQL_Monitor_State_Data { template class WorkItem { public: - T *data; - void *(*routine) (void *); - WorkItem(T*_data, void *(*start_routine) (void *)) { - data=_data; - routine=start_routine; - } - ~WorkItem() {} + std::vector data; + using entry_point = void *(*)(const std::vector& data); + entry_point start_routine; + WorkItem(T*_data, entry_point _start_routine) { + data.push_back(_data); + start_routine = _start_routine; + } + WorkItem(std::vector&& _data, entry_point _start_routine) + : data(std::move(_data)), start_routine(_start_routine) {} + WorkItem(const std::vector& _data, entry_point _start_routine) + : data(_data), start_routine(_start_routine) { + } + ~WorkItem() = default; }; struct p_mon_counter { diff --git a/lib/MySQL_Monitor.cpp b/lib/MySQL_Monitor.cpp index e324f4278..07ddd318f 100644 --- a/lib/MySQL_Monitor.cpp +++ b/lib/MySQL_Monitor.cpp @@ -119,7 +119,7 @@ class ConsumerThread : public Thread { // available to process. for (int i = 0; (thrn ? i < thrn : 1); i++) { //VALGRIND_DISABLE_ERROR_REPORTING; - WorkItem* item = (WorkItem*)m_queue.remove(); + WorkItem* item = static_cast*>(m_queue.remove()); //VALGRIND_ENABLE_ERROR_REPORTING; if (item == NULL) { if (thrn) { @@ -132,7 +132,7 @@ class ConsumerThread : public Thread { } - if (item->routine) { // NULL is allowed, do nothing for it + if (item->start_routine) { // NULL is allowed, do nothing for it bool me = true; if (check_monitor_enabled_flag) { @@ -142,10 +142,13 @@ class ConsumerThread : public Thread { } if (me) { - item->routine((void *)item->data); + item->start_routine(item->data); } } - delete item->data; + for (auto ptr : item->data) { + delete ptr; + } + item->data.clear(); delete item; } cleanup: @@ -264,22 +267,10 @@ class MySQL_Monitor_Connection_Pool { MYSQL *my1 = (MYSQL *)conns->index(i); // 'my1' can be NULL due to connection cleanup if (my1 == nullptr) continue; - assert(my!=my1); - //assert(my->net.fd!=my1->net.fd); // FIXME: we changed this with the next section of code - if (my->net.fd == my1->net.fd) { - // FIXME: we need to identify still why a connection with error 2013 is here - if (my1->net.last_errno == 2013) { - // we remove the connection - conns->remove_index_fast(i); - goto __conn_register_label; // we return to the loop - } else { - // we crash again, as in the old logic - assert(my->net.fd!=my1->net.fd); - } - } } - //proxy_info("Registering MYSQL with FD %d from mmsd %p and MYSQL %p\n", my->net.fd, mmsd, mmsd->mysql); + proxy_debug(PROXY_DEBUG_MONITOR, 7, + "Registering MYSQL with FD %d from mmsd %p and MYSQL %p\n", my->net.fd, mmsd, mmsd->mysql); conns->add(my); pthread_mutex_unlock(&m2); #endif // DEBUG @@ -308,7 +299,8 @@ class MySQL_Monitor_Connection_Pool { MYSQL *my1 = (MYSQL *)conns->index(i); if (my1 == my) { conns->remove_index_fast(i); - //proxy_info("Un-registering MYSQL with FD %d\n", my->net.fd); + proxy_debug(PROXY_DEBUG_MONITOR, 7, + "Un-registering MYSQL with FD %d from mmsd %p and MYSQL %p\n", my->net.fd, mmsd, mmsd->mysql); pthread_mutex_unlock(&m2); return; } @@ -361,7 +353,6 @@ MYSQL * MySQL_Monitor_Connection_Pool::get_connection(char *hostname, int port, if (k!=j) { MYSQL *my2 = (MYSQL *)srv->conns->index(k); assert(my1!=my2); - assert(my1->net.fd!=my2->net.fd); } } } @@ -393,7 +384,6 @@ MYSQL * MySQL_Monitor_Connection_Pool::get_connection(char *hostname, int port, if (!my1) continue; assert(my!=my1); - assert(my->net.fd!=my1->net.fd); } //proxy_info("Registering MYSQL with FD %d from mmsd %p and MYSQL %p\n", my->net.fd, mmsd, my); @@ -1356,9 +1346,10 @@ void MySQL_Monitor::update_monitor_proxysql_servers(SQLite3_result* resultset) { pthread_mutex_unlock(&GloMyMon->proxysql_servers_mutex); } -void * monitor_connect_thread(void *arg) { +void * monitor_connect_thread(const std::vector& mmsds) { + assert(!mmsds.empty()); mysql_close(mysql_init(NULL)); - MySQL_Monitor_State_Data *mmsd=(MySQL_Monitor_State_Data *)arg; + MySQL_Monitor_State_Data *mmsd = mmsds.front(); if (!GloMTH) return NULL; // quick exit during shutdown/restart MySQL_Thread * mysql_thr = new MySQL_Thread(); mysql_thr->curtime=monotonic_time(); @@ -1417,9 +1408,10 @@ void * monitor_connect_thread(void *arg) { return NULL; } -void * monitor_ping_thread(void *arg) { +void * monitor_ping_thread(const std::vector& mmsds) { + assert(!mmsds.empty()); mysql_close(mysql_init(NULL)); - MySQL_Monitor_State_Data *mmsd=(MySQL_Monitor_State_Data *)arg; + MySQL_Monitor_State_Data *mmsd = mmsds.front(); if (!GloMTH) return NULL; // quick exit during shutdown/restart MySQL_Thread * mysql_thr = new MySQL_Thread(); mysql_thr->curtime=monotonic_time(); @@ -1654,10 +1646,11 @@ bool MySQL_Monitor_State_Data::create_new_connection() { return true; } -void * monitor_read_only_thread(void *arg) { +void * monitor_read_only_thread(const std::vector& data) { + assert(!data.empty()); mysql_close(mysql_init(NULL)); bool timeout_reached = false; - MySQL_Monitor_State_Data *mmsd=(MySQL_Monitor_State_Data *)arg; + MySQL_Monitor_State_Data *mmsd = data.front(); if (!GloMTH) return NULL; // quick exit during shutdown/restart MySQL_Thread * mysql_thr = new MySQL_Thread(); mysql_thr->curtime=monotonic_time(); @@ -1935,9 +1928,10 @@ VALGRIND_ENABLE_ERROR_REPORTING; return NULL; } -void * monitor_group_replication_thread(void *arg) { +void * monitor_group_replication_thread(const std::vector& data) { + assert(!data.empty()); mysql_close(mysql_init(NULL)); - MySQL_Monitor_State_Data *mmsd=(MySQL_Monitor_State_Data *)arg; + MySQL_Monitor_State_Data *mmsd = data.front(); MySQL_Thread * mysql_thr = new MySQL_Thread(); mysql_thr->curtime=monotonic_time(); mysql_thr->refresh_variables(); @@ -2286,9 +2280,10 @@ void * monitor_group_replication_thread(void *arg) { return NULL; } -void * monitor_galera_thread(void *arg) { +void * monitor_galera_thread(const std::vector& data) { + assert(!data.empty()); mysql_close(mysql_init(NULL)); - MySQL_Monitor_State_Data *mmsd=(MySQL_Monitor_State_Data *)arg; + MySQL_Monitor_State_Data *mmsd = data.front(); MySQL_Thread * mysql_thr = new MySQL_Thread(); mysql_thr->curtime=monotonic_time(); mysql_thr->refresh_variables(); @@ -2686,9 +2681,10 @@ void * monitor_galera_thread(void *arg) { return NULL; } -void * monitor_replication_lag_thread(void *arg) { +void * monitor_replication_lag_thread(const std::vector& data) { + assert(!data.empty()); mysql_close(mysql_init(NULL)); - MySQL_Monitor_State_Data *mmsd=(MySQL_Monitor_State_Data *)arg; + MySQL_Monitor_State_Data *mmsd = data.front(); if (!GloMTH) return NULL; // quick exit during shutdown/restart MySQL_Thread * mysql_thr = new MySQL_Thread(); mysql_thr->curtime=monotonic_time(); @@ -4756,9 +4752,9 @@ std::string debug_iplisttostring(const T& ips) { return sstr.str(); } -void* monitor_dns_resolver_thread(void* args) { - - DNS_Resolve_Data* dns_resolve_data = static_cast(args); +void* monitor_dns_resolver_thread(const std::vector& dns_resolve_data_list) { + assert(!dns_resolve_data_list.empty()); + DNS_Resolve_Data* dns_resolve_data = dns_resolve_data_list.front(); struct addrinfo hints, *res = NULL; @@ -5186,11 +5182,11 @@ void * MySQL_Monitor::run() { __monitor_run: while (queue->size()) { // this is a clean up in case Monitor was restarted - WorkItem* item = (WorkItem*)queue->remove(); + WorkItem* item = static_cast*>(queue->remove()); if (item) { - if (item->data) { - delete item->data; - } + for (auto ptr : item->data) + delete ptr; + item->data.clear(); delete item; } } @@ -7034,9 +7030,10 @@ class Monitor_Poll { public: using process_ready_tasks_cb = bool (MySQL_Monitor::*)(const std::vector& mmsds); - Process_Ready_Task_Callback_Args(unsigned int min_tasks_to_process, float percentage, + Process_Ready_Task_Callback_Args(unsigned int min_tasks_to_process, unsigned int max_task_to_send, float percentage, process_ready_tasks_cb callback, MySQL_Monitor* mysql_monitor) : - min_task_to_process_(min_tasks_to_process), process_task_percentage_(percentage / 100.00), process_ready_tasks_cb_(callback), + min_task_to_process_(min_tasks_to_process), max_task_to_send_(max_task_to_send), + process_task_percentage_(percentage / 100.00), process_ready_tasks_cb_(callback), mysql_monitor_(mysql_monitor) { assert(mysql_monitor_); assert(process_ready_tasks_cb_); @@ -7053,19 +7050,24 @@ class Monitor_Poll { friend class Monitor_Poll; unsigned int min_task_to_process_; + unsigned int max_task_to_send_; float process_task_percentage_; process_ready_tasks_cb process_ready_tasks_cb_; MySQL_Monitor* mysql_monitor_; }; - Monitor_Poll(unsigned int capacity) { + Monitor_Poll(unsigned int capacity, bool owns_task_memory = false) { len_ = 0; + owns_task_memory_ = owns_task_memory; // if true, this object takes ownership of task memory and will delete unprocessed tasks on destruction capacity_ = capacity; fds_ = (struct pollfd*)malloc(capacity_ * sizeof(struct pollfd)); mmsds_ = (MySQL_Monitor_State_Data**)malloc(capacity_ * sizeof(MySQL_Monitor_State_Data*)); } ~Monitor_Poll() { + if (owns_task_memory_) { + cleanup_unprocessed_tasks(); // free remaining unprocessed tasks + } free(fds_); free(mmsds_); } @@ -7081,19 +7083,16 @@ class Monitor_Poll { void add(short _events, MySQL_Monitor_State_Data* mmsd) { assert(mmsd); - assert(mmsd->mysql); - + assert(_events); if (len_ == capacity_) { expand(1); } - fds_[len_].fd = mysql_get_socket(mmsd->mysql); + fds_[len_].fd = -1; // will be set in event_loop phase 1 fds_[len_].events = _events; fds_[len_].revents = 0; mmsds_[len_] = mmsd; len_++; - mmsd->init_async(); - mmsd->task_handler(-1, _events); } void remove_index_fast(unsigned int i) { @@ -7120,13 +7119,17 @@ class Monitor_Poll { bool event_loop(int poll_timeout_ms, Process_Ready_Task_Callback_Args& process_ready_task_callback_arg) { - if (len_ == 0) - return false; + if (len_ == 0) return false; - int rc = 0; + // Snapshot the number of tasks that are currently queued for this event loop iteration. + // len_ will change as tasks complete and are removed below. + const unsigned int total_tasks = len_; + + // Number of tasks to send in each batch. + const unsigned int send_batch_size = process_ready_task_callback_arg.max_task_to_send_; - // number of tasks to process based on provided percentage - unsigned int tasks_to_process_count = len_ * process_ready_task_callback_arg.process_task_percentage_; + // Determine how many tasks to process before flushing the ready queue to the callback. + unsigned int tasks_to_process_count = total_tasks * process_ready_task_callback_arg.process_task_percentage_; // if number of task to process is less than minimum task to process, overwrite it if (tasks_to_process_count < process_ready_task_callback_arg.min_task_to_process_) { @@ -7136,14 +7139,41 @@ class Monitor_Poll { std::vector ready_tasks; ready_tasks.reserve(tasks_to_process_count); + unsigned int total_sent = 0; while (len_) { if (GloMyMon->shutdown) { return false; } - rc = poll(fds_, len_, poll_timeout_ms); + // Phase 1: proactively arm inactive sockets by initializing their poll entries + // and triggering the first async step for up to `min_task_to_send` tasks. + unsigned int sockets_armed = 0; + for (unsigned int idx = 0; idx < len_ && sockets_armed < send_batch_size; ++idx) { + struct pollfd& poll_entry = fds_[idx]; + + // Skip already-armed entries + if (poll_entry.fd != -1) + continue; + + // Arm socket + poll_entry.fd = mysql_get_socket(mmsds_[idx]->mysql); + assert(poll_entry.fd != -1); + poll_entry.revents = 0; + + // Kick off the task state machine + mmsds_[idx]->task_handler(-1, poll_entry.events); + ++sockets_armed; + } + total_sent += sockets_armed; + + proxy_debug(PROXY_DEBUG_MONITOR, 7, + "Phase 1: armed %u sockets (this batch), total armed=%u, total tasks=%u\n", + sockets_armed, total_sent, total_tasks); + // If we sent all tasks, use the caller poll timeout; otherwise poll immediately (timeout=0) + int poll_timeout = (total_sent == total_tasks) ? poll_timeout_ms : 0; + int rc = poll(fds_, len_, poll_timeout); if (rc == -1) { if (errno == EINTR) { continue; @@ -7152,6 +7182,8 @@ class Monitor_Poll { } } + // Phase 2: collect completed tasks and batch deliver to the callback once the + // tasks_to_process_count threshold is met (or no tasks remain). for (unsigned int i = 0; i < len_;) { if (mmsds_[i]->task_handler(fds_[i].revents, fds_[i].events) != MySQL_Monitor_State_Data_Task_Result::TASK_RESULT_PENDING) { @@ -7164,31 +7196,32 @@ class Monitor_Poll { tasks_to_process_count--; + // Flush the batch when threshold reached or no more tasks remain. if (tasks_to_process_count == 0 || len_ == 0) { - + proxy_debug(PROXY_DEBUG_MONITOR, 7, "Phase 2: Starting processing of %zu ready tasks\n", ready_tasks.size()); if (process_ready_task_callback_arg.process_ready_tasks(ready_tasks) == false) { return false; } - ready_tasks.clear(); + // Recompute threshold against the current remaining queue length. tasks_to_process_count = len_ * process_ready_task_callback_arg.process_task_percentage_; - if (tasks_to_process_count < process_ready_task_callback_arg.min_task_to_process_) { tasks_to_process_count = process_ready_task_callback_arg.min_task_to_process_; } } continue; } else { - assert(fds_[i].events != 0); + // Not ready yet; ensure we keep polling for its events. + if (fds_[i].fd != -1) + assert(fds_[i].events != 0); } fds_[i].revents = 0; i++; } - } - + assert(ready_tasks.empty()); return true; } @@ -7204,10 +7237,22 @@ class Monitor_Poll { return i ? i : n; } + // Deletes any task objects that were not processed + inline + void cleanup_unprocessed_tasks() { + if (len_ == 0) return; + for (unsigned int i = 0; i < len_; ++i) { + delete mmsds_[i]; + mmsds_[i] = nullptr; + } + len_ = 0; + } + unsigned int len_; unsigned int capacity_; struct pollfd* fds_; MySQL_Monitor_State_Data** mmsds_; + bool owns_task_memory_; }; MySQL_Monitor_State_Data_Task_Result MySQL_Monitor_State_Data::task_handler(short event_, short& wait_event) { @@ -7222,7 +7267,7 @@ MySQL_Monitor_State_Data_Task_Result MySQL_Monitor_State_Data::task_handler(shor #else const unsigned long long now = monotonic_time(); #endif - if (now > task_expiry_time_) { + if (task_expiry_time_ > 0 && now > task_expiry_time_) { #ifdef DEBUG mark_task_as_timeout((GloMyMon->proxytest_forced_timeout == false) ? now : monotonic_time()); #else @@ -7300,81 +7345,121 @@ MySQL_Monitor_State_Data_Task_Result MySQL_Monitor_State_Data::ping_handler(shor return result; } -bool MySQL_Monitor::monitor_ping_process_ready_tasks(const std::vector& mmsds) { - - for (auto& mmsd : mmsds) { - const auto task_result = mmsd->get_task_result(); +void* monitor_ping_process_ready_task_thread(const std::vector& ready_mmsds) { + if (ready_mmsds.empty()) return NULL; + + SQLite3DB* monitor_db = ready_mmsds.front()->mondb; + int rc; + sqlite3_stmt* statement1 = NULL; + sqlite3_stmt* statement32 = NULL; + + const char* query1 = "INSERT OR REPLACE INTO mysql_server_ping_log VALUES (?1, ?2, ?3, ?4, ?5)"; + std::string query32s = "INSERT OR REPLACE INTO mysql_server_ping_log VALUES " + generate_multi_rows_query(32, 5); + const char* query32 = query32s.c_str(); + rc = monitor_db->prepare_v2(query1, &statement1); + ASSERT_SQLITE_OK(rc, monitor_db); + + rc = monitor_db->prepare_v2(query32, &statement32); + ASSERT_SQLITE_OK(rc, monitor_db); + + size_t row_idx = 0; + size_t max_bulk_row_idx = (ready_mmsds.size() / 32) * 32; + + for (const auto& mmsd : ready_mmsds) { + const auto task_result = mmsd->get_task_result(); assert(task_result != MySQL_Monitor_State_Data_Task_Result::TASK_RESULT_PENDING); - string server_version = ""; - if (mmsd->mysql && mmsd->mysql->server_version) server_version = string(mmsd->mysql->server_version); + std::string server_version{}; + if (mmsd->mysql && mmsd->mysql->server_version) + server_version = std::string(mmsd->mysql->server_version); if (task_result == MySQL_Monitor_State_Data_Task_Result::TASK_RESULT_SUCCESS) { - __sync_fetch_and_add(&ping_check_OK, 1); - My_Conn_Pool->put_connection(mmsd->hostname, mmsd->port, mmsd->mysql); + __sync_fetch_and_add(&GloMyMon->ping_check_OK, 1); + GloMyMon->My_Conn_Pool->put_connection(mmsd->hostname, mmsd->port, mmsd->mysql); mmsd->mysql = NULL; } else { - __sync_fetch_and_add(&ping_check_ERR, 1); + __sync_fetch_and_add(&GloMyMon->ping_check_ERR, 1); if (task_result == MySQL_Monitor_State_Data_Task_Result::TASK_RESULT_TIMEOUT) { - MyHGM->p_update_mysql_error_counter(p_mysql_error_type::proxysql, mmsd->hostgroup_id, mmsd->hostname, mmsd->port, ER_PROXYSQL_PING_TIMEOUT); - proxy_error("Timeout on ping check for %s:%d after %lldms. If the server is overload, increase mysql-monitor_ping_timeout.\n", mmsd->hostname, mmsd->port, (mmsd->t2 - mmsd->t1) / 1000); + MyHGM->p_update_mysql_error_counter(p_mysql_error_type::proxysql, + mmsd->hostgroup_id, mmsd->hostname, mmsd->port, ER_PROXYSQL_PING_TIMEOUT); + proxy_error("Timeout on ping check for %s:%d after %lldms.\n", + mmsd->hostname, mmsd->port, (mmsd->t2 - mmsd->t1) / 1000); } else { - MyHGM->p_update_mysql_error_counter(p_mysql_error_type::proxysql, mmsd->hostgroup_id, mmsd->hostname, mmsd->port, mysql_errno(mmsd->mysql)); -#ifdef DEBUG - proxy_error("Error after %lldms: server %s:%d , mmsd %p , MYSQL %p , FD %d : %s\n", (mmsd->t2 - mmsd->t1) / 1000, mmsd->hostname, mmsd->port, mmsd, mmsd->mysql, mmsd->mysql->net.fd, (mmsd->mysql_error_msg ? mmsd->mysql_error_msg : "")); -#else - proxy_error("Error after %lldms on server %s:%d : %s\n", (mmsd->t2 - mmsd->t1) / 1000, mmsd->hostname, mmsd->port, (mmsd->mysql_error_msg ? mmsd->mysql_error_msg : "")); -#endif // DEBUG + MyHGM->p_update_mysql_error_counter(p_mysql_error_type::proxysql, + mmsd->hostgroup_id, mmsd->hostname, mmsd->port, mysql_errno(mmsd->mysql)); + proxy_error("Error after %lldms on server %s:%d : %s\n", + (mmsd->t2 - mmsd->t1) / 1000, mmsd->hostname, mmsd->port, + (mmsd->mysql_error_msg ? mmsd->mysql_error_msg : "")); } -//#ifdef DEBUG -// My_Conn_Pool->conn_unregister(mmsd); -//#endif // DEBUG mysql_close(mmsd->mysql); mmsd->mysql = NULL; } - if (shutdown == true) { - return false; - } + if (GloMyMon->shutdown) + return NULL; - sqlite3_stmt* statement = NULL; - const char* query = "INSERT OR REPLACE INTO mysql_server_ping_log VALUES (?1 , ?2 , ?3 , ?4 , ?5)"; - int rc = mmsd->mondb->prepare_v2(query, &statement); - ASSERT_SQLITE_OK(rc, mmsd->mondb); - rc = (*proxy_sqlite3_bind_text)(statement, 1, mmsd->hostname, -1, SQLITE_TRANSIENT); ASSERT_SQLITE_OK(rc, mmsd->mondb); - rc = (*proxy_sqlite3_bind_int)(statement, 2, mmsd->port); ASSERT_SQLITE_OK(rc, mmsd->mondb); unsigned long long time_now = realtime_time(); time_now = time_now - (mmsd->t2 - mmsd->t1); - rc = (*proxy_sqlite3_bind_int64)(statement, 3, time_now); ASSERT_SQLITE_OK(rc, mmsd->mondb); - rc = (*proxy_sqlite3_bind_int64)(statement, 4, (mmsd->mysql_error_msg ? 0 : mmsd->t2 - mmsd->t1)); ASSERT_SQLITE_OK(rc, mmsd->mondb); - rc = (*proxy_sqlite3_bind_text)(statement, 5, mmsd->mysql_error_msg, -1, SQLITE_TRANSIENT); ASSERT_SQLITE_OK(rc, mmsd->mondb); - SAFE_SQLITE3_STEP2(statement); - rc = (*proxy_sqlite3_clear_bindings)(statement); ASSERT_SQLITE_OK(rc, mmsd->mondb); - rc = (*proxy_sqlite3_reset)(statement); ASSERT_SQLITE_OK(rc, mmsd->mondb); - (*proxy_sqlite3_finalize)(statement); - if (strcasestr(server_version.c_str(), (const char *)SERVER_VERSION_READYSET) != NULL) { - ReadySet_Servers.insert(ServerInfo(mmsd->hostname, mmsd->port)); - } else { - if (ReadySet_Servers.size() > 0) { // optimization . The following section is skipped if there are no servers - ServerInfo searchServer(mmsd->hostname, mmsd->port); - if (ReadySet_Servers.count(searchServer) > 0) { - ReadySet_Servers.erase(searchServer); - } + unsigned long long duration = (mmsd->mysql_error_msg ? 0 : mmsd->t2 - mmsd->t1); + const char* errmsg = mmsd->mysql_error_msg; + + size_t idx = row_idx % 32; + + if (row_idx < max_bulk_row_idx) { + // --- bulk insert path --- + rc = (*proxy_sqlite3_bind_text)(statement32, (idx * 5) + 1, mmsd->hostname, -1, SQLITE_TRANSIENT); ASSERT_SQLITE_OK(rc, monitor_db); + rc = (*proxy_sqlite3_bind_int)(statement32, (idx * 5) + 2, mmsd->port); ASSERT_SQLITE_OK(rc, monitor_db); + rc = (*proxy_sqlite3_bind_int64)(statement32, (idx * 5) + 3, time_now); ASSERT_SQLITE_OK(rc, monitor_db); + rc = (*proxy_sqlite3_bind_int64)(statement32, (idx * 5) + 4, duration); ASSERT_SQLITE_OK(rc, monitor_db); + rc = (*proxy_sqlite3_bind_text)(statement32, (idx * 5) + 5, errmsg, -1, SQLITE_TRANSIENT); ASSERT_SQLITE_OK(rc, monitor_db); + + // execute when we reach 32 rows + if (idx == 31) { + SAFE_SQLITE3_STEP2(statement32); + rc = (*proxy_sqlite3_clear_bindings)(statement32); ASSERT_SQLITE_OK(rc, monitor_db); + rc = (*proxy_sqlite3_reset)(statement32); ASSERT_SQLITE_OK(rc, monitor_db); } + } else { + // --- single insert path --- + rc = (*proxy_sqlite3_bind_text)(statement1, 1, mmsd->hostname, -1, SQLITE_TRANSIENT); ASSERT_SQLITE_OK(rc, monitor_db); + rc = (*proxy_sqlite3_bind_int)(statement1, 2, mmsd->port); ASSERT_SQLITE_OK(rc, monitor_db); + rc = (*proxy_sqlite3_bind_int64)(statement1, 3, time_now); ASSERT_SQLITE_OK(rc, monitor_db); + rc = (*proxy_sqlite3_bind_int64)(statement1, 4, duration); ASSERT_SQLITE_OK(rc, monitor_db); + rc = (*proxy_sqlite3_bind_text)(statement1, 5, errmsg, -1, SQLITE_TRANSIENT); ASSERT_SQLITE_OK(rc, monitor_db); + SAFE_SQLITE3_STEP2(statement1); + rc = (*proxy_sqlite3_clear_bindings)(statement1); ASSERT_SQLITE_OK(rc, monitor_db); + rc = (*proxy_sqlite3_reset)(statement1); ASSERT_SQLITE_OK(rc, monitor_db); } + + if (strcasestr(server_version.c_str(), (const char*)SERVER_VERSION_READYSET) != NULL) + ReadySet_Servers.insert(ServerInfo(mmsd->hostname, mmsd->port)); + else if (!ReadySet_Servers.empty()) + ReadySet_Servers.erase(ServerInfo(mmsd->hostname, mmsd->port)); + + row_idx++; } + (*proxy_sqlite3_finalize)(statement1); + (*proxy_sqlite3_finalize)(statement32); + + return NULL; +} + +bool MySQL_Monitor::monitor_ping_process_ready_tasks(const std::vector& ready_mmsds) { + // Ensure no null tasks are dispatched to the thread pool + if (!ready_mmsds.empty()) { + WorkItem* item = new WorkItem(ready_mmsds, monitor_ping_process_ready_task_thread); + queue->add(item); + } return true; } void MySQL_Monitor::monitor_ping_async(SQLite3_result* resultset) { assert(resultset); - std::vector> mmsds; - mmsds.reserve(resultset->rows_count); - Monitor_Poll monitor_poll(resultset->rows_count); + Monitor_Poll monitor_poll(resultset->rows_count, true); for (std::vector::iterator it = resultset->rows.begin(); it != resultset->rows.end(); ++it) { const SQLite3_row* r = *it; @@ -7385,8 +7470,8 @@ void MySQL_Monitor::monitor_ping_async(SQLite3_result* resultset) { mmsd->mysql = My_Conn_Pool->get_connection(mmsd->hostname, mmsd->port, mmsd.get()); if (mmsd->mysql) { - monitor_poll.add((POLLIN|POLLOUT|POLLPRI), mmsd.get()); - mmsds.push_back(std::move(mmsd)); + // Register the task; don't dispatch it yet. + monitor_poll.add((POLLIN|POLLOUT|POLLPRI), mmsd.release()); } else { WorkItem* item = new WorkItem(mmsd.release(), monitor_ping_thread); @@ -7396,9 +7481,9 @@ void MySQL_Monitor::monitor_ping_async(SQLite3_result* resultset) { if (shutdown) return; } - Monitor_Poll::Process_Ready_Task_Callback_Args args(5, 50, &MySQL_Monitor::monitor_ping_process_ready_tasks, this); + Monitor_Poll::Process_Ready_Task_Callback_Args args(5, 30, 50, &MySQL_Monitor::monitor_ping_process_ready_tasks, this); - if (monitor_poll.event_loop(mysql_thread___monitor_ping_timeout, args) == false) { + if (monitor_poll.event_loop(10, args) == false) { return; } } @@ -7717,6 +7802,7 @@ void MySQL_Monitor::monitor_read_only_async(SQLite3_result* resultset, bool do_d mmsd->mysql = My_Conn_Pool->get_connection(mmsd->hostname, mmsd->port, mmsd.get()); if (mmsd->mysql) { + // Register the task; don't dispatch it yet. monitor_poll.add((POLLIN|POLLOUT|POLLPRI), mmsd.get()); mmsds.push_back(std::move(mmsd)); } else { @@ -7729,7 +7815,7 @@ void MySQL_Monitor::monitor_read_only_async(SQLite3_result* resultset, bool do_d if (shutdown) return; } - Monitor_Poll::Process_Ready_Task_Callback_Args args(5, 50, &MySQL_Monitor::monitor_read_only_process_ready_tasks, this); + Monitor_Poll::Process_Ready_Task_Callback_Args args(5, 30, 50, &MySQL_Monitor::monitor_read_only_process_ready_tasks, this); if (monitor_poll.event_loop(mysql_thread___monitor_read_only_timeout, args) == false) { return; @@ -7923,6 +8009,7 @@ void MySQL_Monitor::monitor_group_replication_async() { mmsd->mysql = My_Conn_Pool->get_connection(mmsd->hostname, mmsd->port, mmsd.get()); if (mmsd->mysql) { + // Register the task; don't dispatch it yet. monitor_poll.add((POLLIN|POLLOUT|POLLPRI), mmsd.get()); mmsds.push_back(std::move(mmsd)); } else { @@ -7939,7 +8026,7 @@ void MySQL_Monitor::monitor_group_replication_async() { } pthread_mutex_unlock(&group_replication_mutex); - Monitor_Poll::Process_Ready_Task_Callback_Args args(5, 50, &MySQL_Monitor::monitor_group_replication_process_ready_tasks, this); + Monitor_Poll::Process_Ready_Task_Callback_Args args(5, 30, 50, &MySQL_Monitor::monitor_group_replication_process_ready_tasks, this); if (monitor_poll.event_loop(mysql_thread___monitor_groupreplication_healthcheck_timeout, args) == false) { return; @@ -7964,11 +8051,12 @@ void MySQL_Monitor::monitor_gr_async_actions_handler( Monitor_Poll monitor_poll(mmsds.size()); for (const unique_ptr& mmsd : mmsds) { + // Register the task; don't dispatch it yet. monitor_poll.add((POLLIN|POLLOUT|POLLPRI), mmsd.get()); } Monitor_Poll::Process_Ready_Task_Callback_Args args( - mmsds.size(), 100, &MySQL_Monitor::monitor_group_replication_process_ready_tasks_2, this + mmsds.size(), mmsds.size(), 100, &MySQL_Monitor::monitor_group_replication_process_ready_tasks_2, this ); if (monitor_poll.event_loop(mysql_thread___monitor_groupreplication_healthcheck_timeout, args) == false) { @@ -8181,6 +8269,7 @@ void MySQL_Monitor::monitor_replication_lag_async(SQLite3_result* resultset) { mmsd->mysql = My_Conn_Pool->get_connection(mmsd->hostname, mmsd->port, mmsd.get()); if (mmsd->mysql) { + // Register the task; don't dispatch it yet. monitor_poll.add((POLLIN|POLLOUT|POLLPRI), mmsd.get()); mmsds.push_back(std::move(mmsd)); } else { @@ -8193,7 +8282,7 @@ void MySQL_Monitor::monitor_replication_lag_async(SQLite3_result* resultset) { if (shutdown) return; } - Monitor_Poll::Process_Ready_Task_Callback_Args args(5, 50, &MySQL_Monitor::monitor_replication_lag_process_ready_tasks, this); + Monitor_Poll::Process_Ready_Task_Callback_Args args(5, 30, 50, &MySQL_Monitor::monitor_replication_lag_process_ready_tasks, this); if (monitor_poll.event_loop(mysql_thread___monitor_replication_lag_timeout, args) == false) { return; @@ -8489,6 +8578,7 @@ void MySQL_Monitor::monitor_galera_async() { mmsd->mondb = monitordb; if (mmsd->mysql) { + // Register the task; don't dispatch it yet. monitor_poll.add((POLLIN|POLLOUT|POLLPRI), mmsd.get()); mmsds.push_back(std::move(mmsd)); } else { @@ -8505,7 +8595,7 @@ void MySQL_Monitor::monitor_galera_async() { } pthread_mutex_unlock(&galera_mutex); - Monitor_Poll::Process_Ready_Task_Callback_Args args(5, 50, &MySQL_Monitor::monitor_galera_process_ready_tasks, this); + Monitor_Poll::Process_Ready_Task_Callback_Args args(5, 30, 50, &MySQL_Monitor::monitor_galera_process_ready_tasks, this); if (monitor_poll.event_loop(mysql_thread___monitor_galera_healthcheck_timeout, args) == false) { return;