Skip to content

Commit 824cd51

Browse files
feat: background workers = non-HTTP workers with shared state
1 parent 006f37f commit 824cd51

53 files changed

Lines changed: 2861 additions & 24 deletions

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

background_worker.go

Lines changed: 400 additions & 0 deletions
Large diffs are not rendered by default.

background_worker_test.go

Lines changed: 124 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,124 @@
1+
package frankenphp
2+
3+
import (
4+
"testing"
5+
"time"
6+
7+
"github.com/dunglas/frankenphp/internal/state"
8+
"github.com/stretchr/testify/assert"
9+
"github.com/stretchr/testify/require"
10+
)
11+
12+
type backgroundWorkerTestMetrics struct {
13+
readyCalls int
14+
stopCalls []StopReason
15+
}
16+
17+
func (m *backgroundWorkerTestMetrics) StartWorker(string) {}
18+
19+
func (m *backgroundWorkerTestMetrics) ReadyWorker(string) {
20+
m.readyCalls++
21+
}
22+
23+
func (m *backgroundWorkerTestMetrics) StopWorker(_ string, reason StopReason) {
24+
m.stopCalls = append(m.stopCalls, reason)
25+
}
26+
27+
func (m *backgroundWorkerTestMetrics) TotalWorkers(string, int) {}
28+
29+
func (m *backgroundWorkerTestMetrics) TotalThreads(int) {}
30+
31+
func (m *backgroundWorkerTestMetrics) StartRequest() {}
32+
33+
func (m *backgroundWorkerTestMetrics) StopRequest() {}
34+
35+
func (m *backgroundWorkerTestMetrics) StopWorkerRequest(string, time.Duration) {}
36+
37+
func (m *backgroundWorkerTestMetrics) StartWorkerRequest(string) {}
38+
39+
func (m *backgroundWorkerTestMetrics) Shutdown() {}
40+
41+
func (m *backgroundWorkerTestMetrics) QueuedWorkerRequest(string) {}
42+
43+
func (m *backgroundWorkerTestMetrics) DequeuedWorkerRequest(string) {}
44+
45+
func (m *backgroundWorkerTestMetrics) QueuedRequest() {}
46+
47+
func (m *backgroundWorkerTestMetrics) DequeuedRequest() {}
48+
49+
func TestStartBackgroundWorkerFailureIsRetryable(t *testing.T) {
50+
lookup := newBackgroundWorkerLookupWithCatchAll(testDataPath + "/background-worker-with-argv.php")
51+
backgroundLookups = map[string]*backgroundWorkerLookup{"": lookup}
52+
thread := newPHPThread(0)
53+
thread.state.Set(state.Ready)
54+
thread.handler = &workerThread{
55+
thread: thread,
56+
worker: &worker{backgroundLookup: lookup},
57+
}
58+
phpThreads = []*phpThread{thread}
59+
t.Cleanup(func() {
60+
phpThreads = nil
61+
})
62+
63+
registry := lookup.Resolve("retryable-background-worker")
64+
65+
err := startBackgroundWorker(thread, "retryable-background-worker")
66+
require.EqualError(t, err, "no available PHP thread for background worker (increase max_threads)")
67+
assert.Empty(t, registry.workers)
68+
69+
err = startBackgroundWorker(thread, "retryable-background-worker")
70+
require.EqualError(t, err, "no available PHP thread for background worker (increase max_threads)")
71+
assert.Empty(t, registry.workers)
72+
}
73+
74+
func TestBackgroundWorkerSetVarsMarksWorkerReady(t *testing.T) {
75+
originalMetrics := metrics
76+
testMetrics := &backgroundWorkerTestMetrics{}
77+
metrics = testMetrics
78+
t.Cleanup(func() {
79+
metrics = originalMetrics
80+
})
81+
82+
handler := &backgroundWorkerThread{
83+
thread: newPHPThread(0),
84+
worker: &worker{name: "background-worker", fileName: "background-worker.php", maxConsecutiveFailures: -1},
85+
isBootingScript: true,
86+
}
87+
88+
handler.markBackgroundReady()
89+
handler.markBackgroundReady()
90+
91+
assert.False(t, handler.isBootingScript)
92+
assert.Equal(t, 0, handler.failureCount)
93+
assert.Equal(t, 1, testMetrics.readyCalls)
94+
}
95+
96+
func TestBackgroundWorkerBootFailureStaysBootFailureUntilReady(t *testing.T) {
97+
originalMetrics := metrics
98+
testMetrics := &backgroundWorkerTestMetrics{}
99+
metrics = testMetrics
100+
t.Cleanup(func() {
101+
metrics = originalMetrics
102+
})
103+
104+
handler := &backgroundWorkerThread{
105+
thread: newPHPThread(0),
106+
worker: &worker{
107+
name: "background-worker",
108+
fileName: "background-worker.php",
109+
maxConsecutiveFailures: -1,
110+
},
111+
isBootingScript: true,
112+
}
113+
114+
handler.afterScriptExecution(1)
115+
require.Len(t, testMetrics.stopCalls, 1)
116+
assert.Equal(t, StopReason(StopReasonBootFailure), testMetrics.stopCalls[0])
117+
118+
testMetrics.stopCalls = nil
119+
handler.isBootingScript = true
120+
handler.markBackgroundReady()
121+
handler.afterScriptExecution(1)
122+
require.Len(t, testMetrics.stopCalls, 1)
123+
assert.Equal(t, StopReason(StopReasonCrash), testMetrics.stopCalls[0])
124+
}

