Skip to content

Commit fe18341

Browse files
committed
"seperate internal library and exported library"
1 parent 7364348 commit fe18341

File tree

9 files changed

+153
-51
lines changed

9 files changed

+153
-51
lines changed

paddle/fluid/recordio/CMakeLists.txt

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,14 @@
1-
cc_library(header SRCS header.cc)
2-
cc_test(header_test SRCS header_test.cc DEPS header)
1+
# internal library.
32
cc_library(io SRCS io.cc DEPS stringpiece)
43
cc_test(io_test SRCS io_test.cc DEPS io)
4+
cc_library(header SRCS header.cc DEPS io)
5+
cc_test(header_test SRCS header_test.cc DEPS header)
56
cc_library(chunk SRCS chunk.cc DEPS snappy)
7+
cc_test(chunk_test SRCS chunk_test.cc DEPS chunk)
8+
cc_library(range_scanner SRCS range_scanner.cc DEPS io chunk)
9+
cc_test(range_scanner_test SRCS range_scanner_test.cc DEPS range_scanner)
10+
cc_library(scanner SRCS scanner.cc DEPS range_scanner)
11+
cc_test(scanner_test SRCS scanner_test.cc DEPS scanner)
12+
# exported library.
13+
cc_library(recordio SRCS recordio.cc DEPS scanner chunk header)
14+
cc_test(recordio_test SRCS recordio_test.cc DEPS scanner)

