Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Project.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ Sockets = "6462fe0b-24de-5631-8697-dd941f90decc"
Statistics = "10745b16-79ce-11e8-11f9-7d13ad32a3b2"
StringEncodings = "69024149-9ee7-55f6-a4c4-859efe599b68"
TOML = "fa267f1f-6049-4f14-aa54-33bafae1ed76"
Test = "8dfed614-e22c-5e08-85e1-65c5234f0b40"
ThreadPools = "b189fb0b-2eb5-4ed4-bc0c-d34c51242431"
UUIDs = "cf7118a7-6976-5b1a-9a39-7adc72f591a4"
UnicodePlots = "b8865327-cd53-5732-bb35-84acbb429228"
Expand Down
66 changes: 66 additions & 0 deletions example/CompareSaveMethods.jl
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
using MPIMeasurements
using FFTW
using Statistics

println("="^70)
println("MPI Data Storage Methods Comparison")
println("="^70)

config = (numSamples=1632, numChannels=3, numPeriods=500, numFrames=100,
numPeriodGrouping=5, selectedFrequencies=50)
testData = randn(Float32, config.numSamples, config.numChannels, config.numPeriods, config.numFrames)

println("\nMethod 1: Traditional (Full Time Domain)")
println("-"^70)
traditionalSize = sizeof(testData)
println("Storage: $(round(traditionalSize / 1024^2, digits=2)) MB")
println("Post-processing: Period grouping, FFT, frequency filtering needed")

println("\nMethod 2: Frequency Filtered During Acquisition")
println("-"^70)
mutable struct FilteredStorage <: StorageBuffer
frequencyData::Vector{Any}
end
FilteredStorage() = FilteredStorage(Any[])
Base.push!(buffer::FilteredStorage, data) = (push!(buffer.frequencyData, data); (start=1, stop=size(data, 4)))
MPIMeasurements.sinks!(buffer::FilteredStorage, sinks::Vector{SinkBuffer}) = sinks

filteredBuffer = FilteredStorage()
frequencyIndices = collect(1:config.selectedFrequencies)
rfftBuffer = RFFTBuffer(filteredBuffer, frequencyIndices)
periodGroupingBuffer = PeriodGroupingBuffer(rfftBuffer, config.numPeriodGrouping)
push!(periodGroupingBuffer, testData)

filteredSize = sizeof(filteredBuffer.frequencyData[1])
println("Storage: $(round(filteredSize / 1024^2, digits=2)) MB")
println("Post-processing: None, ready for reconstruction")

savings = traditionalSize - filteredSize
savingsPercent = (1 - filteredSize / traditionalSize) * 100
compressionRatio = traditionalSize / filteredSize

println("\n"*"="^70)
println("Analysis")
println("="^70)
println("Space saved: $(round(savings / 1024^2, digits=2)) MB ($(round(savingsPercent, digits=2))%)")
println("Compression ratio: $(round(compressionRatio, digits=1))×")

positionsInScan = 3000
totalFrames = config.numFrames * positionsInScan
traditionalTotalSize = traditionalSize / config.numFrames * totalFrames
filteredTotalSize = filteredSize / config.numFrames * totalFrames
totalSavings = traditionalTotalSize - filteredTotalSize

println("\nFull 3D Scan Projection ($positionsInScan positions):")
println(" Traditional: $(round(traditionalTotalSize / 1024^3, digits=2)) GB")
println(" Filtered: $(round(filteredTotalSize / 1024^3, digits=2)) GB")
println(" Savings: $(round(totalSavings / 1024^3, digits=2)) GB")

archiveUploadSpeed = 100 * 1024^2
traditionalUploadTime = traditionalTotalSize / archiveUploadSpeed
filteredUploadTime = filteredTotalSize / archiveUploadSpeed
println("\nNetwork Transfer (100 MB/s):")
println(" Traditional: $(round(traditionalUploadTime / 60, digits=1)) min")
println(" Filtered: $(round(filteredUploadTime / 60, digits=1)) min")
println(" Time saved: $(round((traditionalUploadTime - filteredUploadTime) / 60, digits=1)) min")
println("="^70)
101 changes: 101 additions & 0 deletions example/FrequencyFilteringDemo.jl
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
using MPIMeasurements
using FFTW
using Statistics

