Skip to content

Auto-size num.stream.threads from topology and replica count#119

Open
suresh-prakash wants to merge 8 commits into
mainfrom
dynamic-stream-threads
Open

Auto-size num.stream.threads from topology and replica count#119
suresh-prakash wants to merge 8 commits into
mainfrom
dynamic-stream-threads

Conversation

@suresh-prakash
Copy link
Copy Markdown
Contributor

@suresh-prakash suresh-prakash commented Jun 1, 2026

Summary

Introduces a per-instance auto-sizing path for num.stream.threads. When an app opts in by (a) overriding getStreamThreadsCountResolver() and (b) setting dynamic.num.stream.threads = true in its streams config, the framework derives the right thread count from the topology's partition layout and the deployment's replica count — keeping every stream task active without idle threads or noisy under-allocation.

The new threading package contains:

  • DynamicStreamThreadsCountCalculator — sums the maximum partition count across each sub-topology's source topics (one task per partition per sub-topology) and divides by the replica count to get per-instance threads. Returns OptionalInt.empty() when the topology contains a regex/pattern source or no source partitions are resolvable on the broker — caller keeps the configured value.
  • StreamThreadsCountResolver — bridges topology + AdminClient to the calculator. Returns OptionalInt.empty() on any failure (broker outage, missing replica count, AdminClient errors) so the app keeps the configured num.stream.threads.

KafkaStreamsApp exposes:

  • A new overrideable hook getStreamThreadsCountResolver() returning Optional.empty() by default. Apps that want auto-sizing override it.
  • A new framework-only config key KafkaStreamsApp.DYNAMIC_NUM_STREAM_THREADS_CONFIG = "dynamic.num.stream.threads" (boolean). The framework reads and strips this flag before the config reaches Kafka Streams, so it never hits StreamsConfig validation.

Why

Today every Hypertrace Kafka Streams app hardcodes num.stream.threads per-cluster in its Helm values. As topics grow partitions or as replica counts change, the hardcoded value drifts from the optimal — leading to idle threads or under-provisioned consumption. This change lets ops flip dynamic.num.stream.threads = true per-cluster and have the right number computed at startup based on actual broker state.

Why a separate boolean flag (vs. a "DYNAMIC" sentinel on num.stream.threads)