paddle/fluid/recordio/chunk.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,9 +32,10 @@ class Chunk {
3232
bool Dump(Stream* fo, Compressor ct);
3333
void Parse(Stream* fi, size_t offset);
3434
size_t NumBytes() { return num_bytes_; }
35+
const std::string Record(int i) { return records_[i]; }
3536

3637
private:
37-
std::forward_list<std::string> records_;
38+
std::forward_list<const std::string> records_;
3839
// sum of record lengths in bytes.
3940
size_t num_bytes_;
4041
DISABLE_COPY_AND_ASSIGN(Chunk);

paddle/fluid/recordio/header_test.cc

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -22,12 +22,18 @@ using namespace paddle::recordio;
2222

2323
TEST(Recordio, ChunkHead) {
2424
Header hdr(0, 1, Compressor::kGzip, 3);
25-
Stream* oss = Stream::Open("/tmp/record_1", "w");
26-
hdr->Write(oss);
25+
{
26+
Stream* oss = Stream::Open("/tmp/record_1", "w");
27+
hdr.Write(oss);
28+
delete oss;
29+
}
2730

28-
// Stream* iss = Stream::Open("/tmp/record_1", "r");
29-
// Header hdr2;
30-
// hdr2.Parse(iss);
31+
Header hdr2;
32+
{
33+
Stream* iss = Stream::Open("/tmp/record_1", "r");
34+
hdr2.Parse(iss);
35+
delete iss;
36+
}
3137

32-
// EXPECT_TRUE(hdr == hdr2);
38+
EXPECT_TRUE(hdr == hdr2);
3339
}

paddle/fluid/recordio/range_scanner.cc

Lines changed: 46 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,37 @@
1717
namespace paddle {
1818
namespace recordio {
1919

20+
void Index::LoadIndex(FileStream* fi) {
21+
int64_t offset = 0;
22+
while (!fi->Eof()) {
23+
Header hdr;
24+
hdr.Parse(fi);
25+
chunk_offsets_.push_back(offset);
26+
chunk_lens_.push_back(hdr.NumRecords());
27+
chunk_records_.push_back(hdr.NumRecords());
28+
num_records_ += hdr.NumRecords();
29+
offset += hdr.CompressSize();
30+
}
31+
}
32+
2033
Index Index::ChunkIndex(int i) { Index idx; }
2134

22-
RangeScanner::RangeScanner(std::istream is, Index idx, int start, int len)
23-
: stream_(is.rdbuf()), index_(idx) {
35+
std::pair<int, int> Index::Locate(int record_idx) {
36+
std::pair<int, int> range(-1, -1);
37+
int sum = 0;
38+
for (size_t i = 0; i < chunk_lens_.size(); ++i) {
39+
int len = static_cast<int>(chunk_lens_[i]);
40+
sum += len;
41+
if (record_idx < sum) {
42+
range.first = static_cast<int>(i);
43+
range.second = record_idx - sum + len;
44+
}
45+
}
46+
return range;
47+
}
48+
49+
RangeScanner::RangeScanner(Stream* fi, Index idx, int start, int len)
50+
: stream_(fi), index_(idx) {
2451
if (start < 0) {
2552
start = 0;
2653
}
@@ -30,16 +57,28 @@ RangeScanner::RangeScanner(std::istream is, Index idx, int start, int len)
3057

3158
start_ = start;
3259
end_ = start + len;
33-
cur_ = start - 1;
60+
cur_ = start - 1; // The intial status required by Scan
3461
chunk_index_ = -1;
35-
// chunk_->reset(new Chunk());
62+
chunk_.reset(new Chunk);
3663
}
3764

38-
bool RangeScanner::Scan() {}
65+
bool RangeScanner::Scan() {
66+
++cur_;
67+
if (cur_ >= end_) {
68+
return false;
69+
} else {
70+
auto cursor = index_.Locate(cur_);
71+
if (chunk_index_ != cursor.first) {
72+
chunk_index_ = cursor.first;
73+
chunk_->Parse(fi, index_.ChunkOffsets[chunk_index_]);
74+
}
75+
}
76+
return true;
77+
}
3978

4079
const std::string RangeScanner::Record() {
41-
// int i = index_.Locate(cur_);
42-
// return chunk_->Record(i);
80+
auto cursor = index_.Locate(cur_);
81+
return chunk_->Record(cursor.second);
4382
}
4483

4584
} // namespace recordio

paddle/fluid/recordio/range_scanner.h

Lines changed: 15 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,9 @@
1414

1515
#pragma once
1616

17+
#include <utility>
18+
19+
#include "paddle/fluid/recordio/chunk.h"
1720
#include "paddle/fluid/recordio/io.h"
1821

1922
namespace paddle {
@@ -26,29 +29,22 @@ namespace recordio {
2629
// for the correct encoding and decoding using Gob.
2730
class Index {
2831
public:
32+
Index() : num_records_(0) {}
33+
// LoadIndex scans the file and parse chunkOffsets, chunkLens, and len.
34+
void LoadIndex(Stream* fi);
35+
// NumRecords returns the total number of all records in a RecordIO file.
2936
int NumRecords() { return num_records_; }
3037
// NumChunks returns the total number of chunks in a RecordIO file.
3138
int NumChunks() { return chunk_lens_.size(); }
3239
// ChunkIndex return the Index of i-th Chunk.
3340
int ChunkIndex(int i);
3441

42+
int64_t ChunkOffsets(int i) { return chunk_offsets_[i]; }
43+
3544
// Locate returns the index of chunk that contains the given record,
3645
// and the record index within the chunk. It returns (-1, -1) if the
3746
// record is out of range.
38-
void Locate(int record_idx, std::pair<int, int>* out) {
39-
size_t sum = 0;
40-
for (size_t i = 0; i < chunk_lens_.size(); ++i) {
41-
sum += chunk_lens_[i];
42-
if (static_cast<size_t>(record_idx) < sum) {
43-
out->first = i;
44-
out->second = record_idx - sum + chunk_lens_[i];
45-
return;
46-
}
47-
}
48-
// out->swap(std::make_pair<int,int>(-1, -1));
49-
out->first = -1;
50-
out->second = -1;
51-
}
47+
std::pair<int, int> Locate(int record_idx);
5248

5349
private:
5450
// the offset of each chunk in a file.
@@ -62,12 +58,14 @@ class Index {
6258
};
6359

6460
// RangeScanner
65-
// creates a scanner that sequencially reads records in the
66-
// range [start, start+len). If start < 0, it scans from the
67-
// beginning. If len < 0, it scans till the end of file.
6861
class RangeScanner {
6962
public:
63+
// creates a scanner that sequencially reads records in the
64+
// range [start, start+len). If start < 0, it scans from the
65+
// beginning. If len < 0, it scans till the end of file.
7066
RangeScanner(Stream* fi, Index idx, int start, int end);
67+
// Scan moves the cursor forward for one record and loads the chunk
68+
// containing the record if not yet.
7169
bool Scan();
7270
const std::string Record();
7371

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
// Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
#include "paddle/fluid/recordio/range_scanner.h"
16+
17+
#include "gtest/gtest.h"
18+
19+
using namespace paddle::recordio;
20+
21+
TEST(RangeScanner, Recordio) {
22+
Stream* fo = Stream::Open("/tmp/record_range", "w");
23+
}

paddle/fluid/recordio/filesys.h renamed to paddle/fluid/recordio/recordio.cc

Lines changed: 5 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -12,13 +12,9 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15-
#pragma once
15+
#include "paddle/fluid/recordio/io.h"
16+
#include "paddle/fluid/string/piece.h"
1617

17-
#include <fcntl.h>
18-
#include <stdio.h>
19-
#include <unistd.h>
20-
21-
class DefaultFileSys {
22-
public:
23-
private:
24-
};
18+
namespace paddle {
19+
namespace recordio {} // namespace recordio
20+
} // namespace paddle

paddle/fluid/recordio/recordio.h

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
// Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
#pragma once
15+
16+
#include "paddle/fluid/recordio/chunk.h"
17+
#include "paddle/fluid/recordio/header.h"
18+
#include "paddle/fluid/recordio/io.h"
19+
#include "paddle/fluid/recordio/scanner.h"
20+
#include "paddle/fluid/recordio/writer.h"

paddle/fluid/recordio/scanner.cc

Lines changed: 19 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -31,28 +31,38 @@ Scanner::Scanner(const char* paths)
3131
}
3232

3333
bool Scanner::Scan() {
34-
if (err_ == -1 || end_ == true) {
34+
if (end_ == true) {
3535
return false;
3636
}
3737
if (cur_scanner_ == nullptr) {
3838
if (!NextFile()) {
3939
end_ = true;
4040
return false;
4141
}
42-
if (err_ == -1) {
43-
return false;
44-
}
4542
}
4643
if (!cur_scanner_->Scan()) {
47-
if (err_ == -1) {
48-
return false;
49-
}
44+
end_ = true;
45+
cur_file_ = nullptr;
46+
return false;
5047
}
51-
5248
return true;
5349
}
5450

55-
bool Scanner::NextFile() {}
51+
bool Scanner::NextFile() {
52+
if (path_idx_ >= paths_.size()) {
53+
return false;
54+
}
55+
std::string path = paths_[path_idx_];
56+
++path_idx_;
57+
cur_file_ = Stream::Open(path);
58+
if (cur_file_ == nullptr) {
59+
return false;
60+
}
61+
Index idx;
62+
idx.LoadIndex(cur_file_);
63+
cur_scanner_ = RangeScanner(cur_file_, idx, 0, -1);
64+
return true;
65+
}
5666

5767
} // namespace recordio
5868
} // namespace paddle

0 commit comments

Comments
 (0)