Skip to content

Commit 4a10aa5

Browse files
committed
feat: Add GPU-accelerated RFI marking and decouple CPU/GPU implementations
refactor: Separate RFI marker into dedicated CPU/GPU modules fix: Correct RFI application timing in CUDA dedispersion pipeline perf: Implement parallel CUDA kernels for RFI zeroing operations chore: Clean up debug logs and improve parameter handling
1 parent 6268bed commit 4a10aa5

File tree

11 files changed

+292
-95
lines changed

11 files changed

+292
-95
lines changed

README.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,9 +38,10 @@ Feedback: use [Issues](https://github.com/lintian233/astroflow/issues) or [Discu
3838

3939
<h2 id="updates">Updates</h2>
4040

41+
42+
- **Aug 22, 2025** - feat: Add GPU-accelerated RFI marking and decouple CPU/GPU implementations
4143
- **Aug 21, 2025** — Public preview of `astroflow` CLI ; CUDA dedispersion and YOLO-based detector integrated.
4244
- **Aug 20, 2025** — Docker image (CUDA 12.x, PyTorch) and end-to-end benchmark scripts.
43-
- **Aug 20, 2025** — Initial repo import and CI skeleton.
4445

4546
<!-- > [!NOTE]
4647
> Roadmap and milestones are tracked in [Projects](https://github.com/lintian233/astroflow/projects). -->

include/cpucal.hpp

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
#include "data.h"
77
#include "filterbank.h"
88
#include "marcoutils.h"
9-
#include "rfimarker.h"
9+
#include "rfimarker_cpu.h"
1010
#include <algorithm>
1111
#include <cmath>
1212
#include <cstddef>
@@ -228,7 +228,7 @@ Spectrum<T> dedispered_fil_with_dm(Filterbank *fil, float tstart, float tend,
228228
chan_start = std::max(static_cast<size_t>(0), chan_start);
229229
chan_end = std::min(static_cast<size_t>(fil->nchans - 1), chan_end);
230230

231-
RfiMarker<T> rfi_marker(maskfile);
231+
RfiMarkerCPU<T> rfi_marker(maskfile);
232232
Spectrum<T> result;
233233
result.nbits = fil->nbits;
234234
result.ntimes = t_len;
@@ -325,10 +325,10 @@ Spectrum<T> dedisperse_spec_with_dm(T *spec, Header header, float dm,
325325
chan_start = std::max(static_cast<size_t>(0), chan_start);
326326
chan_end = std::min(static_cast<size_t>(header.nchans - 1), chan_end);
327327

328-
RfiMarker <T> rfi_marker(maskfile);
328+
RfiMarkerCPU <T> rfi_marker(maskfile);
329329
rfi_marker.mark_rfi(spec, header.nchans, header.ndata);
330-
printf("RFI marked, chan_start: %zu, chan_end: %zu\n", chan_start,
331-
chan_end);
330+
// printf("RFI marked, chan_start: %zu, chan_end: %zu\n", chan_start,
331+
// chan_end);CPU
332332

333333
Spectrum<T> result;
334334
result.nbits = header.nbits;

include/data.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -640,8 +640,8 @@ preprocess_typed_dedisperseddata_with_slicing(const DedispersedDataTyped<dedispe
640640
if (src_rows == 0 || src_cols == 0)
641641
throw std::runtime_error("DedispersedDataTyped.shape 非法");
642642

643-
printf("Input typed data shape: [%zu, %zu], original tsample: %.6f, time_downsample: %d\n",
644-
src_rows, src_cols, header.tsamp, time_downsample);
643+
printf("Input typed data shape: [%zu, %zu], original tsample: %.6f \n",
644+
src_rows, src_cols, header.tsamp);
645645

646646
// 计算切片参数
647647
const float downsampled_tsamp = header.tsamp * time_downsample;

include/rfimarker.h

Lines changed: 32 additions & 71 deletions
Original file line numberDiff line numberDiff line change
@@ -1,88 +1,49 @@
1-
21
#ifndef _RFIMARKER_H
32
#define _RFIMARKER_H
43

54
#include <vector>
65
#include <string>
7-
#include <iostream>
8-
#include <fstream>
9-
#include <sstream>
10-
#include <omp.h>
11-
12-
6+
#include <cuda_runtime.h>
7+
#include <cstdint>
8+
9+
/**
10+
* GPU RFI marker:
11+
* - load_mask(): 在 CPU 读入坏道列表,并上传到 GPU
12+
* - mark_rfi(): 传入 device 指针 (d_data),在 kernel 中把坏道通道清零
13+
*
14+
* 数据布局假设为: data[sample * num_channels + chan] (row-major: time-major)
15+
*/
1316
template <typename T>
1417
class RfiMarker {
1518
public:
1619
RfiMarker();
17-
RfiMarker(const char* mask_file); // Constructor that takes a mask file
18-
RfiMarker(std::string mask_file) : RfiMarker(mask_file.c_str()) {} // Constructor that takes a string
19-
~RfiMarker() = default;
20+
explicit RfiMarker(const char* mask_file);
21+
explicit RfiMarker(const std::string& mask_file) : RfiMarker(mask_file.c_str()) {}
22+
~RfiMarker();
2023

21-
std::vector<int> bad_channels; // Vector to store bad channels
24+
// 将坏道置零(在 GPU 上执行)。d_data 必须是 device 指针
25+
void mark_rfi(T* d_data,
26+
unsigned int num_channels,
27+
unsigned int num_samples,
28+
cudaStream_t stream = 0);
2229

23-
// Method to load the RFI mask
30+
// 重新加载掩码文件(会同步上传到 GPU);文件不存在或为空则视为无坏道
2431
void load_mask(const char* mask_file);
2532

26-
void mark_rfi(T* data, uint num_channels, uint num_samples);
27-
33+
// Host 侧只读坏道列表
34+
const std::vector<int>& get_bad_channels() const { return bad_channels_; }
2835

29-
const std::vector<int>& get_bad_channels() const {
30-
return bad_channels;
31-
}
32-
};
36+
private:
37+
void upload_bad_channels_to_device();
3338

39+
std::vector<int> bad_channels_; // host: 坏道索引
40+
int* d_bad_channels_ = nullptr; // device: 坏道索引
41+
size_t n_bad_ = 0;
42+
};
3443

44+
// ------------ 显式实例化声明(由 .cu 文件提供定义) ------------
45+
extern template class RfiMarker<uint8_t>;
46+
extern template class RfiMarker<uint16_t>;
47+
extern template class RfiMarker<uint32_t>;
3548

36-
template <typename T>
37-
RfiMarker<T>::RfiMarker() {
38-
load_mask("mask.txt"); // Default mask file
39-
}
40-
41-
template <typename T>
42-
RfiMarker<T>::RfiMarker(const char* mask_file) {
43-
load_mask(mask_file); // Load the mask from the provided file
44-
}
45-
46-
template <typename T>
47-
void RfiMarker<T>::mark_rfi(T* data, uint num_channels, uint num_samples) {
48-
// Iterate through the bad channels and mark them in the data
49-
#pragma omp parallel for
50-
for (int chan : bad_channels) {
51-
if (chan >= 0 && chan < num_channels) {
52-
#pragma omp simd
53-
for (uint sample = 0; sample < num_samples; ++sample) {
54-
// Set the data for the bad channel to zero
55-
data[sample * num_channels + chan] = 0;
56-
}
57-
} else {
58-
std::cerr << "Warning: Bad channel index " << chan << " out of range." << std::endl;
59-
}
60-
}
61-
std::cout << "RFI marking completed. Bad channels: " << bad_channels.size() << std::endl;
62-
}
63-
64-
template <typename T>
65-
void RfiMarker<T>::load_mask(const char* mask_file) {
66-
// open the mask file
67-
std::ifstream file(mask_file);
68-
if (!file.is_open()) {
69-
std::cerr << "Error opening mask file: " << mask_file << std::endl
70-
<< "Please check the file path and try again." << std::endl;
71-
return;
72-
}
73-
// if tempty continue
74-
if (file.peek() == std::ifstream::traits_type::eof()) {
75-
return;
76-
}
77-
78-
std::string line;
79-
while (std::getline(file, line)) {
80-
std::stringstream ss(line);
81-
int chan;
82-
while (ss >> chan) {
83-
bad_channels.push_back(chan);
84-
}
85-
}
86-
file.close();
87-
}
88-
#endif //_RFIMARKER_H
49+
#endif // _RFIMARKER_H

include/rfimarker_cpu.h

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
#ifndef _RFIMARKER_CPU_H
2+
#define _RFIMARKER_CPU_H
3+
4+
#include <vector>
5+
#include <string>
6+
#include <cstdint>
7+
8+
/**
9+
* CPU RFI marker:
10+
* - load_mask(): 在 CPU 读入坏道列表
11+
* - mark_rfi(): 传入 host 指针 (h_data),在 CPU 上把坏道通道清零
12+
*
13+
* 数据布局假设为: data[sample * num_channels + chan] (row-major: time-major)
14+
*/
15+
template <typename T>
16+
class RfiMarkerCPU {
17+
public:
18+
RfiMarkerCPU();
19+
explicit RfiMarkerCPU(const char* mask_file);
20+
explicit RfiMarkerCPU(const std::string& mask_file) : RfiMarkerCPU(mask_file.c_str()) {}
21+
~RfiMarkerCPU() = default;
22+
23+
// 在 CPU 上将坏道置零;h_data 必须是 host 指针
24+
void mark_rfi(T* h_data,
25+
unsigned int num_channels,
26+
unsigned int num_samples);
27+
28+
// 重新加载掩码;文件不存在或为空则视为无坏道
29+
void load_mask(const char* mask_file);
30+
31+
const std::vector<int>& get_bad_channels() const { return bad_channels_; }
32+
33+
private:
34+
std::vector<int> bad_channels_;
35+
};
36+
37+
// 显式实例化声明(由 .cpp 定义)
38+
extern template class RfiMarkerCPU<uint8_t>;
39+
extern template class RfiMarkerCPU<uint16_t>;
40+
extern template class RfiMarkerCPU<uint32_t>;
41+
42+
#endif // _RFIMARKER_CPU_H

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ build-backend = "scikit_build_core.build"
1212

1313
[project]
1414
name = "pulseflow"
15-
version = "0.1.1"
15+
version = "0.1.2"
1616
description = "High-performance radio astronomy single pulse Detect lib"
1717
requires-python = ">=3.10"
1818
readme = "README.md"

python/astroflow/dataset/generate.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -126,6 +126,7 @@ def muti_pulsar_search_detect(
126126
config.dm_step,
127127
config.time_downsample,
128128
config.t_sample,
129+
maskfile=maskfile,
129130
)
130131

131132
# Setup output directories

src/core/dedispered.cu

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -543,8 +543,6 @@ dedisperseddata_uint8 dedispered_fil_cuda(Filterbank &fil, float dm_low,
543543
T *d_input;
544544
T *d_binned_input; // 存储分bin后的数据
545545
T *data = static_cast<T *>(fil.data);
546-
RfiMarker<T> rfi_marker(mask_file);
547-
rfi_marker.mark_rfi(data, fil.nchans, fil.ndata);
548546

549547
CHECK_CUDA(cudaMalloc(&d_input, fil.ndata * nchans * sizeof(T)));
550548
CHECK_CUDA(cudaMemcpy(d_input, data, fil.ndata * nchans * sizeof(T),
@@ -590,6 +588,9 @@ dedisperseddata_uint8 dedispered_fil_cuda(Filterbank &fil, float dm_low,
590588
CHECK_CUDA(cudaMalloc(&d_output, dm_steps * down_ndata * sizeof(dedispersion_output_t<T>)));
591589
CHECK_CUDA(cudaMemset(d_output, 0, dm_steps * down_ndata * sizeof(dedispersion_output_t<T>)));
592590

591+
RfiMarker<T> rfi_marker(mask_file);
592+
rfi_marker.mark_rfi(d_binned_input, nchans, down_ndata);
593+
593594
int THREADS_PER_BLOCK = 256;
594595
dim3 threads(THREADS_PER_BLOCK);
595596
dim3 grids((down_ndata + threads.x - 1) / threads.x, dm_steps);
@@ -780,8 +781,6 @@ dedisperseddata_uint8 dedisperse_spec(T *data, Header header, float dm_low,
780781
CHECK_CUDA(cudaGetLastError());
781782
CHECK_CUDA(cudaDeviceSynchronize());
782783

783-
RfiMarker<T> rfi_marker(mask_file);
784-
rfi_marker.mark_rfi(data, header.nchans, header.ndata);
785784

786785
T *d_input;
787786
T *d_binned_input; // 存储分bin后的数据
@@ -826,6 +825,9 @@ dedisperseddata_uint8 dedisperse_spec(T *data, Header header, float dm_low,
826825

827826
printf("Processing full data: DM steps = %zu, Time samples = %zu\n", dm_steps, down_ndata);
828827

828+
RfiMarker<T> rfi_marker(mask_file);
829+
rfi_marker.mark_rfi(d_binned_input, nchans, down_ndata);
830+
829831
dedispersion_output_t<T> *d_output;
830832
CHECK_CUDA(cudaMalloc(&d_output, dm_steps * down_ndata * sizeof(dedispersion_output_t<T>)));
831833
CHECK_CUDA(cudaMemset(d_output, 0, dm_steps * down_ndata * sizeof(dedispersion_output_t<T>)));

src/rfi/rfimarker.cpp

Lines changed: 0 additions & 11 deletions
This file was deleted.

0 commit comments

Comments
 (0)