diff --git a/blosc/blosc2.c b/blosc/blosc2.c index faff94972..1d9e1e337 100644 --- a/blosc/blosc2.c +++ b/blosc/blosc2.c @@ -16,6 +16,7 @@ #include #include #include +#include #include "blosc2.h" #include "blosc-private.h" @@ -3055,10 +3056,10 @@ static void t_blosc_do_job(void *ctxt) } else { // Use dynamic schedule via a queue. Get the next block. - pthread_mutex_lock(&context->count_mutex); + pthread_spin_lock(&context->count_spin); context->thread_nblock++; nblock_ = context->thread_nblock; - pthread_mutex_unlock(&context->count_mutex); + pthread_spin_unlock(&context->count_spin); tblock = nblocks; } @@ -3119,15 +3120,15 @@ static void t_blosc_do_job(void *ctxt) /* Check results for the compressed/decompressed block */ if (cbytes < 0) { /* compr/decompr failure */ /* Set giveup_code error */ - pthread_mutex_lock(&context->count_mutex); + pthread_spin_lock(&context->count_spin); context->thread_giveup_code = cbytes; - pthread_mutex_unlock(&context->count_mutex); + pthread_spin_unlock(&context->count_spin); break; } if (compress && !memcpyed) { /* Start critical section */ - pthread_mutex_lock(&context->count_mutex); + pthread_spin_lock(&context->count_spin); ntdest = context->output_bytes; // Note: do not use a typical local dict_training variable here // because it is probably cached from previous calls if the number of @@ -3138,13 +3139,13 @@ static void t_blosc_do_job(void *ctxt) if ((cbytes == 0) || (ntdest + cbytes > maxbytes)) { context->thread_giveup_code = 0; /* incompressible buf */ - pthread_mutex_unlock(&context->count_mutex); + pthread_spin_unlock(&context->count_spin); break; } context->thread_nblock++; nblock_ = context->thread_nblock; context->output_bytes += cbytes; - pthread_mutex_unlock(&context->count_mutex); + pthread_spin_unlock(&context->count_spin); /* End of critical section */ /* Copy the compressed buffer to destination */ @@ -3154,22 +3155,22 @@ static void t_blosc_do_job(void *ctxt) nblock_++; } else { - pthread_mutex_lock(&context->count_mutex); + pthread_spin_lock(&context->count_spin); context->thread_nblock++; nblock_ = context->thread_nblock; context->output_bytes += cbytes; - pthread_mutex_unlock(&context->count_mutex); + pthread_spin_unlock(&context->count_spin); } } /* closes while (nblock_) */ if (static_schedule) { - pthread_mutex_lock(&context->count_mutex); + pthread_spin_lock(&context->count_spin); context->output_bytes = context->sourcesize; if (compress) { context->output_bytes += context->header_overhead; } - pthread_mutex_unlock(&context->count_mutex); + pthread_spin_unlock(&context->count_spin); } } @@ -3208,7 +3209,7 @@ int init_threadpool(blosc2_context *context) { int rc2; /* Initialize mutex and condition variable objects */ - pthread_mutex_init(&context->count_mutex, NULL); + pthread_spin_init(&context->count_spin, PTHREAD_PROCESS_SHARED); pthread_mutex_init(&context->delta_mutex, NULL); pthread_mutex_init(&context->nchunk_mutex, NULL); pthread_cond_init(&context->delta_cv, NULL); @@ -3613,7 +3614,7 @@ int release_threadpool(blosc2_context *context) { } /* Release mutex and condition variable objects */ - pthread_mutex_destroy(&context->count_mutex); + pthread_spin_destroy(&context->count_spin); pthread_mutex_destroy(&context->delta_mutex); pthread_mutex_destroy(&context->nchunk_mutex); pthread_cond_destroy(&context->delta_cv); diff --git a/blosc/context.h b/blosc/context.h index ee16613f0..e8718116c 100644 --- a/blosc/context.h +++ b/blosc/context.h @@ -121,7 +121,7 @@ struct blosc2_context_s { int16_t end_threads; pthread_t *threads; struct thread_context *thread_contexts; /* only for user-managed threads */ - pthread_mutex_t count_mutex; + pthread_spinlock_t count_spin; pthread_mutex_t nchunk_mutex; #ifdef BLOSC_POSIX_BARRIERS pthread_barrier_t barr_init;