Skip to content

Commit 313454d

Browse files
committed
"init"
1 parent a67ceba commit 313454d

File tree

10 files changed

+528
-0
lines changed

10 files changed

+528
-0
lines changed

paddle/fluid/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,3 +5,4 @@ add_subdirectory(operators)
55
add_subdirectory(pybind)
66
add_subdirectory(inference)
77
add_subdirectory(string)
8+
add_subdirectory(recordio)

paddle/fluid/recordio/CMakeLists.txt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
cc_library(header SRCS header.cc)
2+
cc_test(header_test SRCS header_test.cc DEPS header)

paddle/fluid/recordio/chunk.h

Lines changed: 119 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,119 @@
1+
// Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
#pragma once
16+
17+
#include <fstream>
18+
#include <memory>
19+
#include <sstream>
20+
#include <string>
21+
#include <utility>
22+
#include <vector>
23+
24+
// Chunk
25+
// a chunk contains the Header and optionally compressed records.
26+
class Chunk {
27+
public:
28+
Chunk() = default;
29+
void Add(const char* record, size_t length);
30+
void Add(const std::string&);
31+
32+
bool Dump(std::ostream& os, Compressor ct);
33+
void Parse(std::istream& iss, int64_t offset);
34+
const std::string Record(int i) { return records_[i]; }
35+
36+
private:
37+
std::vector<std::string> records_;
38+
size_t num_bytes_;
39+
};
40+
41+
size_t CompressData(const std::stringstream& ss, Compressor ct, char* buffer);
42+
43+
uint32_t DeflateData(char* buffer, uint32_t size, Compressor c);
44+
45+
// implementation
46+
void Chunk::Add(const std::string& s) {
47+
num_bytes_ += s.size() * sizeof(char);
48+
records_.emplace_back(std::move(s));
49+
// records_.resize(records_.size()+1);
50+
// records_[records_.size()-1] = s;
51+
}
52+
53+
void Chunk::Add(const char* record, size_t length) {
54+
Add(std::string(record, length));
55+
}
56+
57+
bool Chunk::Dump(std::ostream& os, Compressor ct) {
58+
if (records_.size() == 0) return false;
59+
60+
// TODO(dzhwinter):
61+
// we pack the string with same size buffer,
62+
// then compress with another buffer.
63+
// Here can be optimized if it is the bottle-neck.
64+
std::ostringstream oss;
65+
for (auto& record : records_) {
66+
unsigned len = record.size();
67+
oss << len;
68+
oss << record;
69+
// os.write(std::to_string(len).c_str(), sizeof(unsigned));
70+
// os.write(record.c_str(), record.size());
71+
}
72+
std::unique_ptr<char[]> buffer(new char[kDefaultMaxChunkSize]);
73+
size_t compressed = CompressData(oss.str(), ct, buffer.get());
74+
75+
// TODO(dzhwinter): crc32 checksum
76+
size_t checksum = compressed;
77+
78+
Header hdr(records_.size(), checksum, ct, compressed);
79+
80+
return true;
81+
}
82+
83+
void Chunk::Parse(std::istream& iss, int64_t offset) {
84+
iss.seekg(offset, iss.beg);
85+
Header hdr;
86+
hdr.Parse(iss);
87+
88+
std::unique_ptr<char[]> buffer(new char[kDefaultMaxChunkSize]);
89+
iss.read(buffer.get(), static_cast<size_t>(hdr.CompressSize()));
90+
// TODO(dzhwinter): checksum
91+
uint32_t deflated_size =
92+
DeflateData(buffer.get(), hdr.CompressSize(), hdr.CompressType());
93+
std::istringstream deflated(std::string(buffer.get(), deflated_size));
94+
for (size_t i = 0; i < hdr.NumRecords(); ++i) {
95+
uint32_t rs;
96+
deflated >> rs;
97+
std::string record(rs, '\0');
98+
deflated.read(&record[0], rs);
99+
records_.emplace_back(record);
100+
num_bytes_ += record.size();
101+
}
102+
}
103+
104+
uint32_t DeflateData(char* buffer, uint32_t size, Compressor c) {
105+
uint32_t deflated_size = 0;
106+
std::string uncompressed;
107+
switch (c) {
108+
case Compressor::kNoCompress:
109+
deflated_size = size;
110+
break;
111+
case Compressor::kSnappy:
112+
// snappy::Uncompress(buffer, size, &uncompressed);
113+
// deflated_size = uncompressed.size();
114+
// memcpy(buffer, uncompressed.data(), uncompressed.size() *
115+
// sizeof(char));
116+
break;
117+
}
118+
return deflated_size;
119+
}

