diff --git a/sdks/go/pkg/beam/runners/prism/internal/jobservices/artifact.go b/sdks/go/pkg/beam/runners/prism/internal/jobservices/artifact.go index e42e3e7ca666..f3e97406027e 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/jobservices/artifact.go +++ b/sdks/go/pkg/beam/runners/prism/internal/jobservices/artifact.go @@ -34,7 +34,9 @@ func (s *Server) ReverseArtifactRetrievalService(stream jobpb.ArtifactStagingSer if err != nil { return err } + s.mu.RLock() job := s.jobs[in.GetStagingToken()] + s.mu.RUnlock() envs := job.Pipeline.GetComponents().GetEnvironments() for _, env := range envs { diff --git a/sdks/go/pkg/beam/runners/prism/internal/jobservices/server.go b/sdks/go/pkg/beam/runners/prism/internal/jobservices/server.go index fb55fc54bf93..f7d6ba5ad361 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/jobservices/server.go +++ b/sdks/go/pkg/beam/runners/prism/internal/jobservices/server.go @@ -43,7 +43,7 @@ type Server struct { server *grpc.Server // Job Management - mu sync.Mutex + mu sync.RWMutex index uint32 // Use with atomics. jobs map[string]*Job