1+ import numpy as np
2+ from numba import float64
3+ from .base import BaseDatafit
4+
5+
6+ class DoubleQuadratic (BaseDatafit ):
7+ """Double Quadratic datafit with asymmetric loss.
8+
9+ The datafit reads:
10+
11+ .. math:: 1 / (2 \\ times n_\\ text{samples}) \\ sum_i (\\ alpha + (1-2\\ alpha) \\ cdot 1[\\ epsilon_i > 0]) \\ epsilon_i^2
12+
13+ where :math:`\\ epsilon_i = (Xw)_i - y_i` are the residuals.
14+
15+ Parameters
16+ ----------
17+ alpha : float, default=0.5
18+ Asymmetry parameter controlling the relative weighting of positive vs
19+ negative residuals:
20+ - alpha = 0.5: symmetric loss (equivalent to standard Quadratic)
21+ - alpha < 0.5: penalize positive residuals (overestimation) more heavily
22+ - alpha > 0.5: penalize negative residuals (underestimation) more heavily
23+
24+ Attributes
25+ ----------
26+ Xty : array, shape (n_features,)
27+ Pre-computed quantity used during the gradient evaluation.
28+ Equal to ``X.T @ y``.
29+
30+ Note
31+ ----
32+ The class is jit compiled at fit time using Numba compiler.
33+ This allows for faster computations.
34+ """
35+
36+ def __init__ (self , alpha = 0.5 ):
37+ if not 0 <= alpha <= 1 :
38+ raise ValueError (f"alpha must be between 0 and 1, got { alpha } " )
39+ self .alpha = alpha
40+
41+ def get_spec (self ):
42+ spec = (
43+ ('alpha' , float64 ),
44+ ('Xty' , float64 [:]),
45+ )
46+ return spec
47+
48+ def params_to_dict (self ):
49+ return dict (alpha = self .alpha )
50+
51+ def get_lipschitz (self , X , y ):
52+ """Compute per-coordinate Lipschitz constants.
53+
54+ For DoubleQuadratic with scaling factor 2, the Lipschitz constant
55+ for coordinate j is bounded by 2 * max_weight * ||X[:, j]||^2 / n_samples.
56+ """
57+ n_features = X .shape [1 ]
58+
59+ # Compute weight bounds (after scaling by 2)
60+ weight_pos = 2 * (1 - self .alpha ) # weight for positive residuals
61+ weight_neg = 2 * self .alpha # weight for negative residuals
62+ max_weight = max (weight_pos , weight_neg )
63+
64+ lipschitz = np .zeros (n_features , dtype = X .dtype )
65+ for j in range (n_features ):
66+ lipschitz [j ] = max_weight * (X [:, j ] ** 2 ).sum () / len (y )
67+
68+ return lipschitz
69+
70+ def get_lipschitz_sparse (self , X_data , X_indptr , X_indices , y ):
71+ """Sparse version of get_lipschitz."""
72+ n_features = len (X_indptr ) - 1
73+
74+ # Compute weight bounds (after scaling by 2)
75+ weight_pos = 2 * (1 - self .alpha )
76+ weight_neg = 2 * self .alpha
77+ max_weight = max (weight_pos , weight_neg )
78+
79+ lipschitz = np .zeros (n_features , dtype = X_data .dtype )
80+
81+ for j in range (n_features ):
82+ nrm2 = 0.
83+ for idx in range (X_indptr [j ], X_indptr [j + 1 ]):
84+ nrm2 += X_data [idx ] ** 2
85+
86+ lipschitz [j ] = max_weight * nrm2 / len (y )
87+
88+ return lipschitz
89+
90+ def get_global_lipschitz (self , X , y ):
91+ """Global Lipschitz constant."""
92+ weight_pos = 2 * (1 - self .alpha )
93+ weight_neg = 2 * self .alpha
94+ max_weight = max (weight_pos , weight_neg )
95+
96+ from scipy .linalg import norm
97+ return max_weight * norm (X , ord = 2 ) ** 2 / len (y )
98+
99+ def get_global_lipschitz_sparse (self , X_data , X_indptr , X_indices , y ):
100+ """Sparse version of global Lipschitz constant."""
101+ weight_pos = 2 * (1 - self .alpha )
102+ weight_neg = 2 * self .alpha
103+ max_weight = max (weight_pos , weight_neg )
104+
105+ from .utils import spectral_norm
106+ return max_weight * spectral_norm (X_data , X_indptr , X_indices , len (y )) ** 2 / len (y )
107+
108+ def initialize (self , X , y ):
109+ """Pre-compute X.T @ y for efficient gradient computation."""
110+ self .Xty = X .T @ y
111+
112+ def initialize_sparse (self , X_data , X_indptr , X_indices , y ):
113+ """Sparse version of initialize."""
114+ n_features = len (X_indptr ) - 1
115+ self .Xty = np .zeros (n_features , dtype = X_data .dtype )
116+
117+ for j in range (n_features ):
118+ xty = 0
119+ for idx in range (X_indptr [j ], X_indptr [j + 1 ]):
120+ xty += X_data [idx ] * y [X_indices [idx ]]
121+
122+ self .Xty [j ] = xty
123+
124+ def value (self , y , w , Xw ):
125+ """Compute the asymmetric quadratic loss value.
126+
127+ When alpha=0.5, this should be identical to Quadratic loss.
128+ The formula needs to be: (1/2n) * Σ weights * (y - Xw)²
129+ where weights are normalized so that alpha=0.5 gives weight=1.0
130+ """
131+ # Match Quadratic exactly: use (y - Xw) for loss computation
132+ residuals = y - Xw
133+
134+ # For asymmetric weighting, check sign of (Xw - y)
135+ prediction_residuals = Xw - y
136+
137+ # Compute weights, normalized so alpha=0.5 gives weight=1.0
138+ # Original formula: α + (1-2α) * 1[εᵢ>0]
139+ # At α=0.5: 0.5 + 0 = 0.5, but we want 1.0
140+ # So we need to scale by 2: 2 * (α + (1-2α) * 1[εᵢ>0])
141+ weights = 2 * (self .alpha + (1 - 2 * self .alpha ) * (prediction_residuals > 0 ))
142+
143+ # Return normalized loss
144+ return np .sum (weights * residuals ** 2 ) / (2 * len (y ))
145+
146+ def gradient_scalar (self , X , y , w , Xw , j ):
147+ """Compute gradient w.r.t. coordinate j."""
148+ prediction_residuals = Xw - y # For gradient computation
149+
150+ # Compute weights with same scaling as value()
151+ weights = 2 * (self .alpha + (1 - 2 * self .alpha ) * (prediction_residuals > 0 ))
152+
153+ # Gradient: X[:, j].T @ (weights * (Xw - y)) / n_samples
154+ return (X [:, j ] @ (weights * prediction_residuals )) / len (y )
155+
156+ def gradient_scalar_sparse (self , X_data , X_indptr , X_indices , y , Xw , j ):
157+ """Sparse version of gradient_scalar."""
158+ prediction_residuals = Xw - y
159+
160+ # Compute weights with same scaling
161+ weights = 2 * (self .alpha + (1 - 2 * self .alpha ) * (prediction_residuals > 0 ))
162+
163+ # Compute X[:, j].T @ (weights * prediction_residuals) for sparse X
164+ XjT_weighted_residuals = 0.
165+ for i in range (X_indptr [j ], X_indptr [j + 1 ]):
166+ sample_idx = X_indices [i ]
167+ XjT_weighted_residuals += X_data [i ] * weights [sample_idx ] * prediction_residuals [sample_idx ]
168+
169+ return XjT_weighted_residuals / len (y )
170+
171+ def gradient (self , X , y , Xw ):
172+ """Compute full gradient vector."""
173+ prediction_residuals = Xw - y
174+
175+ # Compute weights with same scaling as value()
176+ weights = 2 * (self .alpha + (1 - 2 * self .alpha ) * (prediction_residuals > 0 ))
177+
178+ # Return X.T @ (weights * prediction_residuals) / n_samples
179+ return X .T @ (weights * prediction_residuals ) / len (y )
180+
181+ def raw_grad (self , y , Xw ):
182+ """Compute gradient of datafit w.r.t Xw."""
183+ prediction_residuals = Xw - y
184+
185+ # Compute weights with same scaling
186+ weights = 2 * (self .alpha + (1 - 2 * self .alpha ) * (prediction_residuals > 0 ))
187+
188+ return weights * prediction_residuals / len (y )
189+
190+ def raw_hessian (self , y , Xw ):
191+ """Compute Hessian of datafit w.r.t Xw."""
192+ prediction_residuals = Xw - y
193+
194+ # Compute weights with same scaling
195+ weights = 2 * (self .alpha + (1 - 2 * self .alpha ) * (prediction_residuals > 0 ))
196+
197+ return weights / len (y )
198+
199+ def full_grad_sparse (self , X_data , X_indptr , X_indices , y , Xw ):
200+ """Sparse version of full gradient computation."""
201+ n_features = X_indptr .shape [0 ] - 1
202+ n_samples = y .shape [0 ]
203+ prediction_residuals = Xw - y
204+
205+ # Compute weights with same scaling
206+ weights = 2 * (self .alpha + (1 - 2 * self .alpha ) * (prediction_residuals > 0 ))
207+
208+ grad = np .zeros (n_features , dtype = Xw .dtype )
209+ for j in range (n_features ):
210+ XjT_weighted_residuals = 0.
211+ for i in range (X_indptr [j ], X_indptr [j + 1 ]):
212+ sample_idx = X_indices [i ]
213+ XjT_weighted_residuals += X_data [i ] * weights [sample_idx ] * prediction_residuals [sample_idx ]
214+ grad [j ] = XjT_weighted_residuals / n_samples
215+ return grad
216+
217+ def intercept_update_step (self , y , Xw ):
218+ """Compute intercept update step."""
219+ prediction_residuals = Xw - y
220+
221+ # Compute weights with same scaling
222+ weights = 2 * (self .alpha + (1 - 2 * self .alpha ) * (prediction_residuals > 0 ))
223+
224+ return np .mean (weights * prediction_residuals )
225+
226+
227+ # Test function to validate our implementation
228+ def _test_double_quadratic ():
229+ """Test DoubleQuadratic implementation."""
230+ import numpy as np
231+ from .single_task import Quadratic
232+
233+ print ("Testing DoubleQuadratic implementation..." )
234+
235+ # Test data
236+ np .random .seed (42 )
237+ n_samples , n_features = 50 , 10
238+ X = np .random .randn (n_samples , n_features )
239+ y = np .random .randn (n_samples )
240+ w = np .random .randn (n_features )
241+ Xw = X @ w
242+
243+ # Test 1: alpha=0.5 should match standard Quadratic
244+ print ("\n === Test 1: alpha=0.5 vs Quadratic ===" )
245+
246+ quad = Quadratic ()
247+ quad .initialize (X , y )
248+
249+ dquad = DoubleQuadratic (alpha = 0.5 )
250+ dquad .initialize (X , y )
251+
252+ loss_quad = quad .value (y , w , Xw )
253+ loss_dquad = dquad .value (y , w , Xw )
254+
255+ print (f"Quadratic loss: { loss_quad :.8f} " )
256+ print (f"DoubleQuadratic: { loss_dquad :.8f} " )
257+ print (f"Difference: { abs (loss_quad - loss_dquad ):.2e} " )
258+
259+ # Test gradients
260+ grad_quad = quad .gradient (X , y , Xw )
261+ grad_dquad = dquad .gradient (X , y , Xw )
262+ grad_diff = np .linalg .norm (grad_quad - grad_dquad )
263+
264+ print (f"Gradient difference: { grad_diff :.2e} " )
265+
266+ # Test case 2: Asymmetric behavior
267+ print ("\n === Test 2: Asymmetric behavior ===" )
268+
269+ # Create simple test case with known residuals
270+ X_simple = np .eye (4 ) # Identity matrix
271+ y_simple = np .array ([0. , 0. , 0. , 0. ])
272+ w_simple = np .array ([1. , - 1. , 2. , - 2. ]) # Predictions: [1, -1, 2, -2], so prediction_residuals = [1, -1, 2, -2]
273+ Xw_simple = X_simple @ w_simple
274+
275+ dquad_asym = DoubleQuadratic (alpha = 0.3 ) # Penalize positive residuals more
276+ dquad_asym .initialize (X_simple , y_simple )
277+
278+ loss_asym = dquad_asym .value (y_simple , w_simple , Xw_simple )
279+
280+ # Manual calculation:
281+ # prediction_residuals = [1, -1, 2, -2] (Xw - y)
282+ # weights = 0.3 + 0.4 * [1, 0, 1, 0] = [0.7, 0.3, 0.7, 0.3]
283+ # loss = (1/(2*4)) * (0.7*1² + 0.3*1² + 0.7*4² + 0.3*4²)
284+ expected = (1 / 8 ) * (0.7 * 1 + 0.3 * 1 + 0.7 * 4 + 0.3 * 4 )
285+
286+ print (f"Asymmetric loss: { loss_asym :.6f} " )
287+ print (f"Expected: { expected :.6f} " )
288+ print (f"Difference: { abs (loss_asym - expected ):.2e} " )
289+
290+ print ("\n === All tests completed ===" )
291+
292+
293+ if __name__ == "__main__" :
294+ _test_double_quadratic ()
0 commit comments