11#include < G3Logging.h>
22#include < dataio.h>
3- #include " compression .h"
3+ #include " streams .h"
44
55#include < filesystem>
66
77#include < sys/types.h>
88#include < sys/socket.h>
99#include < netinet/in.h>
1010#include < netdb.h>
11- #include < unistd.h>
1211#include < stdlib.h>
13- #include < fstream>
14- #include < streambuf>
1512
1613
1714static int
@@ -119,76 +116,13 @@ connect_remote(const std::string &path, float timeout)
119116 return fd;
120117}
121118
122- class RemoteInputStreamBuffer : public std ::streambuf {
123- public:
124- RemoteInputStreamBuffer (const std::string &path, float timeout, size_t size)
125- : buffer_(size), bytes_(0 ) {
126- fd_ = connect_remote (path, timeout);
127- setg (buffer_.data (), buffer_.data (), buffer_.data ());
128- }
129-
130- ~RemoteInputStreamBuffer () {
131- close (fd_);
132- }
133-
134- int fd () { return fd_; }
135-
136- protected:
137- int_type underflow () {
138- if (gptr () < egptr ())
139- return traits_type::to_int_type (*gptr ());
140-
141- ssize_t n = read (fd_, buffer_.data (), buffer_.size ());
142- if (n <= 0 )
143- return traits_type::eof ();
144- setg (buffer_.data (), buffer_.data (), buffer_.data () + n);
145- return traits_type::to_int_type (*gptr ());
146- }
147-
148- std::streamsize xsgetn (char * s, std::streamsize n) {
149- std::streamsize n_read = 0 ;
150- while (n_read < n) {
151- if (gptr () == egptr ()) {
152- if (underflow () == traits_type::eof ())
153- break ;
154- }
155-
156- std::streamsize remaining = n - n_read;
157- std::streamsize available = egptr () - gptr ();
158- std::streamsize to_read = std::min (remaining, available);
159-
160- std::memcpy (s + n_read, gptr (), to_read);
161- gbump (to_read);
162- n_read += to_read;
163- }
164- bytes_ += n_read;
165- return n_read;
166- }
167-
168- std::streampos seekoff (std::streamoff off, std::ios_base::seekdir way,
169- std::ios_base::openmode mode) {
170- // short-circuit for tellg
171- if ((mode & std::ios_base::in) && off == 0 && way == std::ios_base::cur)
172- return bytes_;
173- log_fatal (" Seek not implemented for remote stream" );
174- }
175-
176- std::streampos seekpos (std::streampos pos, std::ios_base::openmode mode) {
177- log_fatal (" Seek not implemented for remote stream" );
178- }
179-
180- private:
181- int fd_;
182- std::vector<char > buffer_;
183- size_t bytes_;
184- };
185-
186119
187120enum Codec {
188121 NONE = 0 ,
189122 GZ = 1 ,
190123 BZIP2 = 2 ,
191124 LZMA = 3 ,
125+ REMOTE = 4 ,
192126};
193127
194128static bool
@@ -234,6 +168,9 @@ get_codec(const std::string &path, const std::string &ext=".g3")
234168static Codec
235169check_input_path (const std::string &path, const std::string &ext)
236170{
171+ if (path.find (" tcp://" ) == 0 )
172+ return REMOTE;
173+
237174 std::filesystem::path fpath (path);
238175 if (!std::filesystem::exists (fpath) ||
239176 !std::filesystem::is_regular_file (fpath))
@@ -260,107 +197,6 @@ check_output_path(const std::string &path, const std::string &ext)
260197 return get_codec (path, ext);
261198}
262199
263- class InputFileStreamCounter : public std ::streambuf {
264- public:
265- InputFileStreamCounter (const std::string& path, size_t size)
266- : buffer_(size), bytes_(0 ) {
267- file_.open (path, std::ios::binary);
268- if (!file_.is_open ())
269- log_fatal (" Error opening file %s" , path.c_str ());
270- file_.rdbuf ()->pubsetbuf (buffer_.data (), buffer_.size ());
271- }
272-
273- protected:
274- int_type underflow () {
275- return file_.rdbuf ()->sgetc ();
276- }
277-
278- std::streamsize xsgetn (char * s, std::streamsize n) {
279- std::streamsize nget = file_.rdbuf ()->sgetn (s, n);
280- if (nget > 0 )
281- bytes_ += nget;
282- return nget;
283- }
284-
285- std::streampos seekoff (std::streamoff off, std::ios_base::seekdir way,
286- std::ios_base::openmode mode) {
287- if (!(mode & std::ios_base::in))
288- log_fatal (" Seek not implemented for output stream" );
289- // short-circuit for tellg
290- if (off == 0 && way == std::ios_base::cur)
291- return bytes_;
292- std::streampos n = file_.rdbuf ()->pubseekoff (off, way, mode);;
293- if (n != std::streampos (std::streamoff (-1 )))
294- bytes_ = n;
295- return n;
296- }
297-
298- std::streampos seekpos (std::streampos pos, std::ios_base::openmode mode) {
299- if (!(mode & std::ios_base::in))
300- log_fatal (" Seek not implemented for output stream" );
301- std::streampos n = file_.rdbuf ()->pubseekpos (pos, mode);;
302- if (n != std::streampos (std::streamoff (-1 )))
303- bytes_ = n;
304- return n;
305- }
306-
307- private:
308- std::ifstream file_;
309- std::vector<char > buffer_;
310- size_t bytes_;
311- };
312-
313- class OutputFileStreamCounter : public std ::streambuf {
314- public:
315- OutputFileStreamCounter (const std::string& path, size_t size, bool append)
316- : buffer_(size), bytes_(0 ) {
317- std::ios_base::openmode mode = std::ios::binary;
318- if (append)
319- mode |= std::ios::app;
320- file_.open (path, mode);
321- if (!file_.is_open ())
322- log_fatal (" Error opening file %s" , path.c_str ());
323- file_.rdbuf ()->pubsetbuf (buffer_.data (), buffer_.size ());
324- }
325-
326- protected:
327- int_type overflow (int_type c) {
328- if (file_.rdbuf ()->sputc (c) != traits_type::eof ()) {
329- bytes_++;
330- return c;
331- }
332- return traits_type::eof ();
333- }
334-
335- std::streamsize xsputn (const char * s, std::streamsize n) {
336- std::streamsize nput = file_.rdbuf ()->sputn (s, n);
337- if (nput > 0 )
338- bytes_ += nput;
339- return nput;
340- }
341-
342- int sync () {
343- return file_.rdbuf ()->pubsync ();
344- }
345-
346- std::streampos seekoff (std::streamoff off, std::ios_base::seekdir way,
347- std::ios_base::openmode mode) {
348- // short-circuit for tellp
349- if ((mode & std::ios_base::out) && off == 0 && way == std::ios_base::cur)
350- return bytes_;
351- log_fatal (" Seek not implemented for output stream" );
352- }
353-
354- std::streampos seekpos (std::streampos pos, std::ios_base::openmode mode) {
355- log_fatal (" Seek not implemented for output stream" );
356- }
357-
358- private:
359- std::ofstream file_;
360- std::vector<char > buffer_;
361- size_t bytes_;
362- };
363-
364200
365201static void
366202reset_stream (std::ios &stream)
@@ -395,14 +231,14 @@ g3_istream_from_path(std::istream &stream, const std::string &path, float timeou
395231{
396232 reset_stream (stream);
397233
398- // Figure out what kind of ultimate data source this is
399- if (path.find (" tcp://" ) == 0 ) {
400- stream.rdbuf (new RemoteInputStreamBuffer (path, timeout, buffersize));
401- return ;
402- }
234+ int fd = -1 ;
403235
404- // Simple file case
236+ // Figure out what kind of ultimate data source this is
405237 switch (check_input_path (path, ext)) {
238+ case REMOTE:
239+ fd = connect_remote (path, timeout);
240+ stream.rdbuf (new RemoteInputStreamBuffer (fd, buffersize));
241+ break ;
406242#ifdef ZLIB_FOUND
407243 case GZ:
408244 stream.rdbuf (new GZipDecoder (path, buffersize));
0 commit comments