-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathray.py
More file actions
83 lines (67 loc) · 2.01 KB
/
ray.py
File metadata and controls
83 lines (67 loc) · 2.01 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
import ray
import dask.array as da
import numpy as np
# -------------------------------
# Ray initialization
# -------------------------------
ray.init(ignore_reinit_error=True)
# -------------------------------
# Dask-based feature extraction
# -------------------------------
def extract_low_freq_features(signal, cutoff_ratio=0.05):
"""
Extract low-frequency components and delta-based features.
"""
d_signal = da.from_array(signal, chunks=1024)
# FFT
fft_vals = da.fft.fft(d_signal)
freqs = da.fft.fftfreq(d_signal.shape[0])
# Low-frequency mask
low_freq_mask = da.abs(freqs) < cutoff_ratio
low_freq_fft = fft_vals * low_freq_mask
# Reconstruct low-frequency signal
low_freq_signal = da.fft.ifft(low_freq_fft).real
# Delta features (1st + 2nd order)
delta_1 = da.diff(low_freq_signal)
delta_2 = da.diff(delta_1)
# Feature vector
features = da.array([
low_freq_signal.mean(),
low_freq_signal.std(),
delta_1.mean(),
delta_1.std(),
delta_2.mean(),
delta_2.std(),
])
return features.compute()
# -------------------------------
# Ray worker
# -------------------------------
@ray.remote
def ray_worker(worker_id, data_chunk):
features = extract_low_freq_features(data_chunk)
return {
"worker_id": worker_id,
"feature_vector": features,
"opportunity_score": float(np.linalg.norm(features))
}
# -------------------------------
# Coordinator
# -------------------------------
def run_pipeline(num_rays=61, signal_length=16384):
tasks = []
for i in range(num_rays):
# Simulated low-frequency noisy signal
t = np.linspace(0, 10, signal_length)
signal = (
np.sin(0.5 * t) +
0.1 * np.random.randn(signal_length)
)
tasks.append(ray_worker.remote(i, signal))
return ray.get(tasks)
# -------------------------------
# Run
# -------------------------------
results = run_pipeline()
for r in results[:3]:
print(r)