Skip to content

Commit 7364348

Browse files
committed
"move from recordio repo to paddle"
1 parent 7016979 commit 7364348

File tree

12 files changed

+231
-50
lines changed

12 files changed

+231
-50
lines changed

CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -144,6 +144,7 @@ include(external/eigen) # download eigen3
144144
include(external/pybind11) # download pybind11
145145
include(external/cares)
146146
include(external/grpc)
147+
include(external/snappy) # download snappy
147148

148149
include(cudnn) # set cudnn libraries, must before configure
149150
include(cupti)

paddle/fluid/recordio/chunk.cc

Lines changed: 16 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ namespace paddle {
2626
namespace recordio {
2727

2828
void Chunk::Add(const char* record, size_t length) {
29-
records_.emplace_after(std::move(s));
29+
records_.emplace_after(std::string(record, length));
3030
num_bytes_ += s.size() * sizeof(char);
3131
}
3232

@@ -42,13 +42,16 @@ bool Chunk::Dump(Stream* fo, Compressor ct) {
4242
os.write(record.data(), static_cast<std::streamsize>(record.size()));
4343
}
4444

45-
std::unique_ptr<char[]> buffer(new char[kDefaultMaxChunkSize]);
45+
std::unique_ptr<char[]> buffer(new char[num_bytes_]);
4646
size_t compressed =
4747
CompressData(os.str().c_str(), num_bytes_, ct, buffer.get());
4848
uint32_t checksum = Crc32(buffer.get(), compressed);
4949
Header hdr(records_.size(), checksum, ct, static_cast<uint32_t>(compressed));
5050
hdr.Write(fo);
5151
fo.Write(buffer.get(), compressed);
52+
// clear the content
53+
records_.clear();
54+
num_bytes_ = 0;
5255
return true;
5356
}
5457

@@ -57,14 +60,18 @@ void Chunk::Parse(Stream* fi, size_t offset) {
5760
Header hdr;
5861
hdr.Parse(fi);
5962

60-
std::unique_ptr<char[]> buffer(new char[kDefaultMaxChunkSize]);
61-
fi->Read(buffer.get(), static_cast<size_t>(hdr.CompressSize()));
62-
uint32_t deflated_size =
63-
DeflateData(buffer.get(), hdr.CompressSize(), hdr.CompressType());
64-
std::istringstream deflated(std::string(buffer.get(), deflated_size));
63+
size_t size = static_cast<size_t>(hdr.CompressSize());
64+
std::unique_ptr<char[]> buffer(new char[size]);
65+
fi->Read(buffer.get(), size);
66+
size_t deflated_size = 0;
67+
snappy::GetUncompressedLength(buffer.get(), size, &deflated_size);
68+
std::unique_ptr<char[]> deflated_buffer(new char[deflated_size]);
69+
DeflateData(buffer.get(), size, hdr.CompressType(), deflated_buffer.get());
70+
std::istringstream deflated(
71+
std::string(deflated_buffer.get(), deflated_size));
6572
for (size_t i = 0; i < hdr.NumRecords(); ++i) {
66-
uint32_t rs;
67-
deflated >> rs;
73+
size_t rs;
74+
deflated.read(&rs, sizeof(size_t));
6875
std::string record(rs, '\0');
6976
deflated.read(&record[0], rs);
7077
records_.emplace_back(record);

paddle/fluid/recordio/chunk.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ namespace recordio {
2525
// A Chunk contains the Header and optionally compressed records.
2626
class Chunk {
2727
public:
28-
Chunk() {}
28+
Chunk() : num_bytes_(0) {}
2929
void Add(const char* record, size_t size);
3030
// dump the chunk into w, and clears the chunk and makes it ready for
3131
// the next add invocation.

paddle/fluid/recordio/chunk_test.cc

Lines changed: 33 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,4 +20,36 @@
2020

2121
using namespace paddle::recordio;
2222

23-
TEST(Chunk, SaveLoad) {}
23+
TEST(Chunk, SaveLoad) {
24+
Chunk ch;
25+
ch.Add("12345", 6);
26+
ch.Add("123", 4);
27+
{
28+
Stream* fs = Stream::Open("/tmp/record_11", "w");
29+
ch.Dump(fs, Compressor::kNoCompress);
30+
EXPECT_EQ(ch.NumBytes(), 0);
31+
}
32+
{
33+
Stream* fs = Stream::Open("/tmp/record_11", "r");
34+
ch.Parse(fs, 0);
35+
EXPECT_EQ(ch.NumBytes(), 10);
36+
}
37+
}
38+
39+
TEST(Chunk, Compressor) {
40+
Chunk ch;
41+
ch.Add("12345", 6);
42+
ch.Add("123", 4);
43+
ch.Add("123", 4);
44+
ch.Add("123", 4);
45+
{
46+
Stream* fs = Stream::Open("/tmp/record_12", "w");
47+
ch.Dump(fs, Compressor::kSnappy);
48+
EXPECT_EQ(ch.NumBytes(), 0);
49+
}
50+
{
51+
Stream* fs = Stream::Open("/tmp/record_12", "r");
52+
ch.Parse(fs, 0);
53+
EXPECT_EQ(ch.NumBytes(), 10);
54+
}
55+
}

paddle/fluid/recordio/header.cc

Lines changed: 11 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -27,27 +27,19 @@ Header::Header(uint32_t num, uint32_t sum, Compressor c, uint32_t cs)
2727
: num_records_(num), checksum_(sum), compressor_(c), compress_size_(cs) {}
2828

2929
void Header::Parse(Stream* iss) {
30-
iss.Read(reinterpret_cast<char*>(&num_records_), sizeof(uint32_t));
31-
iss.Read(reinterpret_cast<char*>(&checksum_), sizeof(uint32_t));
32-
iss.Read(reinterpret_cast<char*>(&compressor_), sizeof(uint32_t));
33-
iss.Read(reinterpret_cast<char*>(&compress_size_), sizeof(uint32_t));
30+
iss->Read(reinterpret_cast<char*>(&num_records_), sizeof(uint32_t));
31+
iss->Read(reinterpret_cast<char*>(&checksum_), sizeof(uint32_t));
32+
iss->Read(reinterpret_cast<char*>(&compressor_), sizeof(uint32_t));
33+
iss->Read(reinterpret_cast<char*>(&compress_size_), sizeof(uint32_t));
3434
}
3535

3636
void Header::Write(Stream* os) {
37-
os.Write(reinterpret_cast<char*>(&num_records_), sizeof(uint32_t));
38-
os.Write(reinterpret_cast<char*>(&checksum_), sizeof(uint32_t));
39-
os.Write(reinterpret_cast<char*>(&compressor_), sizeof(uint32_t));
40-
os.Write(reinterpret_cast<char*>(&compress_size_), sizeof(uint32_t));
37+
os->Write(reinterpret_cast<char*>(&num_records_), sizeof(uint32_t));
38+
os->Write(reinterpret_cast<char*>(&checksum_), sizeof(uint32_t));
39+
os->Write(reinterpret_cast<char*>(&compressor_), sizeof(uint32_t));
40+
os->Write(reinterpret_cast<char*>(&compress_size_), sizeof(uint32_t));
4141
}
4242

43-
// std::ostream& operator << (std::ostream& os, Header h) {
44-
// os << h.num_records_
45-
// << h.checksum_
46-
// << static_cast<uint32_t>(h.compressor_)
47-
// << h.compress_size_;
48-
// return os;
49-
// }
50-
5143
std::ostream& operator<<(std::ostream& os, Header h) {
5244
os << h.NumRecords() << h.Checksum()
5345
<< static_cast<uint32_t>(h.CompressType()) << h.CompressSize();
@@ -59,3 +51,6 @@ bool operator==(Header l, Header r) {
5951
l.CompressType() == r.CompressType() &&
6052
l.CompressSize() == r.CompressSize();
6153
}
54+
55+
} // namespace recordio
56+
} // namespace paddle

paddle/fluid/recordio/header_test.cc

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -23,11 +23,11 @@ using namespace paddle::recordio;
2323
TEST(Recordio, ChunkHead) {
2424
Header hdr(0, 1, Compressor::kGzip, 3);
2525
Stream* oss = Stream::Open("/tmp/record_1", "w");
26-
hdr.Write(oss);
26+
hdr->Write(oss);
2727

28-
Stream* iss = Stream::Open("/tmp/record_1", "r");
29-
Header hdr2;
30-
hdr2.Parse(iss);
28+
// Stream* iss = Stream::Open("/tmp/record_1", "r");
29+
// Header hdr2;
30+
// hdr2.Parse(iss);
3131

32-
EXPECT_TRUE(hdr == hdr2);
32+
// EXPECT_TRUE(hdr == hdr2);
3333
}
Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
// Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
#include "paddle/fluid/recordio/range_scanner.h"
16+
17+
namespace paddle {
18+
namespace recordio {
19+
20+
Index Index::ChunkIndex(int i) { Index idx; }
21+
22+
RangeScanner::RangeScanner(std::istream is, Index idx, int start, int len)
23+
: stream_(is.rdbuf()), index_(idx) {
24+
if (start < 0) {
25+
start = 0;
26+
}
27+
if (len < 0 || start + len >= idx.NumRecords()) {
28+
len = idx.NumRecords() - start;
29+
}
30+
31+
start_ = start;
32+
end_ = start + len;
33+
cur_ = start - 1;
34+
chunk_index_ = -1;
35+
// chunk_->reset(new Chunk());
36+
}
37+
38+
bool RangeScanner::Scan() {}
39+
40+
const std::string RangeScanner::Record() {
41+
// int i = index_.Locate(cur_);
42+
// return chunk_->Record(i);
43+
}
44+
45+
} // namespace recordio
46+
} // namespace paddle

paddle/fluid/recordio/range_scanner.h

Lines changed: 22 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -14,16 +14,23 @@
1414

1515
#pragma once
1616

17-
#include <fstream>
18-
#include <memory>
19-
#include <sstream>
20-
#include <string>
21-
#include <utility>
22-
#include <vector>
17+
#include "paddle/fluid/recordio/io.h"
2318

19+
namespace paddle {
20+
namespace recordio {
21+
22+
// Index consists offsets and sizes of the consequetive chunks in a RecordIO
23+
// file.
24+
//
25+
// Index supports Gob. Every field in the Index needs to be exported
26+
// for the correct encoding and decoding using Gob.
2427
class Index {
2528
public:
2629
int NumRecords() { return num_records_; }
30+
// NumChunks returns the total number of chunks in a RecordIO file.
31+
int NumChunks() { return chunk_lens_.size(); }
32+
// ChunkIndex return the Index of i-th Chunk.
33+
int ChunkIndex(int i);
2734

2835
// Locate returns the index of chunk that contains the given record,
2936
// and the record index within the chunk. It returns (-1, -1) if the
@@ -44,9 +51,13 @@ class Index {
4451
}
4552

4653
private:
54+
// the offset of each chunk in a file.
4755
std::vector<int64_t> chunk_offsets_;
56+
// the length of each chunk in a file.
4857
std::vector<uint32_t> chunk_lens_;
58+
// the numer of all records in a file.
4959
int num_records_;
60+
// the number of records in chunks.
5061
std::vector<int> chunk_records_;
5162
};
5263

@@ -56,14 +67,17 @@ class Index {
5667
// beginning. If len < 0, it scans till the end of file.
5768
class RangeScanner {
5869
public:
59-
RangeScanner(std::istream is, Index idx, int start, int end);
70+
RangeScanner(Stream* fi, Index idx, int start, int end);
6071
bool Scan();
6172
const std::string Record();
6273

6374
private:
64-
std::istream stream_;
75+
Stream* fi;
6576
Index index_;
6677
int start_, end_, cur_;
6778
int chunk_index_;
6879
std::unique_ptr<Chunk> chunk_;
6980
};
81+
82+
} // namespace recordio
83+
} // namespace paddle

paddle/fluid/recordio/scanner.cc

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
// Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
#include "paddle/fluid/recordio/chunk.h"
16+
17+
#include <glob.h> // glob
18+
19+
namespace paddle {
20+
namespace recordio {
21+
22+
Scanner::Scanner(const char* paths)
23+
: cur_file_(nullptr), path_idx_(0), end_(false) {
24+
glob_t glob_result;
25+
glob(paths, GLOB_TILDE, NULL, &glob_result);
26+
27+
for (size_t i = 0; i < glob_result.gl_pathc; ++i) {
28+
paths_.emplace_back(std::string(glob_result.gl_pathv[i]));
29+
}
30+
globfree(&glob_result);
31+
}
32+
33+
bool Scanner::Scan() {
34+
if (err_ == -1 || end_ == true) {
35+
return false;
36+
}
37+
if (cur_scanner_ == nullptr) {
38+
if (!NextFile()) {
39+
end_ = true;
40+
return false;
41+
}
42+
if (err_ == -1) {
43+
return false;
44+
}
45+
}
46+
if (!cur_scanner_->Scan()) {
47+
if (err_ == -1) {
48+
return false;
49+
}
50+
}
51+
52+
return true;
53+
}
54+
55+
bool Scanner::NextFile() {}
56+
57+
} // namespace recordio
58+
} // namespace paddle

paddle/fluid/recordio/scanner.h

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -14,12 +14,10 @@
1414

1515
#pragma once
1616

17-
#include <fstream>
18-
#include <memory>
19-
#include <sstream>
20-
#include <string>
21-
#include <utility>
22-
#include <vector>
17+
#include "paddle/fluid/recordio/io.h"
18+
19+
namespace paddle {
20+
namespace recordio {
2321

2422
class RangeScanner;
2523

@@ -30,16 +28,17 @@ class Scanner {
3028
const std::string Record();
3129
bool Scan();
3230
void Close();
33-
34-
private:
3531
bool NextFile();
3632
int Err() { return err_; }
3733

3834
private:
3935
std::vector<std::string> paths_;
40-
FILE* cur_file_;
36+
Stream* cur_file_;
4137
RangeScanner* cur_scanner_;
4238
int path_idx_;
4339
bool end_;
4440
int err_;
4541
};
42+
43+
} // namespace recordio
44+
} // namespace paddle

0 commit comments

Comments
 (0)