bg_worker_vars.h

Lines changed: 262 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,262 @@
1+
/* bg_worker_vars.h - Persistent zval helpers for background worker variables.
2+
*
3+
* Handles validation, deep-copy to/from persistent memory, immutable array
4+
* detection, interned string optimization, and enum serialization.
5+
*
6+
* Included by frankenphp.c - not a standalone compilation unit. */
7+
8+
#ifndef BG_WORKER_VARS_H
9+
#define BG_WORKER_VARS_H
10+
11+
typedef struct {
12+
zend_string *class_name;
13+
zend_string *case_name;
14+
} bg_worker_enum_t;
15+
16+
/* Forward declarations */
17+
static void bg_worker_free_persistent_zval(zval *z);
18+
19+
static void bg_worker_request_copy_zval(zval *dst, zval *src);
20+
21+
/* Check if a HashTable is an opcache immutable array - safe to share
22+
* across threads without copying. */
23+
static bool bg_worker_is_immutable(HashTable *ht) {
24+
return (GC_FLAGS(ht) & IS_ARRAY_IMMUTABLE) != 0;
25+
}
26+
27+
/* Free a stored vars pointer only if it's a persistent copy (not immutable). */
28+
static void bg_worker_free_stored_vars(void *ptr) {
29+
if (ptr != NULL) {
30+
HashTable *ht = (HashTable *)ptr;
31+
if (!bg_worker_is_immutable(ht)) {
32+
zval z;
33+
ZVAL_ARR(&z, ht);
34+
bg_worker_free_persistent_zval(&z);
35+
}
36+
}
37+
}
38+
39+
/* Copy or reference a stored vars pointer to request memory.
40+
* Immutable arrays are returned as zero-copy references. */
41+
static void bg_worker_read_stored_vars(zval *dst, void *ptr) {
42+
HashTable *ht = (HashTable *)ptr;
43+
if (bg_worker_is_immutable(ht)) {
44+
ZVAL_ARR(dst, ht); /* zero-copy: immutable = safe to share */
45+
} else {
46+
zval src;
47+
ZVAL_ARR(&src, ht);
48+
bg_worker_request_copy_zval(dst, &src);
49+
}
50+
}
51+
52+
/* Validate that a zval tree contains only scalars, arrays, and enums */
53+
static bool bg_worker_validate_zval(zval *z) {
54+
switch (Z_TYPE_P(z)) {
55+
case IS_NULL:
56+
case IS_FALSE:
57+
case IS_TRUE:
58+
case IS_LONG:
59+
case IS_DOUBLE:
60+
case IS_STRING:
61+
return true;
62+
case IS_OBJECT:
63+
return (Z_OBJCE_P(z)->ce_flags & ZEND_ACC_ENUM) != 0;
64+
case IS_ARRAY: {
65+
zval *val;
66+
ZEND_HASH_FOREACH_VAL(Z_ARRVAL_P(z), val) {
67+
if (!bg_worker_validate_zval(val))
68+
return false;
69+
}
70+
ZEND_HASH_FOREACH_END();
71+
return true;
72+
}
73+
default:
74+
return false;
75+
}
76+
}
77+
78+
/* Deep-copy a zval into persistent memory */
79+
static void bg_worker_persist_zval(zval *dst, zval *src) {
80+
switch (Z_TYPE_P(src)) {
81+
case IS_NULL:
82+
case IS_FALSE:
83+
case IS_TRUE:
84+
ZVAL_COPY_VALUE(dst, src);
85+
break;
86+
case IS_LONG:
87+
ZVAL_LONG(dst, Z_LVAL_P(src));
88+
break;
89+
case IS_DOUBLE:
90+
ZVAL_DOUBLE(dst, Z_DVAL_P(src));
91+
break;
92+
case IS_STRING: {
93+
zend_string *s = Z_STR_P(src);
94+
if (ZSTR_IS_INTERNED(s)) {
95+
ZVAL_STR(dst, s); /* interned = shared memory, no copy needed */
96+
} else {
97+
ZVAL_NEW_STR(dst, zend_string_init(ZSTR_VAL(s), ZSTR_LEN(s), 1));
98+
}
99+
break;
100+
}
101+
case IS_OBJECT: {
102+
/* Must be an enum (validated earlier) */
103+
zend_class_entry *ce = Z_OBJCE_P(src);
104+
bg_worker_enum_t *e = pemalloc(sizeof(bg_worker_enum_t), 1);
105+
e->class_name =
106+
ZSTR_IS_INTERNED(ce->name)
107+
? ce->name
108+
: zend_string_init(ZSTR_VAL(ce->name), ZSTR_LEN(ce->name), 1);
109+
zval *case_name_zval = zend_enum_fetch_case_name(Z_OBJ_P(src));
110+
zend_string *case_str = Z_STR_P(case_name_zval);
111+
e->case_name =
112+
ZSTR_IS_INTERNED(case_str)
113+
? case_str
114+
: zend_string_init(ZSTR_VAL(case_str), ZSTR_LEN(case_str), 1);
115+
ZVAL_PTR(dst, e);
116+
break;
117+
}
118+
case IS_ARRAY: {
119+
HashTable *src_ht = Z_ARRVAL_P(src);
120+
HashTable *dst_ht = pemalloc(sizeof(HashTable), 1);
121+
zend_hash_init(dst_ht, zend_hash_num_elements(src_ht), NULL, NULL, 1);
122+
ZVAL_ARR(dst, dst_ht);
123+
124+
zend_string *key;
125+
zend_ulong idx;
126+
zval *val;
127+
ZEND_HASH_FOREACH_KEY_VAL(src_ht, idx, key, val) {
128+
zval pval;
129+
bg_worker_persist_zval(&pval, val);
130+
if (key) {
131+
if (ZSTR_IS_INTERNED(key)) {
132+
zend_hash_add_new(dst_ht, key, &pval);
133+
} else {
134+
zend_string *pkey = zend_string_init(ZSTR_VAL(key), ZSTR_LEN(key), 1);
135+
zend_hash_add_new(dst_ht, pkey, &pval);
136+
zend_string_release(pkey);
137+
}
138+
} else {
139+
zend_hash_index_add_new(dst_ht, idx, &pval);
140+
}
141+
}
142+
ZEND_HASH_FOREACH_END();
143+
break;
144+
}
145+
default:
146+
ZVAL_NULL(dst);
147+
break;
148+
}
149+
}
150+
151+
/* Deep-free a persistent zval tree */
152+
static void bg_worker_free_persistent_zval(zval *z) {
153+
switch (Z_TYPE_P(z)) {
154+
case IS_STRING:
155+
if (!ZSTR_IS_INTERNED(Z_STR_P(z))) {
156+
zend_string_free(Z_STR_P(z));
157+
}
158+
break;
159+
case IS_PTR: {
160+
bg_worker_enum_t *e = (bg_worker_enum_t *)Z_PTR_P(z);
161+
if (!ZSTR_IS_INTERNED(e->class_name))
162+
zend_string_free(e->class_name);
163+
if (!ZSTR_IS_INTERNED(e->case_name))
164+
zend_string_free(e->case_name);
165+
pefree(e, 1);
166+
break;
167+
}
168+
case IS_ARRAY: {
169+
zval *val;
170+
ZEND_HASH_FOREACH_VAL(Z_ARRVAL_P(z), val) {
171+
bg_worker_free_persistent_zval(val);
172+
}
173+
ZEND_HASH_FOREACH_END();
174+
zend_hash_destroy(Z_ARRVAL_P(z));
175+
pefree(Z_ARRVAL_P(z), 1);
176+
break;
177+
}
178+
default:
179+
break;
180+
}
181+
}
182+
183+
/* Deep-copy a persistent zval tree into request memory */
184+
static void bg_worker_request_copy_zval(zval *dst, zval *src) {
185+
switch (Z_TYPE_P(src)) {
186+
case IS_NULL:
187+
case IS_FALSE:
188+
case IS_TRUE:
189+
ZVAL_COPY_VALUE(dst, src);
190+
break;
191+
case IS_LONG:
192+
ZVAL_LONG(dst, Z_LVAL_P(src));
193+
break;
194+
case IS_DOUBLE:
195+
ZVAL_DOUBLE(dst, Z_DVAL_P(src));
196+
break;
197+
case IS_STRING:
198+
if (ZSTR_IS_INTERNED(Z_STR_P(src))) {
199+
ZVAL_STR(dst, Z_STR_P(src));
200+
} else {
201+
ZVAL_STRINGL(dst, Z_STRVAL_P(src), Z_STRLEN_P(src));
202+
}
203+
break;
204+
case IS_PTR: {
205+
bg_worker_enum_t *e = (bg_worker_enum_t *)Z_PTR_P(src);
206+
zend_class_entry *ce = zend_lookup_class(e->class_name);
207+
if (!ce || !(ce->ce_flags & ZEND_ACC_ENUM)) {
208+
zend_throw_exception_ex(spl_ce_LogicException, 0,
209+
"Background worker enum class \"%s\" not found",
210+
ZSTR_VAL(e->class_name));
211+
ZVAL_NULL(dst);
212+
break;
213+
}
214+
zend_object *enum_obj = zend_enum_get_case_cstr(ce, ZSTR_VAL(e->case_name));
215+
if (!enum_obj) {
216+
zend_throw_exception_ex(
217+
spl_ce_LogicException, 0,
218+
"Background worker enum case \"%s::%s\" not found",
219+
ZSTR_VAL(e->class_name), ZSTR_VAL(e->case_name));
220+
ZVAL_NULL(dst);
221+
break;
222+
}
223+
ZVAL_OBJ_COPY(dst, enum_obj);
224+
break;
225+
}
226+
case IS_ARRAY: {
227+
HashTable *src_ht = Z_ARRVAL_P(src);
228+
array_init_size(dst, zend_hash_num_elements(src_ht));
229+
HashTable *dst_ht = Z_ARRVAL_P(dst);
230+
231+
zend_string *key;
232+
zend_ulong idx;
233+
zval *val;
234+
ZEND_HASH_FOREACH_KEY_VAL(src_ht, idx, key, val) {
235+
zval rval;
236+
bg_worker_request_copy_zval(&rval, val);
237+
if (EG(exception)) {
238+
zval_ptr_dtor(&rval);
239+
break;
240+
}
241+
if (key) {
242+
if (ZSTR_IS_INTERNED(key)) {
243+
zend_hash_add_new(dst_ht, key, &rval);
244+
} else {
245+
zend_string *rkey = zend_string_init(ZSTR_VAL(key), ZSTR_LEN(key), 0);
246+
ZSTR_H(rkey) = ZSTR_H(key);
247+
zend_hash_add_new(dst_ht, rkey, &rval);
248+
zend_string_release(rkey);
249+
}
250+
} else {
251+
zend_hash_index_add_new(dst_ht, idx, &rval);
252+
}
253+
}
254+
ZEND_HASH_FOREACH_END();
255+
break;
256+
}
257+
default:
258+
ZVAL_NULL(dst);
259+
break;
260+
}
261+
}
262+
#endif /* BG_WORKER_VARS_H */

0 commit comments

Comments
 (0)