@@ -399,188 +399,3 @@ size_t HeapRootSegments::segment_offset(size_t seg_idx) {
399399 return _base_offset + seg_idx * _max_size_in_bytes;
400400}
401401
402- ArchiveWorkers ArchiveWorkers::_workers;
403-
404- ArchiveWorkers::ArchiveWorkers () :
405- _start_semaphore(0 ),
406- _end_semaphore(0 ),
407- _num_workers(0 ),
408- _started_workers(0 ),
409- _waiting_workers(0 ),
410- _running_workers(0 ),
411- _state(NOT_READY),
412- _task(nullptr ) {
413- }
414-
415- void ArchiveWorkers::initialize () {
416- assert (Atomic::load (&_state) == NOT_READY, " Should be" );
417-
418- Atomic::store (&_num_workers, max_workers ());
419- Atomic::store (&_state, READY);
420-
421- // Kick off pool startup by creating a single worker.
422- start_worker_if_needed ();
423- }
424-
425- int ArchiveWorkers::max_workers () {
426- // The pool is used for short-lived bursty tasks. We do not want to spend
427- // too much time creating and waking up threads unnecessarily. Plus, we do
428- // not want to overwhelm large machines. This is why we want to be very
429- // conservative about the number of workers actually needed.
430- return MAX2 (0 , log2i_graceful (os::active_processor_count ()));
431- }
432-
433- bool ArchiveWorkers::is_parallel () {
434- return _num_workers > 0 ;
435- }
436-
437- void ArchiveWorkers::shutdown () {
438- while (true ) {
439- State state = Atomic::load (&_state);
440- if (state == SHUTDOWN) {
441- // Already shut down.
442- return ;
443- }
444- if (Atomic::cmpxchg (&_state, state, SHUTDOWN, memory_order_relaxed) == state) {
445- if (is_parallel ()) {
446- // Execute a shutdown task and block until all workers respond.
447- run_task (&_shutdown_task);
448- }
449- }
450- }
451- }
452-
453- void ArchiveWorkers::start_worker_if_needed () {
454- while (true ) {
455- int cur = Atomic::load (&_started_workers);
456- if (cur >= _num_workers) {
457- return ;
458- }
459- if (Atomic::cmpxchg (&_started_workers, cur, cur + 1 , memory_order_relaxed) == cur) {
460- new ArchiveWorkerThread (this );
461- return ;
462- }
463- }
464- }
465-
466- void ArchiveWorkers::signal_worker_if_needed () {
467- while (true ) {
468- int cur = Atomic::load (&_waiting_workers);
469- if (cur == 0 ) {
470- return ;
471- }
472- if (Atomic::cmpxchg (&_waiting_workers, cur, cur - 1 , memory_order_relaxed) == cur) {
473- _start_semaphore.signal (1 );
474- return ;
475- }
476- }
477- }
478-
479- void ArchiveWorkers::run_task (ArchiveWorkerTask* task) {
480- assert ((Atomic::load (&_state) == READY) ||
481- ((Atomic::load (&_state) == SHUTDOWN) && (task == &_shutdown_task)),
482- " Should be in correct state" );
483- assert (Atomic::load (&_task) == nullptr , " Should not have running tasks" );
484-
485- if (is_parallel ()) {
486- run_task_multi (task);
487- } else {
488- run_task_single (task);
489- }
490- }
491-
492- void ArchiveWorkers::run_task_single (ArchiveWorkerTask* task) {
493- // Single thread needs no chunking.
494- task->configure_max_chunks (1 );
495-
496- // Execute the task ourselves, as there are no workers.
497- task->work (0 , 1 );
498- }
499-
500- void ArchiveWorkers::run_task_multi (ArchiveWorkerTask* task) {
501- // Multiple threads can work with multiple chunks.
502- task->configure_max_chunks (_num_workers * CHUNKS_PER_WORKER);
503-
504- // Set up the run and publish the task.
505- Atomic::store (&_waiting_workers, _num_workers);
506- Atomic::store (&_running_workers, _num_workers);
507- Atomic::release_store (&_task, task);
508-
509- // Kick off pool wakeup by signaling a single worker, and proceed
510- // immediately to executing the task locally.
511- signal_worker_if_needed ();
512-
513- // Execute the task ourselves, while workers are catching up.
514- // This allows us to hide parts of task handoff latency.
515- task->run ();
516-
517- // Done executing task locally, wait for any remaining workers to complete,
518- // and then do the final housekeeping.
519- _end_semaphore.wait ();
520- Atomic::store (&_task, (ArchiveWorkerTask *) nullptr );
521- OrderAccess::fence ();
522-
523- assert (Atomic::load (&_waiting_workers) == 0 , " All workers were signaled" );
524- assert (Atomic::load (&_running_workers) == 0 , " No workers are running" );
525- }
526-
527- void ArchiveWorkerTask::run () {
528- while (true ) {
529- int chunk = Atomic::load (&_chunk);
530- if (chunk >= _max_chunks) {
531- return ;
532- }
533- if (Atomic::cmpxchg (&_chunk, chunk, chunk + 1 , memory_order_relaxed) == chunk) {
534- assert (0 <= chunk && chunk < _max_chunks, " Sanity" );
535- work (chunk, _max_chunks);
536- }
537- }
538- }
539-
540- void ArchiveWorkerTask::configure_max_chunks (int max_chunks) {
541- if (_max_chunks == 0 ) {
542- _max_chunks = max_chunks;
543- }
544- }
545-
546- bool ArchiveWorkers::run_as_worker () {
547- assert (is_parallel (), " Should be in parallel mode" );
548- _start_semaphore.wait ();
549-
550- // Avalanche wakeups: each worker signals two others.
551- signal_worker_if_needed ();
552- signal_worker_if_needed ();
553-
554- ArchiveWorkerTask* task = Atomic::load_acquire (&_task);
555- task->run ();
556-
557- // All work done in threads should be visible to caller.
558- OrderAccess::fence ();
559-
560- // Signal the pool the tasks are complete, if this is the last worker.
561- if (Atomic::sub (&_running_workers, 1 , memory_order_relaxed) == 0 ) {
562- _end_semaphore.signal ();
563- }
564-
565- // Continue if task was not a termination task.
566- return (task != &_shutdown_task);
567- }
568-
569- ArchiveWorkerThread::ArchiveWorkerThread (ArchiveWorkers* pool) : NamedThread(), _pool(pool) {
570- set_name (" ArchiveWorkerThread" );
571- os::create_thread (this , os::os_thread);
572- os::start_thread (this );
573- }
574-
575- void ArchiveWorkerThread::run () {
576- // Avalanche thread startup: each starting worker starts two others.
577- _pool->start_worker_if_needed ();
578- _pool->start_worker_if_needed ();
579-
580- // Set ourselves up.
581- os::set_priority (this , NearMaxPriority);
582-
583- while (_pool->run_as_worker ()) {
584- // Work until terminated.
585- }
586- }
0 commit comments