Skip to content

Commit 003a81f

Browse files
authored
[#705]: feature: pass ww PID to the PHP workers
1 parent 040bb73 commit 003a81f

File tree

17 files changed

+691
-216
lines changed

17 files changed

+691
-216
lines changed

.claude/CLAUDE.md

Lines changed: 163 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,163 @@
1+
# CLAUDE.md
2+
3+
This file provides guidance to Claude Code (claude.ai/code) when working with code in this repository.
4+
5+
## Project Overview
6+
7+
RoadRunner Temporal Plugin - enables workflow and activity processing for PHP processes using Temporal. The plugin acts as a bridge between RoadRunner (Go) and Temporal SDK for PHP, handling communication via protobuf codec over goridge protocol.
8+
9+
## Build & Test Commands
10+
11+
### Go Tests
12+
```bash
13+
# Run all tests with race detection and coverage
14+
go test -timeout 20m -v -race -cover -tags=debug -failfast ./...
15+
16+
# Run specific test suites
17+
go test -timeout 20m -v -race -cover -tags=debug -failfast ./tests/general
18+
go test -timeout 20m -v -race -cover -tags=debug -failfast ./canceller
19+
go test -timeout 20m -v -race -cover -tags=debug -failfast ./dataconverter
20+
go test -timeout 20m -v -race -cover -tags=debug -failfast ./queue
21+
22+
# Run with coverage profile
23+
go test -timeout 20m -v -race -cover -tags=debug -failfast -coverpkg=$(cat pkgs.txt) -coverprofile=coverage.out -covermode=atomic ./general
24+
```
25+
26+
### PHP Tests Setup
27+
```bash
28+
cd tests/php_test_files
29+
composer install
30+
```
31+
32+
### Linting
33+
```bash
34+
# Run golangci-lint with project config
35+
golangci-lint run --timeout=10m --build-tags=safe
36+
```
37+
38+
## Architecture
39+
40+
### Core Components
41+
42+
**Plugin Structure** (`plugin.go`):
43+
- `Plugin` - main plugin struct, manages lifecycle and pools
44+
- Implements RoadRunner plugin interface: `Init()`, `Serve()`, `Stop()`, `RPC()`
45+
- Manages two worker pools: workflow pool (single worker) and activity pool (multiple workers)
46+
- Uses mutex-protected `temporal` struct to hold client, workers, and definitions
47+
48+
**Worker Pools**:
49+
- **Workflow Pool** (`wfP`): Single PHP worker dedicated to workflow execution
50+
- **Activity Pool** (`actP`): Multiple PHP workers for concurrent activity execution
51+
- Configured via `pool.Config` in config.go
52+
- Uses `static_pool.Pool` from roadrunner-server/pool
53+
54+
**Communication Flow**:
55+
1. Temporal SDK (Go) ↔ Plugin (`aggregatedpool/`) ↔ PHP Workers via codec
56+
2. Protocol messages defined in `internal/protocol.go`
57+
3. Codec implementation in `internal/codec/proto/`
58+
4. Uses goridge for Go↔PHP communication
59+
60+
### Key Abstractions
61+
62+
**Workflow Definition** (`aggregatedpool/workflow.go`):
63+
- Implements Temporal's `WorkflowDefinition` interface
64+
- Handles workflow execution, local activities, queries, signals, updates
65+
- Uses message queue for command/response exchange with PHP worker
66+
- Maintains ID registry, cancellation context, and callback management
67+
68+
**Activity Definition** (`aggregatedpool/activity.go`):
69+
- Implements Temporal's activity execution interface
70+
- Routes activity invocations to PHP activity pool
71+
- Handles activity context, headers, and heartbeats
72+
73+
**Protocol** (`internal/protocol.go`):
74+
- Defines command constants (e.g., `invokeActivityCommand`, `startWorkflowCommand`)
75+
- Message structure for Go↔PHP communication
76+
- Context includes task queue, replay flag, history info
77+
78+
### Worker Lifecycle
79+
80+
1. **Initialization** (`internal.go:initPool()`):
81+
- Creates activity and workflow pools
82+
- Initializes codec and definitions
83+
- Retrieves worker info from PHP via protobuf
84+
- Creates Temporal client with interceptors
85+
- Starts Temporal workers
86+
87+
2. **Reset Flow** (`plugin.go:Reset()`, `ResetAP()`):
88+
- Triggered by worker stop events
89+
- Stops Temporal workers, resets pools
90+
- Purges sticky workflow cache
91+
- Re-initializes workers with fresh PHP processes
92+
- Workflow worker PID tracked for targeted resets
93+
94+
3. **Event Handling**:
95+
- Subscribes to `EventWorkerStopped` events
96+
- Checks PID in event message to determine WF vs Activity worker
97+
- Executes full reset for WF worker, activity-only reset for activity workers
98+
99+
### Configuration
100+
101+
**Structure** (`config.go`):
102+
- `Address`: Temporal server address
103+
- `Namespace`: Temporal namespace
104+
- `CacheSize`: Sticky workflow cache size
105+
- `Activities`: Pool configuration for activity workers
106+
- `Metrics`: Optional Prometheus or StatsD metrics
107+
- `TLS`: Optional TLS configuration
108+
- `DisableActivityWorkers`: Flag to disable activity pool
109+
110+
**Environment Variables**:
111+
- `RR_MODE=temporal` - set for PHP workers
112+
- `RR_CODEC=protobuf` - codec identifier
113+
- `NO_PROXY` - respected for gRPC connections
114+
115+
### Interceptors & Extensions
116+
117+
The plugin supports interceptors via `api.Interceptor` interface:
118+
- Collected via Endure's dependency injection (`Collects()`)
119+
- Applied to Temporal workers during initialization
120+
- Stored in `temporal.interceptors` map
121+
122+
### Metrics
123+
124+
Two driver options:
125+
- **Prometheus**: Exposed on configured address
126+
- **StatsD**: Sent to statsd server with configurable prefix/tags
127+
128+
Metrics integrated via Temporal's `MetricsHandler` interface using uber-go/tally.
129+
130+
## Important Patterns
131+
132+
### Codec Usage
133+
- All PHP communication uses protobuf codec (`internal/codec/proto/`)
134+
- Wraps Temporal's data converter for payload serialization
135+
- PHP SDK version extracted from worker info to set gRPC headers
136+
137+
### Worker Restart Strategy
138+
- Workflow worker PID stored in `p.wwPID` for tracking
139+
- Worker stop events include PID in message
140+
- Targeted restarts based on PID matching prevent unnecessary full resets
141+
142+
### Client Headers
143+
- gRPC interceptor rewrites client-name to "temporal-php-2"
144+
- client-version set to PHP SDK version from worker info
145+
- API key dynamically loaded from atomic pointer
146+
147+
### Pool Allocation
148+
- Activity pool: configurable workers via `Activities.NumWorkers`
149+
- Workflow pool: always 1 worker with 240h allocate timeout
150+
- Both use same command/env but different pool configs
151+
152+
## File Organization
153+
154+
- `plugin.go`, `internal.go` - plugin lifecycle and initialization
155+
- `config.go`, `tls.go`, `metrics.go` - configuration and setup
156+
- `rpc.go` - RPC methods for external control
157+
- `info.go`, `status.go` - worker and status information
158+
- `aggregatedpool/` - workflow/activity definitions bridging Go↔Temporal
159+
- `internal/` - protocol definitions and codec implementation
160+
- `api/` - interfaces and context utilities
161+
- `canceller/`, `queue/`, `registry/` - workflow execution utilities
162+
- `dataconverter/` - Temporal data converter wrapper
163+
- `tests/` - integration tests with PHP workers

.github/workflows/linux.yml

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ jobs:
1717
timeout-minutes: 60
1818
strategy:
1919
matrix:
20-
php: [ "8.3" ]
20+
php: [ "8.4" ]
2121
go: [ stable ]
2222
os: [ "ubuntu-latest" ]
2323
steps:
@@ -86,7 +86,7 @@ jobs:
8686
timeout-minutes: 60
8787
strategy:
8888
matrix:
89-
php: [ "8.3" ]
89+
php: [ "8.4" ]
9090
go: [ stable ]
9191
os: [ "ubuntu-latest" ]
9292
steps:
@@ -179,7 +179,7 @@ jobs:
179179
timeout-minutes: 60
180180
strategy:
181181
matrix:
182-
php: [ "8.3" ]
182+
php: [ "8.4" ]
183183
go: [ stable ]
184184
os: [ "ubuntu-latest" ]
185185
steps:

.golangci.yml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,11 @@ linters:
5454
for-loops: true
5555
wsl:
5656
allow-assign-and-anything: true
57+
revive:
58+
rules:
59+
- name: var-naming
60+
disabled: true
61+
5762
exclusions:
5863
generated: lax
5964
presets:

AGENTS.md

Lines changed: 0 additions & 40 deletions
This file was deleted.

aggregatedpool/handler.go

Lines changed: 27 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,7 @@ func (wp *Workflow) handleUpdate(name string, id string, input *commonpb.Payload
8282
},
8383
input,
8484
header,
85+
wp.getWorkflowWorkerPid(),
8586
)
8687
}
8788

