22
33#if USE_HDFS
44#include " HDFSCommon.h"
5+ #include " HDFSErrorWrapper.h"
56#include < Common/Scheduler/ResourceGuard.h>
67#include < IO/Progress.h>
78#include < Common/Throttler.h>
@@ -28,16 +29,16 @@ namespace ErrorCodes
2829 extern const int SEEK_POSITION_OUT_OF_BOUND;
2930 extern const int LOGICAL_ERROR;
3031 extern const int UNKNOWN_FILE_SIZE;
32+ extern const int HDFS_ERROR;
3133}
3234
3335
34- struct ReadBufferFromHDFS ::ReadBufferFromHDFSImpl : public BufferWithOwnMemory<SeekableReadBuffer>, public WithFileSize
36+ struct ReadBufferFromHDFS ::ReadBufferFromHDFSImpl : public BufferWithOwnMemory<SeekableReadBuffer>, public WithFileSize, public HDFSErrorWrapper
3537{
3638 String hdfs_uri;
3739 String hdfs_file_path;
3840
3941 hdfsFile fin;
40- HDFSBuilderWrapper builder;
4142 HDFSFSPtr fs;
4243 ReadSettings read_settings;
4344
@@ -54,14 +55,14 @@ struct ReadBufferFromHDFS::ReadBufferFromHDFSImpl : public BufferWithOwnMemory<S
5455 bool use_external_buffer_,
5556 std::optional<size_t > file_size_)
5657 : BufferWithOwnMemory<SeekableReadBuffer>(use_external_buffer_ ? 0 : read_settings_.remote_fs_buffer_size)
58+ , HDFSErrorWrapper(hdfs_uri_, config_)
5759 , hdfs_uri(hdfs_uri_)
5860 , hdfs_file_path(hdfs_file_path_)
5961 , read_settings(read_settings_)
6062 , read_until_position(read_until_position_)
6163 {
62- builder = createHDFSBuilder (hdfs_uri_, config_);
6364 fs = createHDFSFS (builder.get ());
64- fin = hdfsOpenFile ( fs.get (), hdfs_file_path.c_str (), O_RDONLY, 0 , 0 , 0 );
65+ fin = wrapErr<hdfsFile>(hdfsOpenFile, fs.get (), hdfs_file_path.c_str (), O_RDONLY, 0 , 0 , 0 );
6566
6667 if (fin == nullptr )
6768 throw Exception (ErrorCodes::CANNOT_OPEN_FILE,
@@ -74,7 +75,7 @@ struct ReadBufferFromHDFS::ReadBufferFromHDFSImpl : public BufferWithOwnMemory<S
7475 }
7576 else
7677 {
77- auto * file_info = hdfsGetPathInfo ( fs.get (), hdfs_file_path.c_str ());
78+ auto * file_info = wrapErr<hdfsFileInfo *>(hdfsGetPathInfo, fs.get (), hdfs_file_path.c_str ());
7879 if (!file_info)
7980 {
8081 hdfsCloseFile (fs.get (), fin);
@@ -123,7 +124,7 @@ struct ReadBufferFromHDFS::ReadBufferFromHDFSImpl : public BufferWithOwnMemory<S
123124 int bytes_read;
124125 try
125126 {
126- bytes_read = hdfsRead ( fs.get (), fin, internal_buffer.begin (), safe_cast<int >(num_bytes_to_read));
127+ bytes_read = wrapErr<tSize>(hdfsRead, fs.get (), fin, internal_buffer.begin (), safe_cast<int >(num_bytes_to_read));
127128 }
128129 catch (...)
129130 {
@@ -160,7 +161,7 @@ struct ReadBufferFromHDFS::ReadBufferFromHDFSImpl : public BufferWithOwnMemory<S
160161 if (whence != SEEK_SET)
161162 throw Exception (ErrorCodes::LOGICAL_ERROR, " Only SEEK_SET is supported" );
162163
163- int seek_status = hdfsSeek ( fs.get (), fin, file_offset_);
164+ int seek_status = wrapErr< int >(hdfsSeek, fs.get (), fin, file_offset_);
164165 if (seek_status != 0 )
165166 throw Exception (ErrorCodes::CANNOT_SEEK_THROUGH_FILE, " Fail to seek HDFS file: {}, error: {}" , hdfs_uri, std::string (hdfsGetLastError ()));
166167 file_offset = file_offset_;
@@ -172,6 +173,29 @@ struct ReadBufferFromHDFS::ReadBufferFromHDFSImpl : public BufferWithOwnMemory<S
172173 {
173174 return file_offset;
174175 }
176+
177+ size_t pread (char * buffer, size_t size, size_t offset)
178+ {
179+ ResourceGuard rlock (read_settings.resource_link , size);
180+ auto bytes_read = wrapErr<tSize>(hdfsPread, fs.get (), fin, buffer, safe_cast<int >(size), offset);
181+ rlock.unlock ();
182+
183+ if (bytes_read < 0 )
184+ {
185+ throw Exception (
186+ ErrorCodes::HDFS_ERROR,
187+ " Fail to read from HDFS: {}, file path: {}. Error: {}" ,
188+ hdfs_uri,
189+ hdfs_file_path,
190+ std::string (hdfsGetLastError ()));
191+ }
192+ if (bytes_read && read_settings.remote_throttler )
193+ {
194+ read_settings.remote_throttler ->add (
195+ bytes_read, ProfileEvents::RemoteReadThrottlerBytes, ProfileEvents::RemoteReadThrottlerSleepMicroseconds);
196+ }
197+ return bytes_read;
198+ }
175199};
176200
177201ReadBufferFromHDFS::ReadBufferFromHDFS (
0 commit comments