Skip to content

Commit 1c07966

Browse files
qmfrederikbrendandburns
authored andcommitted
Add ChannelIndex enumeration, and some documentation (#142)
1 parent d90289a commit 1c07966

File tree

3 files changed

+160
-0
lines changed

3 files changed

+160
-0
lines changed

src/KubernetesClient/ChannelIndex.cs

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
namespace k8s
2+
{
3+
/// <summary>
4+
/// These values identify the various channels which you can use when interacting with a process running in a container in a Kubernetes
5+
/// pod.
6+
/// </summary>
7+
/// <seealso href="https://github.com/kubernetes/kubernetes/blob/master/pkg/kubelet/server/remotecommand/websocket.go#L29"/>
8+
public enum ChannelIndex : byte
9+
{
10+
/// <summary>
11+
/// The standard input channel. Use this channel to send input to a process running in a container inside a Kubernetes pod.
12+
/// </summary>
13+
StdIn,
14+
15+
/// <summary>
16+
/// The standard output channel. Use this channel to read standard output generated by a process running in a container in a Kubernetes pod.
17+
/// </summary>
18+
StdOut,
19+
20+
/// <summary>
21+
/// The standard error channel. Use this channel to read the error output generated by a process running in a container in a Kubernetes pod.
22+
/// </summary>
23+
StdErr,
24+
25+
/// <summary>
26+
/// The error channel. This channel is used by Kubernetes to send you error messages, including the exit code of the process.
27+
/// </summary>
28+
Error,
29+
30+
/// <summary>
31+
/// The resize channel. Use this channel to resize the terminal. You need to send a JSON-formatted object over this channel, which
32+
/// has a Width and Height property.
33+
/// </summary>
34+
/// <seealso href="https://github.com/kubernetes/kubernetes/blob/master/staging/src/k8s.io/client-go/tools/remotecommand/resize.go"/>
35+
Resize
36+
}
37+
}

src/KubernetesClient/MuxedStream.cs

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,12 +3,27 @@
33

44
namespace k8s
55
{
6+
/// <summary>
7+
/// A <see cref="Stream"/> which reads/writes from a specific channel using a <see cref="StreamDemuxer" />.
8+
/// </summary>
69
public class MuxedStream : Stream
710
{
811
private ByteBuffer inputBuffer;
912
private byte? outputIndex;
1013
private StreamDemuxer muxer;
1114

15+
/// <summary>
16+
/// Initializes a new instance of the <see cref="MuxedStream"/> class.
17+
/// </summary>
18+
/// <param name="muxer">
19+
/// The <see cref="StreamDemuxer"/> to use to read from/write to the underlying stream.
20+
/// </param>
21+
/// <param name="inputBuffer">
22+
/// The <see cref="inputBuffer"/> to read from.
23+
/// </param>
24+
/// <param name="outputIndex">
25+
/// The index of the channel to which to write.
26+
/// </param>
1227
public MuxedStream(StreamDemuxer muxer, ByteBuffer inputBuffer, byte? outputIndex)
1328
{
1429
this.inputBuffer = inputBuffer;
@@ -22,20 +37,26 @@ public MuxedStream(StreamDemuxer muxer, ByteBuffer inputBuffer, byte? outputInde
2237
this.muxer = muxer ?? throw new ArgumentNullException(nameof(muxer));
2338
}
2439

40+
/// <inheritdoc/>
2541
public override bool CanRead => this.inputBuffer != null;
2642

43+
/// <inheritdoc/>
2744
public override bool CanSeek => false;
2845

46+
/// <inheritdoc/>
2947
public override bool CanWrite => this.outputIndex != null;
3048

49+
/// <inheritdoc/>
3150
public override long Length => throw new NotSupportedException();
3251

52+
/// <inheritdoc/>
3353
public override long Position
3454
{
3555
get => throw new NotSupportedException();
3656
set => throw new NotSupportedException();
3757
}
3858

59+
/// <inheritdoc/>
3960
public override void Write(byte[] buffer, int offset, int count)
4061
{
4162
if (this.outputIndex == null)
@@ -48,6 +69,7 @@ public override void Write(byte[] buffer, int offset, int count)
4869
}
4970
}
5071

72+
/// <inheritdoc/>
5173
public override int Read(byte[] buffer, int offset, int count)
5274
{
5375
if (this.inputBuffer == null)
@@ -60,16 +82,19 @@ public override int Read(byte[] buffer, int offset, int count)
6082
}
6183
}
6284

85+
/// <inheritdoc/>
6386
public override void Flush()
6487
{
6588
throw new NotSupportedException();
6689
}
6790

91+
/// <inheritdoc/>
6892
public override long Seek(long offset, SeekOrigin origin)
6993
{
7094
throw new NotSupportedException();
7195
}
7296

97+
/// <inheritdoc/>
7398
public override void SetLength(long value)
7499
{
75100
throw new NotSupportedException();

src/KubernetesClient/StreamDemuxer.cs

Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,25 +9,47 @@
99

1010
namespace k8s
1111
{
12+
/// <summary>
13+
/// <para>
14+
/// The <see cref="StreamDemuxer"/> allows you to interact with processes running in a container in a Kubernetes pod. You can start an exec or attach command
15+
/// by calling <see cref="Kubernetes.WebSocketNamespacedPodExecAsync(string, string, IEnumerable{string}, string, bool, bool, bool, bool, Dictionary{string, List{string}}, CancellationToken)"/>
16+
/// or <see cref="Kubernetes.WebSocketNamespacedPodAttachAsync(string, string, string, bool, bool, bool, bool, Dictionary{string, List{string}}, CancellationToken)"/>. These methods
17+
/// will return you a <see cref="WebSocket"/> connection.
18+
/// </para>
19+
/// <para>
20+
/// Kubernetes 'multiplexes' multiple channels over this <see cref="WebSocket"/> connection, such as standard input, standard output and standard error. The <see cref="StreamDemuxer"/>
21+
/// allows you to extract individual <see cref="Stream"/>s from this <see cref="WebSocket"/> class. You can then use these streams to send/receive data from that process.
22+
/// </para>
23+
/// </summary>
1224
public class StreamDemuxer : IDisposable
1325
{
1426
private readonly WebSocket webSocket;
1527
private readonly Dictionary<byte, ByteBuffer> buffers = new Dictionary<byte, ByteBuffer>();
1628
private readonly CancellationTokenSource cts = new CancellationTokenSource();
1729
private Task runLoop;
1830

31+
/// <summary>
32+
/// Initializes a new instance of the <see cref="StreamDemuxer"/> class.
33+
/// </summary>
34+
/// <param name="webSocket">
35+
/// A <see cref="WebSocket"/> which contains a multiplexed stream, such as the <see cref="WebSocket"/> returned by the exec or attach commands.
36+
/// </param>
1937
public StreamDemuxer(WebSocket webSocket)
2038
{
2139
this.webSocket = webSocket ?? throw new ArgumentNullException(nameof(webSocket));
2240
}
2341

2442
public event EventHandler ConnectionClosed;
2543

44+
/// <summary>
45+
/// Starts reading the data sent by the server.
46+
/// </summary>
2647
public void Start()
2748
{
2849
this.runLoop = this.RunLoop(this.cts.Token);
2950
}
3051

52+
/// <inheritdoc/>
3153
public void Dispose()
3254
{
3355
try
@@ -45,6 +67,35 @@ public void Dispose()
4567
}
4668
}
4769

70+
/// <summary>
71+
/// Gets a <see cref="Stream"/> which allows you to read to and/or write from a remote channel.
72+
/// </summary>
73+
/// <param name="inputIndex">
74+
/// The index of the channel from which to read.
75+
/// </param>
76+
/// <param name="outputIndex">
77+
/// The index of the channel to which to write.
78+
/// </param>
79+
/// <returns>
80+
/// A <see cref="Stream"/> which allows you to read/write to the requested channels.
81+
/// </returns>
82+
public Stream GetStream(ChannelIndex? inputIndex, ChannelIndex? outputIndex)
83+
{
84+
return GetStream((byte?)inputIndex, (byte?)outputIndex);
85+
}
86+
87+
/// <summary>
88+
/// Gets a <see cref="Stream"/> which allows you to read to and/or write from a remote channel.
89+
/// </summary>
90+
/// <param name="inputIndex">
91+
/// The index of the channel from which to read.
92+
/// </param>
93+
/// <param name="outputIndex">
94+
/// The index of the channel to which to write.
95+
/// </param>
96+
/// <returns>
97+
/// A <see cref="Stream"/> which allows you to read/write to the requested channels.
98+
/// </returns>
4899
public Stream GetStream(byte? inputIndex, byte? outputIndex)
49100
{
50101
if (inputIndex != null && !this.buffers.ContainsKey(inputIndex.Value))
@@ -60,6 +111,53 @@ public Stream GetStream(byte? inputIndex, byte? outputIndex)
60111
return new MuxedStream(this, inputBuffer, outputIndex);
61112
}
62113

114+
/// <summary>
115+
/// Directly writes data to a channel.
116+
/// </summary>
117+
/// <param name="index">
118+
/// The index of the channel to which to write.
119+
/// </param>
120+
/// <param name="buffer">
121+
/// The buffer from which to read data.
122+
/// </param>
123+
/// <param name="offset">
124+
/// The offset at which to start reading.
125+
/// </param>
126+
/// <param name="count">
127+
/// The number of bytes to read.
128+
/// </param>
129+
/// <param name="cancellationToken">
130+
/// A <see cref="CancellationToken"/> which can be used to cancel the asynchronous operation.
131+
/// </param>
132+
/// <returns>
133+
/// A <see cref="Task"/> which represents the asynchronous operation.
134+
/// </returns>
135+
public Task Write(ChannelIndex index, byte[] buffer, int offset, int count, CancellationToken cancellationToken = default(CancellationToken))
136+
{
137+
return Write((byte)index, buffer, offset, count, cancellationToken);
138+
}
139+
140+
/// <summary>
141+
/// Directly writes data to a channel.
142+
/// </summary>
143+
/// <param name="index">
144+
/// The index of the channel to which to write.
145+
/// </param>
146+
/// <param name="buffer">
147+
/// The buffer from which to read data.
148+
/// </param>
149+
/// <param name="offset">
150+
/// The offset at which to start reading.
151+
/// </param>
152+
/// <param name="count">
153+
/// The number of bytes to read.
154+
/// </param>
155+
/// <param name="cancellationToken">
156+
/// A <see cref="CancellationToken"/> which can be used to cancel the asynchronous operation.
157+
/// </param>
158+
/// <returns>
159+
/// A <see cref="Task"/> which represents the asynchronous operation.
160+
/// </returns>
63161
public async Task Write(byte index, byte[] buffer, int offset, int count, CancellationToken cancellationToken = default(CancellationToken))
64162
{
65163
byte[] writeBuffer = ArrayPool<byte>.Shared.Rent(count + 1);

0 commit comments

Comments
 (0)