Skip to content
Merged
Show file tree
Hide file tree
Changes from 31 commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
6bbd382
Remove dependency on Boost.IOStreams
arahlin Mar 21, 2025
f124112
new implementation
arahlin Mar 22, 2025
e3f9a67
Add gzip implementation
arahlin Mar 22, 2025
d308843
fix socket buffer implementation
arahlin Mar 22, 2025
a5e3956
Add bzip2 compression
arahlin Mar 23, 2025
bcbadb2
Add LZMA compression
arahlin Mar 23, 2025
2e1220f
Use templates to avoid code duplication
arahlin Mar 23, 2025
506f920
correct package
arahlin Mar 24, 2025
ad59524
Fix input stream counter
arahlin Mar 25, 2025
7e4359b
Handle seek consistently for compressed and remote input streams
arahlin Mar 25, 2025
f86c7a7
Use the output stream counter unconditionally, smarter counting with …
arahlin Mar 25, 2025
dfe851e
Check for valid file extensions
arahlin Mar 25, 2025
5c7613b
expose timeout and read buffer size parameters for arc reader
arahlin Mar 25, 2025
4456ce6
Correct extension
arahlin Mar 25, 2025
333b7a5
Use tellp
arahlin Mar 26, 2025
6bb913d
tweak diff
arahlin Mar 26, 2025
d6fdb7f
Correct type comparison
arahlin Mar 26, 2025
e0e0b2f
Simplify pword storage
arahlin Mar 26, 2025
9ee8a02
pass by reference
arahlin Mar 27, 2025
e8c76e1
Error handling
arahlin Mar 27, 2025
3704d9c
Separate counter from encoders
arahlin Mar 28, 2025
7dbe94d
Move compression classes to separate file
arahlin Mar 28, 2025
e4caa7d
Common close function
arahlin Mar 28, 2025
78dc525
Consistent filename checks
arahlin Mar 28, 2025
2bebd75
templates
arahlin Mar 28, 2025
6eb70ff
simplify
arahlin Mar 28, 2025
6cc52fc
Use registered callbacks and default destructors
arahlin Mar 28, 2025
937daa4
Merge branch 'master' into no_boost_iostreams
arahlin Mar 28, 2025
d4e524c
correct type
arahlin Mar 28, 2025
df94ffd
rearrange for readable diff
arahlin Mar 28, 2025
a6dea50
The registered callback is called after the object is already partially
arahlin Mar 28, 2025
76238c4
Addressing Chris' comments
arahlin Mar 29, 2025
960c043
Better file checks
arahlin Mar 29, 2025
38c1d68
Use xalloc to claim a valid pword index
arahlin Mar 29, 2025
cf03625
Missing includes
arahlin Mar 29, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/workflows/cmake.yml
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ jobs:
sudo apt update
sudo apt install gcc-14 g++-14 -y
sudo update-alternatives --install /usr/bin/gcc gcc /usr/bin/gcc-14 14 --slave /usr/bin/g++ g++ /usr/bin/g++-14
sudo apt-get install libbz2-dev libboost-all-dev libflac-dev libnetcdf-dev libpython3-dev
sudo apt-get install libz-dev libbz2-dev liblzma-dev libboost-python-dev libflac-dev libnetcdf-dev libpython3-dev
sudo pip install numpy scipy matplotlib astropy healpy sphinx --break-system-packages

- name: Setup python on macOS
Expand All @@ -47,7 +47,7 @@ jobs:
- name: Install macOS Dependencies
if: runner.environment == 'github-hosted' && runner.os == 'macOS'
run: |
brew install --overwrite bzip2 boost boost-python3 flac netcdf
brew install --overwrite bzip2 xz boost-python3 flac netcdf
python3.13 -m pip install numpy scipy matplotlib astropy healpy sphinx --break-system-packages

