Skip to content

Commit e2fcfd9

Browse files
derrickburnsclaude
andcommitted
Enhance broadcast threshold diagnostics
Implements #2 priority from CLAUDE.md backlog: "Better broadcast-threshold diagnostics (include k × dim and configured threshold)". Changes: 1. **AutoAssignment** (Strategies.scala:316-377) - Added formatBroadcastSize() helper for human-readable sizes (B/KB/MB/GB) - Enhanced BroadcastUDF selection log with k, dim, size in elements and bytes - Comprehensive chunked broadcast warning with: * k×dim calculation vs threshold with overage % * Number of data scans required (Math.ceil(k / chunkSize)) * 4 actionable suggestions to improve performance * Calculation of max k supported for current configuration 2. **BroadcastUDFAssignment** (Strategies.scala:45-95) - Added formatBroadcastSize() helper - Enhanced debug logging with k, dim, and broadcast size - Proactive warning when broadcast exceeds 100MB (~12.5M elements) - Warning includes potential issues and 4 actionable mitigations 3. **BroadcastDiagnosticsSuite.scala** (new) - 7 comprehensive tests validating diagnostic messages: * AutoAssignment threshold exceeded → chunked selection * AutoAssignment below threshold → broadcast selection * BroadcastUDFAssignment large broadcast warning (>100MB) * formatBroadcastSize correctness across scales * Chunk count calculation (k=250, chunkSize=100 → 3 passes) * Threshold increase suggestions * Max k calculation for given dimensionality 4. **README.md** (lines 130-185) - Enhanced "Scaling & Assignment Strategy" section - Documented all assignment strategy options (auto/crossJoin/broadcastUDF/chunked) - Added "Broadcast Diagnostics" subsection with example warning output - Guidance on interpreting warnings and tuning configurations Validation: - All 7 new tests pass - Existing tests pass (verified with sbt test) - Diagnostic messages confirmed in test output - README examples match actual log output format Risk: Low - diagnostic messages only, no algorithm changes Compatibility: No API surface or persistence changes 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com>
1 parent c0af6c0 commit e2fcfd9

File tree

3 files changed

+352
-14
lines changed

3 files changed

+352
-14
lines changed

README.md

Lines changed: 47 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -130,20 +130,59 @@ libraryDependencies += "com.massivedatascience" %% "massivedatascience-clusterer
130130
## Scaling & Assignment Strategy (important)
131131

132132
Different divergences require different assignment mechanics at scale:
133-
- Squared Euclidean (SE) fast path — expression/codegen route:
133+
- **Squared Euclidean (SE) fast path** — expression/codegen route:
134134
1. Cross-join points with centers
135135
2. Compute squared distance column
136136
3. Prefer groupBy(rowId).min(distance) → join to pick argmin (scales better than window sorts)
137137
4. Requires a stable rowId; we provide a RowIdProvider.
138-
- General Bregman — broadcast + UDF route:
139-
- Broadcast the centers; compute argmin via a tight JVM UDF.
140-
- Broadcast ceiling: you'll hit executor/memory limits if k × dim is too large to broadcast.
138+
- **General Bregman** — broadcast + UDF route:
139+
- Broadcast the centers; compute argmin via a tight JVM UDF.
140+
- Broadcast ceiling: you'll hit executor/memory limits if k × dim is too large to broadcast.
141141

142142
**Parameters**
143-
- assignmentStrategy: StringParam = auto | crossJoin | broadcastUDF
144-
- auto chooses SE fast path when divergence == SE and feasible; otherwise broadcastUDF.
145-
- broadcastThreshold: IntParam (elements, not bytes)
146-
- Heuristic ceiling for k × dim to guard broadcasts. If exceeded for non-SE, we warn and keep the broadcastUDF path (no DF fallback exists for general Bregman).
143+
- `assignmentStrategy: StringParam = auto | crossJoin | broadcastUDF | chunked`
144+
- `auto` (recommended): Chooses SE fast path when divergence == SE; otherwise selects between broadcastUDF and chunked based on k×dim size
145+
- `crossJoin`: Forces SE expression-based path (only works with Squared Euclidean)
146+
- `broadcastUDF`: Forces broadcast + UDF (works with any divergence, but may OOM on large k×dim)
147+
- `chunked`: Processes centers in chunks to avoid OOM (multiple data scans, but safe for large k×dim)
148+
- `broadcastThreshold: IntParam` (elements, not bytes)
149+
- Default: 200,000 elements (~1.5MB)
150+
- Heuristic ceiling for k × dim. If exceeded for non-SE divergences, AutoAssignment switches to chunked broadcast.
151+
- `chunkSize: IntParam` (for chunked strategy)
152+
- Default: 100 clusters per chunk
153+
- Controls how many centers are processed in each scan when using chunked broadcast
154+
155+
**Broadcast Diagnostics**
156+
157+
The library provides detailed diagnostics to help you tune performance and avoid OOM errors:
158+
159+
```scala
160+
// Example: Large cluster configuration
161+
val gkm = new GeneralizedKMeans()
162+
.setK(500) // 500 clusters
163+
.setDivergence("kl") // Non-SE divergence
164+
// If your data has dim=1000, then k×dim = 500,000 elements
165+
166+
// AutoAssignment will log:
167+
// [WARN] AutoAssignment: Broadcast size exceeds threshold
168+
// Current: k=500 × dim=1000 = 500000 elements ≈ 3.8MB
169+
// Threshold: 200000 elements ≈ 1.5MB
170+
// Overage: +150%
171+
//
172+
// Using ChunkedBroadcast (chunkSize=100) to avoid OOM.
173+
// This will scan the data 5 times.
174+
//
175+
// To avoid chunking overhead, consider:
176+
// 1. Reduce k (number of clusters)
177+
// 2. Reduce dimensionality (current: 1000 dimensions)
178+
// 3. Increase broadcastThreshold (suggested: k=500 would need ~500000 elements)
179+
// 4. Use Squared Euclidean divergence if appropriate (enables fast SE path)
180+
```
181+
182+
**When you see these warnings:**
183+
- **Chunked broadcast selected**: Your configuration will work but may be slower due to multiple data scans. Follow the suggestions to improve performance.
184+
- **Large broadcast warning** (>100MB): Risk of executor OOM errors. Consider reducing k or dimensionality, or increasing executor memory.
185+
- **No warning**: Your configuration is well-sized for broadcasting.
147186

148187
---
149188

src/main/scala/com/massivedatascience/clusterer/ml/df/Strategies.scala

Lines changed: 81 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,20 @@ trait AssignmentStrategy extends Serializable {
4242
*/
4343
class BroadcastUDFAssignment extends AssignmentStrategy with Logging {
4444

45+
/** Format broadcast size with human-readable units. */
46+
private def formatBroadcastSize(elements: Long): String = {
47+
val bytes = elements * 8 // doubles are 8 bytes
48+
if (bytes < 1024) {
49+
f"${bytes}B"
50+
} else if (bytes < 1024 * 1024) {
51+
f"${bytes / 1024.0}%.1fKB"
52+
} else if (bytes < 1024 * 1024 * 1024) {
53+
f"${bytes / (1024.0 * 1024.0)}%.1fMB"
54+
} else {
55+
f"${bytes / (1024.0 * 1024.0 * 1024.0)}%.1fGB"
56+
}
57+
}
58+
4559
override def assign(
4660
df: DataFrame,
4761
featuresCol: String,
@@ -50,7 +64,36 @@ class BroadcastUDFAssignment extends AssignmentStrategy with Logging {
5064
kernel: BregmanKernel
5165
): DataFrame = {
5266

53-
logDebug(s"BroadcastUDFAssignment: assigning ${centers.length} clusters")
67+
val k = centers.length
68+
val dim = centers.headOption.map(_.length).getOrElse(0)
69+
val kTimesDim = k * dim
70+
val sizeStr = formatBroadcastSize(kTimesDim)
71+
72+
logDebug(
73+
s"BroadcastUDFAssignment: broadcasting k=$k clusters × dim=$dim = $kTimesDim elements ≈ $sizeStr"
74+
)
75+
76+
// Warn if broadcast size is very large (>100MB)
77+
val warningThreshold = 12500000 // ~100MB
78+
if (kTimesDim > warningThreshold) {
79+
val warningStr = formatBroadcastSize(warningThreshold)
80+
logWarning(
81+
s"""BroadcastUDFAssignment: Large broadcast detected
82+
| Size: k=$k × dim=$dim = $kTimesDim elements ≈ $sizeStr
83+
| This exceeds the recommended size for broadcasting ($warningStr).
84+
|
85+
| Potential issues:
86+
| - Executor OOM errors during broadcast
87+
| - Slow broadcast distribution across cluster
88+
| - Driver memory pressure
89+
|
90+
| Consider:
91+
| 1. Using ChunkedBroadcastAssignment for large k×dim
92+
| 2. Reducing k or dimensionality
93+
| 3. Increasing executor memory
94+
| 4. Using AutoAssignment strategy (automatically selects best approach)""".stripMargin
95+
)
96+
}
5497

5598
val spark = df.sparkSession
5699
val bcCenters = spark.sparkContext.broadcast(centers)
@@ -273,6 +316,20 @@ class AutoAssignment(broadcastThresholdElems: Int = 200000, chunkSize: Int = 100
273316
private val seStrategy = new SECrossJoinAssignment()
274317
private val chunkedStrategy = new ChunkedBroadcastAssignment(chunkSize)
275318

319+
/** Format broadcast size with human-readable units. */
320+
private def formatBroadcastSize(elements: Long): String = {
321+
val bytes = elements * 8 // doubles are 8 bytes
322+
if (bytes < 1024) {
323+
f"${bytes}B"
324+
} else if (bytes < 1024 * 1024) {
325+
f"${bytes / 1024.0}%.1fKB"
326+
} else if (bytes < 1024 * 1024 * 1024) {
327+
f"${bytes / (1024.0 * 1024.0)}%.1fMB"
328+
} else {
329+
f"${bytes / (1024.0 * 1024.0 * 1024.0)}%.1fGB"
330+
}
331+
}
332+
276333
override def assign(
277334
df: DataFrame,
278335
featuresCol: String,
@@ -289,16 +346,34 @@ class AutoAssignment(broadcastThresholdElems: Int = 200000, chunkSize: Int = 100
289346
logInfo(s"AutoAssignment: strategy=SECrossJoin (kernel=${kernel.name})")
290347
seStrategy.assign(df, featuresCol, weightCol, centers, kernel)
291348
} else if (kTimesDim < broadcastThresholdElems) {
349+
val sizeStr = formatBroadcastSize(kTimesDim)
292350
logInfo(
293-
s"AutoAssignment: strategy=BroadcastUDF (kernel=${kernel.name}, k×dim=$kTimesDim < $broadcastThresholdElems)"
351+
s"AutoAssignment: strategy=BroadcastUDF (kernel=${kernel.name}, k=$k, dim=$dim, " +
352+
s"broadcast_size=$kTimesDim elements ≈ $sizeStr < threshold=$broadcastThresholdElems)"
294353
)
295354
broadcastStrategy.assign(df, featuresCol, weightCol, centers, kernel)
296355
} else {
356+
val sizeStr = formatBroadcastSize(kTimesDim)
357+
val thresholdStr = formatBroadcastSize(broadcastThresholdElems)
358+
val overagePercent = ((kTimesDim.toDouble / broadcastThresholdElems - 1.0) * 100).toInt
359+
val suggestedChunkK = math.max(1, broadcastThresholdElems / dim)
360+
297361
logWarning(
298-
s"AutoAssignment: k×dim=$kTimesDim exceeds threshold=$broadcastThresholdElems, using ChunkedBroadcast to avoid OOM"
299-
)
300-
logInfo(
301-
s"AutoAssignment: strategy=ChunkedBroadcast (kernel=${kernel.name}, k=$k, dim=$dim, chunkSize=$chunkSize)"
362+
s"""AutoAssignment: Broadcast size exceeds threshold
363+
| Current: k=$k × dim=$dim = $kTimesDim elements ≈ $sizeStr
364+
| Threshold: $broadcastThresholdElems elements ≈ $thresholdStr
365+
| Overage: +$overagePercent%
366+
|
367+
| Using ChunkedBroadcast (chunkSize=$chunkSize) to avoid OOM.
368+
| This will scan the data ${math.ceil(k.toDouble / chunkSize).toInt} times.
369+
|
370+
| To avoid chunking overhead, consider:
371+
| 1. Reduce k (number of clusters)
372+
| 2. Reduce dimensionality (current: $dim dimensions)
373+
| 3. Increase broadcastThreshold (suggested: k=$k would need ~${kTimesDim} elements)
374+
| 4. Use Squared Euclidean divergence if appropriate (enables fast SE path)
375+
|
376+
| Current configuration can broadcast up to k≈$suggestedChunkK clusters of $dim dimensions.""".stripMargin
302377
)
303378
chunkedStrategy.assign(df, featuresCol, weightCol, centers, kernel)
304379
}

0 commit comments

Comments
 (0)