Skip to content

Commit 9dc6958

Browse files
committed
Make recordio simple
1 parent fe18341 commit 9dc6958

25 files changed

+202
-2711
lines changed

CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -145,6 +145,7 @@ include(external/pybind11) # download pybind11
145145
include(external/cares)
146146
include(external/grpc)
147147
include(external/snappy) # download snappy
148+
include(external/snappystream)
148149

149150
include(cudnn) # set cudnn libraries, must before configure
150151
include(cupti)

cmake/external/snappystream.cmake

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
#
15+
16+
IF(MOBILE_INFERENCE)
17+
return()
18+
ENDIF()
19+
20+
include (ExternalProject)
21+
22+
# NOTE: snappy is needed when linking with recordio
23+
24+
SET(SNAPPYSTREAM_SOURCES_DIR ${THIRD_PARTY_PATH}/snappy_stream)
25+
SET(SNAPPYSTREAM_INSTALL_DIR ${THIRD_PARTY_PATH}/install/snappy_stream)
26+
SET(SNAPPYSTREAM_INCLUDE_DIR "${SNAPPYSTREAM_INSTALL_DIR}/include/" CACHE PATH "snappy stream include directory." FORCE)
27+
28+
ExternalProject_Add(
29+
extern_snappystream
30+
GIT_REPOSITORY "https://github.com/hoxnox/snappystream.git"
31+
GIT_TAG "0.2.8"
32+
PREFIX ${SNAPPYSTREAM_SOURCES_DIR}
33+
UPDATE_COMMAND ""
34+
CMAKE_ARGS -DCMAKE_CXX_COMPILER=${CMAKE_CXX_COMPILER}
35+
-DCMAKE_C_COMPILER=${CMAKE_C_COMPILER}
36+
-DCMAKE_CXX_FLAGS=${CMAKE_CXX_FLAGS}
37+
-DCMAKE_C_FLAGS=${CMAKE_C_FLAGS}
38+
-DCMAKE_INSTALL_PREFIX=${SNAPPY_INSTALL_DIR}
39+
-DCMAKE_INSTALL_LIBDIR=${SNAPPY_INSTALL_DIR}/lib
40+
-DCMAKE_POSITION_INDEPENDENT_CODE=ON
41+
-DCMAKE_BUILD_TYPE=${THIRD_PARTY_BUILD_TYPE}
42+
-DSNAPPY_ROOT=${SNAPPY_INSTALL_DIR}
43+
${EXTERNAL_OPTIONAL_ARGS}
44+
CMAKE_CACHE_ARGS
45+
-DCMAKE_INSTALL_PREFIX:PATH=${SNAPPYSTREAM_INSTALL_DIR}
46+
-DCMAKE_INSTALL_LIBDIR:PATH=${SNAPPYSTREAM_INSTALL_DIR}/lib
47+
-DCMAKE_BUILD_TYPE:STRING=${THIRD_PARTY_BUILD_TYPE}
48+
BUILD_COMMAND make -j8
49+
INSTALL_COMMAND make install
50+
DEPENDS snappy
51+
)
52+
53+
add_library(snappystream STATIC IMPORTED GLOBAL)
54+
set_property(TARGET snappystream PROPERTY IMPORTED_LOCATION
55+
"${SNAPPYSTREAM_INSTALL_DIR}/lib/libsnappystream.a")
56+
57+
include_directories(${SNAPPYSTREAM_INCLUDE_DIR})
58+
add_dependencies(snappystream extern_snappystream)

paddle/fluid/recordio/CMakeLists.txt

Lines changed: 3 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,6 @@
11
# internal library.
2-
cc_library(io SRCS io.cc DEPS stringpiece)
3-
cc_test(io_test SRCS io_test.cc DEPS io)
4-
cc_library(header SRCS header.cc DEPS io)
2+
cc_library(header SRCS header.cc)
53
cc_test(header_test SRCS header_test.cc DEPS header)
6-
cc_library(chunk SRCS chunk.cc DEPS snappy)
4+
cc_library(chunk SRCS chunk.cc DEPS snappystream snappy header zlib)
75
cc_test(chunk_test SRCS chunk_test.cc DEPS chunk)
8-
cc_library(range_scanner SRCS range_scanner.cc DEPS io chunk)
9-
cc_test(range_scanner_test SRCS range_scanner_test.cc DEPS range_scanner)
10-
cc_library(scanner SRCS scanner.cc DEPS range_scanner)
11-
cc_test(scanner_test SRCS scanner_test.cc DEPS scanner)
12-
# exported library.
13-
cc_library(recordio SRCS recordio.cc DEPS scanner chunk header)
14-
cc_test(recordio_test SRCS recordio_test.cc DEPS scanner)
6+
cc_library(recordio DEPS chunk header)