paddle/fluid/recordio/header.cc

Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
// Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
#include "paddle/fluid/recordio/header.h"
16+
17+
namespace paddle {
18+
namespace recordio {
19+
20+
Header::Header()
21+
: num_records_(0),
22+
checksum_(0),
23+
compressor_(Compressor::kNoCompress),
24+
compress_size_(0) {}
25+
26+
Header::Header(uint32_t num, uint32_t sum, Compressor c, uint32_t cs)
27+
: num_records_(num), checksum_(sum), compressor_(c), compress_size_(cs) {}
28+
29+
void Header::Parse(std::istream& iss) {
30+
iss.read(reinterpret_cast<char*>(&num_records_), sizeof(uint32_t));
31+
iss.read(reinterpret_cast<char*>(&checksum_), sizeof(uint32_t));
32+
iss.read(reinterpret_cast<char*>(&compressor_), sizeof(uint32_t));
33+
iss.read(reinterpret_cast<char*>(&compress_size_), sizeof(uint32_t));
34+
}
35+
36+
void Header::Write(std::ostream& os) {
37+
os.write(reinterpret_cast<char*>(&num_records_), sizeof(uint32_t));
38+
os.write(reinterpret_cast<char*>(&checksum_), sizeof(uint32_t));
39+
os.write(reinterpret_cast<char*>(&compressor_), sizeof(uint32_t));
40+
os.write(reinterpret_cast<char*>(&compress_size_), sizeof(uint32_t));
41+
}
42+
43+
// std::ostream& operator << (std::ostream& os, Header h) {
44+
// os << h.num_records_
45+
// << h.checksum_
46+
// << static_cast<uint32_t>(h.compressor_)
47+
// << h.compress_size_;
48+
// return os;
49+
// }
50+
51+
std::ostream& operator<<(std::ostream& os, Header h) {
52+
os << h.NumRecords() << h.Checksum()
53+
<< static_cast<uint32_t>(h.CompressType()) << h.CompressSize();
54+
return os;
55+
}
56+
57+
// bool operator==(Header l, Header r) {
58+
// return num_records_ == rhs.NumRecords() &&
59+
// checksum_ == rhs.Checksum() &&
60+
// compressor_ == rhs.CompressType() &&
61+
// compress_size_ == rhs.CompressSize();
62+
// }
63+
64+
bool operator==(Header l, Header r) {
65+
return l.NumRecords() == r.NumRecords() && l.Checksum() == r.Checksum() &&
66+
l.CompressType() == r.CompressType() &&
67+
l.CompressSize() == r.CompressSize();
68+
}
69+
70+
// size_t CompressData(const std::string& os, Compressor ct, char* buffer) {
71+
// size_t compress_size = 0;
72+
73+
// // std::unique_ptr<char[]> buffer(new char[kDefaultMaxChunkSize]);
74+
// // std::string compressed;
75+
// compress_size =os.size();
76+
// memcpy(buffer, os.c_str(), compress_size);
77+
// return compress_size;
78+
// }
79+
80+
} // namespace recordio
81+
} // namespace paddle