Per review discussion (#119 (comment)):

  • Type safety. A string sentinel on num.stream.threads works only because nothing in the current pipeline constructs StreamsConfig from the map before doInit() substitutes the int. Any future caller that touches the map (validation, logging, a feature-flag read) would throw ConfigException. A separate boolean key removes that landmine.
  • No hardcoded static fallback. With a sentinel, "DYNAMIC set but resolver returned empty" needs an arbitrary integer fallback (the prior PR used 8, wrong for any app whose static value isn't 8). With the boolean flag, the configured numeric num.stream.threads is the fallback — by definition the value the app would have run with had they not opted in.
  • Dual opt-in preserved. Service overrides getStreamThreadsCountResolver() AND config sets dynamic.num.stream.threads = true. Per-cluster rollout/rollback intact.

Safety

This is a library change consumed by every Hypertrace Kafka Streams app. The flow is engineered to be invisible to apps that don't opt in:

  • Apps that don't set dynamic.num.stream.threads (everyone today) take the same code path as before — num.stream.threads flows through unchanged.
  • The entire doInit() body is wrapped in try { ... } catch (Exception e) { log; System.exit(1); } (existing envelope). Any unexpected failure is handled there.
  • When the flag is true but resolution can't produce a value (no resolver, resolver throws, broker unreachable, regex source, replica count missing), the configured numeric num.stream.threads is left untouched.

Other changes

  • Fixes a typo'd import Optional; (missing java.util.) in KafkaStreamsApp.java.
  • Adds mockito-junit-jupiter (via commonLibs.mockito.junit) so @ExtendWith(MockitoExtension.class) resolves.

Rebase notes

Rebased onto post-BOM main (#120 merged as 94e63d8). The conflict in kafka-streams-framework/build.gradle.kts was resolved by keeping the new commonLibs.* / localLibs.* form and routing the new mockito-junit dep through commonLibs.mockito.junit. Per-module gradle.lockfiles were regenerated via ./gradlew resolveAndLockAll --write-locks.

Test plan

  • Unit test: single sub-topology partition→threads math
  • Unit test: multi sub-topology summation
  • Unit test: absent topic counts as zero partitions
  • Unit test: zero/negative replicas throws (calculator)
  • Unit test: total tasks zero returns empty (configured value preserved)
  • Unit test: pattern subscription returns empty
  • Unit test: resolver returns empty on calculator throw / missing-zero-negative replica count
  • Unit test: resolver delegates to calculator with configured replicas
  • SampleAppTest: dynamic enabled with no resolver / pattern source / throwing resolver — all preserve configured num.stream.threads and strip the framework-only flag
  • ./gradlew clean build green locally
  • Reviewer to validate the safety guarantee for non-opted-in apps (no behavior change expected)

@github-actions
Copy link
Copy Markdown

github-actions Bot commented Jun 1, 2026

Test Results

 17 files  + 2   17 suites  +2   28s ⏱️ -2s
 82 tests +14   82 ✅ +14  0 💤 ±0  0 ❌ ±0 
100 runs  +14  100 ✅ +14  0 💤 ±0  0 ❌ ±0 

Results for commit c5a7bd0. ± Comparison against base commit 94e63d8.

This pull request removes 7 and adds 21 tests. Note that renamed tests count towards both.
org.hypertrace.core.kafkastreams.framework.rocksdb.BoundedMemoryConfigSetterTest ‑ [1] {application.id=app-1, rocksdb.compaction.style=LEVEL, rocksdb.max.write.buffers=2, rocksdb.direct.reads.enabled=true, rocksdb.write.buffer.size=8388608, rocksdb.block.size=8388608, rocksdb.compression.type=SNAPPY_COMPRESSION, rocksdb.log.level=INFO_LEVEL}
org.hypertrace.core.kafkastreams.framework.rocksdb.BoundedMemoryConfigSetterTest ‑ [1] {rocksdb.compaction.style=UNIVERSAL, rocksdb.periodic.compaction.seconds=60, application.id=app-2, rocksdb.compaction.universal.max.size.amplification.percent=50, rocksdb.compaction.universal.compression.size.percent=40}
org.hypertrace.core.kafkastreams.framework.rocksdb.BoundedMemoryConfigSetterTest ‑ [2] {application.id=app-2, rocksdb.compaction.style=UNIVERSAL, rocksdb.max.write.buffers=3, rocksdb.direct.reads.enabled=true, rocksdb.write.buffer.size=8388607, rocksdb.block.size=8388609, rocksdb.compression.type=SNAPPY_COMPRESSION, rocksdb.log.level=DEBUG_LEVEL}
org.hypertrace.core.kafkastreams.framework.rocksdb.BoundedMemoryConfigSetterTest ‑ [3] {application.id=app-3, rocksdb.compaction.style=FIFO, rocksdb.max.write.buffers=4, rocksdb.direct.reads.enabled=false, rocksdb.write.buffer.size=8388609, rocksdb.block.size=8388607, rocksdb.compression.type=SNAPPY_COMPRESSION, rocksdb.log.level=ERROR_LEVEL}
org.hypertrace.core.kafkastreams.framework.rocksdb.BoundedMemoryConfigSetterTest ‑ [3] {rocksdb.cache.high.priority.pool.ratio=-0.1, application.id=app-3}
org.hypertrace.core.kafkastreams.framework.rocksdb.BoundedMemoryConfigSetterTest ‑ [4] {rocksdb.cache.high.priority.pool.ratio=1.1, application.id=app-2}
org.hypertrace.core.kafkastreams.framework.rocksdb.BoundedMemoryConfigSetterTest ‑ [5] {application.id=app-5, rocksdb.cache.write.buffers.ratio=0.9, rocksdb.cache.high.priority.pool.ratio=0.2}
org.hypertrace.core.kafkastreams.framework.SampleAppTest ‑ dynamicWithPatternSourceKeepsConfiguredValue()
org.hypertrace.core.kafkastreams.framework.SampleAppTest ‑ dynamicWithThrowingResolverKeepsConfiguredValue()
org.hypertrace.core.kafkastreams.framework.SampleAppTest ‑ dynamicWithoutResolverKeepsConfiguredValue()
org.hypertrace.core.kafkastreams.framework.rocksdb.BoundedMemoryConfigSetterTest ‑ [1] {rocksdb.periodic.compaction.seconds=60, rocksdb.compaction.style=UNIVERSAL, rocksdb.compaction.universal.compression.size.percent=40, rocksdb.compaction.universal.max.size.amplification.percent=50, application.id=app-2}
org.hypertrace.core.kafkastreams.framework.rocksdb.BoundedMemoryConfigSetterTest ‑ [1] {rocksdb.write.buffer.size=8388608, rocksdb.direct.reads.enabled=true, rocksdb.max.write.buffers=2, rocksdb.compaction.style=LEVEL, application.id=app-1, rocksdb.log.level=INFO_LEVEL, rocksdb.compression.type=SNAPPY_COMPRESSION, rocksdb.block.size=8388608}
org.hypertrace.core.kafkastreams.framework.rocksdb.BoundedMemoryConfigSetterTest ‑ [2] {rocksdb.write.buffer.size=8388607, rocksdb.direct.reads.enabled=true, rocksdb.max.write.buffers=3, rocksdb.compaction.style=UNIVERSAL, application.id=app-2, rocksdb.log.level=DEBUG_LEVEL, rocksdb.compression.type=SNAPPY_COMPRESSION, rocksdb.block.size=8388609}
org.hypertrace.core.kafkastreams.framework.rocksdb.BoundedMemoryConfigSetterTest ‑ [3] {application.id=app-3, rocksdb.cache.high.priority.pool.ratio=-0.1}
org.hypertrace.core.kafkastreams.framework.rocksdb.BoundedMemoryConfigSetterTest ‑ [3] {rocksdb.write.buffer.size=8388609, rocksdb.direct.reads.enabled=false, rocksdb.max.write.buffers=4, rocksdb.compaction.style=FIFO, application.id=app-3, rocksdb.log.level=ERROR_LEVEL, rocksdb.compression.type=SNAPPY_COMPRESSION, rocksdb.block.size=8388607}
org.hypertrace.core.kafkastreams.framework.rocksdb.BoundedMemoryConfigSetterTest ‑ [4] {application.id=app-2, rocksdb.cache.high.priority.pool.ratio=1.1}
org.hypertrace.core.kafkastreams.framework.rocksdb.BoundedMemoryConfigSetterTest ‑ [5] {application.id=app-5, rocksdb.cache.high.priority.pool.ratio=0.2, rocksdb.cache.write.buffers.ratio=0.9}
…

♻️ This comment has been updated with latest results.

@suresh-prakash suresh-prakash marked this pull request as ready for review June 1, 2026 06:29
@suresh-prakash suresh-prakash requested a review from a team as a code owner June 1, 2026 06:29
suresh-prakash and others added 2 commits June 2, 2026 12:36
Introduce StreamThreadsCountResolver and DynamicStreamThreadsCountCalculator
under a new `threading` package, and wire them into KafkaStreamsApp via
an overrideable getStreamThreadsCountResolver() hook.

When num.stream.threads is set to the sentinel "DYNAMIC", the framework
sums the maximum partition count across each sub-topology's source topics
and divides by the configured replica count, producing a per-instance
thread count that keeps every task active without idle threads.

Apps that don't opt in are unaffected: isDynamic() short-circuits unless
the value is literally "DYNAMIC", and the resolution path is wrapped in
try/catch so any unexpected failure logs and leaves the streams config
untouched. Bad broker calls or absent topics fall back to a safe default
of 8 threads instead of preventing startup.

Also fixes a typo'd `import Optional;` left from an earlier refactor.

Tests cover the partition math, multi-subtopology summation, absent-topic
handling, replica-count edge cases, sentinel detection, and the resolver's
fallback behavior.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
Adds a concrete code snippet showing how a consumer app wires its own
replica-count source through StreamThreadsCountResolver, so adopters
don't have to read the calculator/resolver internals to integrate.
Also clarifies that apps not overriding the hook are unaffected.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
@suresh-prakash suresh-prakash force-pushed the dynamic-stream-threads branch from 9187236 to 6299517 Compare June 2, 2026 07:10
@suresh-prakash suresh-prakash changed the title Add dynamic num.stream.threads sizing for Kafka Streams apps Auto-size num.stream.threads from topology and replica count Jun 2, 2026
@suresh-prakash
Copy link
Copy Markdown
Contributor Author

traceable-java-code-review summary

Scope: 5 source/test files, 509 additions, 0 deletions. New threading package + opt-in hook in KafkaStreamsApp. Build/lockfile delta is just adding commonLibs.mockito.junit (needed by MockitoExtension).

Lockfile scan

scan-gradle-lockfiles.sh ran clean: no skew, no downgrades, no settings-gradle.lockfile.

Substantive findings

  1. CLASSIST: StreamThreadsCountResolverTest mocks an internal class. @Mock private DynamicStreamThreadsCountCalculator calculator; mocks a same-package collaborator, and StreamThreadsCountResolver exposes a package-private constructor StreamThreadsCountResolver(calculator, supplier) purely so the test can inject the mock. Both Rule 1 ("mock ONLY external dependencies") and Rule 4 ("never modify production code — visibility, factory hooks — for testing convenience") apply.

    The actual external is AdminClient, which the resolver builds internally inside resolve(...). Fix: invert the dependency so StreamThreadsCountResolver accepts a Function<Map<String,Object>, AdminClient> (or Supplier<AdminClient> once you've already factored in streamsProperties). Then the test uses the real DynamicStreamThreadsCountCalculator and stubs AdminClient.describeTopics(...) — i.e. exercises the actual computation path against a mocked external. The package-private constructor goes away.

    This is a code-quality flag for OSS, not a blocker.

  2. catch (Throwable) is too broad in KafkaStreamsApp.resolveDynamicStreamThreads. Defensive intent is correct, but Throwable swallows Error (OOM, LinkageError, StackOverflowError) — the JVM should die in those cases, not start in a partially-initialized state with the streams config silently untouched. catch (Exception) covers the documented failure modes (resolver bug, AdminClient timeout, NPE, classloader misses for missing transitive deps surface as RuntimeException/NoClassDefFoundError-wrapping cases — but NoClassDefFoundError is an Error, so reconsider whether you actually want that swallowed; my read is no — fail loudly).

  3. Side-effecting parameter in resolveDynamicStreamThreads(Map streamsProperties). The method's name suggests a query, but it mutates the caller's map via .put(NUM_STREAM_THREADS_CONFIG, resolved). Either rename to applyDynamicStreamThreads / overrideDynamicStreamThreads, or return OptionalInt and let doInit() apply the override at the call site (more readable, easier to reason about). The current call site happens to pass a freshly-built streamsConfig, so this is style not correctness — but the next reader has to read the body to know there's a side effect.

Not flagged

  • catch (Throwable t) { log; } after inheritance/composition pattern — KafkaStreamsApp continues to extend PlatformService, but that's pre-existing and not in scope.
  • protected Topology topology; // Visible for testing only — pre-existing, not touched by this PR.
  • The getStreamThreadsCountResolver() hook returning Optional.empty() by default is a clean opt-in extension point. Good shape.

Other notes

  • Javadoc on getStreamThreadsCountResolver() is excellent — concrete override example with ConfigUtils.optionalInteger(...) + optionalReplicaCount(...). Reduces adoption friction for downstream apps.
  • DynamicStreamThreadsCountCalculator.compute correctly handles per-sub-topology max (not sum) of source-topic partitions. The "absent topic counts as zero" branch in describePartitions is the right call given the warn-and-restart semantic — alternative would have been to fail closed, which I think is wrong for dynamic.
  • StreamThreadsCountResolverTest matchers were tightened from anyInt() to eq(8) in the latest push (CLASSIST Rule 5 ✓).
  • DynamicStreamThreadsCountCalculatorTest is well-shaped: real calculator, only mocks AdminClient (the actual external). This is the model the resolver test should follow.

Reviewed using @claude traceable:traceable-java-code-review


💡 Want this skill on your own PRs? Install the Traceable plugin from the Harness internal marketplace:

/plugin marketplace add https://git0.harness.io/l7B_kbSEQD2wjrM7PShm5w/PROD/Harness_Commons/claude-code-marketplace.git
/plugin install traceable@harness-internal-claude-plugins

Then run /skill traceable:traceable-java-code-review <pr-url>. Includes catalog/lockfile scanning, gRPC → MongoDB index coverage checks, CLASSIST testing review, and more.

- Invert resolver dep: take AdminClient factory in constructor instead
  of building inside resolve(). Removes the package-private test-only
  constructor and lets the test mock the actual external (AdminClient)
  while exercising the real DynamicStreamThreadsCountCalculator.
- Drop @mock on the calculator in the resolver test; route stubs
  through AdminClient.describeTopics(...) so the test asserts on the
  real partition→threads math instead of stubbed return values.
- Narrow catch (Throwable) to catch (Exception) in
  resolveDynamicStreamThreads so JVM Errors (OOM, LinkageError) keep
  surfacing instead of being silently swallowed at startup.
- Return OptionalInt from resolveDynamicStreamThreads and apply the
  override at the doInit() call site, so the helper no longer mutates
  the caller's map under a query-shaped name.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
return subtopology.nodes().stream()
.filter(node -> node instanceof Source)
.map(node -> (Source) node)
.flatMap(source -> source.topicSet().stream())
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Curious whether topic pattern subscriptions are in scope here — source.topicSet() returns empty when topics are subscribed via regex, which would make those sub-topologies contribute 0 partitions. Is this a known limitation or something worth handling?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch — Source.topicSet() is empty for regex subscriptions and topicPattern() would be the way to handle them. None of the current Hypertrace Kafka Streams apps subscribe via regex, so I've documented this as a known limitation in the class javadoc rather than handling it now (would need an extra AdminClient.listTopics() + Pattern match round-trip). Apps that adopt regex shouldn't opt into DYNAMIC until that's added (736f37d).

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If any sub-topology uses a pattern subscription, the dynamic calculation is fundamentally incomplete — it will silently under-count tasks and under-provision threads. Rather than
documenting this and hoping apps don't hit it, the resolver should detect it and return empty:

for (final Subtopology subtopology : description.subtopologies()) {                                                                                                               
  boolean hasPatternSource = subtopology.nodes().stream()                                                                                                                         
      .filter(node -> node instanceof Source)                                                                                                                                     
      .map(node -> (Source) node)                                                                                                                                                 
      .anyMatch(source -> source.topicPattern() != null);                                                                                                                         
                                                                                                                                                                                  
  if (hasPatternSource) {                                                                                                                                                         
    logger.warn("Sub-topology uses pattern subscription; "                                                                                                                        
        + "dynamic thread calculation not supported. "                                                                                                                            
        + "Keeping configured num.stream.threads.");                                                                                                                              
    return OptionalInt.empty();                                                                                                                                                   
  }                                                                                                                                                                               
}  

And the caller simply doesn't override:

Optional<StreamThreadsCountResolver> resolver = getStreamThreadsCountResolver();                                                                                                  
if (resolver.isPresent()) {                                                                                                                                                       
  OptionalInt computed = resolver.get().resolve(topology, streamsConfig);                                                                                                         
  if (computed.isPresent()) {                                                                                                                                                     
    streamsConfig.put(NUM_STREAM_THREADS_CONFIG, computed.getAsInt());                                                                                                            
  }                                                                                                                                                                               
  // else: leave num.stream.threads as-is — configured value wins                                                                                                                 
} 

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch — bumping this from a doc note to detection is the right call. Currently a regex-only sub-topology silently contributes 0 tasks, and an all-regex topology lands on totalTasks == 0 → 1 thread (severe under-provisioning, no signal). Switched compute() to return OptionalInt, added the pattern pre-check, and the call site at the framework layer translates empty → FALLBACK_NUM_STREAM_THREADS via a single .orElse(...) (so all "cannot resolve" paths collapse there). Test added covering the regex-source path end-to-end (1c5c6ce).

result.topicNameValues().entrySet()) {
try {
final TopicDescription description =
entry.getValue().get(DESCRIBE_TOPICS_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The get(timeout) calls on individual futures run sequentially in a loop, so with N source topics the worst-case wait at startup is N × 5 seconds. Would result.allTopicNames().get(timeout) work here to wait for all topics in one call instead?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Excellent point — switched to KafkaFuture.allOf(futures).get(timeout) in 736f37d so the worst-case wait is one shared 5s deadline instead of N×5s. Per-future get() after allOf completes is non-blocking, so the per-topic UnknownTopicOrPartitionException handling (warn + treat as 0 partitions) is preserved.

On retries: AdminClient already retries the underlying RPC internally per its own retries / retry.backoff.ms config (defaults: Integer.MAX_VALUE retries, 100ms backoff, capped by request.timeout.ms / default.api.timeout.ms). Adding our own retry loop on top would double-retry. The 5s deadline is the connection cap; if Kafka isn't reachable in 5s, we fall back to 8 threads at the resolver layer and the app still starts.

public class StreamThreadsCountResolver {

public static final String DYNAMIC_SENTINEL = "DYNAMIC";
static final int FALLBACK_NUM_STREAM_THREADS = 8;
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The fallback of 8 works as a safe default — would it be worth a brief comment explaining the reasoning behind this value? Also wondering if making it configurable would be useful for apps that might want a different fallback.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Documented the rationale on the constant in 736f37d: it's a reasonable default for current Hypertrace Kafka Streams apps. On configurability — leaning YAGNI: apps that need a different value should configure a numeric num.stream.threads explicitly rather than opting into dynamic sizing. The constant is now public so apps can reference it from their own override if needed. Happy to make it configurable later if a real use-case shows up.

return OptionalInt.empty();
}
Optional<StreamThreadsCountResolver> resolver = getStreamThreadsCountResolver();
if (resolver.isEmpty()) {
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If num.stream.threads = DYNAMIC but getStreamThreadsCountResolver() returns empty, the warning logs but DYNAMIC stays in the config — which Kafka Streams won't be able to parse as an integer on startup. Would it make sense to substitute FALLBACK_NUM_STREAM_THREADS here so the app can still start?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Genuine bug — fixed in 736f37d. The resolver-missing branch now substitutes FALLBACK_NUM_STREAM_THREADS instead of leaving the literal "DYNAMIC" in config. Also restructured the helper around explicit gates per Suresh's earlier feedback (no more try/catch (Exception) swallowing prerequisites): the call site now guards on isDynamic(...) upfront, the helper always returns an int, and the only remaining catch is a narrow one around the user-overridable getStreamThreadsCountResolver() (which could throw if an app's override is buggy) — that path also returns the fallback. Net result: "DYNAMIC" never reaches Kafka Streams config.

suresh-prakash and others added 2 commits June 3, 2026 13:48
- Replace throw-and-catch in resolver with explicit if-gate: a
  non-positive replica count (zero, negative, or absent) logs and
  returns FALLBACK_NUM_STREAM_THREADS. optionalReplicaCount() now
  yields 0 instead of throwing.
- Calculator: replace per-topic get(timeout) loop with
  KafkaFuture.allOf(...).get(timeout) so worst-case startup wait is
  one deadline instead of N. Per-future UnknownTopicOrPartitionException
  handling is preserved.
- KafkaStreamsApp: when num.stream.threads = DYNAMIC and no resolver
  is provided (or the override throws), substitute
  FALLBACK_NUM_STREAM_THREADS so the literal "DYNAMIC" never reaches
  Kafka Streams config (it can't parse the string).
- Promote FALLBACK_NUM_STREAM_THREADS to public so callers can
  reference the same constant.
- Document regex/pattern subscription limitation in the calculator
  javadoc.
- Document fallback rationale on FALLBACK_NUM_STREAM_THREADS.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
Cover the two regression cases surfaced in review: when num.stream.threads
is set to DYNAMIC but the app has no resolver wired up, and when the
resolver-supplier throws. In both cases the framework must replace the
literal sentinel with FALLBACK_NUM_STREAM_THREADS so Kafka Streams gets a
parseable integer.
kunalvaswani123
kunalvaswani123 previously approved these changes Jun 3, 2026
@pavan-traceable
Copy link
Copy Markdown

Minor suggestion on describePartitions — the two-phase approach (awaitAll + extract loop) works correctly but adds indirection that makes it harder to reason about at a glance. In particular, the swallowed ExecutionException in awaitAll looks like a bug on first read until you trace it to the per-topic handling below.

Since describeTopics() fires all requests concurrently at call time, a single loop with a shared deadline gives the same behavior — same total timeout, same parallelism — with a more linear flow:

private Map<String, Integer> describePartitions(AdminClient adminClient, Set<String> topics) {
  if (topics.isEmpty()) {
    return Map.of();
  }

  Map<String, KafkaFuture<TopicDescription>> futures =
      adminClient.describeTopics(topics).topicNameValues();

  long deadline = System.currentTimeMillis() + DESCRIBE_TOPICS_TIMEOUT_MILLIS;
  Map<String, Integer> partitions = new HashMap<>();

  for (Entry<String, KafkaFuture<TopicDescription>> entry : futures.entrySet()) {
    long remaining = deadline - System.currentTimeMillis();
    if (remaining <= 0) {
      throw new RuntimeException("Timed out describing topics");
    }
    try {
      partitions.put(
          entry.getKey(),
          entry.getValue().get(remaining, TimeUnit.MILLISECONDS).partitions().size());
    } catch (TimeoutException e) {
      throw new RuntimeException("Timed out describing topic " + entry.getKey(), e);
    } catch (InterruptedException e) {
      Thread.currentThread().interrupt();
      throw new RuntimeException("Interrupted describing topics", e);
    } catch (ExecutionException e) {
      if (e.getCause() instanceof UnknownTopicOrPartitionException) {
        logger.warn("Topic absent on broker: {}. Treating as 0 partitions.", entry.getKey());
        partitions.put(entry.getKey(), 0);
      } else {
        throw new RuntimeException("Failed to describe topic " + entry.getKey(), e);
      }
    }
  }
  return Map.copyOf(partitions);
}

This keeps exception handling in one place and removes the implicit "must call A before B" contract between the two methods. Happy to hear if there's a specific reason the split was preferred — just thought I'd flag it as a readability improvement. 🙂

* Returns true if the {@code num.stream.threads} value in the given streams properties is the
* dynamic sentinel and should be resolved by this class.
*/
public static boolean isDynamic(final Map<String, Object> streamsProperties) {
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggestion: drop the "DYNAMIC" sentinel — use the resolver presence as the signal

The sentinel puts a string where Kafka expects an integer, relying on interception before config parsing. If anyone passes this map through StreamsConfig earlier in the pipeline, it'll throw ConfigException at runtime.

Since apps must already override getStreamThreadsCountResolver() to opt in, that alone can be the trigger:

Optional resolver = getStreamThreadsCountResolver();
if (resolver.isPresent()) {
streamsConfig.put(NUM_STREAM_THREADS_CONFIG, resolver.get().resolve(topology, streamsConfig));
}

This removes the type-unsafe sentinel, the isDynamic() check, and the fallback path for "DYNAMIC set but no resolver provided." One opt-in gate instead of two.

And also with the resolver-as-signal approach, num.stream.threads will always have a valid configured value. The hardcoded FALLBACK_NUM_STREAM_THREADS = 8 becomes unnecessary

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are 2 reasons why this was not done in the first place. Let's say in the scenario where we don't have the "DYNAMIC" sentinel and if the application opts-in by just overriding the getStreamThreadsCountResolver()

  1. The configuration could read "4" (static), while the actual resolved threads could be "8" (dynamically computed). To avoid this discrepancy, we want to be explicit at the config. level.
  2. We cannot achieve incremental rollout (or rollback in case of an issue) if it's not config. driven.

Hence, following a dual opt-in mechanism (The service has to opt-in and then the config. also has to).

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense — the incremental rollout and config-level clarity are valid reasons to keep it config-driven. I agree you don't want the resolver silently overriding a deliberately-set static value.

A small refinement to keep the dual opt-in but avoid the type-unsafe sentinel: could you use a separate boolean config key (e.g., stream.threads.dynamic.enabled = true) instead of overloading num.stream.threads with a string? That way:

  • num.stream.threads always has a valid integer (used as fallback if dynamic resolution fails)
  • You still get config-level opt-in/rollback per environment
  • No risk of ConfigException if the map hits StreamsConfig validation earlier in the pipeline

If you'd prefer to keep the current approach, it works — just wanted to flag the type-safety angle. Either way, the dual opt-in reasoning is sound.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right — adopted in c5a7bd0. Concretely:

  • New framework-only flag dynamic.num.stream.threads (boolean) on KafkaStreamsApp. Stripped from the streams config before it reaches Kafka, so it never hits StreamsConfig validation regardless of where the map flows.
  • Removed DYNAMIC_SENTINEL / isDynamic() / FALLBACK_NUM_STREAM_THREADS = 8 entirely.
  • Calculator's totalTasks == 0 path now returns OptionalInt.empty() instead of OptionalInt.of(1) — consistent with "no silent static fallback".
  • All not-resolvable paths (no resolver, resolver throws, resolver returns empty) leave the configured numeric num.stream.threads untouched. The configured value is the fallback, by definition.

Dual opt-in preserved (override getStreamThreadsCountResolver() + set dynamic.num.stream.threads = true), so per-cluster rollout/rollback still works without code changes. Thanks for pushing on this — much cleaner.

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @suresh-prakash for patiently addressing :)

"{} is set to DYNAMIC but no StreamThreadsCountResolver is provided; falling back to {}",
NUM_STREAM_THREADS_CONFIG,
StreamThreadsCountResolver.FALLBACK_NUM_STREAM_THREADS);
return StreamThreadsCountResolver.FALLBACK_NUM_STREAM_THREADS;
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we use just resolver to check its dynamic, then we can always fallback to numStreamThreads configured.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

return subtopology.nodes().stream()
.filter(node -> node instanceof Source)
.map(node -> (Source) node)
.flatMap(source -> source.topicSet().stream())
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If any sub-topology uses a pattern subscription, the dynamic calculation is fundamentally incomplete — it will silently under-count tasks and under-provision threads. Rather than
documenting this and hoping apps don't hit it, the resolver should detect it and return empty:

for (final Subtopology subtopology : description.subtopologies()) {                                                                                                               
  boolean hasPatternSource = subtopology.nodes().stream()                                                                                                                         
      .filter(node -> node instanceof Source)                                                                                                                                     
      .map(node -> (Source) node)                                                                                                                                                 
      .anyMatch(source -> source.topicPattern() != null);                                                                                                                         
                                                                                                                                                                                  
  if (hasPatternSource) {                                                                                                                                                         
    logger.warn("Sub-topology uses pattern subscription; "                                                                                                                        
        + "dynamic thread calculation not supported. "                                                                                                                            
        + "Keeping configured num.stream.threads.");                                                                                                                              
    return OptionalInt.empty();                                                                                                                                                   
  }                                                                                                                                                                               
}  

And the caller simply doesn't override:

Optional<StreamThreadsCountResolver> resolver = getStreamThreadsCountResolver();                                                                                                  
if (resolver.isPresent()) {                                                                                                                                                       
  OptionalInt computed = resolver.get().resolve(topology, streamsConfig);                                                                                                         
  if (computed.isPresent()) {                                                                                                                                                     
    streamsConfig.put(NUM_STREAM_THREADS_CONFIG, computed.getAsInt());                                                                                                            
  }                                                                                                                                                                               
  // else: leave num.stream.threads as-is — configured value wins                                                                                                                 
} 

// (UnknownTopicOrPartitionException → 0 partitions; everything else → fail).
}
}
}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor suggestion — the two-phase approach (awaitAll + extract loop) works correctly but adds indirection that's harder to reason about at a glance. The swallowed ExecutionException in awaitAll looks like a bug on first read until you trace it to the per-topic handling below.

Since describeTopics() fires all requests concurrently at call time, a single loop with a shared deadline gives the same behavior — same total timeout, same parallelism — with a more linear flow:

private Map<String, Integer> describePartitions(AdminClient adminClient, Set<String> topics) {
  if (topics.isEmpty()) {
    return Map.of();
  }

  Map<String, KafkaFuture<TopicDescription>> futures =
      adminClient.describeTopics(topics).topicNameValues();

  long deadline = System.currentTimeMillis() + DESCRIBE_TOPICS_TIMEOUT_MILLIS;
  Map<String, Integer> partitions = new HashMap<>();

  for (Entry<String, KafkaFuture<TopicDescription>> entry : futures.entrySet()) {
    long remaining = deadline - System.currentTimeMillis();
    if (remaining <= 0) {
      throw new RuntimeException("Timed out describing topics");
    }
    try {
      partitions.put(
          entry.getKey(),
          entry.getValue().get(remaining, TimeUnit.MILLISECONDS).partitions().size());
    } catch (TimeoutException e) {
      throw new RuntimeException("Timed out describing topic " + entry.getKey(), e);
    } catch (InterruptedException e) {
      Thread.currentThread().interrupt();
      throw new RuntimeException("Interrupted describing topics", e);
    } catch (ExecutionException e) {
      if (e.getCause() instanceof UnknownTopicOrPartitionException) {
        logger.warn("Topic absent on broker: {}. Treating as 0 partitions.", entry.getKey());
        partitions.put(entry.getKey(), 0);
      } else {
        throw new RuntimeException("Failed to describe topic " + entry.getKey(), e);
      }
    }
  }
  return Map.copyOf(partitions);
}

This keeps exception handling in one place, removes the implicit "must call A before B" contract, and eliminates the awaitAll helper entirely. Happy to hear if there was a specific reason for the split — just flagging as a readability improvement. 🙂

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Verified equivalence and adopted (1c5c6ce). Kunal's earlier round-1 fix was specifically about the N×5s case where each future had its own independent timeout. Both allOf(...).get(timeout) and deadline = now + timeout; future.get(remaining, MS) share a single 5s budget across all topics — AdminClient.describeTopics() fires all RPCs concurrently before returning futures, so iteration order in the loop only consumes the shared budget, doesn't change parallelism. Pure readability swap: dropped awaitAll, kept the per-topic UnknownTopicOrPartitionException → 0 partitions handling.

- Calculator returns OptionalInt; pre-pass on Source.topicPattern() returns
  empty when any sub-topology subscribes via regex (was silently undercount).
- Resolver also returns OptionalInt; non-positive replicas and AdminClient
  failures map to empty rather than each falling back independently.
- Framework collapses every "cannot resolve" path through one .orElse so
  FALLBACK_NUM_STREAM_THREADS is the only place the literal substitution
  happens.
- describePartitions inlines the awaitAll helper into a single deadline-
  driven loop. Behaviour identical (allOf + per-future get and now+timeout
  + per-future get(remaining) both share one 5s budget; AdminClient fires
  RPCs concurrently before returning futures), readability improved.
- New test covers regex-source topology -> empty all the way through.
Single source of truth for when the framework substitutes
FALLBACK_NUM_STREAM_THREADS — placed on the user-facing override point so
override authors don't have to chase the trail across resolver/calculator
to know what bypasses their wiring. Helper comment now points at this
javadoc instead of duplicating the list.
Per review: the "DYNAMIC" string sentinel for num.stream.threads is type-
unsafe — anything that constructs StreamsConfig from the map before doInit()
substitutes the int will throw ConfigException. And FALLBACK_NUM_STREAM_THREADS
= 8 is wrong for any app whose static value isn't 8.

- Drop DYNAMIC_SENTINEL / isDynamic() / FALLBACK_NUM_STREAM_THREADS.
- Add framework-only flag dynamic.num.stream.threads (boolean) on
  KafkaStreamsApp; stripped before the config reaches Kafka Streams.
- Calculator: totalTasks == 0 now returns OptionalInt.empty() instead of
  OptionalInt.of(1) — caller keeps the configured value.
- Resolver-empty / resolver-throws / no-resolver paths all leave the
  configured numeric num.stream.threads untouched. Configured value is the
  fallback by definition.

Dual opt-in preserved: app overrides getStreamThreadsCountResolver() AND
sets dynamic.num.stream.threads = true. Per-cluster rollout/rollback intact.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants