Skip to content

Commit cfcb761

Browse files
derrickburnsclaude
andcommitted
refactor: Split large files into modular subpackages
Split Strategies.scala (1030 lines) into strategies/ subpackage: - AssignmentStrategy.scala: Assignment strategy trait and implementations - UpdateStrategy.scala: Update strategy trait and implementations - EmptyClusterHandler.scala: Empty cluster handling strategies - ConvergenceCheck.scala: Convergence checking strategies - InputValidator.scala: Input validation strategies Split BregmanKernel.scala (540 lines) into kernels/ subpackage: - BregmanKernel.scala: Core kernel trait - SquaredEuclideanKernel.scala - KLDivergenceKernel.scala - ItakuraSaitoKernel.scala - GeneralizedIDivergenceKernel.scala - LogisticLossKernel.scala - L1Kernel.scala - SphericalKernel.scala Added package objects for df/ and subpackages to re-export types for backward compatibility. Existing code using wildcard imports continues to work unchanged. Also added private[df] visibility modifiers to internal implementation classes, keeping only traits public. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
1 parent 2923f5e commit cfcb761

18 files changed

+1765
-1564
lines changed

src/main/scala/com/massivedatascience/clusterer/ml/df/BregmanKernel.scala

Lines changed: 16 additions & 537 deletions
Large diffs are not rendered by default.

src/main/scala/com/massivedatascience/clusterer/ml/df/Strategies.scala

Lines changed: 37 additions & 1027 deletions
Large diffs are not rendered by default.
Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
package com.massivedatascience.clusterer.ml.df.kernels
2+
3+
import org.apache.spark.ml.linalg.Vector
4+
5+
/** Kernel interface for Bregman divergences in DataFrame-based clustering.
6+
*
7+
* A Bregman divergence is defined by a strictly convex function F: S → ℝ where S ⊆ ℝ^d. The
8+
* divergence between points x and y is: D_F(x, y) = F(x) - F(y) - ⟨∇F(y), x - y⟩
9+
*
10+
* For clustering, we need:
11+
* - grad(x): Gradient ∇F(x) (natural parameters)
12+
* - invGrad(θ): Inverse gradient mapping θ → x where θ = ∇F(x) (expectation parameters)
13+
* - divergence(x, μ): Compute D_F(x, μ)
14+
* - validate(x): Check if x is in valid domain
15+
*
16+
* The cluster centers are computed as: μ = invGrad(∑_i w_i · grad(x_i) / ∑_i w_i)
17+
*/
18+
trait BregmanKernel extends Serializable {
19+
20+
/** Compute the gradient ∇F(x) of the Bregman function at point x.
21+
*
22+
* @param x
23+
* input point
24+
* @return
25+
* gradient vector (natural parameters)
26+
*/
27+
def grad(x: Vector): Vector
28+
29+
/** Compute the inverse gradient mapping: given θ = ∇F(x), recover x.
30+
*
31+
* This maps from natural parameters to expectation parameters.
32+
*
33+
* @param theta
34+
* natural parameters (gradient of some point)
35+
* @return
36+
* point x such that ∇F(x) = theta
37+
*/
38+
def invGrad(theta: Vector): Vector
39+
40+
/** Compute the Bregman divergence D_F(x, mu) between point x and center mu.
41+
*
42+
* @param x
43+
* data point
44+
* @param mu
45+
* cluster center
46+
* @return
47+
* D_F(x, mu) ≥ 0
48+
*/
49+
def divergence(x: Vector, mu: Vector): Double
50+
51+
/** Validate that point x is in the valid domain of the Bregman function.
52+
*
53+
* @param x
54+
* point to validate
55+
* @return
56+
* true if x is in valid domain
57+
*/
58+
def validate(x: Vector): Boolean
59+
60+
/** Name of the kernel for logging and identification.
61+
*/
62+
def name: String
63+
64+
/** Whether this kernel supports expression-based cross-join optimization. Only Squared Euclidean
65+
* can use this fast path.
66+
*/
67+
def supportsExpressionOptimization: Boolean = false
68+
}
Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
package com.massivedatascience.clusterer.ml.df.kernels
2+
3+
import org.apache.spark.ml.linalg.{ Vector, Vectors }
4+
5+
/** Generalized I-divergence kernel: F(x) = ∑_i x_i log(x_i) - x_i
6+
*
7+
* Similar to KL but without normalization constraint. Used for non-negative data like counts.
8+
* - grad(x) = [log(x_i)]
9+
* - invGrad(θ) = [exp(θ_i)]
10+
* - divergence(x, μ) = ∑_i (x_i log(x_i/μ_i) - x_i + μ_i)
11+
*
12+
* @param smoothing
13+
* small constant added to avoid log(0)
14+
*/
15+
private[df] class GeneralizedIDivergenceKernel(smoothing: Double = 1e-10) extends BregmanKernel {
16+
require(smoothing > 0, "Smoothing must be positive")
17+
18+
override def grad(x: Vector): Vector = {
19+
val arr = x.toArray
20+
val result = new Array[Double](arr.length)
21+
var i = 0
22+
while (i < arr.length) {
23+
result(i) = math.log(arr(i) + smoothing)
24+
i += 1
25+
}
26+
Vectors.dense(result)
27+
}
28+
29+
override def invGrad(theta: Vector): Vector = {
30+
val arr = theta.toArray
31+
val result = new Array[Double](arr.length)
32+
var i = 0
33+
while (i < arr.length) {
34+
result(i) = math.exp(arr(i))
35+
i += 1
36+
}
37+
Vectors.dense(result)
38+
}
39+
40+
override def divergence(x: Vector, mu: Vector): Double = {
41+
val xArr = x.toArray
42+
val muArr = mu.toArray
43+
var sum = 0.0
44+
var i = 0
45+
while (i < xArr.length) {
46+
val xi = xArr(i) + smoothing
47+
val mui = muArr(i) + smoothing
48+
sum += xi * math.log(xi / mui) - xi + mui
49+
i += 1
50+
}
51+
sum
52+
}
53+
54+
override def validate(x: Vector): Boolean = {
55+
val arr = x.toArray
56+
arr.forall(v => !v.isNaN && !v.isInfinity && v >= 0.0)
57+
}
58+
59+
override def name: String = s"GeneralizedI(smoothing=$smoothing)"
60+
}
Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
package com.massivedatascience.clusterer.ml.df.kernels
2+
3+
import org.apache.spark.ml.linalg.{ Vector, Vectors }
4+
5+
/** Itakura-Saito divergence kernel: F(x) = -∑_i log(x_i)
6+
*
7+
* Used for spectral analysis and audio processing. Requires all components to be strictly
8+
* positive.
9+
* - grad(x) = [-1/x_i]
10+
* - invGrad(θ) = [-1/θ_i]
11+
* - divergence(x, μ) = ∑_i (x_i/μ_i - log(x_i/μ_i) - 1)
12+
*
13+
* @param smoothing
14+
* small constant added to avoid division by zero
15+
*/
16+
private[df] class ItakuraSaitoKernel(smoothing: Double = 1e-10) extends BregmanKernel {
17+
require(smoothing > 0, "Smoothing must be positive")
18+
19+
override def grad(x: Vector): Vector = {
20+
val arr = x.toArray
21+
val result = new Array[Double](arr.length)
22+
var i = 0
23+
while (i < arr.length) {
24+
result(i) = -1.0 / (arr(i) + smoothing)
25+
i += 1
26+
}
27+
Vectors.dense(result)
28+
}
29+
30+
override def invGrad(theta: Vector): Vector = {
31+
val arr = theta.toArray
32+
val result = new Array[Double](arr.length)
33+
var i = 0
34+
while (i < arr.length) {
35+
result(i) = -1.0 / arr(i)
36+
i += 1
37+
}
38+
Vectors.dense(result)
39+
}
40+
41+
override def divergence(x: Vector, mu: Vector): Double = {
42+
val xArr = x.toArray
43+
val muArr = mu.toArray
44+
var sum = 0.0
45+
var i = 0
46+
while (i < xArr.length) {
47+
val xi = xArr(i) + smoothing
48+
val mui = muArr(i) + smoothing
49+
sum += xi / mui - math.log(xi / mui) - 1.0
50+
i += 1
51+
}
52+
sum
53+
}
54+
55+
override def validate(x: Vector): Boolean = {
56+
val arr = x.toArray
57+
arr.forall(v => !v.isNaN && !v.isInfinity && v > 0.0)
58+
}
59+
60+
override def name: String = s"ItakuraSaito(smoothing=$smoothing)"
61+
}
Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
package com.massivedatascience.clusterer.ml.df.kernels
2+
3+
import org.apache.spark.ml.linalg.{ Vector, Vectors }
4+
5+
/** Kullback-Leibler divergence kernel: F(x) = ∑_i x_i log(x_i)
6+
*
7+
* Used for probability distributions and count data. Requires all components to be positive
8+
* (probability simplex).
9+
* - grad(x) = [log(x_i) + 1]
10+
* - invGrad(θ) = [exp(θ_i - 1)]
11+
* - divergence(x, μ) = ∑_i x_i log(x_i / μ_i)
12+
*
13+
* @param smoothing
14+
* small constant added to avoid log(0)
15+
*/
16+
private[df] class KLDivergenceKernel(smoothing: Double = 1e-10) extends BregmanKernel {
17+
require(smoothing > 0, "Smoothing must be positive")
18+
19+
override def grad(x: Vector): Vector = {
20+
val arr = x.toArray
21+
val result = new Array[Double](arr.length)
22+
var i = 0
23+
while (i < arr.length) {
24+
result(i) = math.log(arr(i) + smoothing) + 1.0
25+
i += 1
26+
}
27+
Vectors.dense(result)
28+
}
29+
30+
override def invGrad(theta: Vector): Vector = {
31+
val arr = theta.toArray
32+
val result = new Array[Double](arr.length)
33+
var i = 0
34+
while (i < arr.length) {
35+
result(i) = math.exp(arr(i) - 1.0)
36+
i += 1
37+
}
38+
Vectors.dense(result)
39+
}
40+
41+
override def divergence(x: Vector, mu: Vector): Double = {
42+
val xArr = x.toArray
43+
val muArr = mu.toArray
44+
var sum = 0.0
45+
var i = 0
46+
while (i < xArr.length) {
47+
val xi = xArr(i) + smoothing
48+
val mui = muArr(i) + smoothing
49+
sum += xi * math.log(xi / mui)
50+
i += 1
51+
}
52+
sum
53+
}
54+
55+
override def validate(x: Vector): Boolean = {
56+
val arr = x.toArray
57+
arr.forall(v => !v.isNaN && !v.isInfinity && v >= 0.0)
58+
}
59+
60+
override def name: String = s"KL(smoothing=$smoothing)"
61+
}
Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
package com.massivedatascience.clusterer.ml.df.kernels
2+
3+
import org.apache.spark.ml.linalg.{ Vector, Vectors }
4+
5+
/** L1 (Manhattan distance) kernel for K-Medians clustering.
6+
*
7+
* This is NOT a Bregman divergence, but we provide it for K-Medians support. K-Medians uses
8+
* component-wise medians instead of gradient-based means.
9+
*
10+
* - Distance: ||x - μ||_1 = ∑_i |x_i - μ_i|
11+
* - Centers: component-wise median (not via gradient)
12+
* - More robust to outliers than squared Euclidean
13+
*
14+
* Note: This kernel should be paired with MedianUpdateStrategy, not GradMeanUDAFUpdate.
15+
*/
16+
private[df] class L1Kernel extends BregmanKernel {
17+
18+
override def grad(x: Vector): Vector = {
19+
// L1 is not differentiable everywhere (non-differentiable at 0)
20+
// Use sign function as subgradient
21+
val arr = x.toArray
22+
val result = new Array[Double](arr.length)
23+
var i = 0
24+
while (i < arr.length) {
25+
result(i) = if (arr(i) > 0) 1.0 else if (arr(i) < 0) -1.0 else 0.0
26+
i += 1
27+
}
28+
Vectors.dense(result)
29+
}
30+
31+
override def invGrad(theta: Vector): Vector = {
32+
// Inverse of sign function is not well-defined
33+
// For K-Medians, we compute medians directly, not via gradient
34+
// Return theta as identity (will be overridden by MedianUpdateStrategy)
35+
theta
36+
}
37+
38+
override def divergence(x: Vector, mu: Vector): Double = {
39+
// Manhattan distance (L1 norm)
40+
val xArr = x.toArray
41+
val muArr = mu.toArray
42+
var sum = 0.0
43+
var i = 0
44+
while (i < xArr.length) {
45+
sum += math.abs(xArr(i) - muArr(i))
46+
i += 1
47+
}
48+
sum
49+
}
50+
51+
override def validate(x: Vector): Boolean = {
52+
val arr = x.toArray
53+
arr.forall(v => !v.isNaN && !v.isInfinity)
54+
}
55+
56+
override def name: String = "L1"
57+
}
Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
package com.massivedatascience.clusterer.ml.df.kernels
2+
3+
import org.apache.spark.ml.linalg.{ Vector, Vectors }
4+
5+
/** Logistic loss kernel: F(x) = ∑_i (x_i log(x_i) + (1-x_i)log(1-x_i))
6+
*
7+
* Used for binary probability vectors. Requires all components in (0, 1).
8+
* - grad(x) = [log(x_i / (1-x_i))]
9+
* - invGrad(θ) = [1 / (1 + exp(-θ_i))] (sigmoid)
10+
* - divergence(x, μ) = ∑_i (x_i log(x_i/μ_i) + (1-x_i)log((1-x_i)/(1-μ_i)))
11+
*
12+
* @param smoothing
13+
* small constant to keep values away from 0 and 1
14+
*/
15+
private[df] class LogisticLossKernel(smoothing: Double = 1e-10) extends BregmanKernel {
16+
require(smoothing > 0 && smoothing < 0.5, "Smoothing must be in (0, 0.5)")
17+
18+
override def grad(x: Vector): Vector = {
19+
val arr = x.toArray
20+
val result = new Array[Double](arr.length)
21+
var i = 0
22+
while (i < arr.length) {
23+
val xi = math.max(smoothing, math.min(1.0 - smoothing, arr(i)))
24+
result(i) = math.log(xi / (1.0 - xi))
25+
i += 1
26+
}
27+
Vectors.dense(result)
28+
}
29+
30+
override def invGrad(theta: Vector): Vector = {
31+
val arr = theta.toArray
32+
val result = new Array[Double](arr.length)
33+
var i = 0
34+
while (i < arr.length) {
35+
result(i) = 1.0 / (1.0 + math.exp(-arr(i)))
36+
i += 1
37+
}
38+
Vectors.dense(result)
39+
}
40+
41+
override def divergence(x: Vector, mu: Vector): Double = {
42+
val xArr = x.toArray
43+
val muArr = mu.toArray
44+
var sum = 0.0
45+
var i = 0
46+
while (i < xArr.length) {
47+
val xi = math.max(smoothing, math.min(1.0 - smoothing, xArr(i)))
48+
val mui = math.max(smoothing, math.min(1.0 - smoothing, muArr(i)))
49+
sum += xi * math.log(xi / mui) + (1.0 - xi) * math.log((1.0 - xi) / (1.0 - mui))
50+
i += 1
51+
}
52+
sum
53+
}
54+
55+
override def validate(x: Vector): Boolean = {
56+
val arr = x.toArray
57+
arr.forall(v => !v.isNaN && !v.isInfinity && v >= 0.0 && v <= 1.0)
58+
}
59+
60+
override def name: String = s"LogisticLoss(smoothing=$smoothing)"
61+
}

0 commit comments

Comments
 (0)