paddle/fluid/recordio/header.h

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
// Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
#pragma once
16+
17+
#include <sstream>
18+
19+
namespace paddle {
20+
namespace recordio {
21+
22+
// Default ChunkSize
23+
constexpr size_t kDefaultMaxChunkSize = 32 * 1024 * 1024;
24+
// MagicNumber for memory checking
25+
constexpr uint32_t kMagicNumber = 0x01020304;
26+
27+
enum class Compressor {
28+
// NoCompression means writing raw chunk data into files.
29+
// With other choices, chunks are compressed before written.
30+
kNoCompress = 0,
31+
// Snappy had been the default compressing algorithm widely
32+
// used in Google. It compromises between speech and
33+
// compression ratio.
34+
kSnappy = 1,
35+
// Gzip is a well-known compression algorithm. It is
36+
// recommmended only you are looking for compression ratio.
37+
kGzip = 2,
38+
};
39+
40+
// Header is the metadata of Chunk
41+
class Header {
42+
public:
43+
Header();
44+
Header(uint32_t num, uint32_t sum, Compressor ct, uint32_t cs);
45+
46+
void Write(std::ostream& os);
47+
void Parse(std::istream& iss);
48+
49+
uint32_t NumRecords() const { return num_records_; }
50+
uint32_t Checksum() const { return checksum_; }
51+
Compressor CompressType() const { return compressor_; }
52+
uint32_t CompressSize() const { return compress_size_; }
53+
54+
private:
55+
uint32_t num_records_;
56+
uint32_t checksum_;
57+
Compressor compressor_;
58+
uint32_t compress_size_;
59+
};
60+
61+
// Allow Header Loggable
62+
std::ostream& operator<<(std::ostream& os, Header h);
63+
bool operator==(Header l, Header r);
64+
65+
} // namespace recordio
66+
} // namespace paddle

paddle/fluid/recordio/header_test.cc

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
// Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
#include "paddle/fluid/recordio/header.h"
16+
17+
#include <sstream>
18+
19+
#include "gtest/gtest.h"
20+
21+
using namespace recordio;
22+
23+
TEST(Recordio, ChunkHead) {
24+
Header hdr(0, 1, Compressor::kGzip, 3);
25+
std::ostringstream oss;
26+
hdr.Write(oss);
27+
28+
std::istringstream iss(oss.str());
29+
Header hdr2;
30+
hdr2.Parse(iss);
31+
32+
std::ostringstream oss2;
33+
hdr2.Write(oss2);
34+
EXPECT_STREQ(oss2.str().c_str(), oss.str().c_str());
35+
}
36+
37+
TEST(Recordio, Stream) {
38+
Header hdr(0, 1, static_cast<Compressor>(2), 3);
39+
std::ostringstream oss1;
40+
hdr.Write(oss1);
41+
42+
std::ostringstream oss2;
43+
oss2 << hdr;
44+
EXPECT_STREQ(oss2.str().c_str(), oss1.str().c_str());
45+
}

paddle/fluid/recordio/range_scanner.h

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
// Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
#pragma once
16+
17+
#include <fstream>
18+
#include <memory>
19+
#include <sstream>
20+
#include <string>
21+
#include <utility>
22+
#include <vector>
23+
24+
class Index {
25+
public:
26+
int NumRecords() { return num_records_; }
27+
28+
// Locate returns the index of chunk that contains the given record,
29+
// and the record index within the chunk. It returns (-1, -1) if the
30+
// record is out of range.
31+
void Locate(int record_idx, std::pair<int, int>* out) {
32+
size_t sum = 0;
33+
for (size_t i = 0; i < chunk_lens_.size(); ++i) {
34+
sum += chunk_lens_[i];
35+
if (static_cast<size_t>(record_idx) < sum) {
36+
out->first = i;
37+
out->second = record_idx - sum + chunk_lens_[i];
38+
return;
39+
}
40+
}
41+
// out->swap(std::make_pair<int,int>(-1, -1));
42+
out->first = -1;
43+
out->second = -1;
44+
}
45+
46+
private:
47+
std::vector<int64_t> chunk_offsets_;
48+
std::vector<uint32_t> chunk_lens_;
49+
int num_records_;
50+
std::vector<int> chunk_records_;
51+
};
52+
53+
// RangeScanner
54+
// creates a scanner that sequencially reads records in the
55+
// range [start, start+len). If start < 0, it scans from the
56+
// beginning. If len < 0, it scans till the end of file.
57+
class RangeScanner {
58+
public:
59+
RangeScanner(std::istream is, Index idx, int start, int end);
60+
bool Scan();
61+
const std::string Record();
62+
63+
private:
64+
std::istream stream_;
65+
Index index_;
66+
int start_, end_, cur_;
67+
int chunk_index_;
68+
std::unique_ptr<Chunk> chunk_;
69+
};

0 commit comments

Comments
 (0)