From 3695d6e19ba7591a57886da54e47fd57b277df36 Mon Sep 17 00:00:00 2001 From: Shunping Huang Date: Fri, 5 Dec 2025 20:14:20 -0500 Subject: [PATCH 1/8] add some debugging msg. --- sdks/python/apache_beam/pipeline.py | 34 ++++++++++--------- .../fn_api_runner/worker_handlers.py | 1 + sdks/python/setup.py | 1 + 3 files changed, 20 insertions(+), 16 deletions(-) diff --git a/sdks/python/apache_beam/pipeline.py b/sdks/python/apache_beam/pipeline.py index 6ef06abb7436..8fa47b40760e 100644 --- a/sdks/python/apache_beam/pipeline.py +++ b/sdks/python/apache_beam/pipeline.py @@ -561,9 +561,11 @@ def run(self, test_runner_api: Union[bool, str] = 'AUTO') -> 'PipelineResult': """Runs the pipeline. Returns whatever our runner returns after running.""" # All pipeline options are finalized at this point. # Call get_all_options to print warnings on invalid options. - self.options.get_all_options( + opts = self.options.get_all_options( retain_unknown_options=True, display_warnings=True) + logging.exception("all_options:" + str(opts)) + logging.exception('runner class:' + str(self.runner)) for error_handler in self._error_handlers: error_handler.verify_closed() @@ -867,7 +869,7 @@ def _infer_result_type( inputs: Sequence[Union[pvalue.PBegin, pvalue.PCollection]], result_pcollection: Union[pvalue.PValue, pvalue.DoOutputsTuple]) -> None: """Infer and set the output element type for a PCollection. - + This function determines the output types of transforms by combining: 1. Concrete input types from previous transforms 2. Type hints declared on the current transform @@ -878,43 +880,43 @@ def _infer_result_type( Type variables (K, V, T, etc.) act as placeholders that get bound to concrete types through pattern matching. This requires both an input pattern and an output template: - + Input Pattern (from .with_input_types()): Defines where in the input to find each type variable Example: Tuple[K, V] means "K is the first element, V is the second" - + Output Template (from .with_output_types()): Defines how to use the bound variables in the output Example: Tuple[V, K] means "swap the positions" - + CONCRETE TYPES VS TYPE VARIABLES --------------------------------- The system handles these differently: - + Concrete Types (e.g., str, int, Tuple[str, int]): - Used as-is without any binding - Do not fall back to Any - Example: .with_output_types(Tuple[str, int]) → Tuple[str, int] - + Type Variables (e.g., K, V, T): - Must be bound through pattern matching - Require .with_input_types() to provide the pattern - Fall back to Any if not bound - Example without pattern: Tuple[K, V] → Tuple[Any, Any] - Example with pattern: Tuple[K, V] → Tuple[str, int] - + BINDING ALGORITHM ----------------- 1. Match: Compare input pattern to concrete input Pattern: Tuple[K, V] Concrete: Tuple[str, int] Result: {K: str, V: int} ← Bindings created - + 2. Substitute: Apply bindings to output template Template: Tuple[V, K] ← Note: swapped! Bindings: {K: str, V: int} Result: Tuple[int, str] ← Swapped concrete types - + Each transform operates in its own type inference scope. Type variables declared in a parent composite transform do NOT automatically propagate to child transforms. @@ -925,36 +927,36 @@ class MyComposite(PTransform): def expand(self, pcoll): # Child scope - parent's K, V are NOT available return pcoll | ChildTransform() - + Type variables that remain unbound after inference fall back to Any: - + EXAMPLES -------- Example 1: Concrete types (no variables) Input: Tuple[str, int] Transform: .with_output_types(Tuple[str, int]) Output: Tuple[str, int] ← Used as-is - + Example 2: Type variables with pattern (correct) Input: Tuple[str, int] Transform: .with_input_types(Tuple[K, V]) .with_output_types(Tuple[V, K]) Binding: {K: str, V: int} Output: Tuple[int, str] ← Swapped! - + Example 3: Type variables without pattern (falls back to Any) Input: Tuple[str, int] Transform: .with_output_types(Tuple[K, V]) ← No input pattern! Binding: None (can't match) Output: Tuple[Any, Any] ← Fallback - + Example 4: Mixed concrete and variables Input: Tuple[str, int] Transform: .with_input_types(Tuple[str, V]) .with_output_types(Tuple[str, V]) Binding: {V: int} ← Only V needs binding Output: Tuple[str, int] ← str passed through, V bound to int - + Args: transform: The PTransform being applied inputs: Input PCollections (provides concrete types) diff --git a/sdks/python/apache_beam/runners/portability/fn_api_runner/worker_handlers.py b/sdks/python/apache_beam/runners/portability/fn_api_runner/worker_handlers.py index d79b381f2d78..5d5f9f5c7ac6 100644 --- a/sdks/python/apache_beam/runners/portability/fn_api_runner/worker_handlers.py +++ b/sdks/python/apache_beam/runners/portability/fn_api_runner/worker_handlers.py @@ -475,6 +475,7 @@ def __init__( ("grpc.http2.min_ping_interval_without_data_ms", 20_000), ] + _LOGGER.exception("Starting a few grpc server with options:" + str(options)) self.state = state self.provision_info = provision_info self.control_server = grpc.server( diff --git a/sdks/python/setup.py b/sdks/python/setup.py index 176c84c9966b..24e63e2d0604 100644 --- a/sdks/python/setup.py +++ b/sdks/python/setup.py @@ -14,6 +14,7 @@ # See the License for the specific language governing permissions and # limitations under the License. # +# Modify setup.py to trigger more tests. """Apache Beam SDK for Python setup file.""" import glob From d50898cc80dbd22ebdf638853c29c2c08c1bab6d Mon Sep 17 00:00:00 2001 From: Shunping Huang Date: Fri, 5 Dec 2025 20:50:58 -0500 Subject: [PATCH 2/8] Make an error. --- sdks/python/apache_beam/dataframe/io_test.py | 1 + 1 file changed, 1 insertion(+) diff --git a/sdks/python/apache_beam/dataframe/io_test.py b/sdks/python/apache_beam/dataframe/io_test.py index 92bb10225c78..4ecd47207910 100644 --- a/sdks/python/apache_beam/dataframe/io_test.py +++ b/sdks/python/apache_beam/dataframe/io_test.py @@ -179,6 +179,7 @@ def test_read_write( requires=()): self._run_read_write_test( format, read_kwargs, write_kwargs, check_options, requires) + self.assertEqual(1, 2) # pylint: disable=dangerous-default-value def _run_read_write_test( From 5cb8ac711aca37138034e36375a3b2b16e36d0f6 Mon Sep 17 00:00:00 2001 From: Shunping Huang Date: Fri, 5 Dec 2025 21:41:03 -0500 Subject: [PATCH 3/8] Add logging msg in SwitchingDirectRunner --- sdks/python/apache_beam/runners/direct/direct_runner.py | 9 +++++---- sdks/python/apache_beam/runners/worker/data_plane.py | 2 ++ 2 files changed, 7 insertions(+), 4 deletions(-) diff --git a/sdks/python/apache_beam/runners/direct/direct_runner.py b/sdks/python/apache_beam/runners/direct/direct_runner.py index 73b0321b5de4..e7e80ad9ad7f 100644 --- a/sdks/python/apache_beam/runners/direct/direct_runner.py +++ b/sdks/python/apache_beam/runners/direct/direct_runner.py @@ -206,7 +206,7 @@ def visit_transform(self, applied_ptransform): # Check whether all transforms used in the pipeline are supported by the # PrismRunner if _PrismRunnerSupportVisitor().accept(pipeline, self._is_interactive): - _LOGGER.info('Running pipeline with PrismRunner.') + _LOGGER.exception('Running pipeline with PrismRunner.') from apache_beam.runners.portability import prism_runner runner = prism_runner.PrismRunner() @@ -216,7 +216,7 @@ def visit_transform(self, applied_ptransform): # probably failed on job submission. if (PipelineState.is_terminal(pr.state) and pr.state != PipelineState.DONE): - _LOGGER.info( + _LOGGER.exception( 'Pipeline failed on PrismRunner, falling back to DirectRunner.') runner = BundleBasedDirectRunner() else: @@ -225,8 +225,8 @@ def visit_transform(self, applied_ptransform): # If prism fails in Preparing the portable job, then the PortableRunner # code raises an exception. Catch it, log it, and use the Direct runner # instead. - _LOGGER.info('Exception with PrismRunner:\n %s\n' % (e)) - _LOGGER.info('Falling back to DirectRunner') + _LOGGER.exception('Exception with PrismRunner:\n %s\n' % (e)) + _LOGGER.exception('Falling back to DirectRunner') runner = BundleBasedDirectRunner() # Check whether all transforms used in the pipeline are supported by the @@ -240,6 +240,7 @@ def visit_transform(self, applied_ptransform): provision_info = fn_runner.ExtendedProvisionInfo( beam_provision_api_pb2.ProvisionInfo( pipeline_options=encoded_options)) + _LOGGER.exception("Use FnApiRunner") runner = fn_runner.FnApiRunner(provision_info=provision_info) return runner.run_pipeline(pipeline, options) diff --git a/sdks/python/apache_beam/runners/worker/data_plane.py b/sdks/python/apache_beam/runners/worker/data_plane.py index cbd28f8b0a3f..a00e6cd6ea1f 100644 --- a/sdks/python/apache_beam/runners/worker/data_plane.py +++ b/sdks/python/apache_beam/runners/worker/data_plane.py @@ -472,6 +472,8 @@ def __init__(self, data_buffer_time_limit_ms=0): self._closed = False self._exception = None # type: Optional[Exception] + _LOGGER.exception("start grpc data channel") + def close(self): # type: () -> None self._to_send.put(self._WRITES_FINISHED) From 92e3a94051d98e5ff251fe2cea9e2e479fe8a6ce Mon Sep 17 00:00:00 2001 From: Shunping Huang Date: Fri, 5 Dec 2025 22:01:06 -0500 Subject: [PATCH 4/8] Add msg for result state --- sdks/python/apache_beam/runners/direct/direct_runner.py | 1 + 1 file changed, 1 insertion(+) diff --git a/sdks/python/apache_beam/runners/direct/direct_runner.py b/sdks/python/apache_beam/runners/direct/direct_runner.py index e7e80ad9ad7f..bc3cbdd91d79 100644 --- a/sdks/python/apache_beam/runners/direct/direct_runner.py +++ b/sdks/python/apache_beam/runners/direct/direct_runner.py @@ -214,6 +214,7 @@ def visit_transform(self, applied_ptransform): pr = runner.run_pipeline(pipeline, options) # This is non-blocking, so if the state is *already* finished, something # probably failed on job submission. + _LOGGER.exception('PrismRunner state:' + str(pr.state)) if (PipelineState.is_terminal(pr.state) and pr.state != PipelineState.DONE): _LOGGER.exception( From 610bca85e79c9bc4e7c2e1407ad54fb3e8e2e7f1 Mon Sep 17 00:00:00 2001 From: Shunping Huang Date: Sat, 6 Dec 2025 13:30:45 -0500 Subject: [PATCH 5/8] Revert "Make an error." This reverts commit d50898cc80dbd22ebdf638853c29c2c08c1bab6d. --- sdks/python/apache_beam/dataframe/io_test.py | 1 - 1 file changed, 1 deletion(-) diff --git a/sdks/python/apache_beam/dataframe/io_test.py b/sdks/python/apache_beam/dataframe/io_test.py index 4ecd47207910..92bb10225c78 100644 --- a/sdks/python/apache_beam/dataframe/io_test.py +++ b/sdks/python/apache_beam/dataframe/io_test.py @@ -179,7 +179,6 @@ def test_read_write( requires=()): self._run_read_write_test( format, read_kwargs, write_kwargs, check_options, requires) - self.assertEqual(1, 2) # pylint: disable=dangerous-default-value def _run_read_write_test( From c27030cc77271b9af32f2d5217523eb778910a9b Mon Sep 17 00:00:00 2001 From: Shunping Huang Date: Sat, 6 Dec 2025 13:32:07 -0500 Subject: [PATCH 6/8] Apply patch on prism grpc server option. --- .../go/pkg/beam/runners/prism/internal/jobservices/server.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/sdks/go/pkg/beam/runners/prism/internal/jobservices/server.go b/sdks/go/pkg/beam/runners/prism/internal/jobservices/server.go index f7d6ba5ad361..2399fd726dae 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/jobservices/server.go +++ b/sdks/go/pkg/beam/runners/prism/internal/jobservices/server.go @@ -30,6 +30,7 @@ import ( jobpb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/jobmanagement_v1" "github.com/apache/beam/sdks/v2/go/pkg/beam/runners/prism/internal/worker" "google.golang.org/grpc" + "google.golang.org/grpc/keepalive" ) type Server struct { @@ -80,6 +81,10 @@ func NewServer(port int, execute func(*Job)) *Server { s.logger.Info("Serving JobManagement", slog.String("endpoint", s.Endpoint())) opts := []grpc.ServerOption{ grpc.MaxRecvMsgSize(math.MaxInt32), + grpc.KeepaliveEnforcementPolicy(keepalive.EnforcementPolicy{ + MinTime: 20 * time.Second, // Minimum duration a client should wait before sending a keepalive ping + PermitWithoutStream: true, // Allow pings even if there are no active streams + }), } s.server = grpc.NewServer(opts...) jobpb.RegisterJobServiceServer(s.server, s) From 42d3c3b51a2c6680237a9025872557bf0685cc28 Mon Sep 17 00:00:00 2001 From: Shunping Huang Date: Sat, 6 Dec 2025 14:32:22 -0500 Subject: [PATCH 7/8] Revert "Reduce weighing overhead for caching blocks (#36897)" This reverts commit 84684aef490e03163295050e3b6e51aa6c6a2ee6. --- .../apache/beam/sdk/fn/data/WeightedList.java | 11 ++- .../harness/state/StateFetchingIterators.java | 83 +++++++------------ 2 files changed, 39 insertions(+), 55 deletions(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/fn/data/WeightedList.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/fn/data/WeightedList.java index 5eb317fc2875..ad5e131cb2d7 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/fn/data/WeightedList.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/fn/data/WeightedList.java @@ -20,7 +20,6 @@ import java.util.List; import java.util.concurrent.atomic.AtomicLong; import org.apache.beam.sdk.util.Weighted; -import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.math.LongMath; /** Facade for a {@link List} that keeps track of weight, for cache limit reasons. */ public class WeightedList implements Weighted { @@ -72,6 +71,14 @@ public void addAll(List values, long weight) { } public void accumulateWeight(long weight) { - this.weight.accumulateAndGet(weight, LongMath::saturatedAdd); + this.weight.accumulateAndGet( + weight, + (first, second) -> { + try { + return Math.addExact(first, second); + } catch (ArithmeticException e) { + return Long.MAX_VALUE; + } + }); } } diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/StateFetchingIterators.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/StateFetchingIterators.java index 339ddad4061e..1e06c98f2e31 100644 --- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/StateFetchingIterators.java +++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/StateFetchingIterators.java @@ -49,8 +49,6 @@ import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Throwables; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.AbstractIterator; -import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; -import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.math.LongMath; /** * Adapters which convert a logical series of chunks using continuation tokens over the Beam Fn @@ -251,11 +249,15 @@ static class BlocksPrefix extends Blocks implements Shrinkable block : blocks) { - sum = LongMath.saturatedAdd(sum, block.getWeight()); + try { + long sum = 8 + blocks.size() * 8L; + for (Block block : blocks) { + sum = Math.addExact(sum, block.getWeight()); + } + return sum; + } catch (ArithmeticException e) { + return Long.MAX_VALUE; } - return sum; } BlocksPrefix(List> blocks) { @@ -280,7 +282,8 @@ public List> getBlocks() { @AutoValue abstract static class Block implements Weighted { - private static final Block EMPTY = fromValues(ImmutableList.of(), 0, null); + private static final Block EMPTY = + fromValues(WeightedList.of(Collections.emptyList(), 0), null); @SuppressWarnings("unchecked") // Based upon as Collections.emptyList() public static Block emptyBlock() { @@ -296,37 +299,21 @@ public static Block mutatedBlock(WeightedList values) { } public static Block fromValues(List values, @Nullable ByteString nextToken) { - if (values.isEmpty() && nextToken == null) { - return emptyBlock(); - } - ImmutableList immutableValues = ImmutableList.copyOf(values); - long listWeight = immutableValues.size() * Caches.REFERENCE_SIZE; - for (T value : immutableValues) { - listWeight = LongMath.saturatedAdd(listWeight, Caches.weigh(value)); - } - return fromValues(immutableValues, listWeight, nextToken); + return fromValues(WeightedList.of(values, Caches.weigh(values)), nextToken); } public static Block fromValues( WeightedList values, @Nullable ByteString nextToken) { - if (values.isEmpty() && nextToken == null) { - return emptyBlock(); - } - return fromValues(ImmutableList.copyOf(values.getBacking()), values.getWeight(), nextToken); - } - - private static Block fromValues( - ImmutableList values, long listWeight, @Nullable ByteString nextToken) { - long weight = LongMath.saturatedAdd(listWeight, 24); + long weight = values.getWeight() + 24; if (nextToken != null) { if (nextToken.isEmpty()) { nextToken = ByteString.EMPTY; } else { - weight = LongMath.saturatedAdd(weight, Caches.weigh(nextToken)); + weight += Caches.weigh(nextToken); } } return new AutoValue_StateFetchingIterators_CachingStateIterable_Block<>( - values, nextToken, weight); + values.getBacking(), nextToken, weight); } abstract List getValues(); @@ -385,12 +372,10 @@ public void remove(Set toRemoveStructuralValues) { totalSize += tBlock.getValues().size(); } - ImmutableList.Builder allValues = ImmutableList.builderWithExpectedSize(totalSize); - long weight = 0; - List blockValuesToKeep = new ArrayList<>(); + WeightedList allValues = WeightedList.of(new ArrayList<>(totalSize), 0L); for (Block block : blocks) { - blockValuesToKeep.clear(); boolean valueRemovedFromBlock = false; + List blockValuesToKeep = new ArrayList<>(); for (T value : block.getValues()) { if (!toRemoveStructuralValues.contains(valueCoder.structuralValue(value))) { blockValuesToKeep.add(value); @@ -402,19 +387,13 @@ public void remove(Set toRemoveStructuralValues) { // If any value was removed from this block, need to estimate the weight again. // Otherwise, just reuse the block's weight. if (valueRemovedFromBlock) { - allValues.addAll(blockValuesToKeep); - for (T value : blockValuesToKeep) { - weight = LongMath.saturatedAdd(weight, Caches.weigh(value)); - } + allValues.addAll(blockValuesToKeep, Caches.weigh(block.getValues())); } else { - allValues.addAll(block.getValues()); - weight = LongMath.saturatedAdd(weight, block.getWeight()); + allValues.addAll(block.getValues(), block.getWeight()); } } - cache.put( - IterableCacheKey.INSTANCE, - new MutatedBlocks<>(Block.fromValues(allValues.build(), weight, null))); + cache.put(IterableCacheKey.INSTANCE, new MutatedBlocks<>(Block.mutatedBlock(allValues))); } /** @@ -505,24 +484,21 @@ private void appendHelper(List newValues, long newWeight) { for (Block block : blocks) { totalSize += block.getValues().size(); } - ImmutableList.Builder allValues = ImmutableList.builderWithExpectedSize(totalSize); - long weight = 0; + WeightedList allValues = WeightedList.of(new ArrayList<>(totalSize), 0L); for (Block block : blocks) { - allValues.addAll(block.getValues()); - weight = LongMath.saturatedAdd(weight, block.getWeight()); + allValues.addAll(block.getValues(), block.getWeight()); } if (newWeight < 0) { - newWeight = 0; - for (T value : newValues) { - newWeight = LongMath.saturatedAdd(newWeight, Caches.weigh(value)); + if (newValues.size() == 1) { + // Optimize weighing of the common value state as single single-element bag state. + newWeight = Caches.weigh(newValues.get(0)); + } else { + newWeight = Caches.weigh(newValues); } } - allValues.addAll(newValues); - weight = LongMath.saturatedAdd(weight, newWeight); + allValues.addAll(newValues, newWeight); - cache.put( - IterableCacheKey.INSTANCE, - new MutatedBlocks<>(Block.fromValues(allValues.build(), weight, null))); + cache.put(IterableCacheKey.INSTANCE, new MutatedBlocks<>(Block.mutatedBlock(allValues))); } class CachingStateIterator implements PrefetchableIterator { @@ -604,7 +580,8 @@ public boolean hasNext() { return false; } // Release the block while we are loading the next one. - currentBlock = Block.emptyBlock(); + currentBlock = + Block.fromValues(WeightedList.of(Collections.emptyList(), 0L), ByteString.EMPTY); @Nullable Blocks existing = cache.peek(IterableCacheKey.INSTANCE); boolean isFirstBlock = ByteString.EMPTY.equals(nextToken); From bb31810a243dc1f35ff1c1d1d2b6efffc491a2ec Mon Sep 17 00:00:00 2001 From: Shunping Huang Date: Sat, 6 Dec 2025 14:35:55 -0500 Subject: [PATCH 8/8] Modify logging. --- sdks/python/apache_beam/pipeline.py | 6 +++--- .../apache_beam/runners/direct/direct_runner.py | 12 ++++++------ .../portability/fn_api_runner/worker_handlers.py | 2 +- sdks/python/apache_beam/runners/worker/data_plane.py | 2 +- sdks/python/setup.py | 2 +- 5 files changed, 12 insertions(+), 12 deletions(-) diff --git a/sdks/python/apache_beam/pipeline.py b/sdks/python/apache_beam/pipeline.py index 8fa47b40760e..2d28a03d091b 100644 --- a/sdks/python/apache_beam/pipeline.py +++ b/sdks/python/apache_beam/pipeline.py @@ -561,11 +561,11 @@ def run(self, test_runner_api: Union[bool, str] = 'AUTO') -> 'PipelineResult': """Runs the pipeline. Returns whatever our runner returns after running.""" # All pipeline options are finalized at this point. # Call get_all_options to print warnings on invalid options. - opts = self.options.get_all_options( + self.options.get_all_options( retain_unknown_options=True, display_warnings=True) - logging.exception("all_options:" + str(opts)) - logging.exception('runner class:' + str(self.runner)) + # logging.exception("all_options:" + str(opts)) + logging.error('runner class:' + str(self.runner)) for error_handler in self._error_handlers: error_handler.verify_closed() diff --git a/sdks/python/apache_beam/runners/direct/direct_runner.py b/sdks/python/apache_beam/runners/direct/direct_runner.py index bc3cbdd91d79..8a2f37a9f583 100644 --- a/sdks/python/apache_beam/runners/direct/direct_runner.py +++ b/sdks/python/apache_beam/runners/direct/direct_runner.py @@ -206,7 +206,7 @@ def visit_transform(self, applied_ptransform): # Check whether all transforms used in the pipeline are supported by the # PrismRunner if _PrismRunnerSupportVisitor().accept(pipeline, self._is_interactive): - _LOGGER.exception('Running pipeline with PrismRunner.') + _LOGGER.error('Running pipeline with PrismRunner.') from apache_beam.runners.portability import prism_runner runner = prism_runner.PrismRunner() @@ -214,10 +214,10 @@ def visit_transform(self, applied_ptransform): pr = runner.run_pipeline(pipeline, options) # This is non-blocking, so if the state is *already* finished, something # probably failed on job submission. - _LOGGER.exception('PrismRunner state:' + str(pr.state)) + _LOGGER.error('PrismRunner state:' + str(pr.state)) if (PipelineState.is_terminal(pr.state) and pr.state != PipelineState.DONE): - _LOGGER.exception( + _LOGGER.error( 'Pipeline failed on PrismRunner, falling back to DirectRunner.') runner = BundleBasedDirectRunner() else: @@ -226,8 +226,8 @@ def visit_transform(self, applied_ptransform): # If prism fails in Preparing the portable job, then the PortableRunner # code raises an exception. Catch it, log it, and use the Direct runner # instead. - _LOGGER.exception('Exception with PrismRunner:\n %s\n' % (e)) - _LOGGER.exception('Falling back to DirectRunner') + _LOGGER.error('Exception with PrismRunner:\n %s\n' % (e)) + _LOGGER.error('Falling back to DirectRunner') runner = BundleBasedDirectRunner() # Check whether all transforms used in the pipeline are supported by the @@ -241,7 +241,7 @@ def visit_transform(self, applied_ptransform): provision_info = fn_runner.ExtendedProvisionInfo( beam_provision_api_pb2.ProvisionInfo( pipeline_options=encoded_options)) - _LOGGER.exception("Use FnApiRunner") + _LOGGER.error("Use FnApiRunner") runner = fn_runner.FnApiRunner(provision_info=provision_info) return runner.run_pipeline(pipeline, options) diff --git a/sdks/python/apache_beam/runners/portability/fn_api_runner/worker_handlers.py b/sdks/python/apache_beam/runners/portability/fn_api_runner/worker_handlers.py index 5d5f9f5c7ac6..3cbc6878887f 100644 --- a/sdks/python/apache_beam/runners/portability/fn_api_runner/worker_handlers.py +++ b/sdks/python/apache_beam/runners/portability/fn_api_runner/worker_handlers.py @@ -475,7 +475,7 @@ def __init__( ("grpc.http2.min_ping_interval_without_data_ms", 20_000), ] - _LOGGER.exception("Starting a few grpc server with options:" + str(options)) + _LOGGER.error("Starting a few grpc server with options:" + str(options)) self.state = state self.provision_info = provision_info self.control_server = grpc.server( diff --git a/sdks/python/apache_beam/runners/worker/data_plane.py b/sdks/python/apache_beam/runners/worker/data_plane.py index a00e6cd6ea1f..92b713ae8831 100644 --- a/sdks/python/apache_beam/runners/worker/data_plane.py +++ b/sdks/python/apache_beam/runners/worker/data_plane.py @@ -472,7 +472,7 @@ def __init__(self, data_buffer_time_limit_ms=0): self._closed = False self._exception = None # type: Optional[Exception] - _LOGGER.exception("start grpc data channel") + _LOGGER.error("start grpc data channel") def close(self): # type: () -> None diff --git a/sdks/python/setup.py b/sdks/python/setup.py index 24e63e2d0604..a0f177803de1 100644 --- a/sdks/python/setup.py +++ b/sdks/python/setup.py @@ -14,7 +14,6 @@ # See the License for the specific language governing permissions and # limitations under the License. # -# Modify setup.py to trigger more tests. """Apache Beam SDK for Python setup file.""" import glob @@ -187,6 +186,7 @@ def find_by_ext(root_dir, ext): # We must generate protos after setup_requires are installed. +# Add some comments here. def generate_protos_first(): try: # Pyproject toml build happens in isolated environemnts. In those envs,