Skip to content

Commit 62cbf83

Browse files
authored
Override localhost endpoint when a worker is running in docker on mac (#35964)
1 parent 4c97993 commit 62cbf83

File tree

3 files changed

+33
-6
lines changed

3 files changed

+33
-6
lines changed

sdks/go/pkg/beam/runners/prism/internal/environments.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,7 @@ func runEnvironment(ctx context.Context, j *jobservices.Job, env string, wk *wor
7979
logger.Error("unmarshaling docker environment payload", "error", err)
8080
return err
8181
}
82-
return dockerEnvironment(ctx, logger, dp, wk, j.ArtifactEndpoint())
82+
return dockerEnvironment(ctx, logger, dp, wk, wk.ArtifactEndpoint)
8383
case urns.EnvProcess:
8484
pp := &pipepb.ProcessPayload{}
8585
if err := (proto.UnmarshalOptions{}).Unmarshal(e.GetPayload(), pp); err != nil {

sdks/go/pkg/beam/runners/prism/internal/jobservices/job.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -208,7 +208,7 @@ func (j *Job) MakeWorker(env string) *worker.W {
208208
wk.EnvPb = j.Pipeline.GetComponents().GetEnvironments()[env]
209209
wk.PipelineOptions = j.PipelineOptions()
210210
wk.JobKey = j.JobKey()
211-
wk.ArtifactEndpoint = j.ArtifactEndpoint()
212211

212+
wk.ResolveEndpoints(j.ArtifactEndpoint())
213213
return wk
214214
}

sdks/go/pkg/beam/runners/prism/internal/worker/worker.go

Lines changed: 31 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,9 @@ import (
2424
"io"
2525
"log/slog"
2626
"net"
27+
"os"
28+
"runtime"
29+
"strings"
2730
"sync"
2831
"sync/atomic"
2932

@@ -58,9 +61,9 @@ type W struct {
5861

5962
ID, Env string
6063

61-
JobKey, ArtifactEndpoint string
62-
EnvPb *pipepb.Environment
63-
PipelineOptions *structpb.Struct
64+
JobKey, ArtifactEndpoint, endpoint string
65+
EnvPb *pipepb.Environment
66+
PipelineOptions *structpb.Struct
6467

6568
// These are the ID sources
6669
inst uint64
@@ -79,8 +82,32 @@ type controlResponder interface {
7982
Respond(*fnpb.InstructionResponse)
8083
}
8184

85+
// resolveEndpoint checks if the worker is running inside a docker container on mac or Windows and
86+
// if the endpoint is a "localhost" endpoint. If so, overrides it with "host.docker.internal".
87+
// Reference: https://docs.docker.com/desktop/features/networking/#networking-mode-and-dns-behaviour-for-mac-and-windows
88+
func (wk *W) resolveEndpoint(endpoint string) string {
89+
// The presence of an external environment does not guarantee execution within
90+
// Docker, as Python's LOOPBACK also runs in an external environment.
91+
// A specific check for the "BEAM_WORKER_POOL_IN_DOCKER_VM" environment variable is required to confirm
92+
// if the worker is running inside a Docker container.
93+
// Python LOOPBACK mode: https://github.com/apache/beam/blob/0589b14812ec52bff9d20d3bfcd96da393b9ebdb/sdks/python/apache_beam/runners/portability/portable_runner.py#L397
94+
// External Environment: https://beam.apache.org/documentation/runtime/sdk-harness-config/
95+
96+
workerInDocker := wk.EnvPb.GetUrn() == urns.EnvDocker ||
97+
(wk.EnvPb.GetUrn() == urns.EnvExternal && (os.Getenv("BEAM_WORKER_POOL_IN_DOCKER_VM") == "1"))
98+
if runtime.GOOS != "linux" && workerInDocker && strings.HasPrefix(endpoint, "localhost:") {
99+
return "host.docker.internal:" + strings.TrimPrefix(endpoint, "localhost:")
100+
}
101+
return endpoint
102+
}
103+
104+
func (wk *W) ResolveEndpoints(artifactEndpoint string) {
105+
wk.ArtifactEndpoint = wk.resolveEndpoint(artifactEndpoint)
106+
wk.endpoint = wk.resolveEndpoint(wk.parentPool.endpoint)
107+
}
108+
82109
func (wk *W) Endpoint() string {
83-
return wk.parentPool.endpoint
110+
return wk.endpoint
84111
}
85112

86113
func (wk *W) String() string {

0 commit comments

Comments
 (0)