Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 14 additions & 13 deletions blosc/blosc2.c
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
#include <sys/types.h>
#include <assert.h>
#include <math.h>
#include <pthread.h>

#include "blosc2.h"
#include "blosc-private.h"
Expand Down Expand Up @@ -3055,10 +3056,10 @@ static void t_blosc_do_job(void *ctxt)
}
else {
// Use dynamic schedule via a queue. Get the next block.
pthread_mutex_lock(&context->count_mutex);
pthread_spin_lock(&context->count_spin);
context->thread_nblock++;
nblock_ = context->thread_nblock;
pthread_mutex_unlock(&context->count_mutex);
pthread_spin_unlock(&context->count_spin);
tblock = nblocks;
}

Expand Down Expand Up @@ -3119,15 +3120,15 @@ static void t_blosc_do_job(void *ctxt)
/* Check results for the compressed/decompressed block */
if (cbytes < 0) { /* compr/decompr failure */
/* Set giveup_code error */
pthread_mutex_lock(&context->count_mutex);
pthread_spin_lock(&context->count_spin);
context->thread_giveup_code = cbytes;
pthread_mutex_unlock(&context->count_mutex);
pthread_spin_unlock(&context->count_spin);
break;
}

if (compress && !memcpyed) {
/* Start critical section */
pthread_mutex_lock(&context->count_mutex);
pthread_spin_lock(&context->count_spin);
ntdest = context->output_bytes;
// Note: do not use a typical local dict_training variable here
// because it is probably cached from previous calls if the number of
Expand All @@ -3138,13 +3139,13 @@ static void t_blosc_do_job(void *ctxt)

if ((cbytes == 0) || (ntdest + cbytes > maxbytes)) {
context->thread_giveup_code = 0; /* incompressible buf */
pthread_mutex_unlock(&context->count_mutex);
pthread_spin_unlock(&context->count_spin);
break;
}
context->thread_nblock++;
nblock_ = context->thread_nblock;
context->output_bytes += cbytes;
pthread_mutex_unlock(&context->count_mutex);
pthread_spin_unlock(&context->count_spin);
/* End of critical section */

/* Copy the compressed buffer to destination */
Expand All @@ -3154,22 +3155,22 @@ static void t_blosc_do_job(void *ctxt)
nblock_++;
}
else {
pthread_mutex_lock(&context->count_mutex);
pthread_spin_lock(&context->count_spin);
context->thread_nblock++;
nblock_ = context->thread_nblock;
context->output_bytes += cbytes;
pthread_mutex_unlock(&context->count_mutex);
pthread_spin_unlock(&context->count_spin);
}

} /* closes while (nblock_) */

if (static_schedule) {
pthread_mutex_lock(&context->count_mutex);
pthread_spin_lock(&context->count_spin);
context->output_bytes = context->sourcesize;
if (compress) {
context->output_bytes += context->header_overhead;
}
pthread_mutex_unlock(&context->count_mutex);
pthread_spin_unlock(&context->count_spin);
}

}
Expand Down Expand Up @@ -3208,7 +3209,7 @@ int init_threadpool(blosc2_context *context) {
int rc2;

/* Initialize mutex and condition variable objects */
pthread_mutex_init(&context->count_mutex, NULL);
pthread_spin_init(&context->count_spin, PTHREAD_PROCESS_SHARED);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PTHREAD_PROCESS_SHARED why we need this flag?

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PTHREAD_PROCESS_SHARED why we need this flag?

int pthread_spin_init(pthread_spinlock_t *lock, int pshared);

The pshared must be one of the following.
PTHREAD_PROCESS_PRIVATE/PTHREAD_PROCESS_SHARED
The 1st one has limited to the threads that are forked from the same process.
The 2nd one could be obtained by all the threads.

Reference Link: https://man7.org/linux/man-pages/man3/pthread_spin_init.3.html

pthread_mutex_init(&context->delta_mutex, NULL);
pthread_mutex_init(&context->nchunk_mutex, NULL);
pthread_cond_init(&context->delta_cv, NULL);
Expand Down Expand Up @@ -3613,7 +3614,7 @@ int release_threadpool(blosc2_context *context) {
}

/* Release mutex and condition variable objects */
pthread_mutex_destroy(&context->count_mutex);
pthread_spin_destroy(&context->count_spin);
pthread_mutex_destroy(&context->delta_mutex);
pthread_mutex_destroy(&context->nchunk_mutex);
pthread_cond_destroy(&context->delta_cv);
Expand Down
2 changes: 1 addition & 1 deletion blosc/context.h
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ struct blosc2_context_s {
int16_t end_threads;
pthread_t *threads;
struct thread_context *thread_contexts; /* only for user-managed threads */
pthread_mutex_t count_mutex;
pthread_spinlock_t count_spin;
pthread_mutex_t nchunk_mutex;
#ifdef BLOSC_POSIX_BARRIERS
pthread_barrier_t barr_init;
Expand Down