Skip to content
This repository was archived by the owner on Mar 28, 2023. It is now read-only.

Commit 9a05dd8

Browse files
authored
Merge pull request #1667 from OpenBazaar/mg-resend-order-msg-spike-2
Re-send missed ORDER messages on processing failure
2 parents 8a436d4 + e4f5ee4 commit 9a05dd8

18 files changed

+1068
-743
lines changed

Dockerfile.dev

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,11 @@ RUN go get -u github.com/gogo/protobuf/proto \
3030
github.com/derekparker/delve/cmd/dlv \
3131
github.com/tools/godep
3232

33+
RUN cd /go/src/github.com/golang/protobuf && \
34+
git checkout tags/v1.2.0 -b v1.2.0 && \
35+
cd protoc-gen-go && \
36+
go install
37+
3338
RUN curl -sfL https://install.goreleaser.com/github.com/golangci/golangci-lint.sh | bash -s -- -b $GOPATH/bin v1.15.0
3439

3540
WORKDIR /go/src/github.com/OpenBazaar/openbazaar-go

api/jsonapi.go

Lines changed: 9 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -4251,47 +4251,26 @@ func (i *jsonAPIHandler) POSTResendOrderMessage(w http.ResponseWriter, r *http.R
42514251
return
42524252
}
42534253

