Skip to content

Commit d8c185c

Browse files
committed
Force all threads to call opcache_reset().
1 parent acf2a1c commit d8c185c

File tree

5 files changed

+89
-94
lines changed

5 files changed

+89
-94
lines changed

frankenphp.go

Lines changed: 81 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -59,9 +59,9 @@ var (
5959
contextKey = contextKeyStruct{}
6060
serverHeader = []string{"FrankenPHP"}
6161

62-
isRunning bool
63-
restartCounter atomic.Int32
64-
onServerShutdown []func()
62+
isRunning bool
63+
threadsAreRestarting atomic.Bool
64+
onServerShutdown []func()
6565

6666
// Set default values to make Shutdown() idempotent
6767
globalMu sync.Mutex
@@ -759,49 +759,111 @@ func go_is_context_done(threadIndex C.uintptr_t) C.bool {
759759

760760
//export go_schedule_opcache_reset
761761
func go_schedule_opcache_reset(threadIndex C.uintptr_t) {
762-
if restartCounter.CompareAndSwap(0, 1) {
763-
go restartThreadsForOpcacheReset()
762+
if threadsAreRestarting.CompareAndSwap(false, true) {
763+
go restartThreadsAndOpcacheReset(true)
764764
}
765765
}
766766

767767
// restart all threads for an opcache_reset
768-
func restartThreadsForOpcacheReset() {
768+
func restartThreadsAndOpcacheReset(withRegularThreads bool) {
769769
// disallow scaling threads while restarting workers
770770
scalingMu.Lock()
771771
defer scalingMu.Unlock()
772772

773-
threadsToRestart := drainWorkerThreads(true)
773+
threadsToRestart := drainThreads(withRegularThreads)
774+
775+
opcacheResetWg := sync.WaitGroup{}
776+
for _, thread := range threadsToRestart {
777+
thread.state.Set(state.OpcacheResetting)
778+
opcacheResetWg.Go(func() {
779+
thread.state.WaitFor(state.OpcacheResettingDone)
780+
})
781+
}
782+
783+
opcacheResetWg.Wait()
774784

775785
for _, thread := range threadsToRestart {
776786
thread.drainChan = make(chan struct{})
777787
thread.state.Set(state.Ready)
778788
}
789+
790+
threadsAreRestarting.Store(false)
779791
}
780792

781-
func scheduleOpcacheReset(thread *phpThread) {
782-
restartCounter.Add(-1)
783-
if restartCounter.Load() != 1 {
784-
return // only the last restarting thread will trigger an actual opcache_reset
793+
func drainThreads(withRegularThreads bool) []*phpThread {
794+
var (
795+
ready sync.WaitGroup
796+
drainedThreads []*phpThread
797+
)
798+
799+
for _, worker := range workers {
800+
worker.threadMutex.RLock()
801+
ready.Add(len(worker.threads))
802+
803+
for _, thread := range worker.threads {
804+
if !thread.state.RequestSafeStateChange(state.Restarting) {
805+
ready.Done()
806+
807+
// no state change allowed == thread is shutting down
808+
// we'll proceed to restart all other threads anyway
809+
continue
810+
}
811+
close(thread.drainChan)
812+
drainedThreads = append(drainedThreads, thread)
813+
814+
go func(thread *phpThread) {
815+
thread.state.WaitFor(state.Yielding)
816+
ready.Done()
817+
}(thread)
818+
}
819+
820+
worker.threadMutex.RUnlock()
821+
}
822+
823+
if withRegularThreads {
824+
regularThreadMu.RLock()
825+
ready.Add(len(regularThreads))
826+
827+
for _, thread := range regularThreads {
828+
if !thread.state.RequestSafeStateChange(state.Restarting) {
829+
ready.Done()
830+
831+
// no state change allowed == thread is shutting down
832+
// we'll proceed to restart all other threads anyway
833+
continue
834+
}
835+
close(thread.drainChan)
836+
drainedThreads = append(drainedThreads, thread)
837+
838+
go func(thread *phpThread) {
839+
thread.state.WaitFor(state.Yielding)
840+
ready.Done()
841+
}(thread)
842+
}
843+
844+
regularThreadMu.RUnlock()
785845
}
786-
workerThread, ok := thread.handler.(*workerThread)
846+
847+
ready.Wait()
848+
849+
return drainedThreads
850+
}
851+
852+
func scheduleOpcacheReset(thread *phpThread) {
787853
fc, _ := newDummyContext("/opcache_reset")
788-
if ok && workerThread.worker != nil {
854+
855+
if workerThread, ok := thread.handler.(*workerThread); ok {
789856
workerThread.dummyFrankenPHPContext = fc
790857
defer func() { workerThread.dummyFrankenPHPContext = nil }()
791858
}
792859

793-
regularThread, ok := thread.handler.(*regularThread)
794-
if ok {
860+
if regularThread, ok := thread.handler.(*regularThread); ok {
795861
regularThread.contextHolder.frankenPHPContext = fc
796862
defer func() { regularThread.contextHolder.frankenPHPContext = nil }()
797863
}
798864

799865
globalLogger.Info("resetting opcache in all threads")
800866
C.frankenphp_reset_opcache()
801-
time.Sleep(200 * time.Millisecond) // opcache_reset grace period
802-
803-
// all threads should have restarted now
804-
restartCounter.Store(0)
805867
}
806868

807869
// ExecuteScriptCLI executes the PHP script passed as parameter.

internal/state/state.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@ const (
2424
// States necessary for restarting workers
2525
Restarting State = "restarting"
2626
Yielding State = "yielding"
27+
OpcacheResetting State = "opcache resetting"
28+
OpcacheResettingDone State = "opcache reset done"
2729

2830
// States necessary for transitioning between different handlers
2931
TransitionRequested State = "transition requested"

threadregular.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,9 @@ func (handler *regularThread) beforeScriptExecution() string {
5050
return handler.waitForRequest()
5151
case state.Restarting:
5252
handler.state.Set(state.Yielding)
53+
handler.state.WaitFor(state.OpcacheResetting)
5354
scheduleOpcacheReset(handler.thread)
55+
handler.state.Set(state.OpcacheResettingDone)
5456
handler.state.WaitFor(state.Ready, state.ShuttingDown)
5557
return handler.beforeScriptExecution()
5658

threadworker.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,9 @@ func (handler *workerThread) beforeScriptExecution() string {
5151
handler.worker.onThreadShutdown(handler.thread.threadIndex)
5252
}
5353
handler.state.Set(state.Yielding)
54+
handler.state.WaitFor(state.OpcacheResetting)
5455
scheduleOpcacheReset(handler.thread)
56+
handler.state.Set(state.OpcacheResettingDone)
5557
handler.state.WaitFor(state.Ready, state.ShuttingDown)
5658
return handler.beforeScriptExecution()
5759
case state.Ready, state.TransitionComplete:

worker.go

Lines changed: 2 additions & 75 deletions
Original file line numberDiff line numberDiff line change
@@ -171,86 +171,13 @@ func newWorker(o workerOpt) (*worker, error) {
171171

172172
// EXPERIMENTAL: DrainWorkers finishes all worker scripts before a graceful shutdown
173173
func DrainWorkers() {
174-
_ = drainWorkerThreads(false)
175-
}
176-
177-
func drainWorkerThreads(withRegularThreads bool) []*phpThread {
178-
var (
179-
ready sync.WaitGroup
180-
drainedThreads []*phpThread
181-
)
182-
183-
for _, worker := range workers {
184-
worker.threadMutex.RLock()
185-
ready.Add(len(worker.threads))
186-
187-
for _, thread := range worker.threads {
188-
if !thread.state.RequestSafeStateChange(state.Restarting) {
189-
ready.Done()
190-
191-
// no state change allowed == thread is shutting down
192-
// we'll proceed to restart all other threads anyway
193-
continue
194-
}
195-
restartCounter.Add(1)
196-
close(thread.drainChan)
197-
drainedThreads = append(drainedThreads, thread)
198-
199-
go func(thread *phpThread) {
200-
thread.state.WaitFor(state.Yielding)
201-
ready.Done()
202-
}(thread)
203-
}
204-
205-
worker.threadMutex.RUnlock()
206-
}
207-
208-
if withRegularThreads {
209-
regularThreadMu.RLock()
210-
ready.Add(len(regularThreads))
211-
212-
for _, thread := range regularThreads {
213-
if !thread.state.RequestSafeStateChange(state.Restarting) {
214-
ready.Done()
215-
216-
// no state change allowed == thread is shutting down
217-
// we'll proceed to restart all other threads anyway
218-
continue
219-
}
220-
restartCounter.Add(1)
221-
close(thread.drainChan)
222-
drainedThreads = append(drainedThreads, thread)
223-
224-
go func(thread *phpThread) {
225-
thread.state.WaitFor(state.Yielding)
226-
ready.Done()
227-
}(thread)
228-
}
229-
230-
regularThreadMu.RUnlock()
231-
}
232-
233-
ready.Wait()
234-
235-
return drainedThreads
174+
_ = drainThreads(false)
236175
}
237176

238177
// RestartWorkers attempts to restart all workers gracefully
239178
// All workers must be restarted at the same time to prevent issues with opcache resetting.
240179
func RestartWorkers() {
241-
if !restartCounter.CompareAndSwap(0, 1) {
242-
return // another restart is already in progress
243-
}
244-
// disallow scaling threads while restarting workers
245-
scalingMu.Lock()
246-
defer scalingMu.Unlock()
247-
248-
threadsToRestart := drainWorkerThreads(false)
249-
250-
for _, thread := range threadsToRestart {
251-
thread.drainChan = make(chan struct{})
252-
thread.state.Set(state.Ready)
253-
}
180+
restartThreadsAndOpcacheReset(false)
254181
}
255182

256183
func (worker *worker) attachThread(thread *phpThread) {

0 commit comments

Comments
 (0)