-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathone_pass_stats.py
More file actions
88 lines (69 loc) · 3.18 KB
/
one_pass_stats.py
File metadata and controls
88 lines (69 loc) · 3.18 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
# Copyright (c) 2015 Peter Gaultney — MIT License (see LICENSE)
from math import sqrt
# from https://en.wikipedia.org/wiki/Algorithms_for_calculating_variance#Parallel_algorithm
class ParallelDescriptiveStats:
__slots__ = ("count", "mean", "min", "max", "_m2", "delta")
def __init__(self) -> None:
self.count: int = 0
self.mean: float = 0.0
self.min: float = float("inf")
self.max: float = -float("inf")
self._m2: float = 0.0 # second moment
self.delta: float = 0.0 # useful for a separate covariance calculation
def add_value(self, value: float) -> None:
if value > self.max:
self.max = value
if value < self.min:
self.min = value
self.count += 1
self.delta = value - self.mean
self.mean = self.mean + self.delta / self.count
self._m2 = self._m2 + self.delta * (value - self.mean)
def variance(self, sample: bool = True) -> float:
if self.count < 2:
return 0.0
return self._m2 / (self.count - (1 if sample else 0))
def stddev(self, sample: bool = True) -> float:
return sqrt(self.variance(sample))
def merge(self, other: "ParallelDescriptiveStats") -> "ParallelDescriptiveStats":
"""Non-destructive merge — returns a new instance."""
total_count = self.count + other.count
if total_count == 0:
return ParallelDescriptiveStats()
result = ParallelDescriptiveStats()
result.count = total_count
result.mean = (self.count * self.mean + other.count * other.mean) / total_count
delta = other.mean - self.mean
result._m2 = self._m2 + other._m2 + delta * delta * self.count * other.count / total_count
result.max = max(self.max, other.max)
result.min = min(self.min, other.min)
return result
# backwards compat
addValue = add_value
getVariance = variance
getStddev = stddev
# from http://prod.sandia.gov/techlib/access-control.cgi/2008/086212.pdf
class ParallelCovariance:
__slots__ = ("_co2", "x", "y")
def __init__(self) -> None:
self._co2: float = 0.0 # 2nd comoment
self.x: ParallelDescriptiveStats = ParallelDescriptiveStats()
self.y: ParallelDescriptiveStats = ParallelDescriptiveStats()
def add_pair(self, x: float, y: float) -> None:
self.x.add_value(x)
self.y.add_value(y)
self._co2 = self._co2 + (self.x.count - 1) * self.x.delta * self.y.delta / self.x.count
def covariance(self, sample: bool = True) -> float:
if self.x.count <= 1:
return 0.0
return self._co2 / (self.x.count - (1 if sample else 0))
def pearson(self, sample: bool = True) -> float:
return self.covariance(sample) / (self.x.stddev(sample) * self.y.stddev(sample))
def merge(self, other: "ParallelCovariance") -> "ParallelCovariance":
result = ParallelCovariance()
result.x = self.x.merge(other.x)
result.y = self.y.merge(other.y)
dx = other.x.mean - self.x.mean
dy = other.y.mean - self.y.mean
result._co2 = self._co2 + other._co2 + dx * dy * self.x.count * other.x.count / result.x.count
return result