4254-
if args.OrderID == "" {
4255-
ErrorResponse(w, http.StatusBadRequest, core.ErrOrderNotFound.Error())
4254+
if args.MessageType == "" {
4255+
ErrorResponse(w, http.StatusBadRequest, fmt.Sprintf("missing messageType argument"))
42564256
return
42574257
}
4258-
4259-
var msgType int32
4260-
var ok bool
4261-
4262-
if msgType, ok = pb.Message_MessageType_value[args.MessageType]; !ok {
4263-
ErrorResponse(w, http.StatusBadRequest, "invalid order message type")
4258+
if args.OrderID == "" {
4259+
ErrorResponse(w, http.StatusBadRequest, fmt.Sprintf("missing orderID argument"))
42644260
return
42654261
}
42664262

4267-
msg, peerID, err := i.node.Datastore.Messages().
4268-
GetByOrderIDType(args.OrderID, pb.Message_MessageType(msgType))
4269-
if err != nil || msg == nil || msg.Msg.GetPayload() == nil {
4270-
ErrorResponse(w, http.StatusBadRequest, "order message not found")
4263+
msgInt, ok := pb.Message_MessageType_value[strings.ToUpper(args.MessageType)]
4264+
if !ok {
4265+
ErrorResponse(w, http.StatusBadRequest, fmt.Sprintf("unknown messageType (%s)", args.MessageType))
42714266
return
42724267
}
42734268

4274-
p, err := peer.IDB58Decode(peerID)
4275-
if err != nil {
4276-
ErrorResponse(w, http.StatusBadRequest, "invalid peer id")
4269+
if err := i.node.ResendCachedOrderMessage(args.OrderID, pb.Message_MessageType(msgInt)); err != nil {
4270+
ErrorResponse(w, http.StatusInternalServerError, err.Error())
42774271
return
42784272
}
42794273

4280-
ctx, cancel := context.WithCancel(context.Background())
4281-
defer cancel()
4282-
4283-
err = i.node.Service.SendMessage(ctx, p, &msg.Msg)
4284-
if err != nil {
4285-
// If send message failed, try sending offline message
4286-
log.Warningf("resending message failed: %v", err)
4287-
err = i.node.SendOfflineMessage(p, nil, &msg.Msg)
4288-
if err != nil {
4289-
log.Errorf("resending offline message failed: %v", err)
4290-
ErrorResponse(w, http.StatusBadRequest, "order message not sent")
4291-
return
4292-
}
4293-
}
4294-
42954274
SanitizedResponse(w, `{}`)
42964275
}
42974276

api/jsonapi_test.go

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -847,6 +847,19 @@ func TestNotificationsAreReturnedInExpectedOrder(t *testing.T) {
847847
}, dbSetup, dbTeardown)
848848
}
849849

850+
func TestResendOrderMessage(t *testing.T) {
851+
runAPITests(t, apiTests{
852+
// supports missing messageType
853+
{"POST", "/ob/resendordermessage", `{"orderID":"123"}`, http.StatusBadRequest, errorResponseJSON(fmt.Errorf("missing messageType argument"))},
854+
// supports missing order ID
855+
{"POST", "/ob/resendordermessage", `{"messageType":"nonexistant"}`, http.StatusBadRequest, errorResponseJSON(fmt.Errorf("missing orderID argument"))},
856+
// supports nonexistant message types
857+
{"POST", "/ob/resendordermessage", `{"orderID":"123","messageType":"nonexistant"}`, http.StatusBadRequest, errorResponseJSON(fmt.Errorf("unknown messageType (nonexistant)"))},
858+
// supports downcased message types, expected not to find order ID
859+
{"POST", "/ob/resendordermessage", `{"orderID":"123","messageType":"order"}`, http.StatusInternalServerError, errorResponseJSON(fmt.Errorf("unable to find message for order ID (123) and message type (ORDER)"))},
860+
})
861+
}
862+
850863
// TODO: Make NewDisputeCaseRecord return a valid fixture for this valid case to work
851864
//func TestCloseDisputeReturnsOK(t *testing.T) {
852865
//dbSetup := func(testRepo *test.Repository) error {

core/disputes.go

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -305,9 +305,12 @@ func (n *OpenBazaarNode) ProcessDisputeOpen(rc *pb.RicardianContract, peerID str
305305
DisputerHandle = contract.BuyerOrder.BuyerID.Handle
306306
DisputeeID = contract.VendorListings[0].VendorID.PeerID
307307
DisputeeHandle = contract.VendorListings[0].VendorID.Handle
308-
// Load out version of the contract from the db
308+
// Load our version of the contract from the db
309309
myContract, state, _, records, _, _, err := n.Datastore.Sales().GetByOrderId(orderID)
310310
if err != nil {
311+
if err := n.SendProcessingError(DisputerID, orderID, pb.Message_DISPUTE_OPEN, nil); err != nil {
312+
log.Errorf("failed sending ORDER_PROCESSING_FAILURE to peer (%s): %s", DisputerID, err)
313+
}
311314
return net.OutOfOrderMessage
312315
}
313316
// Check this order is currently in a state which can be disputed
@@ -365,6 +368,9 @@ func (n *OpenBazaarNode) ProcessDisputeOpen(rc *pb.RicardianContract, peerID str
365368
return err
366369
}
367370
if state == pb.OrderState_AWAITING_PAYMENT || state == pb.OrderState_AWAITING_FULFILLMENT || state == pb.OrderState_PARTIALLY_FULFILLED || state == pb.OrderState_PENDING {
371+
if err := n.SendProcessingError(DisputerID, orderID, pb.Message_DISPUTE_OPEN, myContract); err != nil {
372+
log.Errorf("failed sending ORDER_PROCESSING_FAILURE to peer (%s): %s", DisputerID, err)
373+
}
368374
return net.OutOfOrderMessage
369375
}
370376
// Check this order is currently in a state which can be disputed

core/errors.go

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,16 @@
11
package core
22

33
import (
4+
"context"
45
"encoding/json"
56
"errors"
67
"fmt"
78
"strconv"
9+
10+
peer "gx/ipfs/QmYVXrKrKHDC9FobgmcmshCDyWwdrfwfanNQN4oxJ9Fk3h/go-libp2p-peer"
11+
12+
"github.com/OpenBazaar/openbazaar-go/pb"
13+
"github.com/golang/protobuf/ptypes"
814
)
915

1016
var (
@@ -121,3 +127,35 @@ func (e ErrMarketPriceListingIllegalField) Error() string {
121127
func illegalFieldString(objectType string, field string) string {
122128
return fmt.Sprintf("Illegal %s field: %s", objectType, field)
123129
}
130+
131+
// SendProcessingError will encapsulate the failing state in a message to be sent back to pid
132+
// When pid receives the OrderProcessingError, it will analyze the contract and send the messages
133+
// that this node is missing to resynchronize the order
134+
func (n *OpenBazaarNode) SendProcessingError(pid, oid string, attemptedMessage pb.Message_MessageType, latestContract *pb.RicardianContract) error {
135+
log.Debugf("sending ORDER_PROCESSING_ERROR to peer (%s)", pid)
136+
var (
137+
procErrMsg = &pb.OrderProcessingFailure{
138+
OrderID: oid,
139+
AttemptedMessageType: attemptedMessage,
140+
Contract: latestContract,
141+
}
142+
procErrBytes, mErr = ptypes.MarshalAny(procErrMsg)
143+
errMsg = &pb.Message{
144+
MessageType: pb.Message_ORDER_PROCESSING_FAILURE,
145+
Payload: procErrBytes,
146+
}
147+
p, pErr = peer.IDB58Decode(pid)
148+
)
149+
if mErr != nil {
150+
log.Errorf("failed marshaling OrderProcessingFailure message for order (%s): %s", oid, mErr)
151+
return mErr
152+
}
153+
if pErr != nil {
154+
log.Errorf("failed decoding peer ID (%s): %s", pid, pErr)
155+
return pErr
156+
}
157+
ctx, cancel := context.WithTimeout(context.Background(), n.OfflineMessageFailoverTimeout)
158+
defer cancel()
159+
160+
return n.Service.SendMessage(ctx, p, errMsg)
161+
}

core/net.go

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -263,6 +263,36 @@ func (n *OpenBazaarNode) Unfollow(peerID string) error {
263263
return nil
264264
}
265265

266+
// ResendCachedOrderMessage will retrieve the ORDER message from the datastore and resend it to the peerID
267+
// for which it was originally intended
268+
func (n *OpenBazaarNode) ResendCachedOrderMessage(orderID string, msgType pb.Message_MessageType) error {
269+
if _, ok := pb.Message_MessageType_name[int32(msgType)]; !ok {
270+
return fmt.Errorf("invalid order message type (%d)", int(msgType))
271+
}
272+
273+
msg, peerID, err := n.Datastore.Messages().GetByOrderIDType(orderID, msgType)
274+
if err != nil || msg == nil || msg.Msg.GetPayload() == nil {
275+
return fmt.Errorf("unable to find message for order ID (%s) and message type (%s)", orderID, msgType.String())
276+
}
277+
278+
p, err := peer.IDB58Decode(peerID)
279+
if err != nil {
280+
return fmt.Errorf("unable to decode invalid peer ID for order (%s) and message type (%s)", orderID, msgType.String())
281+
}
282+
283+
ctx, cancel := context.WithTimeout(context.Background(), n.OfflineMessageFailoverTimeout)
284+
defer cancel()
285+
286+
if err = n.Service.SendMessage(ctx, p, &msg.Msg); err != nil {
287+
go func() {
288+
if err := n.SendOfflineMessage(p, nil, &msg.Msg); err != nil {
289+
log.Errorf("error resending offline message for order id (%s) and message type (%+v): %s", orderID, msgType, err.Error())
290+
}
291+
}()
292+
}
293+
return nil
294+
}
295+
266296
// SendOrder - send order created msg to peer
267297
func (n *OpenBazaarNode) SendOrder(peerID string, contract *pb.RicardianContract) (resp *pb.Message, err error) {
268298
p, err := peer.IDB58Decode(peerID)

net/retriever/retriever_test.go

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,12 @@ func TestEnsureNoOmissionsInMessageProcessingOrder(t *testing.T) {
1717

1818
// Add deliberate omissions to this list
1919
blackList := map[pb.Message_MessageType]struct{}{
20-
pb.Message_PING: {},
21-
pb.Message_OFFLINE_RELAY: {},
22-
pb.Message_STORE: {},
23-
pb.Message_BLOCK: {},
24-
pb.Message_ERROR: {},
20+
pb.Message_PING: {},
21+
pb.Message_OFFLINE_RELAY: {},
22+
pb.Message_STORE: {},
23+
pb.Message_BLOCK: {},
24+
pb.Message_ORDER_PROCESSING_FAILURE: {},
25+
pb.Message_ERROR: {},
2526
}
2627

2728
// Inclusion check

0 commit comments

Comments
 (0)