println("="^70)
println("Frequency Filtering Demonstration")
println("="^70)

numSamples, numChannels, numPeriods, numFrames = 1632, 3, 500, 100
numPeriodGrouping = 5

function createSyntheticMPIData(samples, channels, periods, frames)
data = zeros(Float32, samples, channels, periods, frames)
for freq in [1, 2, 3, 5, 7, 11]
amplitude = 1.0 / freq
for t in 1:samples
data[t, :, :, :] .+= amplitude * sin(2π * freq * (t-1) / samples)
end
end
data .+= 0.1f0 * randn(Float32, size(data)...)
return data
end

originalData = createSyntheticMPIData(numSamples, numChannels, numPeriods, numFrames)
originalSize = sizeof(originalData)
println("\nOriginal: $(size(originalData)), $(round(originalSize / 1024^2, digits=2)) MB")

println("\nApplying frequency filtering...")

selectedFrequencies = 1:50
numFrequenciesAfterGrouping = div(numSamples * numPeriodGrouping, 2) + 1
println(" Selected: $(length(selectedFrequencies)) / $numFrequenciesAfterGrouping frequencies")

mutable struct CaptureBuffer <: StorageBuffer
data::Vector{Any}
end
CaptureBuffer() = CaptureBuffer(Any[])
Base.push!(buffer::CaptureBuffer, data) = (push!(buffer.data, data); (start=1, stop=size(data, 4)))
MPIMeasurements.sinks!(buffer::CaptureBuffer, sinks::Vector{SinkBuffer}) = sinks

captureBuffer = CaptureBuffer()
rfftBuffer = RFFTBuffer(captureBuffer, collect(selectedFrequencies))
periodGroupingBuffer = PeriodGroupingBuffer(rfftBuffer, numPeriodGrouping)
push!(periodGroupingBuffer, originalData)

filteredData = captureBuffer.data[1]
filteredSize = sizeof(filteredData)

println("\nFiltered: $(size(filteredData)), $(round(filteredSize / 1024^2, digits=2)) MB")
reductionPercent = (1 - filteredSize / originalSize) * 100
compressionRatio = originalSize / filteredSize

println("\n"*"="^70)
println("Results")
println("="^70)
println("Data reduction: $(round(reductionPercent, digits=2))%")
println("Compression ratio: $(round(compressionRatio, digits=1))×")
println("Storage saved: $(round((originalSize - filteredSize) / 1024^2, digits=2)) MB")

fullSpectrum = abs.(rfft(originalData[:, 1, 1, 1]))
filteredSpectrum = abs.(filteredData[:, 1, 1, 1])
println("\nSpectrum analysis:")
println(" Full length: $(length(fullSpectrum)), strongest at $(argmax(fullSpectrum))")
println(" Filtered length: $(length(filteredSpectrum)), strongest at $(argmax(filteredSpectr um))")
println("="^70)

println(" Filtered data shape: $(size(filteredData))")
println(" Filtered data size: $(round(filteredSize / 1024^2, digits=2)) MB")
println(" Data type: $(eltype(filteredData))")

# ============================================================================
# Part 3: Calculate Savings
# ============================================================================

println("\n[3/4] Analyzing storage efficiency...")

reductionPercent = (1 - filteredSize / originalSize) * 100
compressionRatio = originalSize / filteredSize

println(" ✓ Data reduction: $(round(reductionPercent, digits=2))%")
println(" ✓ Compression ratio: $(round(compressionRatio, digits=1))×")
println(" ✓ Storage saved: $(round((originalSize - filteredSize) / 1024^2, digits=2)) MB")

# ============================================================================
# Part 4: Frequency Spectrum Analysis
# ============================================================================

println("\n[4/4] Frequency spectrum analysis...")