@@ -94,6 +95,7 @@ func (wp *Workflow) handleCancel() {
9495
internal.CancelWorkflow{RunID: wp.env.WorkflowInfo().WorkflowExecution.RunID},
9596
nil,
9697
wp.header,
98+
wp.getWorkflowWorkerPid(),
9799
)
98100
}
99101

@@ -107,6 +109,7 @@ func (wp *Workflow) handleSignal(name string, input *commonpb.Payloads, header *
107109
},
108110
input,
109111
header,
112+
wp.getWorkflowWorkerPid(),
110113
)
111114

112115
return nil
@@ -223,7 +226,7 @@ func (wp *Workflow) handleMessage(msg *internal.Message) error {
223226
return errors.E(op, err)
224227
}
225228

226-
wp.mq.PushResponse(msg.ID, result)
229+
wp.mq.PushResponse(msg.ID, result, wp.getWorkflowWorkerPid())
227230
err = wp.flushQueue()
228231
if err != nil {
229232
return errors.E(op, err)
@@ -279,7 +282,7 @@ func (wp *Workflow) handleMessage(msg *internal.Message) error {
279282
case *internal.CompleteWorkflow:
280283
wp.log.Debug("complete workflow request", zap.Uint64("ID", msg.ID))
281284
result, _ := wp.env.GetDataConverter().ToPayloads(completed)
282-
wp.mq.PushResponse(msg.ID, result)
285+
wp.mq.PushResponse(msg.ID, result, wp.getWorkflowWorkerPid())
283286

284287
if msg.Failure == nil {
285288
wp.env.Complete(msg.Payloads, nil)
@@ -291,7 +294,7 @@ func (wp *Workflow) handleMessage(msg *internal.Message) error {
291294
case *internal.ContinueAsNew:
292295
wp.log.Debug("continue-as-new request", zap.Uint64("ID", msg.ID), zap.String("name", command.Name))
293296
result, _ := wp.env.GetDataConverter().ToPayloads(completed)
294-
wp.mq.PushResponse(msg.ID, result)
297+
wp.mq.PushResponse(msg.ID, result, wp.getWorkflowWorkerPid())
295298

296299
wp.env.Complete(nil, &workflow.ContinueAsNewError{
297300
WorkflowType: &bindings.WorkflowType{
@@ -503,7 +506,7 @@ func (wp *Workflow) handleMessage(msg *internal.Message) error {
503506
}
504507

505508
result, _ := wp.env.GetDataConverter().ToPayloads(completed)
506-
wp.mq.PushResponse(msg.ID, result)
509+
wp.mq.PushResponse(msg.ID, result, wp.getWorkflowWorkerPid())
507510

508511
err = wp.flushQueue()
509512
if err != nil {
@@ -540,12 +543,12 @@ func (wp *Workflow) createLocalActivityCallback(id uint64) bindings.LocalActivit
540543

541544
if lar.Err != nil {
542545
wp.log.Debug("error", zap.Error(lar.Err), zap.Int32("attempt", lar.Attempt), zap.Duration("backoff", lar.Backoff))
543-
wp.mq.PushError(id, temporal.GetDefaultFailureConverter().ErrorToFailure(lar.Err))
546+
wp.mq.PushError(id, temporal.GetDefaultFailureConverter().ErrorToFailure(lar.Err), wp.getWorkflowWorkerPid())
544547
return
545548
}
546549

547550
wp.log.Debug("pushing local activity response", zap.Uint64("ID", id))
548-
wp.mq.PushResponse(id, lar.Result)
551+
wp.mq.PushResponse(id, lar.Result, wp.getWorkflowWorkerPid())
549552
}
550553

551554
return func(lar *bindings.LocalActivityResultWrapper) {
@@ -571,13 +574,13 @@ func (wp *Workflow) createCallback(id uint64, t string) bindings.ResultHandler {
571574

572575
if err != nil {
573576
wp.log.Debug("error", zap.Error(err), zap.String("type", t))
574-
wp.mq.PushError(id, temporal.GetDefaultFailureConverter().ErrorToFailure(err))
577+
wp.mq.PushError(id, temporal.GetDefaultFailureConverter().ErrorToFailure(err), wp.getWorkflowWorkerPid())
575578
return
576579
}
577580

578581
wp.log.Debug("pushing response", zap.Uint64("ID", id), zap.String("type", t))
579582
// fetch original payload
580-
wp.mq.PushResponse(id, result)
583+
wp.mq.PushResponse(id, result, wp.getWorkflowWorkerPid())
581584
}
582585

583586
return func(result *commonpb.Payloads, err error) {
@@ -603,11 +606,11 @@ func (wp *Workflow) createContinuableCallback(id uint64, t string) bindings.Resu
603606
wp.canceller.Discard(id)
604607

605608
if err != nil {
606-
wp.mq.PushError(id, temporal.GetDefaultFailureConverter().ErrorToFailure(err))
609+
wp.mq.PushError(id, temporal.GetDefaultFailureConverter().ErrorToFailure(err), wp.getWorkflowWorkerPid())
607610
return
608611
}
609612

610-
wp.mq.PushResponse(id, result)
613+
wp.mq.PushResponse(id, result, wp.getWorkflowWorkerPid())
611614
err = wp.flushQueue()
612615
if err != nil {
613616
panic(err)
@@ -678,7 +681,9 @@ func (wp *Workflow) flushQueue() error {
678681
func (wp *Workflow) runCommand(cmd any, payloads *commonpb.Payloads, header *commonpb.Header) (*internal.Message, error) {
679682
const op = errors.Op("workflow_process_runcommand")
680683
msg := &internal.Message{}
681-
wp.mq.AllocateMessage(cmd, payloads, header, msg)
684+
// attempt to prevent sending the response from the dead worker
685+
686+
wp.mq.AllocateMessage(cmd, payloads, header, msg, wp.getWorkflowWorkerPid())
682687

683688
if wp.mh != nil {
684689
wp.mh.Gauge(RrMetricName).Update(float64(wp.pool.QueueSize()))
@@ -734,6 +739,17 @@ func (wp *Workflow) runCommand(cmd any, payloads *commonpb.Payloads, header *com
734739
return msgs[0], nil
735740
}
736741

742+
func (wp *Workflow) getWorkflowWorkerPid() int {
743+
wp.log.Debug("fetching workflow worker pid")
744+
wfw := wp.pool.Workers()
745+
if len(wfw) > 0 {
746+
wp.log.Debug("workflow worker pid found", zap.Int("pid", int(wfw[0].Pid())))
747+
return int(wfw[0].Pid())
748+
}
749+
wp.log.Debug("workflow worker pid not found")
750+
return 0
751+
}
752+
737753
func (wp *Workflow) getPld() *payload.Payload {
738754
return wp.pldPool.Get().(*payload.Payload)
739755
}

aggregatedpool/workers.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ func TemporalWorkers(wDef *Workflow, actDef *Activity, wi []*internal.WorkerInfo
3636
)
3737
}
3838

39-
// interceptor used here to the headers
39+
// interceptor used here to the headers
4040
wi[i].Options.Interceptors = append(wi[i].Options.Interceptors, NewWorkerInterceptor())
4141
for _, interceptor := range interceptors {
4242
wi[i].Options.Interceptors = append(wi[i].Options.Interceptors, interceptor.WorkerInterceptor())

0 commit comments

Comments
 (0)