33package main
44
55import (
6+ "bufio"
67 "bytes"
78 "context"
89 "fmt"
@@ -15,8 +16,10 @@ import (
1516 "syscall/js"
1617 "time"
1718
19+ "github.com/coder/coder/v2/coderd/util/ptr"
1820 "github.com/coder/wush/overlay"
1921 "github.com/coder/wush/tsserver"
22+ "github.com/pion/webrtc/v4"
2023 "golang.org/x/crypto/ssh"
2124 "golang.org/x/xerrors"
2225 "tailscale.com/ipn/store"
@@ -46,10 +49,6 @@ func main() {
4649 promiseConstructor := js .Global ().Get ("Promise" )
4750 return promiseConstructor .New (handler )
4851 }))
49- js .Global ().Set ("exitWush" , js .FuncOf (func (this js.Value , args []js.Value ) any {
50- // close(ch)
51- return nil
52- }))
5352
5453 // Keep the main function running
5554 <- make (chan struct {}, 0 )
@@ -66,7 +65,12 @@ func newWush(cfg js.Value) map[string]any {
6665 panic (err )
6766 }
6867
69- ov := overlay .NewWasmOverlay (log .Printf , dm , cfg .Get ("onNewPeer" ))
68+ ov := overlay .NewWasmOverlay (log .Printf , dm ,
69+ cfg .Get ("onNewPeer" ),
70+ cfg .Get ("onWebrtcOffer" ),
71+ cfg .Get ("onWebrtcAnswer" ),
72+ cfg .Get ("onWebrtcCandidate" ),
73+ )
7074
7175 err = ov .PickDERPHome (ctx )
7276 if err != nil {
@@ -116,9 +120,10 @@ func newWush(cfg js.Value) map[string]any {
116120 }
117121
118122 return map [string ]any {
119- "derp_id" : ov .DerpRegionID ,
120- "derp_name" : ov .DerpMap .Regions [int (ov .DerpRegionID )].RegionName ,
121- "auth_key" : ov .ClientAuth ().AuthKey (),
123+ "derp_id" : ov .DerpRegionID ,
124+ "derp_name" : ov .DerpMap .Regions [int (ov .DerpRegionID )].RegionName ,
125+ "derp_latency" : ov .DerpLatency .Milliseconds (),
126+ "auth_key" : ov .ClientAuth ().AuthKey (),
122127 }
123128 }),
124129 "stop" : js .FuncOf (func (this js.Value , args []js.Value ) any {
@@ -131,14 +136,14 @@ func newWush(cfg js.Value) map[string]any {
131136 return nil
132137 }),
133138 "ssh" : js .FuncOf (func (this js.Value , args []js.Value ) any {
134- if len (args ) != 1 {
135- log .Printf ("Usage: ssh({} )" )
139+ if len (args ) != 2 {
140+ log .Printf ("Usage: ssh(peer, config )" )
136141 return nil
137142 }
138143
139144 sess := & sshSession {
140145 ts : ts ,
141- cfg : args [0 ],
146+ cfg : args [1 ],
142147 }
143148
144149 go sess .Run ()
@@ -160,9 +165,9 @@ func newWush(cfg js.Value) map[string]any {
160165 reject := promiseArgs [1 ]
161166
162167 go func () {
163- if len (args ) != 1 {
168+ if len (args ) != 2 {
164169 errorConstructor := js .Global ().Get ("Error" )
165- errorObject := errorConstructor .New ("Usage: connect(authKey)" )
170+ errorObject := errorConstructor .New ("Usage: connect(authKey, offer )" )
166171 reject .Invoke (errorObject )
167172 return
168173 }
@@ -172,7 +177,18 @@ func newWush(cfg js.Value) map[string]any {
172177 authKey = args [0 ].String ()
173178 } else {
174179 errorConstructor := js .Global ().Get ("Error" )
175- errorObject := errorConstructor .New ("Usage: connect(authKey)" )
180+ errorObject := errorConstructor .New ("Usage: connect(authKey, offer)" )
181+ reject .Invoke (errorObject )
182+ return
183+ }
184+
185+ var offer webrtc.SessionDescription
186+ if jsOffer := args [1 ]; jsOffer .Type () == js .TypeObject {
187+ offer .SDP = jsOffer .Get ("sdp" ).String ()
188+ offer .Type = webrtc .NewSDPType (jsOffer .Get ("type" ).String ())
189+ } else {
190+ errorConstructor := js .Global ().Get ("Error" )
191+ errorObject := errorConstructor .New ("Usage: connect(authKey, offer)" )
176192 reject .Invoke (errorObject )
177193 return
178194 }
@@ -187,11 +203,11 @@ func newWush(cfg js.Value) map[string]any {
187203 }
188204
189205 ctx , cancel := context .WithCancel (context .Background ())
190- peer , err := ov .Connect (ctx , ca )
206+ peer , err := ov .Connect (ctx , ca , offer )
191207 if err != nil {
192208 cancel ()
193209 errorConstructor := js .Global ().Get ("Error" )
194- errorObject := errorConstructor .New (fmt .Errorf ("parse authkey : %w" , err ).Error ())
210+ errorObject := errorConstructor .New (fmt .Errorf ("connect to peer : %w" , err ).Error ())
195211 reject .Invoke (errorObject )
196212 return
197213 }
@@ -200,6 +216,7 @@ func newWush(cfg js.Value) map[string]any {
200216 "id" : js .ValueOf (peer .ID ),
201217 "name" : js .ValueOf (peer .Name ),
202218 "ip" : js .ValueOf (peer .IP .String ()),
219+ "type" : js .ValueOf (peer .Type ),
203220 "cancel" : js .FuncOf (func (this js.Value , args []js.Value ) any {
204221 cancel ()
205222 return nil
@@ -220,61 +237,34 @@ func newWush(cfg js.Value) map[string]any {
220237
221238 if len (args ) != 5 {
222239 errorConstructor := js .Global ().Get ("Error" )
223- errorObject := errorConstructor .New ("Usage: transfer(peer, file )" )
240+ errorObject := errorConstructor .New ("Usage: transfer(peer, fileName, sizeBytes, stream, onProgress )" )
224241 reject .Invoke (errorObject )
225242 return nil
226243 }
227244
228245 peer := args [0 ]
229246 ip := peer .Get ("ip" ).String ()
230247 fileName := args [1 ].String ()
231- sizeBytes := args [2 ].Int ()
248+ sizeBytes := int64 ( args [2 ].Int () )
232249 stream := args [3 ]
233- streamHelper := args [4 ]
234-
235- pr , pw := io .Pipe ()
236-
237- goCallback := js .FuncOf (func (this js.Value , args []js.Value ) interface {} {
238- promiseConstructor := js .Global ().Get ("Promise" )
239- return promiseConstructor .New (js .FuncOf (func (this js.Value , promiseArgs []js.Value ) any {
240- resolve := promiseArgs [0 ]
241- _ = promiseArgs [1 ]
242- go func () {
243- if len (args ) == 0 || args [0 ].IsNull () || args [0 ].IsUndefined () {
244- pw .Close ()
245- resolve .Invoke ()
246- return
247- }
248-
249- fmt .Println ("in go callback" )
250- // Convert the JavaScript Uint8Array to a Go byte slice
251- uint8Array := args [0 ]
252- fmt .Println ("type is" , uint8Array .Type ().String ())
253- length := uint8Array .Get ("length" ).Int ()
254- buf := make ([]byte , length )
255- js .CopyBytesToGo (buf , uint8Array )
256-
257- fmt .Println ("sending data to channel" )
258- // Send the data to the channel
259- if _ , err := pw .Write (buf ); err != nil {
260- pw .CloseWithError (err )
261- }
262- fmt .Println ("callback finished" )
263-
264- // Resolve the promise
265- resolve .Invoke ()
266- }()
267- return nil
268- }))
269- })
250+ onProgress := args [4 ]
270251
271252 go func () {
272- defer goCallback .Release ()
273-
274- streamHelper .Invoke (stream , goCallback )
275-
276- hc := ts .HTTPClient ()
277- req , err := http .NewRequest (http .MethodPost , fmt .Sprintf ("http://%s:4444/%s" , ip , fileName ), pr )
253+ startTime := time .Now ()
254+ reader := & jsStreamReader {
255+ reader : stream .Call ("getReader" ),
256+ onProgress : onProgress ,
257+ totalSize : sizeBytes ,
258+ }
259+ bufferSize := 1024 * 1024
260+ hc := & http.Client {
261+ Transport : & http.Transport {
262+ DialContext : ts .Dial ,
263+ ReadBufferSize : bufferSize ,
264+ WriteBufferSize : bufferSize ,
265+ },
266+ }
267+ req , err := http .NewRequest (http .MethodPost , fmt .Sprintf ("http://%s:4444/%s" , ip , fileName ), bufio .NewReaderSize (reader , bufferSize ))
278268 if err != nil {
279269 errorConstructor := js .Global ().Get ("Error" )
280270 errorObject := errorConstructor .New (err .Error ())
@@ -283,6 +273,7 @@ func newWush(cfg js.Value) map[string]any {
283273 }
284274 req .ContentLength = int64 (sizeBytes )
285275
276+ fmt .Printf ("Starting transfer of %d bytes\n " , sizeBytes )
286277 res , err := hc .Do (req )
287278 if err != nil {
288279 errorConstructor := js .Global ().Get ("Error" )
@@ -295,7 +286,10 @@ func newWush(cfg js.Value) map[string]any {
295286 bod := bytes .NewBuffer (nil )
296287 _ , _ = io .Copy (bod , res .Body )
297288
298- fmt .Println (bod .String ())
289+ duration := time .Since (startTime )
290+ speed := float64 (sizeBytes ) / duration .Seconds () / 1024 / 1024 // MB/s
291+ fmt .Printf ("Transfer completed in %v. Speed: %.2f MB/s\n " , duration , speed )
292+
299293 resolve .Invoke ()
300294 }()
301295
@@ -305,6 +299,36 @@ func newWush(cfg js.Value) map[string]any {
305299 promiseConstructor := js .Global ().Get ("Promise" )
306300 return promiseConstructor .New (handler )
307301 }),
302+
303+ "sendWebrtcCandidate" : js .FuncOf (func (this js.Value , args []js.Value ) any {
304+ peer := args [0 ].String ()
305+ candidate := args [1 ]
306+
307+ ov .SendWebrtcCandidate (peer , webrtc.ICECandidateInit {
308+ Candidate : candidate .Get ("candidate" ).String (),
309+ SDPMLineIndex : ptr .Ref (uint16 (candidate .Get ("sdpMLineIndex" ).Int ())),
310+ SDPMid : ptr .Ref (candidate .Get ("sdpMid" ).String ()),
311+ UsernameFragment : ptr .Ref (candidate .Get ("sdpMid" ).String ()),
312+ })
313+
314+ return nil
315+ }),
316+
317+ "parseAuthKey" : js .FuncOf (func (this js.Value , args []js.Value ) any {
318+ authKey := args [0 ].String ()
319+
320+ var ca overlay.ClientAuth
321+ _ = ca .Parse (authKey )
322+ typ := "cli"
323+ if ca .Web {
324+ typ = "web"
325+ }
326+
327+ return map [string ]any {
328+ "id" : js .ValueOf (ca .ReceiverPublicKey .String ()),
329+ "type" : js .ValueOf (typ ),
330+ }
331+ }),
308332 }
309333}
310334
@@ -359,7 +383,7 @@ func (s *sshSession) Run() {
359383 ctx , cancel := context .WithTimeout (context .Background (), time .Duration (timeoutSeconds * float64 (time .Second )))
360384 defer cancel ()
361385 reportProgress (fmt .Sprintf ("Connecting..." ))
362- c , err := s .ts .Dial (ctx , "tcp" , net .JoinHostPort ("100.64.0.0 " , "3" ))
386+ c , err := s .ts .Dial (ctx , "tcp" , net .JoinHostPort ("fd7a:115c:a1e0::1 " , "3" ))
363387 if err != nil {
364388 writeError ("Dial" , err )
365389 return
@@ -538,8 +562,8 @@ func cpH(onIncomingFile js.Value, downloadFile js.Value) http.HandlerFunc {
538562
539563 // Read the entire stream and pass it to JavaScript
540564 for {
541- // Read up to 16KB at a time
542- buf := make ([]byte , 16384 )
565+ // Read up to 1MB at a time
566+ buf := make ([]byte , 1024 * 1024 )
543567 n , err := r .Body .Read (buf )
544568 if err != nil && err != io .EOF {
545569 // Tell the controller we have an error
@@ -582,3 +606,73 @@ func cpH(onIncomingFile js.Value, downloadFile js.Value) http.HandlerFunc {
582606 downloadFile .Invoke (peer , fiName , r .ContentLength , readableStream )
583607 }
584608}
609+
610+ // jsStreamReader implements io.Reader for JavaScript streams
611+ type jsStreamReader struct {
612+ reader js.Value
613+ onProgress js.Value
614+ bytesRead int64
615+ totalSize int64
616+ buffer bytes.Buffer
617+ }
618+
619+ func (r * jsStreamReader ) Read (p []byte ) (n int , err error ) {
620+ if r .bytesRead >= r .totalSize {
621+ return 0 , io .EOF
622+ }
623+
624+ fmt .Printf ("Read %d bytes\n " , len (p ))
625+
626+ // If we have buffered data, use it first
627+ if r .buffer .Len () > 0 {
628+ n , _ = r .buffer .Read (p )
629+ r .bytesRead += int64 (n )
630+
631+ if r .onProgress .Truthy () {
632+ r .onProgress .Invoke (r .bytesRead )
633+ }
634+ return n , nil
635+ }
636+
637+ // Only read from stream if buffer is empty
638+ promise := r .reader .Call ("read" )
639+ result := await (promise )
640+
641+ if result .Get ("done" ).Bool () {
642+ if r .bytesRead < r .totalSize {
643+ return 0 , fmt .Errorf ("stream ended prematurely at %d/%d bytes" , r .bytesRead , r .totalSize )
644+ }
645+ return 0 , io .EOF
646+ }
647+
648+ // Get the chunk from JavaScript and write it to our buffer
649+ value := result .Get ("value" )
650+ chunk := make ([]byte , value .Length ())
651+ js .CopyBytesToGo (chunk , value )
652+ r .buffer .Write (chunk )
653+
654+ // Now read what we can into p
655+ n , _ = r .buffer .Read (p )
656+ r .bytesRead += int64 (n )
657+
658+ if r .onProgress .Truthy () {
659+ r .onProgress .Invoke (r .bytesRead )
660+ }
661+
662+ return n , nil
663+ }
664+
665+ // Helper function to await a JavaScript promise
666+ func await (promise js.Value ) js.Value {
667+ done := make (chan js.Value )
668+ promise .Call ("then" , js .FuncOf (func (_ js.Value , args []js.Value ) interface {} {
669+ done <- args [0 ]
670+ return nil
671+ }))
672+ return <- done
673+ }
674+
675+ func (r * jsStreamReader ) Close () error {
676+ r .reader .Call ("releaseLock" )
677+ return nil
678+ }
0 commit comments