|
3 | 3 | #define _GNU_SOURCE
|
4 | 4 | #include <poll.h>
|
5 | 5 | #include <unistd.h>
|
| 6 | +#include <assert.h> |
6 | 7 | #include <signal.h>
|
7 | 8 | #include <pthread.h>
|
8 | 9 | #include <sys/epoll.h>
|
@@ -3136,4 +3137,149 @@ TEST(epoll59)
|
3136 | 3137 | close(ctx.sfd[0]);
|
3137 | 3138 | }
|
3138 | 3139 |
|
| 3140 | +enum { |
| 3141 | + EPOLL60_EVENTS_NR = 10, |
| 3142 | +}; |
| 3143 | + |
| 3144 | +struct epoll60_ctx { |
| 3145 | + volatile int stopped; |
| 3146 | + int ready; |
| 3147 | + int waiters; |
| 3148 | + int epfd; |
| 3149 | + int evfd[EPOLL60_EVENTS_NR]; |
| 3150 | +}; |
| 3151 | + |
| 3152 | +static void *epoll60_wait_thread(void *ctx_) |
| 3153 | +{ |
| 3154 | + struct epoll60_ctx *ctx = ctx_; |
| 3155 | + struct epoll_event e; |
| 3156 | + sigset_t sigmask; |
| 3157 | + uint64_t v; |
| 3158 | + int ret; |
| 3159 | + |
| 3160 | + /* Block SIGUSR1 */ |
| 3161 | + sigemptyset(&sigmask); |
| 3162 | + sigaddset(&sigmask, SIGUSR1); |
| 3163 | + sigprocmask(SIG_SETMASK, &sigmask, NULL); |
| 3164 | + |
| 3165 | + /* Prepare empty mask for epoll_pwait() */ |
| 3166 | + sigemptyset(&sigmask); |
| 3167 | + |
| 3168 | + while (!ctx->stopped) { |
| 3169 | + /* Mark we are ready */ |
| 3170 | + __atomic_fetch_add(&ctx->ready, 1, __ATOMIC_ACQUIRE); |
| 3171 | + |
| 3172 | + /* Start when all are ready */ |
| 3173 | + while (__atomic_load_n(&ctx->ready, __ATOMIC_ACQUIRE) && |
| 3174 | + !ctx->stopped); |
| 3175 | + |
| 3176 | + /* Account this waiter */ |
| 3177 | + __atomic_fetch_add(&ctx->waiters, 1, __ATOMIC_ACQUIRE); |
| 3178 | + |
| 3179 | + ret = epoll_pwait(ctx->epfd, &e, 1, 2000, &sigmask); |
| 3180 | + if (ret != 1) { |
| 3181 | + /* We expect only signal delivery on stop */ |
| 3182 | + assert(ret < 0 && errno == EINTR && "Lost wakeup!\n"); |
| 3183 | + assert(ctx->stopped); |
| 3184 | + break; |
| 3185 | + } |
| 3186 | + |
| 3187 | + ret = read(e.data.fd, &v, sizeof(v)); |
| 3188 | + /* Since we are on ET mode, thus each thread gets its own fd. */ |
| 3189 | + assert(ret == sizeof(v)); |
| 3190 | + |
| 3191 | + __atomic_fetch_sub(&ctx->waiters, 1, __ATOMIC_RELEASE); |
| 3192 | + } |
| 3193 | + |
| 3194 | + return NULL; |
| 3195 | +} |
| 3196 | + |
| 3197 | +static inline unsigned long long msecs(void) |
| 3198 | +{ |
| 3199 | + struct timespec ts; |
| 3200 | + unsigned long long msecs; |
| 3201 | + |
| 3202 | + clock_gettime(CLOCK_REALTIME, &ts); |
| 3203 | + msecs = ts.tv_sec * 1000ull; |
| 3204 | + msecs += ts.tv_nsec / 1000000ull; |
| 3205 | + |
| 3206 | + return msecs; |
| 3207 | +} |
| 3208 | + |
| 3209 | +static inline int count_waiters(struct epoll60_ctx *ctx) |
| 3210 | +{ |
| 3211 | + return __atomic_load_n(&ctx->waiters, __ATOMIC_ACQUIRE); |
| 3212 | +} |
| 3213 | + |
| 3214 | +TEST(epoll60) |
| 3215 | +{ |
| 3216 | + struct epoll60_ctx ctx = { 0 }; |
| 3217 | + pthread_t waiters[ARRAY_SIZE(ctx.evfd)]; |
| 3218 | + struct epoll_event e; |
| 3219 | + int i, n, ret; |
| 3220 | + |
| 3221 | + signal(SIGUSR1, signal_handler); |
| 3222 | + |
| 3223 | + ctx.epfd = epoll_create1(0); |
| 3224 | + ASSERT_GE(ctx.epfd, 0); |
| 3225 | + |
| 3226 | + /* Create event fds */ |
| 3227 | + for (i = 0; i < ARRAY_SIZE(ctx.evfd); i++) { |
| 3228 | + ctx.evfd[i] = eventfd(0, EFD_NONBLOCK); |
| 3229 | + ASSERT_GE(ctx.evfd[i], 0); |
| 3230 | + |
| 3231 | + e.events = EPOLLIN | EPOLLET; |
| 3232 | + e.data.fd = ctx.evfd[i]; |
| 3233 | + ASSERT_EQ(epoll_ctl(ctx.epfd, EPOLL_CTL_ADD, ctx.evfd[i], &e), 0); |
| 3234 | + } |
| 3235 | + |
| 3236 | + /* Create waiter threads */ |
| 3237 | + for (i = 0; i < ARRAY_SIZE(waiters); i++) |
| 3238 | + ASSERT_EQ(pthread_create(&waiters[i], NULL, |
| 3239 | + epoll60_wait_thread, &ctx), 0); |
| 3240 | + |
| 3241 | + for (i = 0; i < 300; i++) { |
| 3242 | + uint64_t v = 1, ms; |
| 3243 | + |
| 3244 | + /* Wait for all to be ready */ |
| 3245 | + while (__atomic_load_n(&ctx.ready, __ATOMIC_ACQUIRE) != |
| 3246 | + ARRAY_SIZE(ctx.evfd)) |
| 3247 | + ; |
| 3248 | + |
| 3249 | + /* Steady, go */ |
| 3250 | + __atomic_fetch_sub(&ctx.ready, ARRAY_SIZE(ctx.evfd), |
| 3251 | + __ATOMIC_ACQUIRE); |
| 3252 | + |
| 3253 | + /* Wait all have gone to kernel */ |
| 3254 | + while (count_waiters(&ctx) != ARRAY_SIZE(ctx.evfd)) |
| 3255 | + ; |
| 3256 | + |
| 3257 | + /* 1ms should be enough to schedule away */ |
| 3258 | + usleep(1000); |
| 3259 | + |
| 3260 | + /* Quickly signal all handles at once */ |
| 3261 | + for (n = 0; n < ARRAY_SIZE(ctx.evfd); n++) { |
| 3262 | + ret = write(ctx.evfd[n], &v, sizeof(v)); |
| 3263 | + ASSERT_EQ(ret, sizeof(v)); |
| 3264 | + } |
| 3265 | + |
| 3266 | + /* Busy loop for 1s and wait for all waiters to wake up */ |
| 3267 | + ms = msecs(); |
| 3268 | + while (count_waiters(&ctx) && msecs() < ms + 1000) |
| 3269 | + ; |
| 3270 | + |
| 3271 | + ASSERT_EQ(count_waiters(&ctx), 0); |
| 3272 | + } |
| 3273 | + ctx.stopped = 1; |
| 3274 | + /* Stop waiters */ |
| 3275 | + for (i = 0; i < ARRAY_SIZE(waiters); i++) |
| 3276 | + ret = pthread_kill(waiters[i], SIGUSR1); |
| 3277 | + for (i = 0; i < ARRAY_SIZE(waiters); i++) |
| 3278 | + pthread_join(waiters[i], NULL); |
| 3279 | + |
| 3280 | + for (i = 0; i < ARRAY_SIZE(waiters); i++) |
| 3281 | + close(ctx.evfd[i]); |
| 3282 | + close(ctx.epfd); |
| 3283 | +} |
| 3284 | + |
3139 | 3285 | TEST_HARNESS_MAIN
|
0 commit comments