1+ package main
2+
3+ import (
4+ "bufio"
5+ "log"
6+ "net/http"
7+ "os"
8+
9+ "github.com/gorilla/websocket"
10+ "github.com/reactivex/rxgo/v2"
11+ )
12+
13+ type client chan <- string // an outgoing message channel
14+
15+ var (
16+ entering = make (chan client )
17+ leaving = make (chan client )
18+ messages = make (chan rxgo.Item ) // all incoming client messages
19+ ObservableMsg = rxgo .FromChannel (messages )
20+ )
21+
22+ func broadcaster () {
23+ clients := make (map [client ]bool ) // all connected clients
24+ MessageBroadcast := ObservableMsg .Observe ()
25+ for {
26+ select {
27+ case msg := <- MessageBroadcast :
28+ // Broadcast incoming message to all
29+ // clients' outgoing message channels.
30+ for cli := range clients {
31+ cli <- msg .V .(string )
32+ }
33+
34+ case cli := <- entering :
35+ clients [cli ] = true
36+
37+ case cli := <- leaving :
38+ delete (clients , cli )
39+ close (cli )
40+ }
41+ }
42+ }
43+
44+ func clientWriter (conn * websocket.Conn , ch <- chan string ) {
45+ for msg := range ch {
46+ conn .WriteMessage (1 , []byte (msg ))
47+ }
48+ }
49+
50+ func wshandle (w http.ResponseWriter , r * http.Request ) {
51+ upgrader := & websocket.Upgrader {CheckOrigin : func (r * http.Request ) bool { return true }}
52+ conn , err := upgrader .Upgrade (w , r , nil )
53+ if err != nil {
54+ log .Println ("upgrade:" , err )
55+ return
56+ }
57+
58+ ch := make (chan string ) // outgoing client messages
59+ go clientWriter (conn , ch )
60+
61+ who := conn .RemoteAddr ().String ()
62+ ch <- "你是 " + who + "\n "
63+ messages <- rxgo .Of (who + " 來到了現場" + "\n " )
64+ entering <- ch
65+
66+ defer func () {
67+ log .Println ("disconnect !!" )
68+ leaving <- ch
69+ messages <- rxgo .Of (who + " 離開了" + "\n " )
70+ conn .Close ()
71+ }()
72+
73+ for {
74+ _ , msg , err := conn .ReadMessage ()
75+ if err != nil {
76+ log .Println ("read:" , err )
77+ break
78+ }
79+ messages <- rxgo .Of (who + " 表示: " + string (msg ))
80+ }
81+ }
82+
83+ func InitObservable () {
84+ // TODO: Please create an Observable to handle the messages
85+ /*
86+ ObservableMsg = ObservableMsg.Filter(...) ... {
87+ }).Map(...) {
88+ ...
89+ })
90+ */
91+ }
92+
93+ func main () {
94+ InitObservable ()
95+ go broadcaster ()
96+ http .HandleFunc ("/wschatroom" , wshandle )
97+
98+ http .Handle ("/" , http .FileServer (http .Dir ("./static" )))
99+
100+ log .Println ("server start at :8090" )
101+ log .Fatal (http .ListenAndServe (":8090" , nil ))
102+ }
0 commit comments