@@ -32,6 +32,7 @@ import (
3232 "github.com/apache/beam/sdks/v2/go/pkg/beam/util/execx"
3333)
3434
35+ const pipLogFlushInterval time.Duration = 15 * time .Second
3536const unrecoverableURL string = "https://beam.apache.org/documentation/sdks/python-unrecoverable-errors/index.html#pip-dependency-resolution-failures"
3637
3738// pipInstallRequirements installs the given requirement, if present.
@@ -40,15 +41,15 @@ func pipInstallRequirements(ctx context.Context, logger *tools.Logger, files []s
4041 if err != nil {
4142 return err
4243 }
43- bufLogger := tools .NewBufferedLogger ( logger )
44+ bufLogger := tools .NewBufferedLoggerWithFlushInterval ( ctx , logger , pipLogFlushInterval )
4445 for _ , file := range files {
4546 if file == name {
4647 // We run the install process in two rounds in order to avoid as much
4748 // as possible PyPI downloads. In the first round the --find-links
4849 // option will make sure that only things staged in the worker will be
4950 // used without following their dependencies.
5051 args := []string {"-m" , "pip" , "install" , "-r" , filepath .Join (dir , name ), "--no-cache-dir" , "--disable-pip-version-check" , "--no-index" , "--no-deps" , "--find-links" , dir }
51- if err := execx .Execute ( pythonVersion , args ... ); err != nil {
52+ if err := execx .ExecuteEnvWithIO ( nil , os . Stdin , bufLogger , bufLogger , pythonVersion , args ... ); err != nil {
5253 bufLogger .Printf (ctx , "Some packages could not be installed solely from the requirements cache. Installing packages from PyPI." )
5354 }
5455 // The second install round opens up the search for packages on PyPI and
@@ -79,8 +80,6 @@ func isPackageInstalled(pkgName string) bool {
7980 return true
8081}
8182
82- const pipLogFlushInterval time.Duration = 15 * time .Second
83-
8483// pipInstallPackage installs the given package, if present.
8584func pipInstallPackage (ctx context.Context , logger * tools.Logger , files []string , dir , name string , force , optional bool , extras []string ) error {
8685 pythonVersion , err := expansionx .GetPythonVersion ()
@@ -150,7 +149,7 @@ func pipInstallPackage(ctx context.Context, logger *tools.Logger, files []string
150149// installExtraPackages installs all the packages declared in the extra
151150// packages manifest file.
152151func installExtraPackages (ctx context.Context , logger * tools.Logger , files []string , extraPackagesFile , dir string ) error {
153- bufLogger := tools .NewBufferedLogger ( logger )
152+ bufLogger := tools .NewBufferedLoggerWithFlushInterval ( ctx , logger , pipLogFlushInterval )
154153 // First check that extra packages manifest file is present.
155154 for _ , file := range files {
156155 if file != extraPackagesFile {
@@ -179,7 +178,7 @@ func installExtraPackages(ctx context.Context, logger *tools.Logger, files []str
179178}
180179
181180func findBeamSdkWhl (ctx context.Context , logger * tools.Logger , files []string , acceptableWhlSpecs []string ) string {
182- bufLogger := tools .NewBufferedLogger ( logger )
181+ bufLogger := tools .NewBufferedLoggerWithFlushInterval ( ctx , logger , pipLogFlushInterval )
183182 for _ , file := range files {
184183 if strings .HasPrefix (file , "apache_beam" ) {
185184 for _ , s := range acceptableWhlSpecs {
@@ -200,7 +199,7 @@ func findBeamSdkWhl(ctx context.Context, logger *tools.Logger, files []string, a
200199// SDK from source tarball provided in sdkSrcFile.
201200func installSdk (ctx context.Context , logger * tools.Logger , files []string , workDir string , sdkSrcFile string , acceptableWhlSpecs []string , required bool ) error {
202201 sdkWhlFile := findBeamSdkWhl (ctx , logger , files , acceptableWhlSpecs )
203- bufLogger := tools .NewBufferedLogger ( logger )
202+ bufLogger := tools .NewBufferedLoggerWithFlushInterval ( ctx , logger , pipLogFlushInterval )
204203 if sdkWhlFile != "" {
205204 // by default, pip rejects to install wheel if same version already installed
206205 isDev := strings .Contains (sdkWhlFile , ".dev" )
0 commit comments