diff --git a/sdks/go/pkg/beam/runners/prism/internal/jobservices/server.go b/sdks/go/pkg/beam/runners/prism/internal/jobservices/server.go index f7d6ba5ad361..2399fd726dae 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/jobservices/server.go +++ b/sdks/go/pkg/beam/runners/prism/internal/jobservices/server.go @@ -30,6 +30,7 @@ import ( jobpb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/jobmanagement_v1" "github.com/apache/beam/sdks/v2/go/pkg/beam/runners/prism/internal/worker" "google.golang.org/grpc" + "google.golang.org/grpc/keepalive" ) type Server struct { @@ -80,6 +81,10 @@ func NewServer(port int, execute func(*Job)) *Server { s.logger.Info("Serving JobManagement", slog.String("endpoint", s.Endpoint())) opts := []grpc.ServerOption{ grpc.MaxRecvMsgSize(math.MaxInt32), + grpc.KeepaliveEnforcementPolicy(keepalive.EnforcementPolicy{ + MinTime: 20 * time.Second, // Minimum duration a client should wait before sending a keepalive ping + PermitWithoutStream: true, // Allow pings even if there are no active streams + }), } s.server = grpc.NewServer(opts...) jobpb.RegisterJobServiceServer(s.server, s) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/fn/data/WeightedList.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/fn/data/WeightedList.java index 5eb317fc2875..ad5e131cb2d7 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/fn/data/WeightedList.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/fn/data/WeightedList.java @@ -20,7 +20,6 @@ import java.util.List; import java.util.concurrent.atomic.AtomicLong; import org.apache.beam.sdk.util.Weighted; -import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.math.LongMath; /** Facade for a {@link List} that keeps track of weight, for cache limit reasons. */ public class WeightedList implements Weighted { @@ -72,6 +71,14 @@ public void addAll(List values, long weight) { } public void accumulateWeight(long weight) { - this.weight.accumulateAndGet(weight, LongMath::saturatedAdd); + this.weight.accumulateAndGet( + weight, + (first, second) -> { + try { + return Math.addExact(first, second); + } catch (ArithmeticException e) { + return Long.MAX_VALUE; + } + }); } } diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/StateFetchingIterators.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/StateFetchingIterators.java index 339ddad4061e..1e06c98f2e31 100644 --- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/StateFetchingIterators.java +++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/StateFetchingIterators.java @@ -49,8 +49,6 @@ import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Throwables; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.AbstractIterator; -import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; -import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.math.LongMath; /** * Adapters which convert a logical series of chunks using continuation tokens over the Beam Fn @@ -251,11 +249,15 @@ static class BlocksPrefix extends Blocks implements Shrinkable block : blocks) { - sum = LongMath.saturatedAdd(sum, block.getWeight()); + try { + long sum = 8 + blocks.size() * 8L; + for (Block block : blocks) { + sum = Math.addExact(sum, block.getWeight()); + } + return sum; + } catch (ArithmeticException e) { + return Long.MAX_VALUE; } - return sum; } BlocksPrefix(List> blocks) { @@ -280,7 +282,8 @@ public List> getBlocks() { @AutoValue abstract static class Block implements Weighted { - private static final Block EMPTY = fromValues(ImmutableList.of(), 0, null); + private static final Block EMPTY = + fromValues(WeightedList.of(Collections.emptyList(), 0), null); @SuppressWarnings("unchecked") // Based upon as Collections.emptyList() public static Block emptyBlock() { @@ -296,37 +299,21 @@ public static Block mutatedBlock(WeightedList values) { } public static Block fromValues(List values, @Nullable ByteString nextToken) { - if (values.isEmpty() && nextToken == null) { - return emptyBlock(); - } - ImmutableList immutableValues = ImmutableList.copyOf(values); - long listWeight = immutableValues.size() * Caches.REFERENCE_SIZE; - for (T value : immutableValues) { - listWeight = LongMath.saturatedAdd(listWeight, Caches.weigh(value)); - } - return fromValues(immutableValues, listWeight, nextToken); + return fromValues(WeightedList.of(values, Caches.weigh(values)), nextToken); } public static Block fromValues( WeightedList values, @Nullable ByteString nextToken) { - if (values.isEmpty() && nextToken == null) { - return emptyBlock(); - } - return fromValues(ImmutableList.copyOf(values.getBacking()), values.getWeight(), nextToken); - } - - private static Block fromValues( - ImmutableList values, long listWeight, @Nullable ByteString nextToken) { - long weight = LongMath.saturatedAdd(listWeight, 24); + long weight = values.getWeight() + 24; if (nextToken != null) { if (nextToken.isEmpty()) { nextToken = ByteString.EMPTY; } else { - weight = LongMath.saturatedAdd(weight, Caches.weigh(nextToken)); + weight += Caches.weigh(nextToken); } } return new AutoValue_StateFetchingIterators_CachingStateIterable_Block<>( - values, nextToken, weight); + values.getBacking(), nextToken, weight); } abstract List getValues(); @@ -385,12 +372,10 @@ public void remove(Set toRemoveStructuralValues) { totalSize += tBlock.getValues().size(); } - ImmutableList.Builder allValues = ImmutableList.builderWithExpectedSize(totalSize); - long weight = 0; - List blockValuesToKeep = new ArrayList<>(); + WeightedList allValues = WeightedList.of(new ArrayList<>(totalSize), 0L); for (Block block : blocks) { - blockValuesToKeep.clear(); boolean valueRemovedFromBlock = false; + List blockValuesToKeep = new ArrayList<>(); for (T value : block.getValues()) { if (!toRemoveStructuralValues.contains(valueCoder.structuralValue(value))) { blockValuesToKeep.add(value); @@ -402,19 +387,13 @@ public void remove(Set toRemoveStructuralValues) { // If any value was removed from this block, need to estimate the weight again. // Otherwise, just reuse the block's weight. if (valueRemovedFromBlock) { - allValues.addAll(blockValuesToKeep); - for (T value : blockValuesToKeep) { - weight = LongMath.saturatedAdd(weight, Caches.weigh(value)); - } + allValues.addAll(blockValuesToKeep, Caches.weigh(block.getValues())); } else { - allValues.addAll(block.getValues()); - weight = LongMath.saturatedAdd(weight, block.getWeight()); + allValues.addAll(block.getValues(), block.getWeight()); } } - cache.put( - IterableCacheKey.INSTANCE, - new MutatedBlocks<>(Block.fromValues(allValues.build(), weight, null))); + cache.put(IterableCacheKey.INSTANCE, new MutatedBlocks<>(Block.mutatedBlock(allValues))); } /** @@ -505,24 +484,21 @@ private void appendHelper(List newValues, long newWeight) { for (Block block : blocks) { totalSize += block.getValues().size(); } - ImmutableList.Builder allValues = ImmutableList.builderWithExpectedSize(totalSize); - long weight = 0; + WeightedList allValues = WeightedList.of(new ArrayList<>(totalSize), 0L); for (Block block : blocks) { - allValues.addAll(block.getValues()); - weight = LongMath.saturatedAdd(weight, block.getWeight()); + allValues.addAll(block.getValues(), block.getWeight()); } if (newWeight < 0) { - newWeight = 0; - for (T value : newValues) { - newWeight = LongMath.saturatedAdd(newWeight, Caches.weigh(value)); + if (newValues.size() == 1) { + // Optimize weighing of the common value state as single single-element bag state. + newWeight = Caches.weigh(newValues.get(0)); + } else { + newWeight = Caches.weigh(newValues); } } - allValues.addAll(newValues); - weight = LongMath.saturatedAdd(weight, newWeight); + allValues.addAll(newValues, newWeight); - cache.put( - IterableCacheKey.INSTANCE, - new MutatedBlocks<>(Block.fromValues(allValues.build(), weight, null))); + cache.put(IterableCacheKey.INSTANCE, new MutatedBlocks<>(Block.mutatedBlock(allValues))); } class CachingStateIterator implements PrefetchableIterator { @@ -604,7 +580,8 @@ public boolean hasNext() { return false; } // Release the block while we are loading the next one. - currentBlock = Block.emptyBlock(); + currentBlock = + Block.fromValues(WeightedList.of(Collections.emptyList(), 0L), ByteString.EMPTY); @Nullable Blocks existing = cache.peek(IterableCacheKey.INSTANCE); boolean isFirstBlock = ByteString.EMPTY.equals(nextToken); diff --git a/sdks/python/apache_beam/pipeline.py b/sdks/python/apache_beam/pipeline.py index 6ef06abb7436..2d28a03d091b 100644 --- a/sdks/python/apache_beam/pipeline.py +++ b/sdks/python/apache_beam/pipeline.py @@ -564,6 +564,8 @@ def run(self, test_runner_api: Union[bool, str] = 'AUTO') -> 'PipelineResult': self.options.get_all_options( retain_unknown_options=True, display_warnings=True) + # logging.exception("all_options:" + str(opts)) + logging.error('runner class:' + str(self.runner)) for error_handler in self._error_handlers: error_handler.verify_closed() @@ -867,7 +869,7 @@ def _infer_result_type( inputs: Sequence[Union[pvalue.PBegin, pvalue.PCollection]], result_pcollection: Union[pvalue.PValue, pvalue.DoOutputsTuple]) -> None: """Infer and set the output element type for a PCollection. - + This function determines the output types of transforms by combining: 1. Concrete input types from previous transforms 2. Type hints declared on the current transform @@ -878,43 +880,43 @@ def _infer_result_type( Type variables (K, V, T, etc.) act as placeholders that get bound to concrete types through pattern matching. This requires both an input pattern and an output template: - + Input Pattern (from .with_input_types()): Defines where in the input to find each type variable Example: Tuple[K, V] means "K is the first element, V is the second" - + Output Template (from .with_output_types()): Defines how to use the bound variables in the output Example: Tuple[V, K] means "swap the positions" - + CONCRETE TYPES VS TYPE VARIABLES --------------------------------- The system handles these differently: - + Concrete Types (e.g., str, int, Tuple[str, int]): - Used as-is without any binding - Do not fall back to Any - Example: .with_output_types(Tuple[str, int]) → Tuple[str, int] - + Type Variables (e.g., K, V, T): - Must be bound through pattern matching - Require .with_input_types() to provide the pattern - Fall back to Any if not bound - Example without pattern: Tuple[K, V] → Tuple[Any, Any] - Example with pattern: Tuple[K, V] → Tuple[str, int] - + BINDING ALGORITHM ----------------- 1. Match: Compare input pattern to concrete input Pattern: Tuple[K, V] Concrete: Tuple[str, int] Result: {K: str, V: int} ← Bindings created - + 2. Substitute: Apply bindings to output template Template: Tuple[V, K] ← Note: swapped! Bindings: {K: str, V: int} Result: Tuple[int, str] ← Swapped concrete types - + Each transform operates in its own type inference scope. Type variables declared in a parent composite transform do NOT automatically propagate to child transforms. @@ -925,36 +927,36 @@ class MyComposite(PTransform): def expand(self, pcoll): # Child scope - parent's K, V are NOT available return pcoll | ChildTransform() - + Type variables that remain unbound after inference fall back to Any: - + EXAMPLES -------- Example 1: Concrete types (no variables) Input: Tuple[str, int] Transform: .with_output_types(Tuple[str, int]) Output: Tuple[str, int] ← Used as-is - + Example 2: Type variables with pattern (correct) Input: Tuple[str, int] Transform: .with_input_types(Tuple[K, V]) .with_output_types(Tuple[V, K]) Binding: {K: str, V: int} Output: Tuple[int, str] ← Swapped! - + Example 3: Type variables without pattern (falls back to Any) Input: Tuple[str, int] Transform: .with_output_types(Tuple[K, V]) ← No input pattern! Binding: None (can't match) Output: Tuple[Any, Any] ← Fallback - + Example 4: Mixed concrete and variables Input: Tuple[str, int] Transform: .with_input_types(Tuple[str, V]) .with_output_types(Tuple[str, V]) Binding: {V: int} ← Only V needs binding Output: Tuple[str, int] ← str passed through, V bound to int - + Args: transform: The PTransform being applied inputs: Input PCollections (provides concrete types) diff --git a/sdks/python/apache_beam/runners/direct/direct_runner.py b/sdks/python/apache_beam/runners/direct/direct_runner.py index 73b0321b5de4..8a2f37a9f583 100644 --- a/sdks/python/apache_beam/runners/direct/direct_runner.py +++ b/sdks/python/apache_beam/runners/direct/direct_runner.py @@ -206,7 +206,7 @@ def visit_transform(self, applied_ptransform): # Check whether all transforms used in the pipeline are supported by the # PrismRunner if _PrismRunnerSupportVisitor().accept(pipeline, self._is_interactive): - _LOGGER.info('Running pipeline with PrismRunner.') + _LOGGER.error('Running pipeline with PrismRunner.') from apache_beam.runners.portability import prism_runner runner = prism_runner.PrismRunner() @@ -214,9 +214,10 @@ def visit_transform(self, applied_ptransform): pr = runner.run_pipeline(pipeline, options) # This is non-blocking, so if the state is *already* finished, something # probably failed on job submission. + _LOGGER.error('PrismRunner state:' + str(pr.state)) if (PipelineState.is_terminal(pr.state) and pr.state != PipelineState.DONE): - _LOGGER.info( + _LOGGER.error( 'Pipeline failed on PrismRunner, falling back to DirectRunner.') runner = BundleBasedDirectRunner() else: @@ -225,8 +226,8 @@ def visit_transform(self, applied_ptransform): # If prism fails in Preparing the portable job, then the PortableRunner # code raises an exception. Catch it, log it, and use the Direct runner # instead. - _LOGGER.info('Exception with PrismRunner:\n %s\n' % (e)) - _LOGGER.info('Falling back to DirectRunner') + _LOGGER.error('Exception with PrismRunner:\n %s\n' % (e)) + _LOGGER.error('Falling back to DirectRunner') runner = BundleBasedDirectRunner() # Check whether all transforms used in the pipeline are supported by the @@ -240,6 +241,7 @@ def visit_transform(self, applied_ptransform): provision_info = fn_runner.ExtendedProvisionInfo( beam_provision_api_pb2.ProvisionInfo( pipeline_options=encoded_options)) + _LOGGER.error("Use FnApiRunner") runner = fn_runner.FnApiRunner(provision_info=provision_info) return runner.run_pipeline(pipeline, options) diff --git a/sdks/python/apache_beam/runners/portability/fn_api_runner/worker_handlers.py b/sdks/python/apache_beam/runners/portability/fn_api_runner/worker_handlers.py index d79b381f2d78..3cbc6878887f 100644 --- a/sdks/python/apache_beam/runners/portability/fn_api_runner/worker_handlers.py +++ b/sdks/python/apache_beam/runners/portability/fn_api_runner/worker_handlers.py @@ -475,6 +475,7 @@ def __init__( ("grpc.http2.min_ping_interval_without_data_ms", 20_000), ] + _LOGGER.error("Starting a few grpc server with options:" + str(options)) self.state = state self.provision_info = provision_info self.control_server = grpc.server( diff --git a/sdks/python/apache_beam/runners/worker/data_plane.py b/sdks/python/apache_beam/runners/worker/data_plane.py index cbd28f8b0a3f..92b713ae8831 100644 --- a/sdks/python/apache_beam/runners/worker/data_plane.py +++ b/sdks/python/apache_beam/runners/worker/data_plane.py @@ -472,6 +472,8 @@ def __init__(self, data_buffer_time_limit_ms=0): self._closed = False self._exception = None # type: Optional[Exception] + _LOGGER.error("start grpc data channel") + def close(self): # type: () -> None self._to_send.put(self._WRITES_FINISHED) diff --git a/sdks/python/setup.py b/sdks/python/setup.py index 176c84c9966b..a0f177803de1 100644 --- a/sdks/python/setup.py +++ b/sdks/python/setup.py @@ -186,6 +186,7 @@ def find_by_ext(root_dir, ext): # We must generate protos after setup_requires are installed. +# Add some comments here. def generate_protos_first(): try: # Pyproject toml build happens in isolated environemnts. In those envs,