Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions codec.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package epaxos

import (
"github.com/go-distributed/epaxos/message"
)

// A codec interface that defines what a codec should implement.
// A codec should be able to marshal/unmarshal through the given
// connection.
type Codec interface {
// Init a codec.
Initial() error

// Marshal a message into bytes.
Marshal(msg message.Message) ([]byte, error)

// Unmarshal a message from bytes.
Unmarshal(data []byte) (message.Message, error)

// Stop a codec.
Stop() error

// Destroy a codec, release the resource.
Destroy() error
}
114 changes: 114 additions & 0 deletions codec/gogoprotobufcodec.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
package codec

import (
"bytes"
"fmt"
"io/ioutil"

"github.com/go-distributed/epaxos/message"
"github.com/golang/glog"
)

// The gogoprotobuf codec.
type GoGoProtobufCodec struct{}

// Create a new gogpprotobuf codec.
func NewGoGoProtobufCodec() (*GoGoProtobufCodec, error) {
return &GoGoProtobufCodec{}, nil
}

// Initial the gogoprotobuf codec (no-op for now).
func (gc *GoGoProtobufCodec) Initial() error {
return nil
}

// Stop the gogoprotobuf codec (no-op for now).
func (gc *GoGoProtobufCodec) Stop() error {
return nil
}

// Destroy the gogoprotobuf codec (no-op for now).
func (gc *GoGoProtobufCodec) Destroy() error {
return nil
}

// Marshal a message into a byte slice.
func (gc *GoGoProtobufCodec) Marshal(msg message.Message) ([]byte, error) {
var err error
defer func() {
if err != nil {
glog.Warning("GoGoProtobufCodec: Failed to Marshal: ", err)
}
}()

b, err := msg.MarshalProtobuf()
if err != nil {
return nil, err
}

// Use bytes.Buffer to write efficiently.
var buf bytes.Buffer
if err = buf.WriteByte(byte(msg.Type())); err != nil {
return nil, err
}

n, err := buf.Write(b)
if err != nil || n != len(b) {
return nil, err
}
return buf.Bytes(), nil
}

// Unmarshal a message from a byte slice.
func (c *GoGoProtobufCodec) Unmarshal(data []byte) (message.Message, error) {
var msg message.Message
var err error

defer func() {
if err != nil {
glog.Warning("GoGoProtobufCodec: Failed to Unmarshal: ", err)
}
}()

buf := bytes.NewReader(data)
bt, err := buf.ReadByte()
if err != nil {
return nil, err
}

mtype := message.MsgType(bt)
switch mtype {
case message.ProposeMsg:
msg = new(message.Propose)
case message.PreAcceptMsg:
msg = new(message.PreAccept)
case message.PreAcceptOkMsg:
msg = new(message.PreAcceptOk)
case message.PreAcceptReplyMsg:
msg = new(message.PreAcceptReply)
case message.AcceptMsg:
msg = new(message.Accept)
case message.AcceptReplyMsg:
msg = new(message.AcceptReply)
case message.CommitMsg:
msg = new(message.Commit)
case message.PrepareMsg:
msg = new(message.Prepare)
case message.PrepareReplyMsg:
msg = new(message.PrepareReply)
default:
err := fmt.Errorf("Unknown message type %s\n", message.TypeToString(mtype))
return nil, err
}

// TODO(yifan): Move this from the message package
// to the protobuf package.
b, err := ioutil.ReadAll(buf)
if err != nil {
return nil, err
}
if err := msg.UnmarshalProtobuf(b); err != nil {
return nil, err
}
return msg, nil
}
106 changes: 92 additions & 14 deletions message/accept.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@ package message

import (
"fmt"

"github.com/go-distributed/epaxos/protobuf"
"github.com/golang/glog"
)

