Skip to content

Commit 25be5af

Browse files
authored
[GoSDK + Prism] Support Process env execution. (#33651)
1 parent 7d17f2a commit 25be5af

File tree

4 files changed

+62
-11
lines changed

4 files changed

+62
-11
lines changed

CHANGES.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,12 +74,14 @@
7474
* External, Process based Worker Pool support added to the Go SDK container. ([#33572](https://github.com/apache/beam/pull/33572))
7575
* This is used to enable sidecar containers to run SDK workers for some runners.
7676
* See https://beam.apache.org/documentation/runtime/sdk-harness-config/ for details.
77+
* Support the Process Environment for execution in the Go SDK. ([#33651](https://github.com/apache/beam/pull/33651))
7778
* Prism
7879
* Prism now uses the same single port for both pipeline submission and execution on workers. Requests are differentiated by worker-id. ([#33438](https://github.com/apache/beam/pull/33438))
7980
* This avoids port starvation and provides clarity on port use when running Prism in non-local environments.
8081
* Support for @RequiresTimeSortedInputs added. ([#33513](https://github.com/apache/beam/issues/33513))
8182
* Initial support for AllowedLateness added. ([#33542](https://github.com/apache/beam/pull/33542))
8283
* The Go SDK's inprocess Prism runner (AKA the Go SDK default runner) now supports non-loopback mode environment types. ([#33572](https://github.com/apache/beam/pull/33572))
84+
* Support the Process Environment for execution in Prism ([#33651](https://github.com/apache/beam/pull/33651))
8385

8486
## Breaking Changes
8587

sdks/go/pkg/beam/core/runtime/graphx/translate.go

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ package graphx
1717

1818
import (
1919
"context"
20+
"encoding/json"
2021
"fmt"
2122
"sort"
2223
"strings"
@@ -122,8 +123,12 @@ func CreateEnvironment(ctx context.Context, urn string, extractEnvironmentConfig
122123
var serializedPayload []byte
123124
switch urn {
124125
case URNEnvProcess:
125-
// TODO Support process based SDK Harness.
126-
return nil, errors.Errorf("unsupported environment %v", urn)
126+
config := extractEnvironmentConfig(ctx)
127+
payload := &pipepb.ProcessPayload{}
128+
if err := json.Unmarshal([]byte(config), payload); err != nil {
129+
return nil, fmt.Errorf("unable to json unmarshal --environment_config: %w", err)
130+
}
131+
serializedPayload = protox.MustEncode(payload)
127132
case URNEnvExternal:
128133
config := extractEnvironmentConfig(ctx)
129134
payload := &pipepb.ExternalPayload{Endpoint: &pipepb.ApiServiceDescriptor{Url: config}}

sdks/go/pkg/beam/core/runtime/graphx/translate_test.go

Lines changed: 22 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -296,22 +296,22 @@ func (fn *splitPickFn) ProcessElement(_ *testRT, a int, small, big func(int)) {
296296
}
297297

298298
func TestCreateEnvironment(t *testing.T) {
299-
t.Run("process", func(t *testing.T) {
300-
const wantEnv = "process"
299+
t.Run("processBadConfig", func(t *testing.T) {
301300
urn := graphx.URNEnvProcess
302-
got, err := graphx.CreateEnvironment(context.Background(), urn, func(_ context.Context) string { return wantEnv })
301+
got, err := graphx.CreateEnvironment(context.Background(), urn, func(_ context.Context) string { return "not a real json" })
303302
if err == nil {
304-
t.Errorf("CreateEnvironment(%v) = %v error, want error since it's unsupported", urn, err)
303+
t.Errorf("CreateEnvironment(%v) = %v error, want error since parsing should fail", urn, err)
305304
}
306305
want := (*pipepb.Environment)(nil)
307306
if !proto.Equal(got, want) {
308-
t.Errorf("CreateEnvironment(%v) = %v, want %v since it's unsupported", urn, got, want)
307+
t.Errorf("CreateEnvironment(%v) = %v, want %v since creation should have failed", urn, got, want)
309308
}
310309
})
311310
tests := []struct {
312-
name string
313-
urn string
314-
payload func(name string) []byte
311+
name string
312+
configOverride string
313+
urn string
314+
payload func(name string) []byte
315315
}{
316316
{
317317
name: "external",
@@ -331,12 +331,25 @@ func TestCreateEnvironment(t *testing.T) {
331331
ContainerImage: name,
332332
})
333333
},
334+
}, {
335+
name: "process",
336+
configOverride: "{ \"command\": \"process\" }",
337+
urn: graphx.URNEnvProcess,
338+
payload: func(name string) []byte {
339+
return protox.MustEncode(&pipepb.ProcessPayload{
340+
Command: name,
341+
})
342+
},
334343
},
335344
}
336345
for _, test := range tests {
337346
test := test
338347
t.Run(test.name, func(t *testing.T) {
339-
got, err := graphx.CreateEnvironment(context.Background(), test.urn, func(_ context.Context) string { return test.name })
348+
config := test.name
349+
if test.configOverride != "" {
350+
config = test.configOverride
351+
}
352+
got, err := graphx.CreateEnvironment(context.Background(), test.urn, func(_ context.Context) string { return config })
340353
if err != nil {
341354
t.Errorf("CreateEnvironment(%v) = %v error, want nil", test.urn, err)
342355
}

sdks/go/pkg/beam/runners/prism/internal/environments.go

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@ import (
2222
"io"
2323
"log/slog"
2424
"os"
25+
"os/exec"
26+
"time"
2527

2628
fnpb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/fnexecution_v1"
2729
pipepb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/pipeline_v1"
@@ -66,6 +68,16 @@ func runEnvironment(ctx context.Context, j *jobservices.Job, env string, wk *wor
6668
logger.Error("unmarshing docker environment payload", "error", err)
6769
}
6870
return dockerEnvironment(ctx, logger, dp, wk, j.ArtifactEndpoint())
71+
case urns.EnvProcess:
72+
pp := &pipepb.ProcessPayload{}
73+
if err := (proto.UnmarshalOptions{}).Unmarshal(e.GetPayload(), pp); err != nil {
74+
logger.Error("unmarshing docker environment payload", "error", err)
75+
}
76+
go func() {
77+
processEnvironment(ctx, pp, wk)
78+
logger.Debug("environment stopped", slog.String("job", j.String()))
79+
}()
80+
return nil
6981
default:
7082
return fmt.Errorf("environment %v with urn %v unimplemented", env, e.GetUrn())
7183
}
@@ -231,3 +243,22 @@ func dockerEnvironment(ctx context.Context, logger *slog.Logger, dp *pipepb.Dock
231243

232244
return nil
233245
}
246+
247+
func processEnvironment(ctx context.Context, pp *pipepb.ProcessPayload, wk *worker.W) {
248+
cmd := exec.CommandContext(ctx, pp.GetCommand(), "--id="+wk.ID, "--provision_endpoint="+wk.Endpoint())
249+
250+
cmd.WaitDelay = time.Millisecond * 100
251+
cmd.Stderr = os.Stderr
252+
cmd.Stdout = os.Stdout
253+
cmd.Env = os.Environ()
254+
255+
for k, v := range pp.GetEnv() {
256+
cmd.Env = append(cmd.Environ(), fmt.Sprintf("%v=%v", k, v))
257+
}
258+
if err := cmd.Start(); err != nil {
259+
return
260+
}
261+
// Job processing happens here, but orchestrated by other goroutines
262+
// This call blocks until the context is cancelled, or the command exits.
263+
cmd.Wait()
264+
}

0 commit comments

Comments
 (0)