# Compute full spectrum for comparison
fullSpectrum = rfft(originalData[:, 1, 1, 1]) # First channel, first period, first frame
fullSpectrumMagnitude = abs.(fullSpectrum)

# Filtered spectrum (what we actually store)
filteredSpectrum = filteredData[:, 1, 1, 1] # Same selection
filteredSpectrumMagnitude = abs.(filteredSpectrum)

println(" Full spectrum length: $(length(fullSpectrum))")
println(" Filtered spectrum length: $(length(filteredSpectrum))")
println(" Strongest at: $(argmax(filteredSpectrum))")
println("="^70)
73 changes: 73 additions & 0 deletions example/VisualizeFilteringPipeline.jl
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
using MPIMeasurements
using FFTW
using Statistics

println("="^70)
println("Frequency Filtering Pipeline Visualization")
println("="^70)

numSamples, numChannels, numPeriods, numFrames = 64, 3, 12, 4
numPeriodGrouping, selectedFrequencies = 3, [1, 3, 5, 7, 9, 11, 13, 15]

t = range(0, 2π, length=numSamples)
signal = sin.(t .* 3) .+ 0.5 .* sin.(t .* 7) .+ 0.2 .* randn(numSamples)
inputData = zeros(Float32, numSamples, numChannels, numPeriods, numFrames)
for c in 1:numChannels, p in 1:numPeriods, f in 1:numFrames
inputData[:, c, p, f] .= signal .* (1 + 0.1 * randn())
end

println("\n"*"="^70)
println("Stage 1: Time Domain Input")
println("="^70)
println("Shape: $(size(inputData))")
println("Memory: $(sizeof(inputData)) bytes")
println("Type: $(eltype(inputData))")

println("\n"*"="^70)
println("Stage 2: Period Grouping (×$numPeriodGrouping)")
println("="^70)
tmp = permutedims(inputData, (1, 3, 2, 4))
tmp2 = reshape(tmp, numSamples * numPeriodGrouping, div(numPeriods, numPeriodGrouping), numChannels, numFrames)
groupedData = permutedims(tmp2, (1, 3, 2, 4))
println("Shape: $(size(inputData)) → $(size(groupedData))")
println("Memory: $(sizeof(groupedData)) bytes")

println("\n"*"="^70)
println("Stage 3: Real FFT")
println("="^70)
fftData = rfft(groupedData, 1)
numFrequencies = size(fftData, 1)
println("Shape: $(size(groupedData)) → $(size(fftData))")
println("Memory: $(sizeof(fftData)) bytes")
println("Type: $(eltype(fftData))")

spectrum = abs.(fftData[:, 1, 1, 1])
topFreqs = sortperm(spectrum, rev=true)[1:3]
println("Top frequencies: $(topFreqs)")

println("\n"*"="^70)
println("Stage 4: Frequency Selection")
println("="^70)
filteredData = fftData[selectedFrequencies, :, :, :]
println("Shape: $(size(fftData)) → $(size(filteredData))")
println("Memory: $(sizeof(fftData)) → $(sizeof(filteredData)) bytes")
println("Reduction: $(round((1 - sizeof(filteredData) / sizeof(fftData)) * 100, digits=1))%")

println("\n"*"="^70)
println("Stage 5: MDF Storage")
println("="^70)
println("Shape: $(size(filteredData))")
println("Type: $(eltype(filteredData))")
println("MDF metadata: isFourierTransformed=true, isFrequencySelection=true")

totalReduction = (1 - sizeof(filteredData) / sizeof(inputData)) * 100
compressionRatio = sizeof(inputData) / sizeof(filteredData)

println("\n"*"="^70)
println("Pipeline Summary")
println("="^70)
println("Original: $(sizeof(inputData)) bytes")
println("Filtered: $(sizeof(filteredData)) bytes")
println("Reduction: $(round(totalReduction, digits=2))%")
println("Compression: $(round(compressionRatio, digits=1))×")
println("="^70)
46 changes: 45 additions & 1 deletion src/Protocols/Storage/ChainableBuffer.jl
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
# Export new frequency filtering buffers
export PeriodGroupingBuffer, RFFTBuffer

