File tree Expand file tree Collapse file tree 4 files changed +57
-3
lines changed
third_party/concurrentqueue Expand file tree Collapse file tree 4 files changed +57
-3
lines changed Original file line number Diff line number Diff line change 11#ifndef DUCKDB_PATCH_VERSION
2- #define DUCKDB_PATCH_VERSION " 1-dev221 "
2+ #define DUCKDB_PATCH_VERSION " 1-dev226 "
33#endif
44#ifndef DUCKDB_MINOR_VERSION
55#define DUCKDB_MINOR_VERSION 2
88#define DUCKDB_MAJOR_VERSION 1
99#endif
1010#ifndef DUCKDB_VERSION
11- #define DUCKDB_VERSION " v1.2.1-dev221 "
11+ #define DUCKDB_VERSION " v1.2.1-dev226 "
1212#endif
1313#ifndef DUCKDB_SOURCE_ID
14- #define DUCKDB_SOURCE_ID " a0b525cb6d "
14+ #define DUCKDB_SOURCE_ID " 28b8a398a4 "
1515#endif
1616#include " duckdb/function/table/system_functions.hpp"
1717#include " duckdb/main/database.hpp"
Original file line number Diff line number Diff line change @@ -68,6 +68,10 @@ class TaskScheduler {
6868 // ! Returns the number of threads
6969 DUCKDB_API int32_t NumberOfThreads ();
7070
71+ idx_t GetNumberOfTasks () const ;
72+ idx_t GetProducerCount () const ;
73+ idx_t GetTaskCountForProducer (ProducerToken &token) const ;
74+
7175 // ! Send signals to n threads, signalling for them to wake up and attempt to execute a task
7276 void Signal (idx_t n);
7377
Original file line number Diff line number Diff line change @@ -284,6 +284,39 @@ int32_t TaskScheduler::NumberOfThreads() {
284284 return current_thread_count.load ();
285285}
286286
287+ idx_t TaskScheduler::GetNumberOfTasks () const {
288+ #ifndef DUCKDB_NO_THREADS
289+ return queue->q .size_approx ();
290+ #else
291+ idx_t task_count = 0 ;
292+ for (auto &producer : queue->q ) {
293+ task_count += producer.second .size ();
294+ }
295+ return task_count;
296+ #endif
297+ }
298+
299+ idx_t TaskScheduler::GetProducerCount () const {
300+ #ifndef DUCKDB_NO_THREADS
301+ return queue->q .size_producers_approx ();
302+ #else
303+ return queue->q .size ();
304+ #endif
305+ }
306+
307+ idx_t TaskScheduler::GetTaskCountForProducer (ProducerToken &token) const {
308+ #ifndef DUCKDB_NO_THREADS
309+ lock_guard<mutex> producer_lock (token.producer_lock );
310+ return queue->q .size_producer_approx (token.token ->queue_token );
311+ #else
312+ const auto it = queue->q .find (std::ref (*token.token ));
313+ if (it == queue->q .end ()) {
314+ return 0 ;
315+ }
316+ return it->second .size ();
317+ #endif
318+ }
319+
287320void TaskScheduler::SetThreads (idx_t total_threads, idx_t external_threads) {
288321 if (total_threads == 0 ) {
289322 throw SyntaxException (" Number of threads must be positive!" );
Original file line number Diff line number Diff line change @@ -1254,6 +1254,23 @@ class ConcurrentQueue
12541254 return size;
12551255 }
12561256
1257+
1258+ // Returns the number of producers currently associated with the queue.
1259+ size_t size_producers_approx () const
1260+ {
1261+ size_t size = 0 ;
1262+ for (auto ptr = producerListTail.load (std::memory_order_acquire); ptr != nullptr ; ptr = ptr->next_prod ()) {
1263+ size += 1 ;
1264+ }
1265+ return size;
1266+ }
1267+
1268+ // Returns the number of elements currently in the queue for a specific producer.
1269+ size_t size_producer_approx (producer_token_t const & producer) const
1270+ {
1271+ return static_cast <ExplicitProducer*>(producer.producer )->size_approx ();
1272+ }
1273+
12571274
12581275 // Returns true if the underlying atomic variables used by
12591276 // the queue are lock-free (they should be on most platforms).
You can’t perform that action at this time.
0 commit comments