|
| 1 | +/** |
| 2 | + * Copyright (c) 2016-present, Facebook, Inc. |
| 3 | + * All rights reserved. |
| 4 | + * |
| 5 | + * This source code is licensed under the BSD-style license found in the |
| 6 | + * LICENSE file in the root directory of this source tree. An additional grant |
| 7 | + * of patent rights can be found in the PATENTS file in the same directory. |
| 8 | + */ |
| 9 | + |
| 10 | + |
| 11 | +/* ====== Dependencies ======= */ |
| 12 | +#include <stddef.h> /* size_t */ |
| 13 | +#include <stdlib.h> /* malloc, calloc, free */ |
| 14 | +#include "pool.h" |
| 15 | + |
| 16 | +/* ====== Compiler specifics ====== */ |
| 17 | +#if defined(_MSC_VER) |
| 18 | +# pragma warning(disable : 4204) /* disable: C4204: non-constant aggregate initializer */ |
| 19 | +#endif |
| 20 | + |
| 21 | + |
| 22 | +#ifdef ZSTD_MULTITHREAD |
| 23 | + |
| 24 | +#include "threading.h" /* pthread adaptation */ |
| 25 | + |
| 26 | +/* A job is a function and an opaque argument */ |
| 27 | +typedef struct POOL_job_s { |
| 28 | + POOL_function function; |
| 29 | + void *opaque; |
| 30 | +} POOL_job; |
| 31 | + |
| 32 | +struct POOL_ctx_s { |
| 33 | + /* Keep track of the threads */ |
| 34 | + pthread_t *threads; |
| 35 | + size_t numThreads; |
| 36 | + |
| 37 | + /* The queue is a circular buffer */ |
| 38 | + POOL_job *queue; |
| 39 | + size_t queueHead; |
| 40 | + size_t queueTail; |
| 41 | + size_t queueSize; |
| 42 | + /* The mutex protects the queue */ |
| 43 | + pthread_mutex_t queueMutex; |
| 44 | + /* Condition variable for pushers to wait on when the queue is full */ |
| 45 | + pthread_cond_t queuePushCond; |
| 46 | + /* Condition variables for poppers to wait on when the queue is empty */ |
| 47 | + pthread_cond_t queuePopCond; |
| 48 | + /* Indicates if the queue is shutting down */ |
| 49 | + int shutdown; |
| 50 | +}; |
| 51 | + |
| 52 | +/* POOL_thread() : |
| 53 | + Work thread for the thread pool. |
| 54 | + Waits for jobs and executes them. |
| 55 | + @returns : NULL on failure else non-null. |
| 56 | +*/ |
| 57 | +static void* POOL_thread(void* opaque) { |
| 58 | + POOL_ctx* const ctx = (POOL_ctx*)opaque; |
| 59 | + if (!ctx) { return NULL; } |
| 60 | + for (;;) { |
| 61 | + /* Lock the mutex and wait for a non-empty queue or until shutdown */ |
| 62 | + pthread_mutex_lock(&ctx->queueMutex); |
| 63 | + while (ctx->queueHead == ctx->queueTail && !ctx->shutdown) { |
| 64 | + pthread_cond_wait(&ctx->queuePopCond, &ctx->queueMutex); |
| 65 | + } |
| 66 | + /* empty => shutting down: so stop */ |
| 67 | + if (ctx->queueHead == ctx->queueTail) { |
| 68 | + pthread_mutex_unlock(&ctx->queueMutex); |
| 69 | + return opaque; |
| 70 | + } |
| 71 | + /* Pop a job off the queue */ |
| 72 | + { POOL_job const job = ctx->queue[ctx->queueHead]; |
| 73 | + ctx->queueHead = (ctx->queueHead + 1) % ctx->queueSize; |
| 74 | + /* Unlock the mutex, signal a pusher, and run the job */ |
| 75 | + pthread_mutex_unlock(&ctx->queueMutex); |
| 76 | + pthread_cond_signal(&ctx->queuePushCond); |
| 77 | + job.function(job.opaque); |
| 78 | + } |
| 79 | + } |
| 80 | + /* Unreachable */ |
| 81 | +} |
| 82 | + |
| 83 | +POOL_ctx *POOL_create(size_t numThreads, size_t queueSize) { |
| 84 | + POOL_ctx *ctx; |
| 85 | + /* Check the parameters */ |
| 86 | + if (!numThreads || !queueSize) { return NULL; } |
| 87 | + /* Allocate the context and zero initialize */ |
| 88 | + ctx = (POOL_ctx *)calloc(1, sizeof(POOL_ctx)); |
| 89 | + if (!ctx) { return NULL; } |
| 90 | + /* Initialize the job queue. |
| 91 | + * It needs one extra space since one space is wasted to differentiate empty |
| 92 | + * and full queues. |
| 93 | + */ |
| 94 | + ctx->queueSize = queueSize + 1; |
| 95 | + ctx->queue = (POOL_job *)malloc(ctx->queueSize * sizeof(POOL_job)); |
| 96 | + ctx->queueHead = 0; |
| 97 | + ctx->queueTail = 0; |
| 98 | + pthread_mutex_init(&ctx->queueMutex, NULL); |
| 99 | + pthread_cond_init(&ctx->queuePushCond, NULL); |
| 100 | + pthread_cond_init(&ctx->queuePopCond, NULL); |
| 101 | + ctx->shutdown = 0; |
| 102 | + /* Allocate space for the thread handles */ |
| 103 | + ctx->threads = (pthread_t *)malloc(numThreads * sizeof(pthread_t)); |
| 104 | + ctx->numThreads = 0; |
| 105 | + /* Check for errors */ |
| 106 | + if (!ctx->threads || !ctx->queue) { POOL_free(ctx); return NULL; } |
| 107 | + /* Initialize the threads */ |
| 108 | + { size_t i; |
| 109 | + for (i = 0; i < numThreads; ++i) { |
| 110 | + if (pthread_create(&ctx->threads[i], NULL, &POOL_thread, ctx)) { |
| 111 | + ctx->numThreads = i; |
| 112 | + POOL_free(ctx); |
| 113 | + return NULL; |
| 114 | + } } |
| 115 | + ctx->numThreads = numThreads; |
| 116 | + } |
| 117 | + return ctx; |
| 118 | +} |
| 119 | + |
| 120 | +/*! POOL_join() : |
| 121 | + Shutdown the queue, wake any sleeping threads, and join all of the threads. |
| 122 | +*/ |
| 123 | +static void POOL_join(POOL_ctx *ctx) { |
| 124 | + /* Shut down the queue */ |
| 125 | + pthread_mutex_lock(&ctx->queueMutex); |
| 126 | + ctx->shutdown = 1; |
| 127 | + pthread_mutex_unlock(&ctx->queueMutex); |
| 128 | + /* Wake up sleeping threads */ |
| 129 | + pthread_cond_broadcast(&ctx->queuePushCond); |
| 130 | + pthread_cond_broadcast(&ctx->queuePopCond); |
| 131 | + /* Join all of the threads */ |
| 132 | + { size_t i; |
| 133 | + for (i = 0; i < ctx->numThreads; ++i) { |
| 134 | + pthread_join(ctx->threads[i], NULL); |
| 135 | + } } |
| 136 | +} |
| 137 | + |
| 138 | +void POOL_free(POOL_ctx *ctx) { |
| 139 | + if (!ctx) { return; } |
| 140 | + POOL_join(ctx); |
| 141 | + pthread_mutex_destroy(&ctx->queueMutex); |
| 142 | + pthread_cond_destroy(&ctx->queuePushCond); |
| 143 | + pthread_cond_destroy(&ctx->queuePopCond); |
| 144 | + if (ctx->queue) free(ctx->queue); |
| 145 | + if (ctx->threads) free(ctx->threads); |
| 146 | + free(ctx); |
| 147 | +} |
| 148 | + |
| 149 | +void POOL_add(void *ctxVoid, POOL_function function, void *opaque) { |
| 150 | + POOL_ctx *ctx = (POOL_ctx *)ctxVoid; |
| 151 | + if (!ctx) { return; } |
| 152 | + |
| 153 | + pthread_mutex_lock(&ctx->queueMutex); |
| 154 | + { POOL_job const job = {function, opaque}; |
| 155 | + /* Wait until there is space in the queue for the new job */ |
| 156 | + size_t newTail = (ctx->queueTail + 1) % ctx->queueSize; |
| 157 | + while (ctx->queueHead == newTail && !ctx->shutdown) { |
| 158 | + pthread_cond_wait(&ctx->queuePushCond, &ctx->queueMutex); |
| 159 | + newTail = (ctx->queueTail + 1) % ctx->queueSize; |
| 160 | + } |
| 161 | + /* The queue is still going => there is space */ |
| 162 | + if (!ctx->shutdown) { |
| 163 | + ctx->queue[ctx->queueTail] = job; |
| 164 | + ctx->queueTail = newTail; |
| 165 | + } |
| 166 | + } |
| 167 | + pthread_mutex_unlock(&ctx->queueMutex); |
| 168 | + pthread_cond_signal(&ctx->queuePopCond); |
| 169 | +} |
| 170 | + |
| 171 | +#else /* ZSTD_MULTITHREAD not defined */ |
| 172 | +/* No multi-threading support */ |
| 173 | + |
| 174 | +/* We don't need any data, but if it is empty malloc() might return NULL. */ |
| 175 | +struct POOL_ctx_s { |
| 176 | + int data; |
| 177 | +}; |
| 178 | + |
| 179 | +POOL_ctx *POOL_create(size_t numThreads, size_t queueSize) { |
| 180 | + (void)numThreads; |
| 181 | + (void)queueSize; |
| 182 | + return (POOL_ctx *)malloc(sizeof(POOL_ctx)); |
| 183 | +} |
| 184 | + |
| 185 | +void POOL_free(POOL_ctx *ctx) { |
| 186 | + if (ctx) free(ctx); |
| 187 | +} |
| 188 | + |
| 189 | +void POOL_add(void *ctx, POOL_function function, void *opaque) { |
| 190 | + (void)ctx; |
| 191 | + function(opaque); |
| 192 | +} |
| 193 | + |
| 194 | +#endif /* ZSTD_MULTITHREAD */ |
0 commit comments