type Accept struct {
Expand All @@ -11,59 +14,134 @@ type Accept struct {
Deps Dependencies
Ballot *Ballot
From uint8
pb protobuf.Accept // for protobuf
}

type AcceptReply struct {
ReplicaId uint8
InstanceId uint64
Ballot *Ballot
From uint8
pb protobuf.AcceptReply // for protobuf
}

func (a *Accept) Sender() uint8 {
return a.From
}

func (a *Accept) Type() uint8 {
func (a *Accept) Type() MsgType {
return AcceptMsg
}

func (a *Accept) Content() interface{} {
return a
}

func (p *Accept) Replica() uint8 {
return p.ReplicaId
func (a *Accept) Replica() uint8 {
return a.ReplicaId
}

func (a *Accept) Instance() uint64 {
return a.InstanceId
}

func (a *Accept) String() string {
return fmt.Sprintf("Accept, Instance[%v][%v], Ballot[%v]",
a.ReplicaId, a.InstanceId, a.Ballot.String())
}

func (p *Accept) Instance() uint64 {
return p.InstanceId
func (a *Accept) MarshalProtobuf() ([]byte, error) {
replicaID := uint32(a.ReplicaId)
instanceID := uint64(a.InstanceId)
from := uint32(a.From)

a.pb.ReplicaID = &replicaID
a.pb.InstanceID = &instanceID
a.pb.Cmds = a.Cmds.ToBytesSlice()
a.pb.Deps = a.Deps
a.pb.Ballot = a.Ballot.ToProtobuf()
a.pb.From = &from

data, err := a.pb.Marshal()
if err != nil {
glog.Warning("Accept: MarshalProtobuf() error: ", err)
return nil, err
}
return data, nil
}

func (p *Accept) String() string {
return fmt.Sprintf("Accept, Instance[%v][%v], Ballot[%v]", p.ReplicaId, p.InstanceId, p.Ballot.String())
func (a *Accept) UnmarshalProtobuf(data []byte) error {
if err := a.pb.Unmarshal(data); err != nil {
glog.Warning("Accept: UnmarshalProtobuf() error: ", err)
return err
}

a.ReplicaId = uint8(a.pb.GetReplicaID())
a.InstanceId = uint64(a.pb.GetInstanceID())
a.Cmds.FromBytesSlice(a.pb.GetCmds())
a.Deps = a.pb.GetDeps()
if a.Ballot == nil {
a.Ballot = new(Ballot)
}
a.Ballot.FromProtobuf(a.pb.GetBallot())
a.From = uint8(a.pb.GetFrom())
return nil
}

func (a *AcceptReply) Sender() uint8 {
return a.From
}

func (a *AcceptReply) Type() uint8 {
func (a *AcceptReply) Type() MsgType {
return AcceptReplyMsg
}

func (a *AcceptReply) Content() interface{} {
return a
}

func (p *AcceptReply) Replica() uint8 {
return p.ReplicaId
func (a *AcceptReply) Replica() uint8 {
return a.ReplicaId
}

func (p *AcceptReply) Instance() uint64 {
return p.InstanceId
func (a *AcceptReply) Instance() uint64 {
return a.InstanceId
}

func (p *AcceptReply) String() string {
return fmt.Sprintf("AcceptReply, Instance[%v][%v], Ballot[%v]", p.ReplicaId, p.InstanceId, p.Ballot.String())
func (a *AcceptReply) String() string {
return fmt.Sprintf("AcceptReply, Instance[%v][%v], Ballot[%v]", a.ReplicaId, a.InstanceId, a.Ballot.String())
}

func (a *AcceptReply) MarshalProtobuf() ([]byte, error) {
replicaID := uint32(a.ReplicaId)
instanceID := uint64(a.InstanceId)
from := uint32(a.From)

a.pb.ReplicaID = &replicaID
a.pb.InstanceID = &instanceID
a.pb.Ballot = a.Ballot.ToProtobuf()
a.pb.From = &from

data, err := a.pb.Marshal()
if err != nil {
glog.Warning("AcceptReply: MarshalProtobuf() error: ", err)
return nil, err
}
return data, nil
}

func (a *AcceptReply) UnmarshalProtobuf(data []byte) error {
if err := a.pb.Unmarshal(data); err != nil {
glog.Warning("AcceptReply: UnmarshalProtobuf() error: ", err)
return err
}

a.ReplicaId = uint8(a.pb.GetReplicaID())
a.InstanceId = uint64(a.pb.GetInstanceID())
if a.Ballot == nil {
a.Ballot = new(Ballot)
}
a.Ballot.FromProtobuf(a.pb.GetBallot())
a.From = uint8(a.pb.GetFrom())
return nil
}
35 changes: 26 additions & 9 deletions message/ballot.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package message

import (
"fmt"

"github.com/go-distributed/epaxos/protobuf"
)

const (
Expand All @@ -25,9 +27,9 @@ type Ballot struct {

func NewBallot(epoch uint32, number uint64, replicId uint8) *Ballot {
return &Ballot{
epoch,
number,
replicId,
Epoch: epoch,
Number: number,
ReplicaId: replicId,
}
}

Expand Down Expand Up @@ -96,9 +98,9 @@ func (b *Ballot) SetReplicaId(rId uint8) {

func (b *Ballot) IncNumClone() *Ballot {
return &Ballot{
b.Epoch,
b.Number + 1,
b.ReplicaId,
Epoch: b.Epoch,
Number: b.Number + 1,
ReplicaId: b.ReplicaId,
}
}

Expand All @@ -111,12 +113,27 @@ func (b *Ballot) Clone() *Ballot {
panic("")
}
return &Ballot{
b.Epoch,
b.Number,
b.ReplicaId,
Epoch: b.Epoch,
Number: b.Number,
ReplicaId: b.ReplicaId,
}
}

func (b *Ballot) String() string {
return fmt.Sprintf("%v.%v.%v", b.Epoch, b.Number, b.ReplicaId)
}

func (b *Ballot) ToProtobuf() *protobuf.Ballot {
epoch, number, replicaID := b.Epoch, b.Number, uint32(b.ReplicaId)
return &protobuf.Ballot{
Epoch: &epoch,
Number: &number,
ReplicaID: &replicaID,
}
}

func (b *Ballot) FromProtobuf(pballot *protobuf.Ballot) {
b.Epoch = uint32(*pballot.Epoch)
b.Number = uint64(*pballot.Number)
b.ReplicaId = uint8(*pballot.ReplicaID)
}
17 changes: 17 additions & 0 deletions message/ballot_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,3 +130,20 @@ func TestBallotSetNumber(t *testing.T) {
b.SetNumber(4)
assert.Equal(t, b.GetNumber(), uint64(4))
}

func TestToProtobuf(t *testing.T) {
b := NewBallot(1, 2, 3)
pb := b.ToProtobuf()
assert.Equal(t, *pb.Epoch, uint32(1))
assert.Equal(t, *pb.Number, uint64(2))
assert.Equal(t, *pb.ReplicaID, uint8(3))
}

func TestFromProtobuf(t *testing.T) {
b := NewBallot(1, 2, 3)
pb := b.ToProtobuf()

var bb Ballot
bb.FromProtobuf(pb)
assert.Equal(t, *b, bb)
}
Loading