Skip to content

Commit b598bd3

Browse files
committed
Adds support for thread transitioning.
1 parent ec8aeb7 commit b598bd3

File tree

7 files changed

+145
-40
lines changed

7 files changed

+145
-40
lines changed

β€Žphpmainthread.goβ€Ž

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@ package frankenphp
44
import "C"
55
import (
66
"fmt"
7-
"net/http"
87
"sync"
98
)
109

@@ -36,12 +35,7 @@ func initPHPThreads(numThreads int) error {
3635

3736
// initialize all threads as inactive
3837
for i := 0; i < numThreads; i++ {
39-
phpThreads[i] = &phpThread{
40-
threadIndex: i,
41-
drainChan: make(chan struct{}),
42-
requestChan: make(chan *http.Request),
43-
state: newThreadState(),
44-
}
38+
phpThreads[i] = newPHPThread(i)
4539
convertToInactiveThread(phpThreads[i])
4640
}
4741

@@ -66,13 +60,15 @@ func drainPHPThreads() {
6660
doneWG := sync.WaitGroup{}
6761
doneWG.Add(len(phpThreads))
6862
for _, thread := range phpThreads {
63+
thread.mu.Lock()
6964
thread.state.set(stateShuttingDown)
7065
close(thread.drainChan)
7166
}
7267
close(mainThread.done)
7368
for _, thread := range phpThreads {
7469
go func(thread *phpThread) {
7570
thread.state.waitFor(stateDone)
71+
thread.mu.Unlock()
7672
doneWG.Done()
7773
}(thread)
7874
}

β€Žphpmainthread_test.goβ€Ž

Lines changed: 106 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,14 @@
11
package frankenphp
22

33
import (
4+
"io"
5+
"math/rand/v2"
6+
"net/http/httptest"
47
"path/filepath"
8+
"sync"
9+
"sync/atomic"
510
"testing"
11+
"time"
612

713
"github.com/stretchr/testify/assert"
814
"go.uber.org/zap"
@@ -20,57 +26,132 @@ func TestStartAndStopTheMainThreadWithOneInactiveThread(t *testing.T) {
2026
assert.Nil(t, phpThreads)
2127
}
2228

23-
func TestTransition2RegularThreadsToWorkerThreadsAndBack(t *testing.T) {
24-
numThreads := 2
25-
logger, _ = zap.NewDevelopment()
26-
assert.NoError(t, initPHPThreads(numThreads))
29+
func TestTransitionRegularThreadToWorkerThread(t *testing.T) {
30+
logger = zap.NewNop()
31+
assert.NoError(t, initPHPThreads(1))
2732

28-
// transition to worker thread
29-
for i := 0; i < numThreads; i++ {
30-
convertToRegularThread(phpThreads[i])
31-
assert.IsType(t, &regularThread{}, phpThreads[i].handler)
32-
}
33+
// transition to regular thread
34+
convertToRegularThread(phpThreads[0])
35+
assert.IsType(t, &regularThread{}, phpThreads[0].handler)
3336

3437
// transition to worker thread
35-
worker := getDummyWorker()
36-
for i := 0; i < numThreads; i++ {
37-
convertToWorkerThread(phpThreads[i], worker)
38-
assert.IsType(t, &workerThread{}, phpThreads[i].handler)
39-
}
40-
assert.Len(t, worker.threads, numThreads)
38+
worker := getDummyWorker("worker-transition-1.php")
39+
convertToWorkerThread(phpThreads[0], worker)
40+
assert.IsType(t, &workerThread{}, phpThreads[0].handler)
41+
assert.Len(t, worker.threads, 1)
4142

4243
// transition back to regular thread
43-
for i := 0; i < numThreads; i++ {
44-
convertToRegularThread(phpThreads[i])
45-
assert.IsType(t, &regularThread{}, phpThreads[i].handler)
46-
}
44+
convertToRegularThread(phpThreads[0])
45+
assert.IsType(t, &regularThread{}, phpThreads[0].handler)
4746
assert.Len(t, worker.threads, 0)
4847

4948
drainPHPThreads()
5049
assert.Nil(t, phpThreads)
5150
}
5251

5352
func TestTransitionAThreadBetween2DifferentWorkers(t *testing.T) {
54-
logger, _ = zap.NewDevelopment()
53+
logger = zap.NewNop()
5554
assert.NoError(t, initPHPThreads(1))
55+
firstWorker := getDummyWorker("worker-transition-1.php")
56+
secondWorker := getDummyWorker("worker-transition-2.php")
5657

5758
// convert to first worker thread
58-
firstWorker := getDummyWorker()
5959
convertToWorkerThread(phpThreads[0], firstWorker)
6060
firstHandler := phpThreads[0].handler.(*workerThread)
6161
assert.Same(t, firstWorker, firstHandler.worker)
62+
assert.Len(t, firstWorker.threads, 1)
63+
assert.Len(t, secondWorker.threads, 0)
6264

6365
// convert to second worker thread
64-
secondWorker := getDummyWorker()
6566
convertToWorkerThread(phpThreads[0], secondWorker)
6667
secondHandler := phpThreads[0].handler.(*workerThread)
6768
assert.Same(t, secondWorker, secondHandler.worker)
69+
assert.Len(t, firstWorker.threads, 0)
70+
assert.Len(t, secondWorker.threads, 1)
6871

6972
drainPHPThreads()
7073
assert.Nil(t, phpThreads)
7174
}
7275

73-
func getDummyWorker() *worker {
74-
path, _ := filepath.Abs("./testdata/index.php")
75-
return &worker{fileName: path}
76+
func TestTransitionThreadsWhileDoingRequests(t *testing.T) {
77+
numThreads := 10
78+
numRequestsPerThread := 100
79+
isRunning := atomic.Bool{}
80+
isRunning.Store(true)
81+
wg := sync.WaitGroup{}
82+
worker1Path, _ := filepath.Abs("./testdata/transition-worker-1.php")
83+
worker2Path, _ := filepath.Abs("./testdata/transition-worker-2.php")
84+
85+
Init(
86+
WithNumThreads(numThreads),
87+
WithWorkers(worker1Path, 4, map[string]string{"ENV1": "foo"}, []string{}),
88+
WithWorkers(worker2Path, 4, map[string]string{"ENV1": "foo"}, []string{}),
89+
WithLogger(zap.NewNop()),
90+
)
91+
92+
// randomly transition threads between regular and 2 worker threads
93+
go func() {
94+
for {
95+
for i := 0; i < numThreads; i++ {
96+
switch rand.IntN(3) {
97+
case 0:
98+
convertToRegularThread(phpThreads[i])
99+
case 1:
100+
convertToWorkerThread(phpThreads[i], workers[worker1Path])
101+
case 2:
102+
convertToWorkerThread(phpThreads[i], workers[worker2Path])
103+
}
104+
time.Sleep(time.Millisecond)
105+
if !isRunning.Load() {
106+
return
107+
}
108+
}
109+
}
110+
}()
111+
112+
// randomly do requests to the 3 endpoints
113+
wg.Add(numThreads)
114+
for i := 0; i < numThreads; i++ {
115+
go func(i int) {
116+
for j := 0; j < numRequestsPerThread; j++ {
117+
switch rand.IntN(3) {
118+
case 0:
119+
assertRequestBody(t, "http://localhost/transition-worker-1.php", "Hello from worker 1")
120+
case 1:
121+
assertRequestBody(t, "http://localhost/transition-worker-2.php", "Hello from worker 2")
122+
case 2:
123+
assertRequestBody(t, "http://localhost/transition-regular.php", "Hello from regular thread")
124+
}
125+
}
126+
wg.Done()
127+
}(i)
128+
}
129+
130+
wg.Wait()
131+
isRunning.Store(false)
132+
Shutdown()
133+
}
134+
135+
func getDummyWorker(fileName string) *worker {
136+
if workers == nil {
137+
workers = make(map[string]*worker)
138+
}
139+
absFileName, _ := filepath.Abs("./testdata/" + fileName)
140+
worker, _ := newWorker(workerOpt{
141+
fileName: absFileName,
142+
num: 1,
143+
})
144+
return worker
145+
}
146+
147+
func assertRequestBody(t *testing.T, url string, expected string) {
148+
r := httptest.NewRequest("GET", url, nil)
149+
w := httptest.NewRecorder()
150+
req, err := NewRequestWithContext(r, WithRequestDocumentRoot("/go/src/app/testdata", false))
151+
assert.NoError(t, err)
152+
err = ServeHTTP(w, req)
153+
assert.NoError(t, err)
154+
resp := w.Result()
155+
body, _ := io.ReadAll(resp.Body)
156+
assert.Equal(t, expected, string(body))
76157
}

β€Žphpthread.goβ€Ž

Lines changed: 18 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,8 @@ import "C"
55
import (
66
"net/http"
77
"runtime"
8+
"sync"
89
"unsafe"
9-
10-
"go.uber.org/zap"
1110
)
1211

1312
// representation of the actual underlying PHP thread
@@ -21,6 +20,7 @@ type phpThread struct {
2120
drainChan chan struct{}
2221
handler threadHandler
2322
state *threadState
23+
mu *sync.Mutex
2424
}
2525

2626
// interface that defines how the callbacks from the C thread should be handled
@@ -30,16 +30,30 @@ type threadHandler interface {
3030
getActiveRequest() *http.Request
3131
}
3232

33+
func newPHPThread(threadIndex int) *phpThread {
34+
return &phpThread{
35+
threadIndex: threadIndex,
36+
drainChan: make(chan struct{}),
37+
requestChan: make(chan *http.Request),
38+
mu: &sync.Mutex{},
39+
state: newThreadState(),
40+
}
41+
}
42+
3343
func (thread *phpThread) getActiveRequest() *http.Request {
3444
return thread.handler.getActiveRequest()
3545
}
3646

3747
// change the thread handler safely
3848
func (thread *phpThread) setHandler(handler threadHandler) {
39-
logger.Debug("transitioning thread", zap.Int("threadIndex", thread.threadIndex))
49+
thread.mu.Lock()
50+
defer thread.mu.Unlock()
51+
if thread.state.is(stateShuttingDown) {
52+
return
53+
}
4054
thread.state.set(stateTransitionRequested)
4155
close(thread.drainChan)
42-
thread.state.waitFor(stateTransitionInProgress)
56+
thread.state.waitFor(stateTransitionInProgress, stateShuttingDown)
4357
thread.handler = handler
4458
thread.drainChan = make(chan struct{})
4559
thread.state.set(stateTransitionComplete)

β€Žtestdata/sleep.phpβ€Ž

Lines changed: 0 additions & 4 deletions
This file was deleted.
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
<?php
2+
3+
echo "Hello from regular thread";
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
<?php
2+
3+
while (frankenphp_handle_request(function () {
4+
echo "Hello from worker 1";
5+
})) {
6+
7+
}
Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
<?php
2+
3+
while (frankenphp_handle_request(function () {
4+
echo "Hello from worker 2";
5+
usleep(1000);
6+
})) {
7+
8+
}

0 commit comments

Comments
Β (0)