- name: Create Build Environment
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/wheels.yml
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ jobs:
uses: pypa/cibuildwheel@v2.23.0
env:
CIBW_BUILD: ${{ matrix.build }}
CIBW_BEFORE_ALL_LINUX: yum install -y bzip2-devel netcdf-devel
CIBW_BEFORE_ALL_LINUX: yum install -y zlib-devel bzip2-devel xz-devel netcdf-devel
CIBW_BEFORE_ALL_MACOS: brew install netcdf
CIBW_BEFORE_BUILD_MACOS: >
ln -s $(dirname $(readlink -f $(which python3)))/python3-config $(dirname $(which python3))/python3-config &&
Expand Down
2 changes: 1 addition & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ set(Boost_USE_STATIC_LIBS OFF)
set(Boost_USE_MULTITHREADED ON)
set(Boost_USE_STATIC_RUNTIME OFF)
set(Boost_PYTHON_VERSION ${Python_VERSION})
find_package(Boost COMPONENTS iostreams python REQUIRED)
find_package(Boost COMPONENTS python REQUIRED)

# Interface library for flags and library dependencies
add_library(spt3g INTERFACE)
Expand Down
21 changes: 17 additions & 4 deletions core/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ add_spt3g_library(core SHARED
src/G3Data.cxx src/G3Vector.cxx src/G3Map.cxx src/G3Timestream.cxx
src/G3Timesample.cxx
src/G3TriggeredBuilder.cxx src/G3MultiFileWriter.cxx src/dataio.cxx
src/crc32.c ${CORE_EXTRA_SRCS}
src/compression.cxx src/crc32.c ${CORE_EXTRA_SRCS}
src/G3NetworkSender.cxx src/G3SyslogLogger.cxx
src/G3PipelineInfo.cxx src/G3Quat.cxx src/G3Units.cxx
src/int_storage.cxx src/pybindings.cxx
Expand Down Expand Up @@ -40,8 +40,17 @@ if(FLAC_FOUND)
target_link_libraries(core PRIVATE FLAC::FLAC)
endif()

# Link Boost IOStreams
target_link_libraries(core PRIVATE Boost::iostreams)
# Link against Z library
if(NOT DEFINED WITH_ZLIB)
set(WITH_ZLIB TRUE CACHE BOOL "Enable gzip fie compression")
endif()
if(WITH_ZLIB)
find_package(ZLIB)
endif()
if(ZLIB_FOUND)
target_compile_definitions(core PRIVATE -DZLIB_FOUND)
target_link_libraries(core PRIVATE ZLIB::ZLIB)
endif()

# Link against BZIP2 library
if(NOT DEFINED WITH_BZIP2)
Expand All @@ -52,6 +61,7 @@ if(WITH_BZIP2)
endif()
if(BZIP2_FOUND)
target_compile_definitions(core PRIVATE -DBZIP2_FOUND)
target_link_libraries(core PRIVATE BZip2::BZip2)
endif()

if(NOT DEFINED WITH_LZMA)
Expand All @@ -62,6 +72,7 @@ if(WITH_LZMA)
endif()
if(LIBLZMA_FOUND)
target_compile_definitions(core PRIVATE -DLZMA_FOUND)
target_link_libraries(core PRIVATE LibLZMA::LibLZMA)
endif()

link_python_dir()
Expand All @@ -88,7 +99,9 @@ add_spt3g_test(cuts)
add_spt3g_test(fileio)
add_spt3g_test(multifileio)
add_spt3g_test(splitfileio)
add_spt3g_test(compressedfileio)
if(ZLIB_FOUND)
add_spt3g_test(compressedfileio)
endif()
if(BZIP2_FOUND)
add_spt3g_test(bz2fileio)
endif()
Expand Down
8 changes: 4 additions & 4 deletions core/include/core/G3Reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,10 @@

class G3Reader : public G3Module {
public:
G3Reader(std::string filename, int n_frames_to_read = -1,
G3Reader(const std::string &filename, int n_frames_to_read = -1,
float timeout = -1., bool track_filename = false,
size_t buffersize = 1024*1024);
G3Reader(std::vector<std::string> filenames, int n_frames_to_read = -1,
G3Reader(const std::vector<std::string> &filenames, int n_frames_to_read = -1,
float timeout = -1., bool track_filename = false,
size_t buffersize = 1024*1024);

Expand All @@ -21,11 +21,11 @@ class G3Reader : public G3Module {
off_t Tell();

private:
void StartFile(std::string path);
void StartFile(const std::string &path);
bool prefix_file_;
std::string cur_file_;
std::deque<std::string> filename_;
std::shared_ptr<std::istream> stream_;
std::istream stream_;
int n_frames_to_read_;
int n_frames_read_;
int n_frames_cur_;
Expand Down
5 changes: 3 additions & 2 deletions core/include/core/G3Writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,16 @@
class G3Writer : public G3Module {
public:
G3Writer(std::string filename,
std::vector<G3Frame::FrameType> streams={}, bool append=false);
std::vector<G3Frame::FrameType> streams={}, bool append=false,
size_t buffersize=1024*1024);
// Writes to file <filename> all frames with types in <streams>.
// If <streams> is empty (default), writes all frames.

void Process(G3FramePtr frame, std::deque<G3FramePtr> &out);
void Flush();
private:
std::string filename_;
std::shared_ptr<std::ostream> stream_;
std::ostream stream_;
std::vector<G3Frame::FrameType> streams_;

SET_LOGGER("G3Writer");
Expand Down
56 changes: 13 additions & 43 deletions core/include/core/dataio.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,14 @@
#define _G3_DATAIO_H

#include <string>
#include <memory>
#include <iostream>

/**
* Configure a filtering stream for G3Frame decompression from a local or remote
* file source.
*
* @param stream A reference to the input stream to be configured by this
* function.
* @param path A valid filename on disk, or a TCP socket address. If a
* filename, the compression scheme is determined from the file
* extension. Supported compression schemes are gzip or bzip2.
Expand All @@ -19,11 +20,11 @@
* read until EOF.
* @param timeout Timeout in seconds for socket connections.
* @param buffersize Advisory buffer size in bytes for aggregating reads
* @return stream The input stream configured by this function.
* @param ext Required file extension (excluding compression suffixes)
*/
std::shared_ptr<std::istream>
g3_istream_from_path(const std::string &path, float timeout=-1.0,
size_t buffersize=1024*1024);
void
g3_istream_from_path(std::istream &stream, const std::string &path,
float timeout=-1.0, size_t buffersize=1024*1024, const std::string &ext=".g3");

/**
* Return the file descriptor handle for socket connections.
Expand All @@ -32,54 +33,23 @@ g3_istream_from_path(const std::string &path, float timeout=-1.0,
* g3_istream_from_path.
* @return fd The socket file descriptor.
*/
int g3_istream_handle(std::shared_ptr<std::istream> &stream);
int g3_istream_handle(std::istream &stream);

/**
* Configure a filtering stream for G3Frame compression to a local file.
*
* @param stream A reference to the output stream to be configured by this
* function.
* @param path A valid filename on disk. If a filename, the compression
* scheme is determined from the file extension. Supported
* compression schemes are gzip or bzip2.
* @param append If true, append to an existing file on disk. Otherwise,
* Create a new file or overwrite an existing file.
* @param counter If true, add a counter filter to the stream configuration,
* for use by the g3_ostream_count function.
* @return stream The output stream configured by this function.
* @param ext Required file extension (excluding compression suffixes)
*/
std::shared_ptr<std::ostream>
g3_ostream_to_path(const std::string &path, bool append=false, bool counter=false);

/**
* Count the number of bytes written to the output file stream.
*
* @param stream A reference to the output stream, as configured by
* g3_ostream_to_path with the counter argument set to true.
* @return Number of bytes written to disk.
*/
size_t g3_ostream_count(std::shared_ptr<std::ostream> &stream);

/**
* Flush the output file stream.
*
* @param stream A reference to the output stream, as configured by
* g3_ostream_to_path.
*/
void g3_ostream_flush(std::shared_ptr<std::ostream> &stream);

/**
* Check that the input filename is a valid filename on disk.
*
* @throws runtime_error If filename is invalid or missing.
*/
void g3_check_input_path(const std::string &path);

/**
* Check that the output filename is a valid filename on disk.
*
* @throws runtime_error If filename is empty, or its parent directory is
* missing.
*/
void g3_check_output_path(const std::string &path);
void
g3_ostream_to_path(std::ostream &stream, const std::string &path, bool append=false,
size_t buffersize=1024*1024, const std::string &ext=".g3");

#endif

2 changes: 0 additions & 2 deletions core/src/G3Frame.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -381,12 +381,10 @@ void G3Frame::blob_encode(struct blob_container &blob)
item_os.flush();
}

template <> void G3Frame::loads(std::shared_ptr<std::istream> &is) { loads(*is); }
template void G3Frame::loads(G3BufferInputStream &);
template void G3Frame::loads(std::istream &);
template void G3Frame::loads(std::istringstream &);

template <> void G3Frame::saves(std::shared_ptr<std::ostream> &os) const { saves(*os); }
template void G3Frame::saves(G3BufferOutputStream &) const;
template void G3Frame::saves(std::ostream &) const;
template void G3Frame::saves(std::ostringstream &) const;
Expand Down
28 changes: 14 additions & 14 deletions core/src/G3MultiFileWriter.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@ class G3MultiFileWriter : public G3Module {
public:
G3MultiFileWriter(boost::python::object filename,
size_t size_limit,
boost::python::object divide_on = boost::python::object());
boost::python::object divide_on = boost::python::object(),
size_t buffersize=1024*1024);

void Process(G3FramePtr frame, std::deque<G3FramePtr> &out);
std::string CurrentFile() { return current_filename_; }
private:
Expand All @@ -18,26 +20,26 @@ class G3MultiFileWriter : public G3Module {
boost::python::object filename_callback_;
std::string current_filename_;
size_t size_limit_;
size_t buffersize_;

std::vector<G3Frame::FrameType> always_break_on_;
boost::python::object newfile_callback_;

std::shared_ptr<std::ostream> stream_;
std::ostream stream_;
std::vector<G3FramePtr> metadata_cache_;
int seqno;

SET_LOGGER("G3MultiFileWriter");
};

G3MultiFileWriter::G3MultiFileWriter(boost::python::object filename,
size_t size_limit, boost::python::object divide_on)
: size_limit_(size_limit), seqno(0)
size_t size_limit, boost::python::object divide_on, size_t buffersize)
: size_limit_(size_limit), buffersize_(buffersize), stream_(nullptr), seqno(0)
{
boost::python::extract<std::string> fstr(filename);

if (fstr.check()) {
filename_ = fstr();
g3_check_output_path(filename_);

if (snprintf(NULL, 0, filename_.c_str(), 0) < 0)
log_fatal("Cannot format filename. Should be "
Expand Down Expand Up @@ -77,10 +79,10 @@ G3MultiFileWriter::CheckNewFile(G3FramePtr frame)
{
// If we are already saving data, check file size. Otherwise, open
// a new file unconditionally.
if (stream_ != nullptr) {
if (stream_) {
bool start_new_ = false;

if (g3_ostream_count(stream_) > size_limit_)
if ((size_t)stream_.tellp() > size_limit_)
start_new_ = true;

if (newfile_callback_.ptr() != Py_None &&
Expand All @@ -95,7 +97,7 @@ G3MultiFileWriter::CheckNewFile(G3FramePtr frame)
return false;
}

stream_.reset();
stream_.flush();

std::string filename;
if (filename_ != "") {
Expand All @@ -111,12 +113,10 @@ G3MultiFileWriter::CheckNewFile(G3FramePtr frame)
} else {
filename = boost::python::extract<std::string>(
filename_callback_(frame, seqno++))();

g3_check_output_path(filename);
}

current_filename_ = filename;
stream_ = g3_ostream_to_path(filename, false, true);
g3_ostream_to_path(stream_, filename, false, buffersize_);

for (auto i = metadata_cache_.begin(); i != metadata_cache_.end(); i++)
(*i)->saves(stream_);
Expand All @@ -129,7 +129,7 @@ void G3MultiFileWriter::Process(G3FramePtr frame, std::deque<G3FramePtr> &out)
bool new_file(false), meta_cached(false);

if (frame->type == G3Frame::EndProcessing) {
stream_.reset();
stream_.flush();
goto done;
}

Expand Down Expand Up @@ -186,8 +186,8 @@ PYBINDINGS("core") {
"python callable as divide_on. This callable will be passed each "
"frame in turn. If it returns True (or something with positive "
"truth-value), a new file will be started at that frame.",
init<object, size_t, optional<object> >((arg("filename"),
arg("size_limit"), arg("divide_on")=object())))
init<object, size_t, optional<object, size_t> >((arg("filename"),
arg("size_limit"), arg("divide_on")=object(), arg("buffersize")=1024*1024)))
.def_readonly("current_file", &G3MultiFileWriter::CurrentFile)
.def_readonly("__g3module__", true)
;
Expand Down
Loading
Loading