mutable struct AverageBuffer{T} <: IntermediateBuffer where {T<:Number}
target::StorageBuffer
buffer::Array{T,4}
Expand Down Expand Up @@ -221,4 +224,45 @@ function TxDAQControllerBuffer(tx::TxDAQController, sequence::ControlSequence)
end
update!(buffer::TxDAQControllerBuffer, start, stop) = insert!(buffer, calcControlMatrix(buffer.tx.cont), start, stop)
insert!(buffer::TxDAQControllerBuffer, applied::Matrix{ComplexF64}, start, stop) = buffer.applied[:, :, :, start:stop] .= applied
read(buffer::TxDAQControllerBuffer) = buffer.applied
read(buffer::TxDAQControllerBuffer) = buffer.applied

mutable struct PeriodGroupingBuffer{T} <: IntermediateBuffer where {T<:Number}
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This would then be:

mutable struct PeriodGroupingBuffer{T, T, S} <: IntermediateBuffer{T, T, S} where {T}
  target::S
  numGrouping::Int
end

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And as discussed, the IntermediateBuffer where {T<:Number} has no effect (which is a problem in the older buffers as well 🙈)

target::StorageBuffer
numGrouping::Int
end
PeriodGroupingBuffer(buffer::StorageBuffer, numGrouping::Int) = PeriodGroupingBuffer{Float32}(buffer, numGrouping)

function push!(buffer::PeriodGroupingBuffer{T}, frames::AbstractArray{T,4}) where {T<:Number}
if buffer.numGrouping == 1
return push!(buffer.target, frames)
end

numSamples, numChannels, numPeriods, numFrames = size(frames)

if mod(numPeriods, buffer.numGrouping) != 0
error("Periods cannot be grouped: $numPeriods periods cannot be divided by $(buffer.numGrouping)")
end

tmp = permutedims(frames, (1, 3, 2, 4))
newNumPeriods = div(numPeriods, buffer.numGrouping)
tmp2 = reshape(tmp, numSamples * buffer.numGrouping, newNumPeriods, numChannels, numFrames)
result = permutedims(tmp2, (1, 3, 2, 4))

return push!(buffer.target, result)
end

sinks!(buffer::PeriodGroupingBuffer, sinks::Vector{SinkBuffer}) = sinks!(buffer.target, sinks)

mutable struct RFFTBuffer{T} <: IntermediateBuffer where {T<:Complex}
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we redfine our buffers, we can define the rfft buffer as follows:

mutable struct RFFTBuffer{T, Complex{T}, S <: IntermediateBuffer{Complex{T}}, M <: Union{Vector{Int32}, Nothing}} <: IntermediateBuffer{T, Complex{T}, S}
  target::S
  frequencyMask::M
end

And have its functions be for example:

function push!(buffer::RFFTBuffer{T, Complex{T}}, frames::AbstractArray{T,4}) where {T}
  dataFD = rfft(frames, 1)
  result = isnothing(buffer.frequencyMask) ? dataFD : dataFD[buffer.frequencyMask, :, :, :]
  return push!(buffer.target, result)
end

Or even do an FFTBuffer and decide between fft and rfft based on input data

target::StorageBuffer
frequencyMask::Union{Vector{Int}, Nothing}
end
RFFTBuffer(buffer::StorageBuffer, frequencyMask::Union{Vector{Int}, Nothing} = nothing) = RFFTBuffer{ComplexF32}(buffer, frequencyMask)

function push!(buffer::RFFTBuffer{T}, frames::AbstractArray{<:Real,4}) where {T<:Complex}
dataFD = rfft(frames, 1)
result = isnothing(buffer.frequencyMask) ? dataFD : dataFD[buffer.frequencyMask, :, :, :]
return push!(buffer.target, result)
end

sinks!(buffer::RFFTBuffer, sinks::Vector{SinkBuffer}) = sinks!(buffer.target, sinks)
Loading
Loading