Skip to content

Commit 702e269

Browse files
committed
[devtool] create stream_data_sink
this diff create `StreamDataSink`, a datasink that adopts array as buffer and flush to disk when buffer is full. Differential Revision: [D69936705](https://our.internmc.facebook.com/intern/diff/D69936705/) ghstack-source-id: 267462092 Pull Request resolved: #8604
1 parent 82b137a commit 702e269

File tree

5 files changed

+485
-20
lines changed

5 files changed

+485
-20
lines changed
Lines changed: 130 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,130 @@
1+
/*
2+
* Copyright (c) Meta Platforms, Inc. and affiliates.
3+
* All rights reserved.
4+
*
5+
* This source code is licensed under the BSD-style license found in the
6+
* LICENSE file in the root directory of this source tree.
7+
*/
8+
9+
#include <executorch/devtools/etdump/data_sinks/stream_data_sink.h>
10+
#include <executorch/devtools/etdump/utils.h>
11+
#include <fcntl.h> // open()
12+
#include <unistd.h> // For write and close
13+
#include <cstring>
14+
15+
using ::executorch::runtime::Error;
16+
using ::executorch::runtime::Result;
17+
18+
namespace executorch {
19+
namespace etdump {
20+
21+
StreamDataSink::StreamDataSink(StreamDataSink&& other)
22+
: buffer_(std::move(other.buffer_)),
23+
buffer_offset_(other.buffer_offset_),
24+
file_descriptor_(other.file_descriptor_),
25+
total_written_bytes_(other.total_written_bytes_),
26+
alignment_(other.alignment_) {
27+
other.file_descriptor_ = -1;
28+
}
29+
30+
Result<StreamDataSink> StreamDataSink::create(
31+
void* buffer_data_ptr,
32+
size_t buffer_size,
33+
const char* file_path,
34+
size_t alignment) {
35+
// Check if alignment is a power of two
36+
if (alignment == 0 || (alignment & (alignment - 1)) != 0) {
37+
return Error::InvalidArgument;
38+
}
39+
40+
// Open the file and get the file descriptor
41+
// It will fail if the file already exists
42+
int file_descriptor = open(file_path, O_WRONLY | O_CREAT | O_EXCL, 0644);
43+
if (file_descriptor < 0) {
44+
// Return an error if the file cannot be accessed or created
45+
ET_LOG(
46+
Error, "Failed to open %s: %s (%d)", file_path, strerror(errno), errno);
47+
return Error::AccessFailed;
48+
}
49+
50+
// Return the successfully created StreamDataSink
51+
return StreamDataSink(
52+
buffer_data_ptr, buffer_size, file_descriptor, alignment);
53+
}
54+
55+
StreamDataSink::~StreamDataSink() {
56+
// Flush to and close the file descriptor
57+
if (file_descriptor_ >= 0) {
58+
flush();
59+
close(file_descriptor_);
60+
}
61+
}
62+
63+
Result<size_t> StreamDataSink::write(const void* ptr, size_t size) {
64+
if (size == 0) {
65+
// No data to write, return current offset
66+
return buffer_offset_ + total_written_bytes_;
67+
}
68+
69+
const uint8_t* data_ptr = static_cast<const uint8_t*>(ptr);
70+
71+
// Align the buffer offset
72+
uint8_t* aligned_ptr =
73+
internal::align_pointer(buffer_.data() + buffer_offset_, alignment_);
74+
75+
// Zero out the padding between data blobs
76+
size_t n_zero_pad = aligned_ptr - (buffer_.data() + buffer_offset_);
77+
memset(buffer_.data() + buffer_offset_, 0, n_zero_pad);
78+
79+
// Calculate the new offset
80+
size_t new_offset = (aligned_ptr - buffer_.data()) + size;
81+
82+
if (new_offset > buffer_.size()) {
83+
// If the new offset is out of range, flush the buffer and try again
84+
Result<bool> ret = flush();
85+
if (!ret.ok()) {
86+
return ret.error();
87+
}
88+
89+
aligned_ptr = internal::align_pointer(buffer_.data(), alignment_);
90+
n_zero_pad = aligned_ptr - buffer_.data();
91+
memset(buffer_.data(), 0, n_zero_pad);
92+
new_offset = (aligned_ptr - buffer_.data()) + size;
93+
94+
if (new_offset > buffer_.size()) {
95+
return Error::OutOfResources;
96+
}
97+
}
98+
99+
// Copy data to the aligned position
100+
std::memcpy(aligned_ptr, data_ptr, size);
101+
buffer_offset_ = new_offset;
102+
103+
return aligned_ptr - buffer_.data() + total_written_bytes_;
104+
}
105+
106+
Result<size_t> StreamDataSink::get_storage_size() const {
107+
// Return an error indicating that the storage size is not supported
108+
return Error::NotSupported;
109+
}
110+
111+
size_t StreamDataSink::get_used_bytes() const {
112+
return total_written_bytes_ + buffer_offset_;
113+
}
114+
115+
Result<bool> StreamDataSink::flush() {
116+
if (buffer_offset_ > 0) {
117+
ssize_t written = ::write(file_descriptor_, buffer_.data(), buffer_offset_);
118+
if (written != buffer_offset_) {
119+
return Error::Internal;
120+
}
121+
122+
total_written_bytes_ += written;
123+
buffer_offset_ = 0;
124+
}
125+
126+
return true;
127+
}
128+
129+
} // namespace etdump
130+
} // namespace executorch
Lines changed: 129 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,129 @@
1+
/*
2+
* Copyright (c) Meta Platforms, Inc. and affiliates.
3+
* All rights reserved.
4+
*
5+
* This source code is licensed under the BSD-style license found in the
6+
* LICENSE file in the root directory of this source tree.
7+
*/
8+
9+
// (c) Meta Platforms, Inc. and affiliates. Confidential and proprietary.
10+
11+
#pragma once
12+
13+
#include <executorch/devtools/etdump/data_sinks/data_sink_base.h>
14+
#include <executorch/runtime/core/exec_aten/exec_aten.h>
15+
#include <executorch/runtime/core/span.h>
16+
#include <unistd.h> // For file operations
17+
18+
namespace executorch {
19+
namespace etdump {
20+
/**
21+
* StreamDataSink is a concrete implementation of DataSinkBase that manages
22+
* the storage of data blobs using a buffer and a file descriptor. It writes
23+
* data to a buffer first and flushes it to a file when the buffer is full,
24+
* or when the flush() method is called explicitly. It is useful for storing
25+
* large amounts of data in a file without consuming excessive memory.
26+
*
27+
* Noted that
28+
* - This class is demonstrated purpose only, and not intended to be used or
29+
* existed in production.
30+
* - The buffer is provided and owned by user at construction time. The user is
31+
* responsible for ensuring the validity and ownership of these resources.
32+
*/
33+
class StreamDataSink : public DataSinkBase {
34+
public:
35+
/**
36+
* Creates a StreamDataSink with a given buffer and file path.
37+
*
38+
* @param[in] buffer_data_ptr A pointer to the buffer used for temporary
39+
* storage.
40+
* @param[in] buffer_size The size of the buffer.
41+
* @param[in] file_path The path to the file for writing data.
42+
* @param[in] alignment The alignment requirement for the buffer. Default
43+
* is 64.
44+
* @returns A new StreamDataSink on success.
45+
* @retval Error::InvalidArgument `alignment` is not a power of two.
46+
* @retval Error::AccessFailed Cannot access/create the file using
47+
* `file_path`.
48+
*/
49+
static ::executorch::runtime::Result<StreamDataSink> create(
50+
void* buffer_data_ptr,
51+
size_t buffer_size,
52+
const char* file_path,
53+
size_t alignment = 64);
54+
55+
/**
56+
* Destructor that ensures all buffered data is flushed to the file.
57+
*/
58+
~StreamDataSink() override;
59+
60+
// Delete copy constructor and copy assignment operator
61+
StreamDataSink(const StreamDataSink&) = delete;
62+
StreamDataSink& operator=(const StreamDataSink&) = delete;
63+
64+
StreamDataSink(StreamDataSink&& other);
65+
StreamDataSink& operator=(StreamDataSink&& other) = default;
66+
67+
/**
68+
* Writes data into the debug storage aligned to the given alignment.
69+
*
70+
* @param[in] ptr A pointer to the data to be written into the storage.
71+
* @param[in] size The size of the data in bytes.
72+
* @return A Result object containing either:
73+
* - The offset of the starting location of the data within the
74+
* debug storage, or
75+
* - An error code indicating the failure reason.
76+
*/
77+
::executorch::runtime::Result<size_t> write(const void* ptr, size_t size)
78+
override;
79+
80+
/**
81+
* Return Error::NotSupported in StreamDataSink since file is unbounded
82+
* destination.
83+
*
84+
* @return Error::NotSupported
85+
*/
86+
::executorch::runtime::Result<size_t> get_storage_size() const override;
87+
88+
/**
89+
* Gets the number of bytes currently used in the debug storage.
90+
*
91+
* @return The amount of data currently stored in bytes.
92+
*/
93+
size_t get_used_bytes() const override;
94+
95+
/**
96+
* Flushes all buffered data to the file. No alignment is applied.
97+
*
98+
* @return A Result object containing either:
99+
* - true, indicating the flush operation was successful, or
100+
* - An error code indicating the failure reason.
101+
*/
102+
::executorch::runtime::Result<bool> flush();
103+
104+
private:
105+
/**
106+
* Constructs a StreamDataSink with a given buffer and file descriptor.
107+
*
108+
* @param[in] buffer A span representing the buffer used for temporary
109+
* storage.
110+
* @param[in] file_descriptor A valid file descriptor for writing data.
111+
*/
112+
StreamDataSink(
113+
void* buffer_data_ptr,
114+
size_t buffer_size,
115+
int file_descriptor,
116+
size_t alignment)
117+
: buffer_({static_cast<uint8_t*>(buffer_data_ptr), buffer_size}),
118+
buffer_offset_(0),
119+
file_descriptor_(file_descriptor),
120+
total_written_bytes_(0),
121+
alignment_(alignment) {}
122+
::executorch::runtime::Span<uint8_t> buffer_;
123+
size_t buffer_offset_;
124+
int file_descriptor_;
125+
size_t total_written_bytes_;
126+
size_t alignment_;
127+
};
128+
} // namespace etdump
129+
} // namespace executorch
Lines changed: 24 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,28 @@
11
load("@fbsource//xplat/executorch/build:runtime_wrapper.bzl", "runtime")
22

33

4+
def define_data_sink_target(data_sink_name, aten_suffix):
5+
runtime.cxx_library(
6+
name = data_sink_name + aten_suffix,
7+
exported_headers = [
8+
data_sink_name + ".h",
9+
],
10+
srcs = [
11+
data_sink_name + ".cpp",
12+
],
13+
deps = [
14+
"//executorch/devtools/etdump:utils",
15+
],
16+
exported_deps = [
17+
"//executorch/runtime/core/exec_aten:lib" + aten_suffix,
18+
":data_sink_base" + aten_suffix,
19+
],
20+
visibility = [
21+
"//executorch/...",
22+
"@EXECUTORCH_CLIENTS",
23+
],
24+
)
25+
426
def define_common_targets():
527
"""Defines targets that should be shared between fbcode and xplat.
628
@@ -24,23 +46,5 @@ def define_common_targets():
2446
],
2547
)
2648

27-
runtime.cxx_library(
28-
name = "buffer_data_sink" + aten_suffix,
29-
exported_headers = [
30-
"buffer_data_sink.h",
31-
],
32-
srcs = [
33-
"buffer_data_sink.cpp",
34-
],
35-
deps = [
36-
"//executorch/devtools/etdump:utils",
37-
],
38-
exported_deps = [
39-
"//executorch/runtime/core/exec_aten:lib" + aten_suffix,
40-
":data_sink_base" + aten_suffix,
41-
],
42-
visibility = [
43-
"//executorch/...",
44-
"@EXECUTORCH_CLIENTS",
45-
],
46-
)
49+
define_data_sink_target("buffer_data_sink", aten_suffix)
50+
define_data_sink_target("stream_data_sink", aten_suffix)

0 commit comments

Comments
 (0)