Skip to content

Commit 7016979

Browse files
committed
"add crc32 encoder"
1 parent 69c7991 commit 7016979

File tree

15 files changed

+2114
-144
lines changed

15 files changed

+2114
-144
lines changed

paddle/fluid/recordio/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,3 +2,4 @@ cc_library(header SRCS header.cc)
22
cc_test(header_test SRCS header_test.cc DEPS header)
33
cc_library(io SRCS io.cc DEPS stringpiece)
44
cc_test(io_test SRCS io_test.cc DEPS io)
5+
cc_library(chunk SRCS chunk.cc DEPS snappy)

paddle/fluid/recordio/chunk.cc

Lines changed: 105 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,105 @@
1+
// Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
#include "paddle/fluid/recordio/chunk.h"
16+
17+
#include <cstring>
18+
#include <sstream>
19+
#include <utility>
20+
21+
#include "snappy.h"
22+
23+
#include "paddle/fluid/recordio/crc32.h"
24+
25+
namespace paddle {
26+
namespace recordio {
27+
28+
void Chunk::Add(const char* record, size_t length) {
29+
records_.emplace_after(std::move(s));
30+
num_bytes_ += s.size() * sizeof(char);
31+
}
32+
33+
bool Chunk::Dump(Stream* fo, Compressor ct) {
34+
// NOTE(dzhwinter): don't check records.numBytes instead, because
35+
// empty records are allowed.
36+
if (records_.size() == 0) return false;
37+
38+
// pack the record into consecutive memory for compress
39+
std::ostringstream os;
40+
for (auto& record : records_) {
41+
os.write(record.size(), sizeof(size_t));
42+
os.write(record.data(), static_cast<std::streamsize>(record.size()));
43+
}
44+
45+
std::unique_ptr<char[]> buffer(new char[kDefaultMaxChunkSize]);
46+
size_t compressed =
47+
CompressData(os.str().c_str(), num_bytes_, ct, buffer.get());
48+
uint32_t checksum = Crc32(buffer.get(), compressed);
49+
Header hdr(records_.size(), checksum, ct, static_cast<uint32_t>(compressed));
50+
hdr.Write(fo);
51+
fo.Write(buffer.get(), compressed);
52+
return true;
53+
}
54+
55+
void Chunk::Parse(Stream* fi, size_t offset) {
56+
fi->Seek(offset);
57+
Header hdr;
58+
hdr.Parse(fi);
59+
60+
std::unique_ptr<char[]> buffer(new char[kDefaultMaxChunkSize]);
61+
fi->Read(buffer.get(), static_cast<size_t>(hdr.CompressSize()));
62+
uint32_t deflated_size =
63+
DeflateData(buffer.get(), hdr.CompressSize(), hdr.CompressType());
64+
std::istringstream deflated(std::string(buffer.get(), deflated_size));
65+
for (size_t i = 0; i < hdr.NumRecords(); ++i) {
66+
uint32_t rs;
67+
deflated >> rs;
68+
std::string record(rs, '\0');
69+
deflated.read(&record[0], rs);
70+
records_.emplace_back(record);
71+
num_bytes_ += record.size();
72+
}
73+
}
74+
75+
size_t CompressData(const char* in,
76+
size_t in_length,
77+
Compressor ct,
78+
char* out) {
79+
size_t compressd_size = 0;
80+
switch (ct) {
81+
case Compressor::kNoCompress:
82+
// do nothing
83+
memcpy(out, in, in_length);
84+
compressd_size = in_length;
85+
break;
86+
case Compressor::kSnappy:
87+
snappy::RawCompress(in, in_length, out, &compressd_size);
88+
break;
89+
}
90+
return compressd_size;
91+
}
92+
93+
void DeflateData(const char* in, size_t in_length, Compressor ct, char* out) {
94+
switch (c) {
95+
case Compressor::kNoCompress:
96+
memcpy(out, in, in_length);
97+
break;
98+
case Compressor::kSnappy:
99+
snappy::RawUncompress(in, in_length, out);
100+
break;
101+
}
102+
}
103+
104+
} // namespace recordio
105+
} // namespace paddle

paddle/fluid/recordio/chunk.h

Lines changed: 20 additions & 93 deletions
Original file line numberDiff line numberDiff line change
@@ -13,109 +13,36 @@
1313
// limitations under the License.
1414

1515
#pragma once
16-
17-
#include <fstream>
18-
#include <memory>
19-
#include <sstream>
16+
#include <forward_list>
2017
#include <string>
21-
#include <utility>
22-
#include <vector>
2318