paddle/fluid/recordio/chunk.cc

Lines changed: 89 additions & 67 deletions
Original file line numberDiff line numberDiff line change
@@ -14,97 +14,119 @@
1414

1515
#include "paddle/fluid/recordio/chunk.h"
1616

17-
#include <cstring>
17+
#include <memory>
1818
#include <sstream>
19-
#include <utility>
20-
21-
#include "snappy.h"
22-
23-
#include "paddle/fluid/recordio/crc32.h"
19+
#include "paddle/fluid/platform/enforce.h"
20+
#include "snappystream.hpp"
21+
#include "zlib.h"
2422

2523
namespace paddle {
2624
namespace recordio {
25+
constexpr size_t kMaxBufSize = 1024;
2726

28-
void Chunk::Add(const char* record, size_t length) {
29-
records_.emplace_after(std::string(record, length));
30-
num_bytes_ += s.size() * sizeof(char);
27+
template <typename Callback>
28+
static void ReadStreamByBuf(std::istream& in, int limit, Callback callback) {
29+
char buf[kMaxBufSize];
30+
std::streamsize actual_size;
31+
size_t counter = 0;
32+
do {
33+
auto actual_max =
34+
limit > 0 ? std::min(limit - counter, kMaxBufSize) : kMaxBufSize;
35+
actual_size = in.readsome(buf, actual_max);
36+
if (actual_size == 0) {
37+
break;
38+
}
39+
callback(buf, actual_size);
40+
if (limit > 0) {
41+
counter += actual_size;
42+
}
43+
} while (actual_size == kMaxBufSize);
3144
}
3245

33-
bool Chunk::Dump(Stream* fo, Compressor ct) {
46+
static void PipeStream(std::istream& in, std::ostream& os) {
47+
ReadStreamByBuf(
48+
in, -1, [&os](const char* buf, size_t len) { os.write(buf, len); });
49+
}
50+
static uint32_t Crc32Stream(std::istream& in, int limit = -1) {
51+
auto crc = crc32(0, nullptr, 0);
52+
ReadStreamByBuf(in, limit, [&crc](const char* buf, size_t len) {
53+
crc = crc32(crc, reinterpret_cast<const Bytef*>(buf), len);
54+
});
55+
return crc;
56+
}
57+
58+
bool Chunk::Write(std::ostream& os, Compressor ct) const {
3459
// NOTE(dzhwinter): don't check records.numBytes instead, because
3560
// empty records are allowed.
36-
if (records_.size() == 0) return false;
61+
if (records_.empty()) {
62+
return false;
63+
}
64+
std::stringstream sout;
65+
std::unique_ptr<std::ostream> compressed_stream;
66+
switch (ct) {
67+
case Compressor::kNoCompress:
68+
break;
69+
case Compressor::kSnappy:
70+
compressed_stream.reset(new snappy::oSnappyStream(sout));
71+
break;
72+
default:
73+
PADDLE_THROW("Not implemented");
74+
}
75+
76+
std::ostream& buf_stream = compressed_stream ? *compressed_stream : sout;
3777

38-
// pack the record into consecutive memory for compress
39-
std::ostringstream os;
4078
for (auto& record : records_) {
41-
os.write(record.size(), sizeof(size_t));
42-
os.write(record.data(), static_cast<std::streamsize>(record.size()));
79+
size_t sz = record.size();
80+
buf_stream.write(reinterpret_cast<const char*>(&sz), sizeof(uint32_t))
81+
.write(record.data(), record.size());
4382
}
4483

45-
std::unique_ptr<char[]> buffer(new char[num_bytes_]);
46-
size_t compressed =
47-
CompressData(os.str().c_str(), num_bytes_, ct, buffer.get());
48-
uint32_t checksum = Crc32(buffer.get(), compressed);
49-
Header hdr(records_.size(), checksum, ct, static_cast<uint32_t>(compressed));
50-
hdr.Write(fo);
51-
fo.Write(buffer.get(), compressed);
52-
// clear the content
53-
records_.clear();
54-
num_bytes_ = 0;
84+
if (compressed_stream) {
85+
compressed_stream.reset();
86+
}
87+
88+
auto end_pos = sout.tellg();
89+
sout.seekg(0, std::ios::beg);
90+
uint32_t len = static_cast<uint32_t>(end_pos - sout.tellg());
91+
uint32_t crc = Crc32Stream(sout);
92+
sout.seekg(0, std::ios::beg);
93+
94+
Header hdr(static_cast<uint32_t>(records_.size()), crc, ct, len);
95+
hdr.Write(os);
96+
PipeStream(sout, os);
5597
return true;
5698
}
5799

58-
void Chunk::Parse(Stream* fi, size_t offset) {
59-
fi->Seek(offset);
100+
void Chunk::Parse(std::istream& sin) {
60101
Header hdr;
61-
hdr.Parse(fi);
62-
63-
size_t size = static_cast<size_t>(hdr.CompressSize());
64-
std::unique_ptr<char[]> buffer(new char[size]);
65-
fi->Read(buffer.get(), size);
66-
size_t deflated_size = 0;
67-
snappy::GetUncompressedLength(buffer.get(), size, &deflated_size);
68-
std::unique_ptr<char[]> deflated_buffer(new char[deflated_size]);
69-
DeflateData(buffer.get(), size, hdr.CompressType(), deflated_buffer.get());
70-
std::istringstream deflated(
71-
std::string(deflated_buffer.get(), deflated_size));
72-
for (size_t i = 0; i < hdr.NumRecords(); ++i) {
73-
size_t rs;
74-
deflated.read(&rs, sizeof(size_t));
75-
std::string record(rs, '\0');
76-
deflated.read(&record[0], rs);
77-
records_.emplace_back(record);
78-
num_bytes_ += record.size();
79-
}
80-
}
102+
hdr.Parse(sin);
103+
auto beg_pos = sin.tellg();
104+
auto crc = Crc32Stream(sin, hdr.CompressSize());
105+
PADDLE_ENFORCE_EQ(hdr.Checksum(), crc);
81106

82-
size_t CompressData(const char* in,
83-
size_t in_length,
84-
Compressor ct,
85-
char* out) {
86-
size_t compressd_size = 0;
87-
switch (ct) {
107+
Clear();
108+
109+
sin.seekg(beg_pos, std::ios::beg);
110+
std::unique_ptr<std::istream> compressed_stream;
111+
switch (hdr.CompressType()) {
88112
case Compressor::kNoCompress:
89-
// do nothing
90-
memcpy(out, in, in_length);
91-
compressd_size = in_length;
92113
break;
93114
case Compressor::kSnappy:
94-
snappy::RawCompress(in, in_length, out, &compressd_size);
115+
compressed_stream.reset(new snappy::iSnappyStream(sin));
95116
break;
117+
default:
118+
PADDLE_THROW("Not implemented");
96119
}
97-
return compressd_size;
98-
}
99120

100-
void DeflateData(const char* in, size_t in_length, Compressor ct, char* out) {
101-
switch (c) {
102-
case Compressor::kNoCompress:
103-
memcpy(out, in, in_length);
104-
break;
105-
case Compressor::kSnappy:
106-
snappy::RawUncompress(in, in_length, out);
107-
break;
121+
std::istream& stream = compressed_stream ? *compressed_stream : sin;
122+
123+
for (uint32_t i = 0; i < hdr.NumRecords(); ++i) {
124+
uint32_t rec_len;
125+
stream.read(reinterpret_cast<char*>(&rec_len), sizeof(uint32_t));
126+
std::string buf;
127+
buf.resize(rec_len);
128+
stream.read(&buf[0], rec_len);
129+
Add(buf);
108130
}
109131
}
110132

paddle/fluid/recordio/chunk.h

Lines changed: 14 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -13,11 +13,11 @@
1313
// limitations under the License.
1414

1515
#pragma once
16-
#include <forward_list>
1716
#include <string>
17+
#include <vector>
1818

19+
#include "paddle/fluid/platform/macros.h"
1920
#include "paddle/fluid/recordio/header.h"
20-
#include "paddle/fluid/recordio/io.h"
2121

2222
namespace paddle {
2323
namespace recordio {
@@ -26,16 +26,23 @@ namespace recordio {
2626
class Chunk {
2727
public:
2828
Chunk() : num_bytes_(0) {}
29-
void Add(const char* record, size_t size);
29+
void Add(std::string buf) {
30+
records_.push_back(buf);
31+
num_bytes_ += buf.size();
32+
}
3033
// dump the chunk into w, and clears the chunk and makes it ready for
3134
// the next add invocation.
32-
bool Dump(Stream* fo, Compressor ct);
33-
void Parse(Stream* fi, size_t offset);
35+
bool Write(std::ostream& fo, Compressor ct) const;
36+
void Clear() {
37+
records_.clear();
38+
num_bytes_ = 0;
39+
}
40+
void Parse(std::istream& sin);
3441
size_t NumBytes() { return num_bytes_; }
35-
const std::string Record(int i) { return records_[i]; }
42+
const std::string& Record(int i) const { return records_[i]; }
3643

3744
private:
38-
std::forward_list<const std::string> records_;
45+
std::vector<std::string> records_;
3946
// sum of record lengths in bytes.
4047
size_t num_bytes_;
4148
DISABLE_COPY_AND_ASSIGN(Chunk);

paddle/fluid/recordio/chunk_test.cc

Lines changed: 20 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -22,34 +22,28 @@ using namespace paddle::recordio;
2222

2323
TEST(Chunk, SaveLoad) {
2424
Chunk ch;
25-
ch.Add("12345", 6);
26-
ch.Add("123", 4);
27-
{
28-
Stream* fs = Stream::Open("/tmp/record_11", "w");
29-
ch.Dump(fs, Compressor::kNoCompress);
30-
EXPECT_EQ(ch.NumBytes(), 0);
31-
}
32-
{
33-
Stream* fs = Stream::Open("/tmp/record_11", "r");
34-
ch.Parse(fs, 0);
35-
EXPECT_EQ(ch.NumBytes(), 10);
36-
}
25+
ch.Add(std::string("12345", 6));
26+
ch.Add(std::string("123", 4));
27+
std::stringstream ss;
28+
ch.Write(ss, Compressor::kNoCompress);
29+
ch.Clear();
30+
ch.Parse(ss);
31+
ASSERT_EQ(ch.NumBytes(), 10U);
3732
}
3833

3934
TEST(Chunk, Compressor) {
4035
Chunk ch;
41-
ch.Add("12345", 6);
42-
ch.Add("123", 4);
43-
ch.Add("123", 4);
44-
ch.Add("123", 4);
45-
{
46-
Stream* fs = Stream::Open("/tmp/record_12", "w");
47-
ch.Dump(fs, Compressor::kSnappy);
48-
EXPECT_EQ(ch.NumBytes(), 0);
49-
}
50-
{
51-
Stream* fs = Stream::Open("/tmp/record_12", "r");
52-
ch.Parse(fs, 0);
53-
EXPECT_EQ(ch.NumBytes(), 10);
54-
}
36+
ch.Add(std::string("12345", 6));
37+
ch.Add(std::string("123", 4));
38+
ch.Add(std::string("123", 4));
39+
ch.Add(std::string("123", 4));
40+
std::stringstream ss;
41+
ch.Write(ss, Compressor::kSnappy);
42+
std::stringstream ss2;
43+
ch.Write(ss2, Compressor::kNoCompress);
44+
ASSERT_LE(ss.tellp(), ss2.tellp()); // Compress should contain less data;
45+
46+
ch.Clear();
47+
ch.Parse(ss);
48+
ASSERT_EQ(ch.NumBytes(), 18);
5549
}

0 commit comments

Comments
 (0)