Skip to content

Commit 3f8b898

Browse files
committed
[devtool] create stream_data_sink
Pull Request resolved: #8604 this diff create `StreamDataSink`, a datasink that adopts array as buffer and flush to disk when buffer is full. ghstack-source-id: 268188577 @exported-using-ghexport Differential Revision: [D69936705](https://our.internmc.facebook.com/intern/diff/D69936705/)
1 parent 0e4b6ec commit 3f8b898

File tree

5 files changed

+438
-0
lines changed

5 files changed

+438
-0
lines changed
Lines changed: 125 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,125 @@
1+
/*
2+
* Copyright (c) Meta Platforms, Inc. and affiliates.
3+
* All rights reserved.
4+
*
5+
* This source code is licensed under the BSD-style license found in the
6+
* LICENSE file in the root directory of this source tree.
7+
*/
8+
9+
#include <executorch/devtools/etdump/data_sinks/stream_data_sink.h>
10+
#include <executorch/devtools/etdump/utils.h>
11+
#include <fcntl.h> // open()
12+
#include <unistd.h> // For write and close
13+
#include <cstring>
14+
15+
using ::executorch::runtime::Error;
16+
using ::executorch::runtime::Result;
17+
18+
namespace executorch {
19+
namespace etdump {
20+
21+
StreamDataSink::StreamDataSink(StreamDataSink&& other) noexcept
22+
: buffer_(std::move(other.buffer_)),
23+
buffer_offset_(other.buffer_offset_),
24+
file_descriptor_(other.file_descriptor_),
25+
total_written_bytes_(other.total_written_bytes_),
26+
alignment_(other.alignment_) {
27+
other.file_descriptor_ = -1;
28+
}
29+
30+
Result<StreamDataSink> StreamDataSink::create(
31+
void* buffer_data_ptr,
32+
size_t buffer_size,
33+
const char* file_path,
34+
size_t alignment) {
35+
// Check if alignment is a power of two
36+
if (alignment == 0 || (alignment & (alignment - 1)) != 0) {
37+
return Error::InvalidArgument;
38+
}
39+
40+
// Open the file and get the file descriptor
41+
// It will fail if the file already exists
42+
int file_descriptor = open(file_path, O_WRONLY | O_CREAT | O_EXCL, 0644);
43+
if (file_descriptor < 0) {
44+
// Return an error if the file cannot be accessed or created
45+
ET_LOG(
46+
Error, "Failed to open %s: %s (%d)", file_path, strerror(errno), errno);
47+
return Error::AccessFailed;
48+
}
49+
50+
// Return the successfully created StreamDataSink
51+
return StreamDataSink(
52+
buffer_data_ptr, buffer_size, file_descriptor, alignment);
53+
}
54+
55+
StreamDataSink::~StreamDataSink() {
56+
// Flush to and close the file descriptor
57+
if (file_descriptor_ >= 0) {
58+
flush();
59+
close(file_descriptor_);
60+
}
61+
}
62+
63+
Result<size_t> StreamDataSink::write(const void* ptr, size_t size) {
64+
if (size == 0) {
65+
// No data to write, return current offset
66+
return buffer_offset_ + total_written_bytes_;
67+
}
68+
69+
const uint8_t* data_ptr = static_cast<const uint8_t*>(ptr);
70+
71+
// Align the buffer offset
72+
uint8_t* aligned_ptr =
73+
internal::align_pointer(buffer_.data() + buffer_offset_, alignment_);
74+
75+
// Zero out the padding between data blobs
76+
size_t n_zero_pad = aligned_ptr - (buffer_.data() + buffer_offset_);
77+
memset(buffer_.data() + buffer_offset_, 0, n_zero_pad);
78+
79+
// Calculate the new offset
80+
size_t new_offset = (aligned_ptr - buffer_.data()) + size;
81+
82+
if (new_offset > buffer_.size()) {
83+
// If the new offset is out of range, flush the buffer and try again
84+
Result<bool> ret = flush();
85+
if (!ret.ok()) {
86+
return ret.error();
87+
}
88+
89+
aligned_ptr = internal::align_pointer(buffer_.data(), alignment_);
90+
n_zero_pad = aligned_ptr - buffer_.data();
91+
memset(buffer_.data(), 0, n_zero_pad);
92+
new_offset = (aligned_ptr - buffer_.data()) + size;
93+
94+
if (new_offset > buffer_.size()) {
95+
return Error::OutOfResources;
96+
}
97+
}
98+
99+
// Copy data to the aligned position
100+
std::memcpy(aligned_ptr, data_ptr, size);
101+
buffer_offset_ = new_offset;
102+
103+
return aligned_ptr - buffer_.data() + total_written_bytes_;
104+
}
105+
106+
size_t StreamDataSink::get_used_bytes() const {
107+
return total_written_bytes_ + buffer_offset_;
108+
}
109+
110+
Result<bool> StreamDataSink::flush() {
111+
if (buffer_offset_ > 0) {
112+
ssize_t written = ::write(file_descriptor_, buffer_.data(), buffer_offset_);
113+
if (written != buffer_offset_) {
114+
return Error::Internal;
115+
}
116+
117+
total_written_bytes_ += written;
118+
buffer_offset_ = 0;
119+
}
120+
121+
return true;
122+
}
123+
124+
} // namespace etdump
125+
} // namespace executorch
Lines changed: 121 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,121 @@
1+
/*
2+
* Copyright (c) Meta Platforms, Inc. and affiliates.
3+
* All rights reserved.
4+
*
5+
* This source code is licensed under the BSD-style license found in the
6+
* LICENSE file in the root directory of this source tree.
7+
*/
8+
9+
// (c) Meta Platforms, Inc. and affiliates. Confidential and proprietary.
10+
11+
#pragma once
12+
13+
#include <executorch/devtools/etdump/data_sinks/data_sink_base.h>
14+
#include <executorch/runtime/core/exec_aten/exec_aten.h>
15+
#include <executorch/runtime/core/span.h>
16+
#include <unistd.h> // For file operations
17+
18+
namespace executorch {
19+
namespace etdump {
20+
/**
21+
* StreamDataSink is a concrete implementation of DataSinkBase that manages
22+
* the storage of data blobs using a buffer and a file descriptor. It writes
23+
* data to a buffer first and flushes it to a file when the buffer is full,
24+
* or when the flush() method is called explicitly. It is useful for storing
25+
* large amounts of data in a file without consuming excessive memory.
26+
*
27+
* Noted that
28+
* - This class is demonstrated purpose only, and not intended to be used or
29+
* existed in production.
30+
* - The buffer is provided and owned by user at construction time. The user is
31+
* responsible for ensuring the validity and ownership of these resources.
32+
*/
33+
class StreamDataSink : public DataSinkBase {
34+
public:
35+
/**
36+
* Creates a StreamDataSink with a given buffer and file path.
37+
*
38+
* @param[in] buffer_data_ptr A pointer to the buffer used for temporary
39+
* storage.
40+
* @param[in] buffer_size The size of the buffer.
41+
* @param[in] file_path The path to the file for writing data.
42+
* @param[in] alignment The alignment requirement for the buffer. Default
43+
* is 64.
44+
* @returns A new StreamDataSink on success.
45+
* @retval Error::InvalidArgument `alignment` is not a power of two.
46+
* @retval Error::AccessFailed Cannot access/create the file using
47+
* `file_path`.
48+
*/
49+
static ::executorch::runtime::Result<StreamDataSink> create(
50+
void* buffer_data_ptr,
51+
size_t buffer_size,
52+
const char* file_path,
53+
size_t alignment = 64);
54+
55+
/**
56+
* Destructor that ensures all buffered data is flushed to the file.
57+
*/
58+
~StreamDataSink() override;
59+
60+
// Delete copy constructor and copy assignment operator
61+
StreamDataSink(const StreamDataSink&) = delete;
62+
StreamDataSink& operator=(const StreamDataSink&) = delete;
63+
64+
StreamDataSink(StreamDataSink&& other) noexcept;
65+
StreamDataSink& operator=(StreamDataSink&& other) = default;
66+
67+
/**
68+
* Writes data into the debug storage aligned to the given alignment.
69+
*
70+
* @param[in] ptr A pointer to the data to be written into the storage.
71+
* @param[in] size The size of the data in bytes.
72+
* @return A Result object containing either:
73+
* - The offset of the starting location of the data within the
74+
* debug storage, or
75+
* - An error code indicating the failure reason.
76+
*/
77+
::executorch::runtime::Result<size_t> write(const void* ptr, size_t size)
78+
override;
79+
80+
/**
81+
* Gets the number of bytes currently used in the debug storage.
82+
*
83+
* @return The amount of data currently stored in bytes.
84+
*/
85+
size_t get_used_bytes() const override;
86+
87+
/**
88+
* Flushes all buffered data to the file. No alignment is applied.
89+
*
90+
* @return A Result object containing either:
91+
* - true, indicating the flush operation was successful, or
92+
* - An error code indicating the failure reason.
93+
*/
94+
::executorch::runtime::Result<bool> flush();
95+
96+
private:
97+
/**
98+
* Constructs a StreamDataSink with a given buffer and file descriptor.
99+
*
100+
* @param[in] buffer A span representing the buffer used for temporary
101+
* storage.
102+
* @param[in] file_descriptor A valid file descriptor for writing data.
103+
*/
104+
StreamDataSink(
105+
void* buffer_data_ptr,
106+
size_t buffer_size,
107+
int file_descriptor,
108+
size_t alignment)
109+
: buffer_({static_cast<uint8_t*>(buffer_data_ptr), buffer_size}),
110+
buffer_offset_(0),
111+
file_descriptor_(file_descriptor),
112+
total_written_bytes_(0),
113+
alignment_(alignment) {}
114+
::executorch::runtime::Span<uint8_t> buffer_;
115+
size_t buffer_offset_;
116+
int file_descriptor_;
117+
size_t total_written_bytes_;
118+
size_t alignment_;
119+
};
120+
} // namespace etdump
121+
} // namespace executorch

devtools/etdump/data_sinks/targets.bzl

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,3 +47,4 @@ def define_common_targets():
4747
)
4848

4949
define_data_sink_target("buffer_data_sink", aten_suffix)
50+
define_data_sink_target("stream_data_sink", aten_suffix)

0 commit comments

Comments
 (0)