24-
// Chunk
25-
// a chunk contains the Header and optionally compressed records.
19+
#include "paddle/fluid/recordio/header.h"
20+
#include "paddle/fluid/recordio/io.h"
21+
22+
namespace paddle {
23+
namespace recordio {
24+
25+
// A Chunk contains the Header and optionally compressed records.
2626
class Chunk {
2727
public:
28-
Chunk() = default;
29-
void Add(const char* record, size_t length);
30-
void Add(const std::string&);
31-
32-
bool Dump(std::ostream& os, Compressor ct);
33-
void Parse(std::istream& iss, int64_t offset);
34-
const std::string Record(int i) { return records_[i]; }
28+
Chunk() {}
29+
void Add(const char* record, size_t size);
30+
// dump the chunk into w, and clears the chunk and makes it ready for
31+
// the next add invocation.
32+
bool Dump(Stream* fo, Compressor ct);
33+
void Parse(Stream* fi, size_t offset);
3534
size_t NumBytes() { return num_bytes_; }
3635

3736
private:
38-
std::vector<std::string> records_;
37+
std::forward_list<std::string> records_;
3938
// sum of record lengths in bytes.
4039
size_t num_bytes_;
40+
DISABLE_COPY_AND_ASSIGN(Chunk);
4141
};
4242

43-
size_t CompressData(const std::stringstream& ss, Compressor ct, char* buffer);
44-
45-
uint32_t DeflateData(char* buffer, uint32_t size, Compressor c);
46-
47-
// implementation
48-
void Chunk::Add(const std::string& s) {
49-
num_bytes_ += s.size() * sizeof(char);
50-
records_.emplace_back(std::move(s));
51-
// records_.resize(records_.size()+1);
52-
// records_[records_.size()-1] = s;
53-
}
54-
55-
void Chunk::Add(const char* record, size_t length) {
56-
Add(std::string(record, length));
57-
}
58-
59-
bool Chunk::Dump(std::ostream& os, Compressor ct) {
60-
if (records_.size() == 0) return false;
61-
62-
// TODO(dzhwinter):
63-
// we pack the string with same size buffer,
64-
// then compress with another buffer.
65-
// Here can be optimized if it is the bottle-neck.
66-
std::ostringstream oss;
67-
for (auto& record : records_) {
68-
unsigned len = record.size();
69-
oss << len;
70-
oss << record;
71-
// os.write(std::to_string(len).c_str(), sizeof(unsigned));
72-
// os.write(record.c_str(), record.size());
73-
}
74-
std::unique_ptr<char[]> buffer(new char[kDefaultMaxChunkSize]);
75-
size_t compressed = CompressData(oss.str(), ct, buffer.get());
76-
77-
// TODO(dzhwinter): crc32 checksum
78-
size_t checksum = compressed;
79-
80-
Header hdr(records_.size(), checksum, ct, compressed);
81-
82-
return true;
83-
}
84-
85-
void Chunk::Parse(std::istream& iss, int64_t offset) {
86-
iss.seekg(offset, iss.beg);
87-
Header hdr;
88-
hdr.Parse(iss);
43+
size_t CompressData(const char* in, size_t in_length, Compressor ct, char* out);
8944

90-
std::unique_ptr<char[]> buffer(new char[kDefaultMaxChunkSize]);
91-
iss.read(buffer.get(), static_cast<size_t>(hdr.CompressSize()));
92-
// TODO(dzhwinter): checksum
93-
uint32_t deflated_size =
94-
DeflateData(buffer.get(), hdr.CompressSize(), hdr.CompressType());
95-
std::istringstream deflated(std::string(buffer.get(), deflated_size));
96-
for (size_t i = 0; i < hdr.NumRecords(); ++i) {
97-
uint32_t rs;
98-
deflated >> rs;
99-
std::string record(rs, '\0');
100-
deflated.read(&record[0], rs);
101-
records_.emplace_back(record);
102-
num_bytes_ += record.size();
103-
}
104-
}
45+
void DeflateData(const char* in, size_t in_length, Compressor ct, char* out);
10546

106-
uint32_t DeflateData(char* buffer, uint32_t size, Compressor c) {
107-
uint32_t deflated_size = 0;
108-
std::string uncompressed;
109-
switch (c) {
110-
case Compressor::kNoCompress:
111-
deflated_size = size;
112-
break;
113-
case Compressor::kSnappy:
114-
// snappy::Uncompress(buffer, size, &uncompressed);
115-
// deflated_size = uncompressed.size();
116-
// memcpy(buffer, uncompressed.data(), uncompressed.size() *
117-
// sizeof(char));
118-
break;
119-
}
120-
return deflated_size;
121-
}
47+
} // namespace recordio
48+
} // namespace paddle

paddle/fluid/recordio/chunk_test.cc

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
// Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
#include "paddle/fluid/recordio/chunk.h"
16+
17+
#include <sstream>
18+
19+
#include "gtest/gtest.h"
20+
21+
using namespace paddle::recordio;
22+
23+
TEST(Chunk, SaveLoad) {}

paddle/fluid/recordio/crc32.h

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
// Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
// A wrapper on crc library https://github.com/d-bahr/CRCpp
16+
#include <cstdint>
17+
18+
#include "paddle/fluid/recordio/detail/crc.h"
19+
20+
namespace paddle {
21+
namespace recordio {
22+
23+
// usage
24+
// char data[] = "hello,world";
25+
// crc = Crc32(data, 12);
26+
// Assert_EQ(crc, 68a85159);
27+
28+
uint32_t Crc32(const char* data, size_t size) {
29+
return CRC::Calculate(data, size, CRC::CRC_32())
30+
}
31+
32+
} // namespace recordio
33+
} // namespace paddle

0 commit comments

Comments
 (0)