Skip to content

Commit 2af0ab1

Browse files
committed
Add test for vrpc
1 parent a1ed9fb commit 2af0ab1

File tree

3 files changed

+122
-19
lines changed

3 files changed

+122
-19
lines changed

example/RpcClientServer/RpcClientServer.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -54,13 +54,15 @@ func rpcserver() {
5454
log.Fatal("listen error:", e)
5555
}
5656

57-
vrpc.ServeRPCConn(server, l, logger)
57+
options := govec.GetDefaultLogOptions()
58+
vrpc.ServeRPCConn(server, l, logger, options)
5859
}
5960

6061
func rpcclient() {
6162
fmt.Println("Starting client")
6263
logger := govec.InitGoVector("client", "clientlogfile", govec.GetDefaultConfig())
63-
client, err := vrpc.RPCDial("tcp", "127.0.0.1:8080", logger)
64+
options := govec.GetDefaultLogOptions()
65+
client, err := vrpc.RPCDial("tcp", "127.0.0.1:8080", logger, options)
6466
if err != nil {
6567
log.Fatal(err)
6668
}

govec/vrpc/vrpc.go

Lines changed: 17 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -16,26 +16,26 @@ import (
1616
//RPCDial connects to a RPC server at the specified network address. The
1717
//logger is provided to be used by the RPCClientCodec for message
1818
//capture.
19-
func RPCDial(network, address string, logger *govec.GoLog) (*rpc.Client, error) {
19+
func RPCDial(network, address string, logger *govec.GoLog, options govec.GoLogOptions) (*rpc.Client, error) {
2020
conn, err := net.Dial(network, address)
2121
if err != nil {
2222
return nil, err
2323
}
24-
return rpc.NewClientWithCodec(newClientCodec(conn, logger)), err
24+
return rpc.NewClientWithCodec(newClientCodec(conn, logger, options)), err
2525
}
2626

2727
//ServerRPCCon is a convenience function that accepts connections for a
2828
//given listener and starts a new goroutine for the server to serve a
2929
//new connection. The logger is provided to be used by the
3030
//RPCServerCodec for message capture.
31-
func ServeRPCConn(server *rpc.Server, l net.Listener, logger *govec.GoLog) {
31+
func ServeRPCConn(server *rpc.Server, l net.Listener, logger *govec.GoLog, options govec.GoLogOptions) {
3232
for {
3333
conn, err := l.Accept()
3434
if err != nil {
3535
log.Fatal(err)
3636
}
3737

38-
go server.ServeCodec(newServerCodec(conn, logger))
38+
go server.ServeCodec(newServerCodec(conn, logger, options))
3939
}
4040
}
4141

@@ -48,6 +48,7 @@ type RPCClientCodec struct {
4848
Enc *gob.Encoder
4949
EncBuf *bufio.Writer
5050
Logger *govec.GoLog
51+
Options govec.GoLogOptions
5152
}
5253

5354
//RPCServerCodec is an extension of the default rpc codec which uses a
@@ -59,17 +60,18 @@ type RPCServerCodec struct {
5960
Enc *gob.Encoder
6061
EncBuf *bufio.Writer
6162
Logger *govec.GoLog
63+
Options govec.GoLogOptions
6264
Closed bool
6365
}
6466

6567
//NewClient returs an rpc.Client insturmented with vector clocks.
66-
func NewClient(conn io.ReadWriteCloser, logger *govec.GoLog) *rpc.Client {
67-
return rpc.NewClientWithCodec(newClientCodec(conn, logger))
68+
func NewClient(conn io.ReadWriteCloser, logger *govec.GoLog, options govec.GoLogOptions) *rpc.Client {
69+
return rpc.NewClientWithCodec(newClientCodec(conn, logger, options))
6870
}
6971

70-
func newClientCodec(conn io.ReadWriteCloser, logger *govec.GoLog) rpc.ClientCodec {
72+
func newClientCodec(conn io.ReadWriteCloser, logger *govec.GoLog, options govec.GoLogOptions) rpc.ClientCodec {
7173
encBuf := bufio.NewWriter(conn)
72-
return &RPCClientCodec{conn, gob.NewDecoder(conn), gob.NewEncoder(encBuf), encBuf, logger}
74+
return &RPCClientCodec{conn, gob.NewDecoder(conn), gob.NewEncoder(encBuf), encBuf, logger, options}
7375
}
7476

7577
//WriteRequest marshalls and sends an rpc request, and it's associated
@@ -78,8 +80,8 @@ func (c *RPCClientCodec) WriteRequest(req *rpc.Request, param interface{}) (err
7880
if err = c.Enc.Encode(req); err != nil {
7981
return
8082
}
81-
opts := govec.GetDefaultLogOptions()
82-
buf := c.Logger.PrepareSend("Making RPC call", param, opts)
83+
84+
buf := c.Logger.PrepareSend("Making RPC call", param, c.Options)
8385
if err = c.Enc.Encode(buf); err != nil {
8486
return
8587
}
@@ -99,8 +101,7 @@ func (c *RPCClientCodec) ReadResponseBody(body interface{}) (err error) {
99101
if err = c.Dec.Decode(&buf); err != nil {
100102
return
101103
}
102-
opts := govec.GetDefaultLogOptions()
103-
c.Logger.UnpackReceive("Received RPC Call response from server", buf, body, opts)
104+
c.Logger.UnpackReceive("Received RPC Call response from server", buf, body, c.Options)
104105
return nil
105106
}
106107

@@ -109,14 +110,15 @@ func (c *RPCClientCodec) Close() error {
109110
return c.C.Close()
110111
}
111112

112-
func newServerCodec(conn io.ReadWriteCloser, logger *govec.GoLog) rpc.ServerCodec {
113+
func newServerCodec(conn io.ReadWriteCloser, logger *govec.GoLog, options govec.GoLogOptions) rpc.ServerCodec {
113114
buf := bufio.NewWriter(conn)
114115
srv := &RPCServerCodec{
115116
Rwc: conn,
116117
Dec: gob.NewDecoder(conn),
117118
Enc: gob.NewEncoder(buf),
118119
EncBuf: buf,
119120
Logger: logger,
121+
Options: options,
120122
}
121123
return srv
122124
}
@@ -133,17 +135,15 @@ func (c *RPCServerCodec) ReadRequestBody(body interface{}) (err error) {
133135
if err = c.Dec.Decode(&buf); err != nil {
134136
return
135137
}
136-
opts := govec.GetDefaultLogOptions()
137-
c.Logger.UnpackReceive("Received RPC request", buf, body, opts)
138+
c.Logger.UnpackReceive("Received RPC request", buf, body, c.Options)
138139
return nil
139140
}
140141

141142
//WriteResponse sends an rpc response, and it's associated result back
142143
//to the client
143144
func (c *RPCServerCodec) WriteResponse(r *rpc.Response, body interface{}) (err error) {
144145
Encode(c, r)
145-
opts := govec.GetDefaultLogOptions()
146-
buf := c.Logger.PrepareSend("Sending response to RPC request", body, opts)
146+
buf := c.Logger.PrepareSend("Sending response to RPC request", body, c.Options)
147147
Encode(c, buf)
148148
return c.EncBuf.Flush()
149149
}

govec/vrpc/vrpc_test.go

Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,101 @@
1+
package vrpc
2+
3+
import (
4+
"errors"
5+
"fmt"
6+
"log"
7+
"net"
8+
"net/rpc"
9+
"time"
10+
"testing"
11+
12+
"github.com/DistributedClocks/GoVector/govec"
13+
)
14+
15+
var done chan int = make(chan int, 1)
16+
17+
//Args are matematical arguments for the rpc operations
18+
type Args struct {
19+
A, B int
20+
}
21+
22+
//Quotient is the result of a Divide RPC
23+
type Quotient struct {
24+
Quo, Rem int
25+
}
26+
27+
//Arith is an RPC math server type
28+
type Arith int
29+
30+
//Multiply performs multiplication on two integers
31+
func (t *Arith) Multiply(args *Args, reply *int) error {
32+
*reply = args.A * args.B
33+
return nil
34+
}
35+
36+
//Divide divides a by b and returns a quotient with a remainder
37+
func (t *Arith) Divide(args *Args, quo *Quotient) error {
38+
if args.B == 0 {
39+
return errors.New("divide by zero")
40+
}
41+
quo.Quo = args.A / args.B
42+
quo.Rem = args.A % args.B
43+
return nil
44+
}
45+
46+
func rpcserver(logger *govec.GoLog) {
47+
fmt.Println("Starting server")
48+
arith := new(Arith)
49+
server := rpc.NewServer()
50+
server.Register(arith)
51+
l, e := net.Listen("tcp", ":8080")
52+
if e != nil {
53+
log.Fatal("listen error:", e)
54+
}
55+
56+
options := govec.GetDefaultLogOptions()
57+
ServeRPCConn(server, l, logger, options)
58+
}
59+
60+
func rpcclient(logger *govec.GoLog) {
61+
fmt.Println("Starting client")
62+
options := govec.GetDefaultLogOptions()
63+
client, err := RPCDial("tcp", "127.0.0.1:8080", logger, options)
64+
if err != nil {
65+
log.Fatal(err)
66+
}
67+
var result int
68+
err = client.Call("Arith.Multiply", Args{5, 6}, &result)
69+
if err != nil {
70+
log.Fatal(err)
71+
}
72+
73+
var qresult Quotient
74+
err = client.Call("Arith.Divide", Args{4, 2}, &qresult)
75+
if err != nil {
76+
log.Fatal(err)
77+
}
78+
done <- 1
79+
}
80+
81+
func TestRPC(t *testing.T) {
82+
serverlogger := govec.InitGoVector("server", "serverlogfile", govec.GetDefaultConfig())
83+
clientlogger := govec.InitGoVector("client", "clientlogfile", govec.GetDefaultConfig())
84+
go rpcserver(serverlogger)
85+
time.Sleep(time.Millisecond)
86+
go rpcclient(clientlogger)
87+
<-done
88+
server_vc := serverlogger.GetCurrentVC()
89+
server_ticks, _ := server_vc.FindTicks("server")
90+
client_vc := clientlogger.GetCurrentVC()
91+
client_ticks, _ := client_vc.FindTicks("client")
92+
93+
AssertEquals(t, uint64(5), server_ticks, "Server Clock value not incremented")
94+
AssertEquals(t, uint64(5), client_ticks, "Client Clock value not incremented")
95+
}
96+
97+
func AssertEquals(t *testing.T, expected interface{}, actual interface{}, message string) {
98+
if expected != actual {
99+
t.Fatalf(message+"Expected: %s, Actual: %s", expected, actual)
100+
}
101+
}

0 commit comments

Comments
 (0)