1
+ use bytes:: Bytes ;
1
2
use magnus:: {
2
3
value:: { InnerValue , Opaque , ReprValue } ,
3
4
RString , Ruby ,
4
5
} ;
5
- use std:: io;
6
- use std:: io:: ErrorKind ;
6
+ use std:: io:: Write ;
7
+ use std:: sync:: { Arc , Mutex } ;
8
+ use wasmtime_wasi:: p2:: { OutputStream , Pollable , StdoutStream , StreamError , StreamResult } ;
7
9
8
10
/// A buffer that limits the number of bytes that can be written to it.
9
11
/// If the buffer is full, it will truncate the data.
10
- /// Is used in the buffer implementations of stdout and stderr in `WasiCtx ` and `WasiCtxBuilder`.
12
+ /// Is used in the buffer implementations of stdout and stderr in `WasiP1Ctx ` and `WasiCtxBuilder`.
11
13
pub struct OutputLimitedBuffer {
14
+ inner : Arc < Mutex < OutputLimitedBufferInner > > ,
15
+ }
16
+
17
+ impl OutputLimitedBuffer {
18
+ /// Creates a new [OutputLimitedBuffer] with the given underlying buffer
19
+ /// and capacity.
20
+ pub fn new ( buffer : Opaque < RString > , capacity : usize ) -> Self {
21
+ Self {
22
+ inner : Arc :: new ( Mutex :: new ( OutputLimitedBufferInner :: new ( buffer, capacity) ) ) ,
23
+ }
24
+ }
25
+ }
26
+
27
+ impl Clone for OutputLimitedBuffer {
28
+ fn clone ( & self ) -> Self {
29
+ Self {
30
+ inner : Arc :: clone ( & self . inner ) ,
31
+ }
32
+ }
33
+ }
34
+
35
+ impl StdoutStream for OutputLimitedBuffer {
36
+ fn stream ( & self ) -> Box < dyn OutputStream > {
37
+ let cloned = self . clone ( ) ;
38
+ Box :: new ( cloned)
39
+ }
40
+
41
+ fn isatty ( & self ) -> bool {
42
+ false
43
+ }
44
+ }
45
+
46
+ #[ async_trait:: async_trait]
47
+ impl Pollable for OutputLimitedBuffer {
48
+ async fn ready ( & mut self ) { }
49
+ }
50
+
51
+ impl OutputStream for OutputLimitedBuffer {
52
+ fn write ( & mut self , bytes : Bytes ) -> StreamResult < ( ) > {
53
+ let mut stream = self . inner . lock ( ) . expect ( "Should be only writer" ) ;
54
+ stream. write ( & bytes)
55
+ }
56
+
57
+ fn flush ( & mut self ) -> StreamResult < ( ) > {
58
+ Ok ( ( ) )
59
+ }
60
+
61
+ fn check_write ( & mut self ) -> StreamResult < usize > {
62
+ let mut stream = self . inner . lock ( ) . expect ( "Should be only writer" ) ;
63
+ stream. check_write ( )
64
+ }
65
+ }
66
+
67
+ struct OutputLimitedBufferInner {
12
68
buffer : Opaque < RString > ,
13
69
/// The maximum number of bytes that can be written to the output stream buffer.
14
70
capacity : usize ,
15
71
}
16
72
17
- impl OutputLimitedBuffer {
73
+ impl OutputLimitedBufferInner {
18
74
#[ must_use]
19
75
pub fn new ( buffer : Opaque < RString > , capacity : usize ) -> Self {
20
76
Self { buffer, capacity }
21
77
}
22
78
}
23
79
24
- impl io:: Write for OutputLimitedBuffer {
25
- fn write ( & mut self , buf : & [ u8 ] ) -> std:: io:: Result < usize > {
80
+ impl OutputLimitedBufferInner {
81
+ fn check_write ( & mut self ) -> StreamResult < usize > {
82
+ Ok ( usize:: MAX )
83
+ }
84
+
85
+ fn write ( & mut self , buf : & [ u8 ] ) -> StreamResult < ( ) > {
26
86
// Append a buffer to the string and truncate when hitting the capacity.
27
87
// We return the input buffer size regardless of whether we truncated or not to avoid a panic.
28
88
let ruby = Ruby :: get ( ) . unwrap ( ) ;
@@ -32,39 +92,28 @@ impl io::Write for OutputLimitedBuffer {
32
92
// Handling frozen case here is necessary because magnus does not check if a string is frozen before writing to it.
33
93
let is_frozen = inner_buffer. as_value ( ) . is_frozen ( ) ;
34
94
if is_frozen {
35
- return Err ( io:: Error :: new (
36
- ErrorKind :: WriteZero ,
37
- "Cannot write to a frozen buffer." ,
38
- ) ) ;
95
+ return Err ( StreamError :: trap ( "Cannot write to a frozen buffer." ) ) ;
39
96
}
40
97
41
98
if buf. is_empty ( ) {
42
- return Ok ( 0 ) ;
99
+ return Ok ( ( ) ) ;
43
100
}
44
101
45
102
if inner_buffer
46
103
. len ( )
47
104
. checked_add ( buf. len ( ) )
48
105
. is_some_and ( |val| val < self . capacity )
49
106
{
50
- let amount_written = inner_buffer. write ( buf) ?;
51
- if amount_written < buf. len ( ) {
52
- return Ok ( amount_written) ;
53
- }
107
+ inner_buffer
108
+ . write ( buf)
109
+ . map_err ( |e| StreamError :: trap ( & e. to_string ( ) ) ) ?;
54
110
} else {
55
111
let portion = self . capacity - inner_buffer. len ( ) ;
56
- let amount_written = inner_buffer. write ( & buf[ 0 ..portion] ) ?;
57
- if amount_written < portion {
58
- return Ok ( amount_written) ;
59
- }
112
+ inner_buffer
113
+ . write ( & buf[ 0 ..portion] )
114
+ . map_err ( |e| StreamError :: trap ( & e. to_string ( ) ) ) ?;
60
115
} ;
61
116
62
- Ok ( buf. len ( ) )
63
- }
64
-
65
- fn flush ( & mut self ) -> io:: Result < ( ) > {
66
- let ruby = Ruby :: get ( ) . unwrap ( ) ;
67
-
68
- self . buffer . get_inner_with ( & ruby) . flush ( )
117
+ Ok ( ( ) )
69
118
}
70
119
}
0 commit comments