diff --git a/pss/ARCHITECTURE.md b/pss/ARCHITECTURE.md index 279e895ab7..aa7dffd107 100644 --- a/pss/ARCHITECTURE.md +++ b/pss/ARCHITECTURE.md @@ -2,7 +2,7 @@ Pss provides devp2p functionality for swarm nodes without the need for a direct tcp connection between them. -Messages are encapsulated in a devp2p message structure `PssMsg`. These capsules are forwarded from node to node using ordinary tcp devp2p until they reach their destination: The node or nodes who can successfully decrypt the message. +Messages are encapsulated in a devp2p message structure `message.Message`. These capsules are forwarded from node to node using ordinary tcp devp2p until they reach their destination: The node or nodes who can successfully decrypt the message. | Layer | Contents | |-----------|-----------------| diff --git a/pss/api.go b/pss/api.go index b5a0145d5e..9b89e2edc2 100644 --- a/pss/api.go +++ b/pss/api.go @@ -25,6 +25,7 @@ import ( "github.com/ethereum/go-ethereum/p2p" "github.com/ethereum/go-ethereum/rpc" "github.com/ethersphere/swarm/log" + "github.com/ethersphere/swarm/pss/message" ) // Wrapper for receiving pss messages when using the pss API @@ -50,7 +51,7 @@ func NewAPI(ps *Pss) *API { // // All incoming messages to the node matching this topic will be encapsulated in the APIMsg // struct and sent to the subscriber -func (pssapi *API) Receive(ctx context.Context, topic Topic, raw bool, prox bool) (*rpc.Subscription, error) { +func (pssapi *API) Receive(ctx context.Context, topic message.Topic, raw bool, prox bool) (*rpc.Subscription, error) { notifier, supported := rpc.NotifierFromContext(ctx) if !supported { return nil, fmt.Errorf("Subscribe not supported") @@ -90,7 +91,7 @@ func (pssapi *API) Receive(ctx context.Context, topic Topic, raw bool, prox bool return psssub, nil } -func (pssapi *API) GetAddress(topic Topic, asymmetric bool, key string) (PssAddress, error) { +func (pssapi *API) GetAddress(topic message.Topic, asymmetric bool, key string) (PssAddress, error) { var addr PssAddress if asymmetric { peer, ok := pssapi.Pss.pubKeyPool[key][topic] @@ -117,13 +118,13 @@ func (pssapi *API) BaseAddr() (PssAddress, error) { // Retrieves the node's public key in hex form func (pssapi *API) GetPublicKey() (keybytes hexutil.Bytes) { key := pssapi.Pss.PublicKey() - keybytes = pssapi.Pss.Crypto.FromECDSAPub(key) + keybytes = pssapi.Pss.Crypto.SerializePublicKey(key) return keybytes } // Set Public key to associate with a particular Pss peer -func (pssapi *API) SetPeerPublicKey(pubkey hexutil.Bytes, topic Topic, addr PssAddress) error { - pk, err := pssapi.Pss.Crypto.UnmarshalPubkey(pubkey) +func (pssapi *API) SetPeerPublicKey(pubkey hexutil.Bytes, topic message.Topic, addr PssAddress) error { + pk, err := pssapi.Pss.Crypto.UnmarshalPublicKey(pubkey) if err != nil { return fmt.Errorf("Cannot unmarshal pubkey: %x", pubkey) } @@ -139,50 +140,50 @@ func (pssapi *API) GetSymmetricKey(symkeyid string) (hexutil.Bytes, error) { return hexutil.Bytes(symkey), err } -func (pssapi *API) GetSymmetricAddressHint(topic Topic, symkeyid string) (PssAddress, error) { +func (pssapi *API) GetSymmetricAddressHint(topic message.Topic, symkeyid string) (PssAddress, error) { return pssapi.Pss.symKeyPool[symkeyid][topic].address, nil } -func (pssapi *API) GetAsymmetricAddressHint(topic Topic, pubkeyid string) (PssAddress, error) { +func (pssapi *API) GetAsymmetricAddressHint(topic message.Topic, pubkeyid string) (PssAddress, error) { return pssapi.Pss.pubKeyPool[pubkeyid][topic].address, nil } -func (pssapi *API) StringToTopic(topicstring string) (Topic, error) { - topicbytes := BytesToTopic([]byte(topicstring)) +func (pssapi *API) StringToTopic(topicstring string) (message.Topic, error) { + topicbytes := message.NewTopic([]byte(topicstring)) if topicbytes == rawTopic { return rawTopic, errors.New("Topic string hashes to 0x00000000 and cannot be used") } return topicbytes, nil } -func (pssapi *API) SendAsym(pubkeyhex string, topic Topic, msg hexutil.Bytes) error { +func (pssapi *API) SendAsym(pubkeyhex string, topic message.Topic, msg hexutil.Bytes) error { if err := validateMsg(msg); err != nil { return err } return pssapi.Pss.SendAsym(pubkeyhex, topic, msg[:]) } -func (pssapi *API) SendSym(symkeyhex string, topic Topic, msg hexutil.Bytes) error { +func (pssapi *API) SendSym(symkeyhex string, topic message.Topic, msg hexutil.Bytes) error { if err := validateMsg(msg); err != nil { return err } return pssapi.Pss.SendSym(symkeyhex, topic, msg[:]) } -func (pssapi *API) SendRaw(addr hexutil.Bytes, topic Topic, msg hexutil.Bytes) error { +func (pssapi *API) SendRaw(addr hexutil.Bytes, topic message.Topic, msg hexutil.Bytes) error { if err := validateMsg(msg); err != nil { return err } return pssapi.Pss.SendRaw(PssAddress(addr), topic, msg[:]) } -func (pssapi *API) GetPeerTopics(pubkeyhex string) ([]Topic, error) { +func (pssapi *API) GetPeerTopics(pubkeyhex string) ([]message.Topic, error) { topics, _, err := pssapi.Pss.GetPublickeyPeers(pubkeyhex) return topics, err } -func (pssapi *API) GetPeerAddress(pubkeyhex string, topic Topic) (PssAddress, error) { +func (pssapi *API) GetPeerAddress(pubkeyhex string, topic message.Topic) (PssAddress, error) { return pssapi.Pss.getPeerAddress(pubkeyhex, topic) } diff --git a/pss/client/client.go b/pss/client/client.go index e1eefcb0b9..0a348bdfb3 100644 --- a/pss/client/client.go +++ b/pss/client/client.go @@ -33,6 +33,7 @@ import ( "github.com/ethersphere/swarm/log" "github.com/ethersphere/swarm/p2p/protocols" "github.com/ethersphere/swarm/pss" + "github.com/ethersphere/swarm/pss/message" ) const ( @@ -46,8 +47,8 @@ type Client struct { BaseAddrHex string // peers - peerPool map[pss.Topic]map[string]*pssRPCRW - protos map[pss.Topic]*p2p.Protocol + peerPool map[message.Topic]map[string]*pssRPCRW + protos map[message.Topic]*p2p.Protocol // rpc connections rpc *rpc.Client @@ -71,7 +72,7 @@ type pssRPCRW struct { closed bool } -func (c *Client) newpssRPCRW(pubkeyid string, addr pss.PssAddress, topicobj pss.Topic) (*pssRPCRW, error) { +func (c *Client) newpssRPCRW(pubkeyid string, addr pss.PssAddress, topicobj message.Topic) (*pssRPCRW, error) { topic := topicobj.String() err := c.rpc.Call(nil, "pss_setPeerPublicKey", pubkeyid, topic, hexutil.Encode(addr[:])) if err != nil { @@ -218,8 +219,8 @@ func NewClientWithRPC(rpcclient *rpc.Client) (*Client, error) { func newClient() (client *Client) { client = &Client{ quitC: make(chan struct{}), - peerPool: make(map[pss.Topic]map[string]*pssRPCRW), - protos: make(map[pss.Topic]*p2p.Protocol), + peerPool: make(map[message.Topic]map[string]*pssRPCRW), + protos: make(map[message.Topic]*p2p.Protocol), } return } @@ -232,7 +233,7 @@ func newClient() (client *Client) { // when an incoming message is received from a peer that is not yet known to the client, // this peer object is instantiated, and the protocol is run on it. func (c *Client) RunProtocol(ctx context.Context, proto *p2p.Protocol) error { - topicobj := pss.BytesToTopic([]byte(fmt.Sprintf("%s:%d", proto.Name, proto.Version))) + topicobj := message.NewTopic([]byte(fmt.Sprintf("%s:%d", proto.Name, proto.Version))) topichex := topicobj.String() msgC := make(chan pss.APIMsg) c.peerPool[topicobj] = make(map[string]*pssRPCRW) diff --git a/pss/client/client_test.go b/pss/client/client_test.go index f53c516f1c..62cf51b604 100644 --- a/pss/client/client_test.go +++ b/pss/client/client_test.go @@ -25,6 +25,7 @@ import ( "time" "github.com/ethereum/go-ethereum/common/hexutil" + ethCrypto "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/node" "github.com/ethereum/go-ethereum/p2p" @@ -45,7 +46,6 @@ type protoCtrl struct { } var ( - cryptoUtils pss.CryptoUtils // custom logging psslogmain log.Logger pssprotocols map[string]*protoCtrl @@ -62,8 +62,6 @@ func init() { psslogmain = log.New("psslog", "*") - cryptoUtils = pss.NewCryptoUtils() - pssprotocols = make(map[string]*protoCtrl) } @@ -231,13 +229,7 @@ func newServices() adapters.Services { } return adapters.Services{ "pss": func(ctx *adapters.ServiceContext) (node.Service, error) { - ctxlocal, cancel := context.WithTimeout(context.Background(), time.Second) - defer cancel() - keys, err := cryptoUtils.NewKeyPair(ctxlocal) - if err != nil { - return nil, err - } - privkey, err := cryptoUtils.GetPrivateKey(keys) + privkey, err := ethCrypto.GenerateKey() if err != nil { return nil, err } diff --git a/pss/crypto.go b/pss/crypto.go deleted file mode 100644 index 4951249f52..0000000000 --- a/pss/crypto.go +++ /dev/null @@ -1,104 +0,0 @@ -// Copyright 2019 The Swarm Authors -// This file is part of the Swarm library. -// -// The Swarm library is free software: you can redistribute it and/or modify -// it under the terms of the GNU Lesser General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. -// -// The Swarm library is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Lesser General Public License for more details. -// -// You should have received a copy of the GNU Lesser General Public License -// along with the Swarm library. If not, see . -package pss - -import ( - "context" - "crypto/ecdsa" - - ethCrypto "github.com/ethereum/go-ethereum/crypto" - whisper "github.com/ethereum/go-ethereum/whisper/whisperv6" -) - -var cryptoBackend defaultCryptoBackend - -type CryptoBackend interface { - GetSymKey(id string) ([]byte, error) - GenerateSymKey() (string, error) - AddSymKeyDirect(bytes []byte) (string, error) - FromECDSAPub(pub *ecdsa.PublicKey) []byte - UnmarshalPubkey(pub []byte) (*ecdsa.PublicKey, error) - CompressPubkey(pubkey *ecdsa.PublicKey) []byte -} - -//Used only in tests -type CryptoUtils interface { - GenerateKey() (*ecdsa.PrivateKey, error) - NewKeyPair(ctx context.Context) (string, error) - GetPrivateKey(id string) (*ecdsa.PrivateKey, error) -} - -type defaultCryptoBackend struct { - whisper *whisper.Whisper - wapi *whisper.PublicWhisperAPI -} - -func NewCryptoBackend() CryptoBackend { - w := whisper.New(&whisper.DefaultConfig) - cryptoBackend = defaultCryptoBackend{ - whisper: w, - wapi: whisper.NewPublicWhisperAPI(w), - } - return &cryptoBackend -} - -func NewCryptoUtils() CryptoUtils { - if cryptoBackend.whisper == nil { - NewCryptoBackend() - } - return &cryptoBackend -} - -func (crypto *defaultCryptoBackend) GetSymKey(id string) ([]byte, error) { - return crypto.whisper.GetSymKey(id) -} - -func (crypto *defaultCryptoBackend) GenerateSymKey() (string, error) { - return crypto.whisper.GenerateSymKey() -} - -func (crypto *defaultCryptoBackend) AddSymKeyDirect(bytes []byte) (string, error) { - return crypto.whisper.AddSymKeyDirect(bytes) -} - -// FromECDSA exports a public key into a binary dump. -func (crypto *defaultCryptoBackend) FromECDSAPub(pub *ecdsa.PublicKey) []byte { - return ethCrypto.FromECDSAPub(pub) -} - -// CompressPubkey encodes a public key to the 33-byte compressed format. -func (crypto *defaultCryptoBackend) CompressPubkey(pubkey *ecdsa.PublicKey) []byte { - return ethCrypto.CompressPubkey(pubkey) -} - -// UnmarshalPubkey converts bytes to a secp256k1 public key. -func (crypto *defaultCryptoBackend) UnmarshalPubkey(pub []byte) (*ecdsa.PublicKey, error) { - return ethCrypto.UnmarshalPubkey(pub) -} - -// CryptoUtils - -func (crypto *defaultCryptoBackend) GenerateKey() (*ecdsa.PrivateKey, error) { - return ethCrypto.GenerateKey() -} - -func (crypto *defaultCryptoBackend) NewKeyPair(ctx context.Context) (string, error) { - return crypto.wapi.NewKeyPair(ctx) -} - -func (crypto *defaultCryptoBackend) GetPrivateKey(id string) (*ecdsa.PrivateKey, error) { - return crypto.whisper.GetPrivateKey(id) -} diff --git a/pss/crypto/crypto.go b/pss/crypto/crypto.go new file mode 100644 index 0000000000..81982d7f2e --- /dev/null +++ b/pss/crypto/crypto.go @@ -0,0 +1,608 @@ +// Copyright 2019 The Swarm Authors +// This file is part of the Swarm library. +// +// The Swarm library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The Swarm library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the Swarm library. If not, see . +package crypto + +import ( + "crypto/aes" + "crypto/cipher" + "crypto/ecdsa" + crand "crypto/rand" + "encoding/binary" + "errors" + "fmt" + mrand "math/rand" + "strconv" + "sync" + + "github.com/ethereum/go-ethereum/common" + ethCrypto "github.com/ethereum/go-ethereum/crypto" + "github.com/ethereum/go-ethereum/crypto/ecies" + "github.com/ethersphere/swarm/log" +) + +var createPadding = true + +const ( + aesNonceLength = 12 // in bytes; for more info please see cipher.gcmStandardNonceSize & aesgcm.NonceSize() + keyIDSize = 32 // in bytes + + flagsLength = 1 + payloadSizeFieldMaxSize = 4 + signatureLength = 65 // in bytes + padSizeLimit = 256 // just an arbitrary number, could be changed without breaking the protocol + SizeMask = byte(3) // mask used to extract the size of payload size field from the flags + signatureFlag = byte(4) + aesKeyLength = 32 // in bytes + + defaultPaddingByteSize = 16 +) + +// Config params to wrap and encrypt a message. +// For asymmetric encryption Receiver is needed. +// For symmetric, SymmetricKey is needed. Sender is not mandatory but used to sign the message in both schemes. +type WrapParams struct { + Sender *ecdsa.PrivateKey // Private key of sender used for signature + Receiver *ecdsa.PublicKey // Public key of receiver for encryption + SymmetricKey []byte // Symmetric key for encryption +} + +// Config params to unwrap and decrypt a message. +// For asymmetric encryption Receiver is needed. +// For symmetric, SymmetricKey is needed. Sender is not mandatory but used to sign the message in both schemes. +type UnwrapParams struct { + Sender *ecdsa.PublicKey // Public key of sender used for signature validation + Receiver *ecdsa.PrivateKey // Private key of receiver for decryption + SymmetricKey []byte // Symmetric key for decryption +} + +// Contains a successfully decrypted message prior to parsing and validating +type ReceivedMessage interface { + GetPayload() ([]byte, error) + GetSender() *ecdsa.PublicKey +} + +// Crypto contains methods from Message and KeyStore +type Crypto interface { + Message + KeyStore +} + +// Message contains methods for wrapping(encrypting) and unwrapping(decrypting) messages +type Message interface { + Wrap(plaintext []byte, params *WrapParams) (data []byte, err error) + UnWrap(ciphertext []byte, unwrapParams *UnwrapParams) (ReceivedMessage, error) +} + +// KeyStore contains key manipulation methods +type KeyStore interface { + + // Symmetric key management + GetSymmetricKey(id string) ([]byte, error) + GenerateSymmetricKey() (string, error) + AddSymmetricKey(bytes []byte) (string, error) + + // Key serialization + SerializePublicKey(pub *ecdsa.PublicKey) []byte + UnmarshalPublicKey(pub []byte) (*ecdsa.PublicKey, error) + CompressPublicKey(pub *ecdsa.PublicKey) []byte +} + +var ( + errInvalidPubkey = errors.New("invalid public key provided for asymmetric encryption") + errInvalidSymkey = errors.New("invalid key provided for symmetric encryption") + errMissingSaltOrInvalidPayload = errors.New("missing salt or invalid payload in symmetric message") + errSecureRandomData = errors.New("failed to generate secure random data") + errNoKey = errors.New("unable to encrypt the message: neither symmetric nor asymmetric key provided") + + // Validation and Parse errors + errEmptyMessage = errors.New("empty message") + errEmptySignature = errors.New("empty expected signature") + errIncorrectSignature = errors.New("incorrect signature") + errIncorrectSize = errors.New("incorrect payload size") +) + +type defaultCryptoBackend struct { + symKeys map[string][]byte // Symmetric key storage + keyMu sync.RWMutex // Mutex associated with key storage +} + +// receivedMessage represents a data packet to be received +// and successfully decrypted. +type receivedMessage struct { + Payload []byte // Parsed and validated message content + Raw []byte // Unparsed but decrypted message content + Signature []byte // Signature from the sender + Salt []byte // Protect against plaintext attacks + Padding []byte // Padding applied + + Sender *ecdsa.PublicKey // Source public key used for signing the message + + validateOnce sync.Once + validateError error + + crypto *defaultCryptoBackend +} + +// Returns the sender public key of the message +func (msg *receivedMessage) GetSender() *ecdsa.PublicKey { + msg.validateOnce.Do( + func() { + msg.validateError = msg.validateAndParse() + }) + return msg.Sender +} + +// GetPayload validate and parse the payload of the message. +func (msg *receivedMessage) GetPayload() ([]byte, error) { + msg.validateOnce.Do( + func() { + msg.validateError = msg.validateAndParse() + }) + return msg.Payload, msg.validateError +} + +// validateAndParse checks that the format and the signature are correct. It also set Payload as the parsed message +func (msg *receivedMessage) validateAndParse() error { + end := len(msg.Raw) + if end == 0 { + return errEmptyMessage + } + if isMessageSigned(msg.Raw[0]) { + end -= signatureLength + if end <= 1 { + return errEmptySignature + } + msg.Signature = msg.Raw[end : end+signatureLength] + sz := len(msg.Raw) - signatureLength + pub, err := msg.crypto.sigToPub(msg.Raw[:sz], msg.Signature) + if err != nil { + log.Error("failed to recover public key from signature", "err", err) + return errIncorrectSignature + } else { + msg.Sender = pub + } + } + + beg := 1 + payloadSize := 0 + sizeOfPayloadSizeField := int(msg.Raw[0] & SizeMask) // number of bytes indicating the size of payload + if sizeOfPayloadSizeField != 0 { + log.Warn("Size of payload field", "size", sizeOfPayloadSizeField) + payloadSize = int(bytesToUintLittleEndian(msg.Raw[beg : beg+sizeOfPayloadSizeField])) + if payloadSize+1 > end { + return errIncorrectSize + } + beg += sizeOfPayloadSizeField + msg.Payload = msg.Raw[beg : beg+payloadSize] + } + + beg += payloadSize + msg.Padding = msg.Raw[beg:end] + return nil +} + +func newReceivedMessage(decrypted []byte, salt []byte, crypto *defaultCryptoBackend) *receivedMessage { + return &receivedMessage{ + Raw: decrypted, + Salt: salt, + crypto: crypto, + validateOnce: sync.Once{}, + } +} + +// Return the default implementation of Crypto +func New() *defaultCryptoBackend { + return newDefaultCryptoBackend() + +} + +func newDefaultCryptoBackend() *defaultCryptoBackend { + return &defaultCryptoBackend{ + symKeys: make(map[string][]byte), + } +} + +// == Message Crypto == + +// Wrap creates a message adding signature, padding and other control fields and then it is encrypted using params +func (crypto *defaultCryptoBackend) Wrap(plaintext []byte, params *WrapParams) (data []byte, err error) { + var padding []byte + if createPadding { + padding, err = generateSecureRandomData(defaultPaddingByteSize) + if err != nil { + return + } + } else { + padding = make([]byte, 0) + } + // Message structure is flags+PayloadSize+Payload+padding+signature + rawBytes := make([]byte, 1, + flagsLength+payloadSizeFieldMaxSize+len(plaintext)+len(padding)+signatureLength+padSizeLimit) + // set flags byte + rawBytes[0] = 0 // set all the flags to zero + // add payloadSizeField + rawBytes = crypto.addPayloadSizeField(rawBytes, plaintext) + // add payload + rawBytes = append(rawBytes, plaintext...) + // add padding + rawBytes = append(rawBytes, padding...) + // sign + if params.Sender != nil { + if rawBytes, err = crypto.sign(rawBytes, params.Sender); err != nil { + return + } + } + // encrypt + if params.Receiver != nil { + rawBytes, err = crypto.encryptAsymmetric(rawBytes, params.Receiver) + } else if params.SymmetricKey != nil { + rawBytes, err = crypto.encryptSymmetric(rawBytes, params.SymmetricKey) + } else { + err = errNoKey + } + if err != nil { + return + } + + data = rawBytes + return +} + +// Unwrap decrypts and compose a received message ready to be parsed and validated. +// It selects symmetric/asymmetric crypto depending on unwrapParams +func (crypto *defaultCryptoBackend) UnWrap(ciphertext []byte, unwrapParams *UnwrapParams) (ReceivedMessage, error) { + if unwrapParams.SymmetricKey != nil { + return crypto.unWrapSymmetric(ciphertext, unwrapParams.SymmetricKey) + } else if unwrapParams.Receiver != nil { + return crypto.unWrapAsymmetric(ciphertext, unwrapParams.Receiver) + } else { + return nil, errNoKey + } +} + +func (crypto *defaultCryptoBackend) unWrapSymmetric(ciphertext, key []byte) (ReceivedMessage, error) { + decrypted, salt, err := crypto.decryptSymmetric(ciphertext, key) + if err != nil { + return nil, err + } + msg := newReceivedMessage(decrypted, salt, crypto) + return msg, err +} + +func (crypto *defaultCryptoBackend) unWrapAsymmetric(ciphertext []byte, key *ecdsa.PrivateKey) (ReceivedMessage, error) { + plaintext, err := crypto.decryptAsymmetric(ciphertext, key) + switch err { + case nil: + message := newReceivedMessage(plaintext, nil, crypto) + return message, nil + case ecies.ErrInvalidPublicKey: // addressed to somebody else + return nil, err + default: + return nil, fmt.Errorf("unable to open envelope, decrypt failed: %v", err) + } +} + +func (crypto *defaultCryptoBackend) addPayloadSizeField(rawBytes []byte, payload []byte) []byte { + fieldSize := getSizeOfPayloadSizeField(payload) + field := make([]byte, 4) + binary.LittleEndian.PutUint32(field, uint32(len(payload))) + field = field[:fieldSize] + rawBytes = append(rawBytes, field...) + rawBytes[0] |= byte(fieldSize) + return rawBytes +} + +// pad appends the padding specified in params. +func (crypto *defaultCryptoBackend) pad(rawBytes, payload []byte, signed bool) ([]byte, error) { + rawSize := flagsLength + getSizeOfPayloadSizeField(payload) + len(payload) + if signed { + rawSize += signatureLength + } + odd := rawSize % padSizeLimit + paddingSize := padSizeLimit - odd + pad := make([]byte, paddingSize) + _, err := crand.Read(pad) + if err != nil { + return nil, err + } + + if len(pad) != paddingSize || containsOnlyZeros(pad) { + return nil, errors.New("failed to generate random padding of size " + strconv.Itoa(paddingSize)) + } + rawBytes = append(rawBytes, pad...) + return rawBytes, nil +} + +// sign calculates and sets the cryptographic signature for the message, +// also setting the sign flag. +func (crypto *defaultCryptoBackend) sign(rawBytes []byte, key *ecdsa.PrivateKey) ([]byte, error) { + if isMessageSigned(rawBytes[0]) { + // this should not happen, but no reason to panic + log.Error("failed to sign the message: already signed") + return rawBytes, nil + } + rawBytes[0] |= signatureFlag // it is important to set this flag before signing + + hash := ethCrypto.Keccak256(rawBytes) + signature, err := crypto.signHash(hash, key) + + if err != nil { + rawBytes[0] &= (0xFF ^ signatureFlag) // clear the flag + return nil, err + } + rawBytes = append(rawBytes, signature...) + return rawBytes, nil +} + +func isMessageSigned(flags byte) bool { + return (flags & signatureFlag) != 0 +} + +// === Key store functions === + +// GetSymmetricKey retrieves symmetric key by id from the store +func (crypto *defaultCryptoBackend) GetSymmetricKey(id string) ([]byte, error) { + crypto.keyMu.RLock() + defer crypto.keyMu.RUnlock() + if crypto.symKeys[id] != nil { + return crypto.symKeys[id], nil + } + return nil, fmt.Errorf("non-existent key ID") +} + +// GenerateSymmetricKey creates a new symmetric, stores it and return its id +func (crypto *defaultCryptoBackend) GenerateSymmetricKey() (string, error) { + key, err := generateSecureRandomData(aesKeyLength) + if err != nil { + return "", err + } else if !validateDataIntegrity(key, aesKeyLength) { + return "", fmt.Errorf("error in GenerateSymmetricKey: crypto/rand failed to generate random data") + } + + id, err := generateRandomKeyID() + if err != nil { + return "", fmt.Errorf("failed to generate ID: %s", err) + } + + crypto.keyMu.Lock() + defer crypto.keyMu.Unlock() + + if crypto.symKeys[id] != nil { + return "", fmt.Errorf("failed to generate unique ID") + } + crypto.symKeys[id] = key + return id, nil +} + +// Add a symmetric key to the store generating an id and returning it +func (crypto *defaultCryptoBackend) AddSymmetricKey(key []byte) (string, error) { + if len(key) != aesKeyLength { + return "", fmt.Errorf("wrong key size: %d", len(key)) + } + + id, err := generateRandomKeyID() + if err != nil { + return "", fmt.Errorf("failed to generate ID: %s", err) + } + + crypto.keyMu.Lock() + defer crypto.keyMu.Unlock() + + if crypto.symKeys[id] != nil { + return "", fmt.Errorf("failed to generate unique ID") + } + crypto.symKeys[id] = key + return id, nil +} + +// === Key conversion === + +// FromECDSA exports a public key into a binary dump. +func (crypto *defaultCryptoBackend) SerializePublicKey(pub *ecdsa.PublicKey) []byte { + return ethCrypto.FromECDSAPub(pub) +} + +// === Key serialization === + +// UnmarshalPublicKey converts bytes to a secp256k1 public key. +func (crypto *defaultCryptoBackend) UnmarshalPublicKey(pub []byte) (*ecdsa.PublicKey, error) { + return ethCrypto.UnmarshalPubkey(pub) +} + +// CompressPublicKey encodes a public key to the 33-byte compressed format. +func (crypto *defaultCryptoBackend) CompressPublicKey(pubkey *ecdsa.PublicKey) []byte { + return ethCrypto.CompressPubkey(pubkey) +} + +// == private methods == + +// === Encrypt-Decrypt === + +// decryptSymmetric decrypts a message with a topic key, using AES-GCM-256. +// nonce size should be 12 bytes (see cipher.gcmStandardNonceSize). +func (crypto *defaultCryptoBackend) decryptSymmetric(rawBytes []byte, key []byte) (decrypted []byte, salt []byte, err error) { + // symmetric messages are expected to contain the 12-byte nonce at the end of the payload + if len(rawBytes) < aesNonceLength { + return nil, nil, errMissingSaltOrInvalidPayload + } + salt = rawBytes[len(rawBytes)-aesNonceLength:] + + block, err := aes.NewCipher(key) + if err != nil { + return nil, nil, err + } + aesgcm, err := cipher.NewGCM(block) + if err != nil { + return nil, nil, err + } + decrypted, err = aesgcm.Open(nil, salt, rawBytes[:len(rawBytes)-aesNonceLength], nil) + if err != nil { + return nil, nil, err + } + return +} + +// encryptAsymmetric encrypts a message with a public key. +func (crypto *defaultCryptoBackend) encryptAsymmetric(rawBytes []byte, key *ecdsa.PublicKey) ([]byte, error) { + if !validatePublicKey(key) { + return nil, errInvalidPubkey + } + encrypted, err := ecies.Encrypt(crand.Reader, crypto.fromECDSAtoECIESPublic(key), rawBytes, nil, nil) + if err == nil { + return encrypted, nil + } + return nil, err +} + +func (crypto *defaultCryptoBackend) decryptAsymmetric(rawBytes []byte, key *ecdsa.PrivateKey) ([]byte, error) { + return ecies.ImportECDSA(key).Decrypt(rawBytes, nil, nil) +} + +func (crypto *defaultCryptoBackend) encryptSymmetric(rawBytes []byte, key []byte) ([]byte, error) { + if !validateDataIntegrity(key, aesKeyLength) { + return nil, errInvalidSymkey + } + block, err := aes.NewCipher(key) + if err != nil { + return nil, err + } + aesgcm, err := cipher.NewGCM(block) + if err != nil { + return nil, err + } + salt, err := generateSecureRandomData(aesNonceLength) // never use more than 2^32 random nonces with a given key + if err != nil { + return nil, err + } + encrypted := aesgcm.Seal(nil, salt, rawBytes, nil) + encBytes := append(encrypted, salt...) + return encBytes, nil +} + +// signHash calculates an ECDSA signature. +func (crypto *defaultCryptoBackend) signHash(hash []byte, prv *ecdsa.PrivateKey) (sig []byte, err error) { + return ethCrypto.Sign(hash, prv) +} + +// sigToPub obtains public key from the signed message and the signature +func (crypto *defaultCryptoBackend) sigToPub(signed, sig []byte) (*ecdsa.PublicKey, error) { + defer func() { recover() }() // in case of invalid signature + hash := ethCrypto.Keccak256(signed) + return ethCrypto.SigToPub(hash, sig) +} + +// fromECDSAtoECIESPublic converts a ecdsa public key to the format needed by the crypti/ecies package +func (crypto *defaultCryptoBackend) fromECDSAtoECIESPublic(key *ecdsa.PublicKey) *ecies.PublicKey { + return ecies.ImportECDSAPublic(key) +} + +// generateRandomKeyID generates a random string, which is then returned to be used as a key id +func generateRandomKeyID() (id string, err error) { + buf, err := generateSecureRandomData(keyIDSize) + if err != nil { + return "", err + } + if !validateDataIntegrity(buf, keyIDSize) { + return "", fmt.Errorf("error in generateRandomKeyID: crypto/rand failed to generate random data") + } + id = common.Bytes2Hex(buf) + return id, err +} + +// generateSecureRandomData generates random data where extra security is required. +// The purpose of this function is to prevent some bugs in software or in hardware +// from delivering not-very-random data. This is especially useful for AES nonce, +// where true randomness does not really matter, but it is very important to have +// a unique nonce for every message. +func generateSecureRandomData(length int) ([]byte, error) { + x := make([]byte, length) + y := make([]byte, length) + res := make([]byte, length) + + _, err := crand.Read(x) + if err != nil { + return nil, err + } else if !validateDataIntegrity(x, length) { + return nil, errSecureRandomData + } + _, err = mrand.Read(y) + if err != nil { + return nil, err + } else if !validateDataIntegrity(y, length) { + return nil, errSecureRandomData + } + for i := 0; i < length; i++ { + res[i] = x[i] ^ y[i] + } + if !validateDataIntegrity(res, length) { + return nil, errSecureRandomData + } + return res, nil +} + +// validateDataIntegrity returns false if the data have the wrong size or contains all zeros, +// which is the simplest and the most common bug. +func validateDataIntegrity(k []byte, expectedSize int) bool { + if len(k) != expectedSize { + return false + } + if expectedSize > 3 && containsOnlyZeros(k) { + return false + } + return true +} + +// validatePrivateKey checks the format of the given private key. +func validatePrivateKey(k *ecdsa.PrivateKey) bool { + if k == nil || k.D == nil || k.D.Sign() == 0 { + return false + } + return validatePublicKey(&k.PublicKey) +} + +// ValidatePublicKey checks the format of the given public key. +func validatePublicKey(k *ecdsa.PublicKey) bool { + return k != nil && k.X != nil && k.Y != nil && k.X.Sign() != 0 && k.Y.Sign() != 0 +} + +// containsOnlyZeros checks if the data contain only zeros. +func containsOnlyZeros(data []byte) bool { + for _, b := range data { + if b != 0 { + return false + } + } + return true +} + +// bytesToUintLittleEndian converts the slice to 64-bit unsigned integer. +func bytesToUintLittleEndian(b []byte) (res uint64) { + mul := uint64(1) + for i := 0; i < len(b); i++ { + res += uint64(b[i]) * mul + mul *= 256 + } + return res +} + +// getSizeOfPayloadSizeField returns the number of bytes necessary to encode the size of payload +func getSizeOfPayloadSizeField(payload []byte) int { + s := 1 + for i := len(payload); i >= 256; i /= 256 { + s++ + } + return s +} diff --git a/pss/doc.go b/pss/doc.go index 462c82aaad..d24b72d830 100644 --- a/pss/doc.go +++ b/pss/doc.go @@ -16,7 +16,7 @@ // Pss provides devp2p functionality for swarm nodes without the need for a direct tcp connection between them. // -// Messages are encapsulated in a devp2p message structure `PssMsg`. These capsules are forwarded from node to node using ordinary tcp devp2p until it reaches its destination: The node or nodes who can successfully decrypt the message. +// Messages are encapsulated in a devp2p message structure `message.Message`. These capsules are forwarded from node to node using ordinary tcp devp2p until it reaches its destination: The node or nodes who can successfully decrypt the message. // // Routing of messages is done using swarm's own kademlia routing. Optionally routing can be turned off, forcing the message to be sent to all peers, similar to the behavior of the whisper protocol. // @@ -50,11 +50,11 @@ // // Under the hood, pss implements its own MsgReadWriter, which bridges MsgReadWriter.WriteMsg with Pss.SendRaw, and deftly adds an InjectMsg method which pipes incoming messages to appear on the MsgReadWriter.ReadMsg channel. // -// An incoming connection is nothing more than an actual PssMsg appearing with a certain Topic. If a Handler har been registered to that Topic, the message will be passed to it. This constitutes a "new" connection if: +// An incoming connection is nothing more than an actual message.Message appearing with a certain Topic. If a Handler har been registered to that Topic, the message will be passed to it. This constitutes a "new" connection if: // // - The pss node never called AddPeer with this combination of remote peer address and topic, and // -// - The pss node never received a PssMsg from this remote peer with this specific Topic before. +// - The pss node never received a message.Message from this remote peer with this specific Topic before. // // If it is a "new" connection, the protocol will be "run" on the remote peer, in the same manner as if it was pre-emptively added. // diff --git a/pss/forwarding_test.go b/pss/forwarding_test.go index c79da8e9b9..d936f02259 100644 --- a/pss/forwarding_test.go +++ b/pss/forwarding_test.go @@ -6,11 +6,13 @@ import ( "testing" "time" + ethCrypto "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/p2p" "github.com/ethereum/go-ethereum/p2p/enode" "github.com/ethersphere/swarm/network" "github.com/ethersphere/swarm/p2p/protocols" "github.com/ethersphere/swarm/pot" + "github.com/ethersphere/swarm/pss/message" ) type testCase struct { @@ -24,8 +26,6 @@ type testCase struct { errors string } -var crypto CryptoUtils = NewCryptoUtils() - // the purpose of this test is to see that pss.forward() function correctly // selects the peers for message forwarding, depending on the message address // and kademlia constellation. @@ -248,7 +248,7 @@ func testForwardMsg(t *testing.T, ps *Pss, c *testCase) { resultMap := make(map[pot.Address]int) defer func() { sendFunc = sendMsg }() - sendFunc = func(_ *Pss, sp *network.Peer, _ *PssMsg) bool { + sendFunc = func(_ *Pss, sp *network.Peer, _ *message.Message) bool { if tries < nFails { tries++ return false @@ -324,7 +324,7 @@ func addPeers(kad *network.Kademlia, addresses []pot.Address) { } func createPss(t *testing.T, kad *network.Kademlia) *Pss { - privKey, err := crypto.GenerateKey() + privKey, err := ethCrypto.GenerateKey() pssp := NewParams().WithPrivateKey(privKey) ps, err := New(kad, pssp) if err != nil { @@ -347,13 +347,11 @@ func newTestDiscoveryPeer(addr pot.Address, kad *network.Kademlia) *network.Peer return network.NewPeer(bp, kad) } -func newTestMsg(addr []byte) *PssMsg { - msg := newPssMsg(&msgParams{}) +func newTestMsg(addr []byte) *message.Message { + msg := message.New(message.Flags{}) msg.To = addr[:] msg.Expire = uint32(time.Now().Add(time.Second * 60).Unix()) - msg.Payload = &envelope{ - Topic: [4]byte{}, - Data: []byte("i have nothing to hide"), - } + msg.Topic = [4]byte{} + msg.Payload = []byte("i have nothing to hide") return msg } diff --git a/pss/handshake.go b/pss/handshake.go index da28f7cf4a..da3bd61cf5 100644 --- a/pss/handshake.go +++ b/pss/handshake.go @@ -31,6 +31,7 @@ import ( "github.com/ethereum/go-ethereum/rlp" "github.com/ethereum/go-ethereum/rpc" "github.com/ethersphere/swarm/log" + "github.com/ethersphere/swarm/pss/message" ) const ( @@ -54,7 +55,7 @@ type handshakeMsg struct { Limit uint16 Keys [][]byte Request uint8 - Topic Topic + Topic message.Topic } // internal representation of an individual symmetric key @@ -112,8 +113,8 @@ type HandshakeController struct { symKeySendLimit uint16 symKeyCapacity uint8 symKeyIndex map[string]*handshakeKey - handshakes map[string]map[Topic]*handshake - deregisterFuncs map[Topic]func() + handshakes map[string]map[message.Topic]*handshake + deregisterFuncs map[message.Topic]func() } // Attach HandshakeController to pss node @@ -128,8 +129,8 @@ func SetHandshakeController(pss *Pss, params *HandshakeParams) error { symKeySendLimit: params.SymKeySendLimit, symKeyCapacity: params.SymKeyCapacity, symKeyIndex: make(map[string]*handshakeKey), - handshakes: make(map[string]map[Topic]*handshake), - deregisterFuncs: make(map[Topic]func()), + handshakes: make(map[string]map[message.Topic]*handshake), + deregisterFuncs: make(map[message.Topic]func()), } api := &HandshakeAPI{ namespace: "pss", @@ -147,7 +148,7 @@ func SetHandshakeController(pss *Pss, params *HandshakeParams) error { // Return all unexpired symmetric keys from store by // peer (public key), topic and specified direction -func (ctl *HandshakeController) validKeys(pubkeyid string, topic *Topic, in bool) (validkeys []*string) { +func (ctl *HandshakeController) validKeys(pubkeyid string, topic *message.Topic, in bool) (validkeys []*string) { ctl.lock.Lock() defer ctl.lock.Unlock() now := time.Now() @@ -177,11 +178,11 @@ func (ctl *HandshakeController) validKeys(pubkeyid string, topic *Topic, in bool // Add all given symmetric keys with validity limits to store by // peer (public key), topic and specified direction -func (ctl *HandshakeController) updateKeys(pubkeyid string, topic *Topic, in bool, symkeyids []string, limit uint16) { +func (ctl *HandshakeController) updateKeys(pubkeyid string, topic *message.Topic, in bool, symkeyids []string, limit uint16) { ctl.lock.Lock() defer ctl.lock.Unlock() if _, ok := ctl.handshakes[pubkeyid]; !ok { - ctl.handshakes[pubkeyid] = make(map[Topic]*handshake) + ctl.handshakes[pubkeyid] = make(map[message.Topic]*handshake) } if ctl.handshakes[pubkeyid][*topic] == nil { @@ -214,14 +215,14 @@ func (ctl *HandshakeController) updateKeys(pubkeyid string, topic *Topic, in boo } } -func (ctl *HandshakeController) releaseKey(symkeyid string, topic *Topic) bool { +func (ctl *HandshakeController) releaseKey(symkeyid string, topic *message.Topic) bool { ctl.lock.Lock() defer ctl.lock.Unlock() return ctl.releaseKeyNoLock(symkeyid, topic) } // Expire a symmetric key, making it eligible for garbage collection -func (ctl *HandshakeController) releaseKeyNoLock(symkeyid string, topic *Topic) bool { +func (ctl *HandshakeController) releaseKeyNoLock(symkeyid string, topic *message.Topic) bool { if ctl.symKeyIndex[symkeyid] == nil { log.Debug("no symkey", "symkeyid", symkeyid) return false @@ -236,7 +237,7 @@ func (ctl *HandshakeController) releaseKeyNoLock(symkeyid string, topic *Topic) // Expired means: // - expiry timestamp is set, and grace period is exceeded // - message validity limit is reached -func (ctl *HandshakeController) cleanHandshake(pubkeyid string, topic *Topic, in bool, out bool) int { +func (ctl *HandshakeController) cleanHandshake(pubkeyid string, topic *message.Topic, in bool, out bool) int { ctl.lock.Lock() defer ctl.lock.Unlock() var deletecount int @@ -290,7 +291,7 @@ func (ctl *HandshakeController) getSymKey(symkeyid string) *handshakeKey { return ctl.symKeyIndex[symkeyid] } -// Passed as a PssMsg handler for the topic handshake is activated on +// Passed as a message.Message handler for the topic handshake is activated on // Handles incoming key exchange messages and // counts message usage by symmetric key (expiry limit control) // Only returns error if key handler fails @@ -324,7 +325,7 @@ func (ctl *HandshakeController) registerSymKeyUse(symkeyid string) error { } symKey.count++ - receiver := common.ToHex(ctl.pss.Crypto.FromECDSAPub(ctl.pss.PublicKey())) + receiver := common.ToHex(ctl.pss.Crypto.SerializePublicKey(ctl.pss.PublicKey())) log.Trace("increment symkey recv use", "symsymkeyid", symkeyid, "count", symKey.count, "limit", symKey.limit, "receiver", receiver) return nil @@ -378,7 +379,7 @@ func (ctl *HandshakeController) handleKeys(pubkeyid string, keymsg *handshakeMsg // If number of valid outgoing keys is less than the ideal/max // amount, a request is sent for the amount of keys to make up // the difference -func (ctl *HandshakeController) sendKey(pubkeyid string, topic *Topic, keycount uint8) ([]string, error) { +func (ctl *HandshakeController) sendKey(pubkeyid string, topic *message.Topic, keycount uint8) ([]string, error) { var requestcount uint8 to := PssAddress{} @@ -392,7 +393,7 @@ func (ctl *HandshakeController) sendKey(pubkeyid string, topic *Topic, keycount recvkeyids := make([]string, keycount) ctl.lock.Lock() if _, ok := ctl.handshakes[pubkeyid]; !ok { - ctl.handshakes[pubkeyid] = make(map[Topic]*handshake) + ctl.handshakes[pubkeyid] = make(map[message.Topic]*handshake) } ctl.lock.Unlock() @@ -479,7 +480,7 @@ type HandshakeAPI struct { // // Fails if the incoming symmetric key store is already full (and `flush` is false), // or if the underlying key dispatcher fails -func (api *HandshakeAPI) Handshake(pubkeyid string, topic Topic, sync bool, flush bool) (keys []string, err error) { +func (api *HandshakeAPI) Handshake(pubkeyid string, topic message.Topic, sync bool, flush bool) (keys []string, err error) { var hsc chan []string var keycount uint8 if flush { @@ -512,13 +513,13 @@ func (api *HandshakeAPI) Handshake(pubkeyid string, topic Topic, sync bool, flus } // Activate handshake functionality on a topic -func (api *HandshakeAPI) AddHandshake(topic Topic) error { +func (api *HandshakeAPI) AddHandshake(topic message.Topic) error { api.ctrl.deregisterFuncs[topic] = api.ctrl.pss.Register(&topic, NewHandler(api.ctrl.handler)) return nil } // Deactivate handshake functionality on a topic -func (api *HandshakeAPI) RemoveHandshake(topic *Topic) error { +func (api *HandshakeAPI) RemoveHandshake(topic *message.Topic) error { if _, ok := api.ctrl.deregisterFuncs[*topic]; ok { api.ctrl.deregisterFuncs[*topic]() } @@ -531,7 +532,7 @@ func (api *HandshakeAPI) RemoveHandshake(topic *Topic) error { // The `in` and `out` parameters indicate for which direction(s) // symmetric keys will be returned. // If both are false, no keys (and no error) will be returned. -func (api *HandshakeAPI) GetHandshakeKeys(pubkeyid string, topic Topic, in bool, out bool) (keys []string, err error) { +func (api *HandshakeAPI) GetHandshakeKeys(pubkeyid string, topic message.Topic, in bool, out bool) (keys []string, err error) { if in { for _, inkey := range api.ctrl.validKeys(pubkeyid, &topic, true) { keys = append(keys, *inkey) @@ -570,7 +571,7 @@ func (api *HandshakeAPI) GetHandshakePublicKey(symkeyid string) (string, error) // If `flush` is set, garbage collection will be performed before returning. // // Returns true on successful removal, false otherwise -func (api *HandshakeAPI) ReleaseHandshakeKey(pubkeyid string, topic Topic, symkeyid string, flush bool) (removed bool, err error) { +func (api *HandshakeAPI) ReleaseHandshakeKey(pubkeyid string, topic message.Topic, symkeyid string, flush bool) (removed bool, err error) { removed = api.ctrl.releaseKey(symkeyid, &topic) if removed && flush { api.ctrl.cleanHandshake(pubkeyid, &topic, true, true) @@ -582,7 +583,7 @@ func (api *HandshakeAPI) ReleaseHandshakeKey(pubkeyid string, topic Topic, symke // // Overloads the pss.SendSym() API call, adding symmetric key usage count // for message expiry control -func (api *HandshakeAPI) SendSym(symkeyid string, topic Topic, msg hexutil.Bytes) (err error) { +func (api *HandshakeAPI) SendSym(symkeyid string, topic message.Topic, msg hexutil.Bytes) (err error) { err = api.ctrl.pss.SendSym(symkeyid, topic, msg[:]) if otherErr := api.ctrl.registerSymKeyUse(symkeyid); otherErr != nil { return otherErr diff --git a/pss/keystore.go b/pss/keystore.go index f32fbf2116..b8b07f3096 100644 --- a/pss/keystore.go +++ b/pss/keystore.go @@ -25,22 +25,24 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/metrics" "github.com/ethersphere/swarm/log" + "github.com/ethersphere/swarm/pss/crypto" + "github.com/ethersphere/swarm/pss/message" ) type KeyStore struct { - Crypto CryptoBackend // key and encryption crypto + Crypto crypto.Crypto // key and encryption crypto mx sync.RWMutex - pubKeyPool map[string]map[Topic]*peer // mapping of hex public keys to peer address by topic. - symKeyPool map[string]map[Topic]*peer // mapping of symkeyids to peer address by topic. - symKeyDecryptCache []*string // fast lookup of symkeys recently used for decryption; last used is on top of stack - symKeyDecryptCacheCursor int // modular cursor pointing to last used, wraps on symKeyDecryptCache array + pubKeyPool map[string]map[message.Topic]*peer // mapping of hex public keys to peer address by topic. + symKeyPool map[string]map[message.Topic]*peer // mapping of symkeyids to peer address by topic. + symKeyDecryptCache []*string // fast lookup of symkeys recently used for decryption; last used is on top of stack + symKeyDecryptCacheCursor int // modular cursor pointing to last used, wraps on symKeyDecryptCache array } func loadKeyStore() *KeyStore { return &KeyStore{ - Crypto: NewCryptoBackend(), - pubKeyPool: make(map[string]map[Topic]*peer), - symKeyPool: make(map[string]map[Topic]*peer), + Crypto: crypto.New(), + pubKeyPool: make(map[string]map[message.Topic]*peer), + symKeyPool: make(map[string]map[message.Topic]*peer), symKeyDecryptCache: make([]*string, defaultSymKeyCacheCapacity), } } @@ -61,14 +63,14 @@ func (ks *KeyStore) isPubKeyStored(key string) bool { return ok } -func (ks *KeyStore) getPeerSym(symkeyid string, topic Topic) (*peer, bool) { +func (ks *KeyStore) getPeerSym(symkeyid string, topic message.Topic) (*peer, bool) { ks.mx.RLock() defer ks.mx.RUnlock() psp, ok := ks.symKeyPool[symkeyid][topic] return psp, ok } -func (ks *KeyStore) getPeerPub(pubkeyid string, topic Topic) (*peer, bool) { +func (ks *KeyStore) getPeerPub(pubkeyid string, topic message.Topic) (*peer, bool) { ks.mx.RLock() defer ks.mx.RUnlock() psp, ok := ks.pubKeyPool[pubkeyid][topic] @@ -78,11 +80,11 @@ func (ks *KeyStore) getPeerPub(pubkeyid string, topic Topic) (*peer, bool) { // Links a peer ECDSA public key to a topic. // This is required for asymmetric message exchange on the given topic. // The value in `address` will be used as a routing hint for the public key / topic association. -func (ks *KeyStore) SetPeerPublicKey(pubkey *ecdsa.PublicKey, topic Topic, address PssAddress) error { +func (ks *KeyStore) SetPeerPublicKey(pubkey *ecdsa.PublicKey, topic message.Topic, address PssAddress) error { if err := validateAddress(address); err != nil { return err } - pubkeybytes := ks.Crypto.FromECDSAPub(pubkey) + pubkeybytes := ks.Crypto.SerializePublicKey(pubkey) if len(pubkeybytes) == 0 { return fmt.Errorf("invalid public key: %v", pubkey) } @@ -92,7 +94,7 @@ func (ks *KeyStore) SetPeerPublicKey(pubkey *ecdsa.PublicKey, topic Topic, addre } ks.mx.Lock() if _, ok := ks.pubKeyPool[pubkeyid]; !ok { - ks.pubKeyPool[pubkeyid] = make(map[Topic]*peer) + ks.pubKeyPool[pubkeyid] = make(map[message.Topic]*peer) } ks.pubKeyPool[pubkeyid][topic] = psp ks.mx.Unlock() @@ -102,14 +104,14 @@ func (ks *KeyStore) SetPeerPublicKey(pubkey *ecdsa.PublicKey, topic Topic, addre // adds a symmetric key to the pss key pool, and optionally adds the key to the // collection of keys used to attempt symmetric decryption of incoming messages -func (ks *KeyStore) addSymmetricKeyToPool(keyid string, topic Topic, address PssAddress, addtocache bool, protected bool) { +func (ks *KeyStore) addSymmetricKeyToPool(keyid string, topic message.Topic, address PssAddress, addtocache bool, protected bool) { psp := &peer{ address: address, protected: protected, } ks.mx.Lock() if _, ok := ks.symKeyPool[keyid]; !ok { - ks.symKeyPool[keyid] = make(map[Topic]*peer) + ks.symKeyPool[keyid] = make(map[message.Topic]*peer) } ks.symKeyPool[keyid][topic] = psp ks.mx.Unlock() @@ -120,7 +122,7 @@ func (ks *KeyStore) addSymmetricKeyToPool(keyid string, topic Topic, address Pss } // Returns all recorded topic and address combination for a specific public key -func (ks *KeyStore) GetPublickeyPeers(keyid string) (topic []Topic, address []PssAddress, err error) { +func (ks *KeyStore) GetPublickeyPeers(keyid string) (topic []message.Topic, address []PssAddress, err error) { ks.mx.RLock() defer ks.mx.RUnlock() for t, peer := range ks.pubKeyPool[keyid] { @@ -130,7 +132,7 @@ func (ks *KeyStore) GetPublickeyPeers(keyid string) (topic []Topic, address []Ps return topic, address, nil } -func (ks *KeyStore) getPeerAddress(keyid string, topic Topic) (PssAddress, error) { +func (ks *KeyStore) getPeerAddress(keyid string, topic message.Topic) (PssAddress, error) { ks.mx.RLock() defer ks.mx.RUnlock() if peers, ok := ks.pubKeyPool[keyid]; ok { @@ -142,63 +144,71 @@ func (ks *KeyStore) getPeerAddress(keyid string, topic Topic) (PssAddress, error } // Attempt to decrypt, validate and unpack a symmetrically encrypted message. -// If successful, returns the unpacked receivedMessage struct -// encapsulating the decrypted message, and the crypto backend id +// If successful, returns the payload of the message and the id // of the symmetric key used to decrypt the message. -// It fails if decryption of the message fails or if the message is corrupted. -func (ks *KeyStore) processSym(envelope *envelope) (*receivedMessage, string, PssAddress, error) { +// It fails if decryption of the message fails or if the message is corrupted/not valid. +func (ks *KeyStore) processSym(pssMsg *message.Message) ([]byte, string, PssAddress, error) { metrics.GetOrRegisterCounter("pss.process.sym", nil).Inc(1) for i := ks.symKeyDecryptCacheCursor; i > ks.symKeyDecryptCacheCursor-cap(ks.symKeyDecryptCache) && i > 0; i-- { symkeyid := ks.symKeyDecryptCache[i%cap(ks.symKeyDecryptCache)] - symkey, err := ks.Crypto.GetSymKey(*symkeyid) + symkey, err := ks.Crypto.GetSymmetricKey(*symkeyid) if err != nil { continue } - recvmsg, err := envelope.openSymmetric(symkey) + unwrapParams := &crypto.UnwrapParams{ + SymmetricKey: symkey, + } + recvmsg, err := ks.Crypto.UnWrap(pssMsg.Payload, unwrapParams) if err != nil { continue } - if !recvmsg.validateAndParse() { - return nil, "", nil, errors.New("symmetrically encrypted message has invalid signature or is corrupt") + payload, validateError := recvmsg.GetPayload() + if validateError != nil { + return nil, "", nil, validateError } + var from PssAddress ks.mx.RLock() - if ks.symKeyPool[*symkeyid][envelope.Topic] != nil { - from = ks.symKeyPool[*symkeyid][envelope.Topic].address + if ks.symKeyPool[*symkeyid][pssMsg.Topic] != nil { + from = ks.symKeyPool[*symkeyid][pssMsg.Topic].address } ks.mx.RUnlock() ks.symKeyDecryptCacheCursor++ ks.symKeyDecryptCache[ks.symKeyDecryptCacheCursor%cap(ks.symKeyDecryptCache)] = symkeyid - return recvmsg, *symkeyid, from, nil + return payload, *symkeyid, from, nil } return nil, "", nil, errors.New("could not decrypt message") } // Attempt to decrypt, validate and unpack an asymmetrically encrypted message. -// If successful, returns the unpacked receivedMessage struct -// encapsulating the decrypted message, and the byte representation of +// If successful, returns the payload of the message and the hex representation of // the public key used to decrypt the message. // It fails if decryption of message fails, or if the message is corrupted. -func (p *Pss) processAsym(envelope *envelope) (*receivedMessage, string, PssAddress, error) { +func (p *Pss) processAsym(pssMsg *message.Message) ([]byte, string, PssAddress, error) { metrics.GetOrRegisterCounter("pss.process.asym", nil).Inc(1) - recvmsg, err := envelope.openAsymmetric(p.privateKey) + unwrapParams := &crypto.UnwrapParams{ + Receiver: p.privateKey, + } + recvmsg, err := p.Crypto.UnWrap(pssMsg.Payload, unwrapParams) if err != nil { return nil, "", nil, fmt.Errorf("could not decrypt message: %s", err) } - // check signature (if signed), strip padding - if !recvmsg.validateAndParse() { - return nil, "", nil, errors.New("invalid message") + + payload, validateError := recvmsg.GetPayload() + if validateError != nil { + return nil, "", nil, validateError } - pubkeyid := common.ToHex(p.Crypto.FromECDSAPub(recvmsg.Src)) + + pubkeyid := common.ToHex(p.Crypto.SerializePublicKey(recvmsg.GetSender())) var from PssAddress p.mx.RLock() - if p.pubKeyPool[pubkeyid][envelope.Topic] != nil { - from = p.pubKeyPool[pubkeyid][envelope.Topic].address + if p.pubKeyPool[pubkeyid][pssMsg.Topic] != nil { + from = p.pubKeyPool[pubkeyid][pssMsg.Topic].address } p.mx.RUnlock() - return recvmsg, pubkeyid, from, nil + return payload, pubkeyid, from, nil } // Symkey garbage collection @@ -209,7 +219,7 @@ func (p *Pss) cleanKeys() (count int) { p.mx.Lock() defer p.mx.Unlock() for keyid, peertopics := range p.symKeyPool { - var expiredtopics []Topic + var expiredtopics []message.Topic for topic, psp := range peertopics { if psp.protected { continue @@ -236,8 +246,8 @@ func (p *Pss) cleanKeys() (count int) { } // Automatically generate a new symkey for a topic and address hint -func (ks *KeyStore) GenerateSymmetricKey(topic Topic, address PssAddress, addToCache bool) (string, error) { - keyid, err := ks.Crypto.GenerateSymKey() +func (ks *KeyStore) GenerateSymmetricKey(topic message.Topic, address PssAddress, addToCache bool) (string, error) { + keyid, err := ks.Crypto.GenerateSymmetricKey() if err == nil { ks.addSymmetricKeyToPool(keyid, topic, address, addToCache, false) } @@ -247,7 +257,7 @@ func (ks *KeyStore) GenerateSymmetricKey(topic Topic, address PssAddress, addToC // Returns a symmetric key byte sequence stored in the crypto backend by its unique id. // Passes on the error value from the crypto backend. func (ks *KeyStore) GetSymmetricKey(symkeyid string) ([]byte, error) { - return ks.Crypto.GetSymKey(symkeyid) + return ks.Crypto.GetSymmetricKey(symkeyid) } // Links a peer symmetric key (arbitrary byte sequence) to a topic. @@ -261,15 +271,15 @@ func (ks *KeyStore) GetSymmetricKey(symkeyid string) ([]byte, error) { // // Returns a string id that can be used to retrieve the key bytes // from the crypto backend (see pss.GetSymmetricKey()) -func (ks *KeyStore) SetSymmetricKey(key []byte, topic Topic, address PssAddress, addtocache bool) (string, error) { +func (ks *KeyStore) SetSymmetricKey(key []byte, topic message.Topic, address PssAddress, addtocache bool) (string, error) { if err := validateAddress(address); err != nil { return "", err } return ks.setSymmetricKey(key, topic, address, addtocache, true) } -func (ks *KeyStore) setSymmetricKey(key []byte, topic Topic, address PssAddress, addtocache bool, protected bool) (string, error) { - keyid, err := ks.Crypto.AddSymKeyDirect(key) +func (ks *KeyStore) setSymmetricKey(key []byte, topic message.Topic, address PssAddress, addtocache bool, protected bool) (string, error) { + keyid, err := ks.Crypto.AddSymmetricKey(key) if err == nil { ks.addSymmetricKeyToPool(keyid, topic, address, addtocache, protected) } diff --git a/pss/message.go b/pss/message.go deleted file mode 100644 index 10a5d2dc7b..0000000000 --- a/pss/message.go +++ /dev/null @@ -1,166 +0,0 @@ -// Copyright 2019 The Swarm Authors -// This file is part of the Swarm library. -// -// The Swarm library is free software: you can redistribute it and/or modify -// it under the terms of the GNU Lesser General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. -// -// The Swarm library is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Lesser General Public License for more details. -// -// You should have received a copy of the GNU Lesser General Public License -// along with the Swarm library. If not, see . - -// Contains all types and code related to messages and envelopes. -// Currently backed by whisperv6 -package pss - -import ( - "crypto/ecdsa" - - whisper "github.com/ethereum/go-ethereum/whisper/whisperv6" -) - -func toTopic(b []byte) (t Topic) { - return Topic(whisper.BytesToTopic(b)) -} - -type envelope struct { - Topic Topic - Data []byte - Expiry uint32 -} - -// == envelope == - -// newSentEnvelope creates and initializes a non-signed, non-encrypted Whisper message -// and then wrap it and encrypt it. It performs what it used to be two function calls: -// msg, e1 := NewSentMessage and env, e := msg.Wrap -func newSentEnvelope(params *messageParams) (*envelope, error) { - whisperParams := toWhisperParams(params) - - message, e := whisper.NewSentMessage(whisperParams) - if e != nil { - return nil, e - } - - whisperEnvelope, e := message.Wrap(whisperParams) - if e != nil { - return nil, e - } - - return toPssEnvelope(whisperEnvelope), nil -} - -// openSymmetric tries to decrypt an envelope, potentially encrypted with a particular key. -func (e *envelope) openSymmetric(key []byte) (*receivedMessage, error) { - whisperEnvelope := toWhisperEnvelope(e) - whisperMsg, err := whisperEnvelope.OpenSymmetric(key) - if err != nil { - return nil, err - } - msg := toReceivedMessage(whisperMsg) - return msg, nil -} - -// openAsymmetric tries to decrypt an envelope, potentially encrypted with a particular key. -func (e *envelope) openAsymmetric(key *ecdsa.PrivateKey) (*receivedMessage, error) { - whisperEnvelope := toWhisperEnvelope(e) - whisperMsg, err := whisperEnvelope.OpenAsymmetric(key) - if err != nil { - return nil, err - } - - msg := toReceivedMessage(whisperMsg) - return msg, nil -} - -// == received message == - -// receivedMessage represents a data packet to be received -// and successfully decrypted. -type receivedMessage struct { - Payload []byte - Raw []byte - Signature []byte - Salt []byte - Padding []byte - - Src *ecdsa.PublicKey - Dst *ecdsa.PublicKey -} - -// validateAndParse checks the message validity and extracts the fields in case of success. -func (msg *receivedMessage) validateAndParse() bool { - whisperRecvMsg := &whisper.ReceivedMessage{ - Raw: msg.Raw, - } - - success := whisperRecvMsg.ValidateAndParse() - if success { - msg.Signature = whisperRecvMsg.Signature - msg.Src = whisperRecvMsg.Src - msg.Payload = whisperRecvMsg.Payload - msg.Padding = whisperRecvMsg.Padding - } - return success -} - -type messageParams struct { - Src *ecdsa.PrivateKey - Dst *ecdsa.PublicKey - KeySym []byte - Topic Topic - Payload []byte - Padding []byte -} - -// == Conversion functions to/from whisper == - -func toReceivedMessage(whisperMsg *whisper.ReceivedMessage) *receivedMessage { - return &receivedMessage{ - Payload: whisperMsg.Payload, - Raw: whisperMsg.Raw, - Signature: whisperMsg.Signature, - Salt: whisperMsg.Salt, - Src: whisperMsg.Src, - Dst: whisperMsg.Dst, - } -} - -func toWhisperEnvelope(e *envelope) *whisper.Envelope { - whisperEnvelope := &whisper.Envelope{ - Expiry: e.Expiry, - TTL: defaultWhisperTTL, - Topic: whisper.TopicType(e.Topic), - Data: e.Data, - Nonce: 0, - } - return whisperEnvelope -} - -func toPssEnvelope(whisperEnvelope *whisper.Envelope) *envelope { - return &envelope{ - Topic: Topic(whisperEnvelope.Topic), - Data: whisperEnvelope.Data, - Expiry: whisperEnvelope.Expiry, - } -} - -func toWhisperParams(params *messageParams) *whisper.MessageParams { - whisperParams := &whisper.MessageParams{ - TTL: defaultWhisperTTL, - Src: params.Src, - Dst: params.Dst, - KeySym: params.KeySym, - Topic: whisper.TopicType(params.Topic), - WorkTime: defaultWhisperWorkTime, - PoW: defaultWhisperPoW, - Payload: params.Payload, - Padding: params.Padding, - } - return whisperParams -} diff --git a/pss/message/flags.go b/pss/message/flags.go new file mode 100644 index 0000000000..3d02cc1eb8 --- /dev/null +++ b/pss/message/flags.go @@ -0,0 +1,48 @@ +package message + +import ( + "errors" + "io" + + "github.com/ethereum/go-ethereum/rlp" +) + +// Flags represents the possible PSS message flags +type Flags struct { + Raw bool // message is flagged as raw or with external encryption + Symmetric bool // message is symmetrically encrypted +} + +const flagsLength = 1 +const flagSymmetric = 1 << 0 +const flagRaw = 1 << 1 + +// ErrIncorrectFlagsFieldLength is returned when the incoming flags field length is incorrect +var ErrIncorrectFlagsFieldLength = errors.New("Incorrect flags field length in message") + +// DecodeRLP implements the rlp.Decoder interface +func (f *Flags) DecodeRLP(s *rlp.Stream) error { + flagsBytes, err := s.Bytes() + if err != nil { + return err + } + if len(flagsBytes) != flagsLength { + return ErrIncorrectFlagsFieldLength + } + f.Symmetric = flagsBytes[0]&flagSymmetric != 0 + f.Raw = flagsBytes[0]&flagRaw != 0 + return nil +} + +// EncodeRLP implements the rlp.Encoder interface +func (f *Flags) EncodeRLP(w io.Writer) error { + var flags byte + if f.Raw { + flags |= flagRaw + } + if f.Symmetric { + flags |= flagSymmetric + } + + return rlp.Encode(w, []byte{flags}) +} diff --git a/pss/message/flags_test.go b/pss/message/flags_test.go new file mode 100644 index 0000000000..625f562ea2 --- /dev/null +++ b/pss/message/flags_test.go @@ -0,0 +1,61 @@ +package message_test + +import ( + "encoding/hex" + "fmt" + "reflect" + "testing" + + "github.com/ethereum/go-ethereum/rlp" + "github.com/ethersphere/swarm/pss/message" +) + +var bools = []bool{true, false} +var flagsFixture = map[string]string{ + "r=false; s=false": "00", + "r=false; s=true": "01", + "r=true; s=false": "02", + "r=true; s=true": "03", +} + +func TestFlags(t *testing.T) { + + for _, r := range bools { + for _, s := range bools { + f := message.Flags{ + Symmetric: s, + Raw: r, + } + // Test encoding: + bytes, err := rlp.EncodeToBytes(&f) + if err != nil { + t.Fatal(err) + } + expected := flagsFixture[fmt.Sprintf("r=%t; s=%t", r, s)] + actual := hex.EncodeToString(bytes) + if expected != actual { + t.Fatalf("Expected RLP encoding of the flags to be %s, got %s", expected, actual) + } + + // Test decoding: + + var f2 message.Flags + err = rlp.DecodeBytes(bytes, &f2) + if err != nil { + t.Fatal(err) + } + if !reflect.DeepEqual(f, f2) { + t.Fatalf("Expected RLP decoding to return the same object. Got %v", f2) + } + } + } + +} + +func TestFlagsErrors(t *testing.T) { + var f2 message.Flags + err := rlp.DecodeBytes([]byte{0x82, 0xFF, 0xFF}, &f2) + if err != message.ErrIncorrectFlagsFieldLength { + t.Fatalf("Expected an message.ErrIncorrectFlagsFieldLength error. Got %v", err) + } +} diff --git a/pss/message/message.go b/pss/message/message.go new file mode 100644 index 0000000000..dff44f0316 --- /dev/null +++ b/pss/message/message.go @@ -0,0 +1,46 @@ +package message + +import ( + "fmt" + + "github.com/ethereum/go-ethereum/common" + "golang.org/x/crypto/sha3" +) + +// Message encapsulates messages transported over pss. +type Message struct { + To []byte + Flags Flags + Expire uint32 + Topic Topic + Payload []byte +} + +const digestLength = 32 // byte length of digest used for pss cache (currently same as swarm chunk hash) + +// Digest holds the digest of a message used for caching +type Digest [digestLength]byte + +// New creates a new PSS message +func New(flags Flags) *Message { + return &Message{ + Flags: flags, + } +} + +// Digest computes a message digest for use as a cache key +func (msg *Message) Digest() Digest { + hasher := sha3.NewLegacyKeccak256() + hasher.Write(msg.To) + hasher.Write(msg.Topic[:]) + hasher.Write(msg.Payload) + key := hasher.Sum(nil) + d := Digest{} + copy(d[:], key[:digestLength]) + return d +} + +// String representation of a PSS message +func (msg *Message) String() string { + return fmt.Sprintf("PssMsg: Recipient: %s, Topic: %v", common.ToHex(msg.To), msg.Topic.String()) +} diff --git a/pss/message/message_test.go b/pss/message/message_test.go new file mode 100644 index 0000000000..3179299e3e --- /dev/null +++ b/pss/message/message_test.go @@ -0,0 +1,90 @@ +package message_test + +import ( + "encoding/hex" + "math/rand" + "reflect" + "testing" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/rlp" + "github.com/ethersphere/swarm/pss/message" +) + +type messageFixture struct { + digest string + rlp string + stringer string +} + +var messageFixtures = []messageFixture{{"4b34781cfa28a5ad653855567273675eabb8535461e57e4f4bfc81504d0a828d", "dd94fa12f92afbe00f8508d0e83bab9cf8cebf42e25e03808491273d4980", "PssMsg: Recipient: 0xfa12f92afbe00f8508d0e83bab9cf8cebf42e25e, Topic: 0x91273d49"}, + {"7f076bc036335b5d587d48c985d1b6ef8cd7015d6e484d0c7a72faddaa2aceaa", "e794210fc7bb818639ac48a4c6afa2f1581a8b9525e2000184ba78973d8aa84f7f80296fda3fd8df", "PssMsg: Recipient: 0x210fc7bb818639ac48a4c6afa2f1581a8b9525e2, Topic: 0xba78973d"}, + {"a3cb8298779bef44c33461f072c54391a39c09b7a726e55d60384d7484760559", "f194e2aadcd868ce028477f86e430140149b0300a9a5020284a6b46dd094f4b754a41bd4d5d11330e2924ff403c95bb84fa5", "PssMsg: Recipient: 0xe2aadcd868ce028477f86e430140149b0300a9a5, Topic: 0xa6b46dd0"}, + {"a82a894a753dffad41330dc1abbc85e5bc1791c393eba682eaf3cee56e6b0d9a", "f83b9460f9e0fa212bac5db82b22cee5272ee19a067256000384f013aa4b9e2fb3c9afcd593f3c5d3a96fecc1b7672562cc1b8828888269264bb976ed2", "PssMsg: Recipient: 0x60f9e0fa212bac5db82b22cee5272ee19a067256, Topic: 0xf013aa4b"}, + {"8ba6836253a10cf02e5031695ab39917e816b9677d53b4e4b2af5e439b05d362", "f845941dd4751f899d743d0780c9644375aae21132781803048426f57386a834dab59240ba3bcec68fd648a62ba94062413e5b5f89c0441b5809fff0a51dd1084e8f06fce30971", "PssMsg: Recipient: 0x1dd4751f899d743d0780c9644375aae211327818, Topic: 0x26f57386"}, +} + +func RandomArray(i, length int) []byte { + source := rand.NewSource(int64(i)) + r := rand.New(source) + b := make([]byte, length) + for n := 0; n < length; n++ { + b[n] = byte(r.Intn(256)) + } + return b +} +func TestMessage(t *testing.T) { + + // generate some test messages deterministically + for i, topicString := range someTopics { + flags := message.Flags{ + Raw: i&0x1 == 0, + Symmetric: i&0x3 == 0, + } + + msg := message.New(flags) + msg.To = RandomArray(i, common.AddressLength) + msg.Expire = uint32(i) + msg.Topic = message.NewTopic([]byte(topicString)) + msg.Payload = RandomArray(i*9361, i*10) + + // test digest function: + digest := msg.Digest() + + actual := hex.EncodeToString(digest[:]) + expected := messageFixtures[i].digest + if expected != actual { + t.Fatalf("Expected digest to be %s, got %s", expected, actual) + } + + // test stringer: + expected = messageFixtures[i].stringer + actual = msg.String() + if expected != actual { + t.Fatalf("Expected stringer to return %s, got %s", expected, actual) + } + + // Test RLP encoding: + bytes, err := rlp.EncodeToBytes(&msg) + if err != nil { + t.Fatal(err) + } + + expected = messageFixtures[i].rlp + actual = hex.EncodeToString(bytes) + if expected != actual { + t.Fatalf("Expected RLP serialization to return %s, got %s", expected, actual) + } + + // Test decoding: + var msg2 message.Message + err = rlp.DecodeBytes(bytes, &msg2) + if err != nil { + t.Fatal(err) + } + + if !reflect.DeepEqual(msg, &msg2) { + t.Fatalf("Expected RLP decoding return %v, got %v", msg, &msg2) + } + } +} diff --git a/pss/message/topic.go b/pss/message/topic.go new file mode 100644 index 0000000000..9e27046202 --- /dev/null +++ b/pss/message/topic.go @@ -0,0 +1,53 @@ +package message + +import ( + "encoding/json" + + "github.com/ethereum/go-ethereum/common/hexutil" + "github.com/ethersphere/swarm/storage" +) + +// TopicLength sets the length of the message topic +const TopicLength = 4 + +// Topic is the PSS encapsulation of the Whisper topic type +type Topic [TopicLength]byte + +// NewTopic hashes an arbitrary length byte slice and truncates it to the length of a topic, using only the first bytes of the digest +func NewTopic(b []byte) Topic { + topicHashFunc := storage.MakeHashFunc("SHA256")() + topicHashFunc.Write(b) + return toTopic(topicHashFunc.Sum(nil)) +} + +// toTopic converts from the byte array representation of a topic +// into the Topic type. +func toTopic(b []byte) (t Topic) { + sz := TopicLength + if x := len(b); x < TopicLength { + sz = x + } + for i := 0; i < sz; i++ { + t[i] = b[i] + } + return t +} + +func (t *Topic) String() string { + return hexutil.Encode(t[:]) +} + +// MarshalJSON implements the json.Marshaler interface +func (t Topic) MarshalJSON() (b []byte, err error) { + return json.Marshal(t.String()) +} + +// UnmarshalJSON implements the json.Marshaler interface +func (t *Topic) UnmarshalJSON(input []byte) error { + topicbytes, err := hexutil.Decode(string(input[1 : len(input)-1])) + if err != nil { + return err + } + copy(t[:], topicbytes) + return nil +} diff --git a/pss/message/topic_test.go b/pss/message/topic_test.go new file mode 100644 index 0000000000..75231a1e46 --- /dev/null +++ b/pss/message/topic_test.go @@ -0,0 +1,45 @@ +package message_test + +import ( + "encoding/json" + "reflect" + "testing" + + "github.com/ethersphere/swarm/pss/message" +) + +// test that topic conversion functions give predictable results +var someTopics = []string{"These", "are", "some", "topics", "A topic can be very long as well, longer than TopicLength"} +var fixtureTopicHash = []string{`"0x91273d49"`, `"0xba78973d"`, `"0xa6b46dd0"`, `"0xf013aa4b"`, `"0x26f57386"`} +var fixtureTopicStringer = []string{"0x91273d49", "0xba78973d", "0xa6b46dd0", "0xf013aa4b", "0x26f57386"} + +func TestTopic(t *testing.T) { + + for i, topicString := range someTopics { + topic := message.NewTopic([]byte(topicString)) + + // Test marshalling and unmarshalling the topic as JSON: + jsonBytes, err := json.Marshal(topic) + if err != nil { + t.Fatal(err) + } + expected := fixtureTopicHash[i] + actual := string(jsonBytes) + if expected != actual { + t.Fatalf("Expected JSON serialization to return %s, got %s", expected, actual) + } + + var topic2 message.Topic + err = json.Unmarshal(jsonBytes, &topic2) + if !reflect.DeepEqual(topic, topic2) { + t.Fatalf("Expected JSON decoding to return the same object, got %v", topic2) + } + + // test Stringer: + expected = fixtureTopicStringer[i] + actual = topic.String() + if expected != actual { + t.Fatalf("Expected topic stringer to return %s, got %s", expected, actual) + } + } +} diff --git a/pss/notify/notify.go b/pss/notify/notify.go index fa97fb2d24..1fbb51fe42 100644 --- a/pss/notify/notify.go +++ b/pss/notify/notify.go @@ -10,6 +10,7 @@ import ( "github.com/ethereum/go-ethereum/rlp" "github.com/ethersphere/swarm/log" "github.com/ethersphere/swarm/pss" + "github.com/ethersphere/swarm/pss/message" ) const ( @@ -34,7 +35,7 @@ const ( var ( // control topic is used before symmetric key issuance completes - controlTopic = pss.Topic{0x00, 0x00, 0x00, 0x01} + controlTopic = message.Topic{0x00, 0x00, 0x00, 0x01} ) // when code is MsgCodeStart, Payload is address @@ -80,8 +81,8 @@ type sendBin struct { // only subscription address bins that match the address of a notification client have entries. type notifier struct { bins map[string]*sendBin - topic pss.Topic // identifies the resource for pss receiver - threshold int // amount of address bytes used in bins + topic message.Topic // identifies the resource for pss receiver + threshold int // amount of address bytes used in bins updateC <-chan []byte quitC chan struct{} } @@ -138,7 +139,7 @@ func (c *Controller) Subscribe(name string, pubkey *ecdsa.PublicKey, address pss defer c.mu.Unlock() msg := NewMsg(MsgCodeStart, name, c.pss.BaseAddr()) c.pss.SetPeerPublicKey(pubkey, controlTopic, address) - pubkeyId := hexutil.Encode(c.pss.Crypto.FromECDSAPub(pubkey)) + pubkeyId := hexutil.Encode(c.pss.Crypto.SerializePublicKey(pubkey)) smsg, err := rlp.EncodeToBytes(msg) if err != nil { return err @@ -191,7 +192,7 @@ func (c *Controller) NewNotifier(name string, threshold int, updateC <-chan []by quitC := make(chan struct{}) c.notifiers[name] = ¬ifier{ bins: make(map[string]*sendBin), - topic: pss.BytesToTopic([]byte(name)), + topic: message.NewTopic([]byte(name)), threshold: threshold, updateC: updateC, quitC: quitC, @@ -289,7 +290,7 @@ func (c *Controller) handleStartMsg(msg *Msg, keyid string) (err error) { if err != nil { return err } - pubkey, err := c.pss.Crypto.UnmarshalPubkey(keyidbytes) + pubkey, err := c.pss.Crypto.UnmarshalPublicKey(keyidbytes) if err != nil { return err } @@ -330,7 +331,7 @@ func (c *Controller) handleStartMsg(msg *Msg, keyid string) (err error) { func (c *Controller) handleNotifyWithKeyMsg(msg *Msg) error { symkey := msg.Payload[len(msg.Payload)-symKeyLength:] - topic := pss.BytesToTopic(msg.Name) + topic := message.NewTopic(msg.Name) // \TODO keep track of and add actual address updaterAddr := pss.PssAddress([]byte{}) diff --git a/pss/notify/notify_test.go b/pss/notify/notify_test.go index 30bdd43bc6..25b7f4113a 100644 --- a/pss/notify/notify_test.go +++ b/pss/notify/notify_test.go @@ -8,7 +8,7 @@ import ( "time" "github.com/ethereum/go-ethereum/common/hexutil" - + ethCrypto "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/node" "github.com/ethereum/go-ethereum/p2p/enode" @@ -16,21 +16,21 @@ import ( "github.com/ethereum/go-ethereum/p2p/simulations/adapters" "github.com/ethersphere/swarm/network" "github.com/ethersphere/swarm/pss" + "github.com/ethersphere/swarm/pss/crypto" + "github.com/ethersphere/swarm/pss/message" "github.com/ethersphere/swarm/state" "github.com/ethersphere/swarm/testutil" ) var ( - psses map[string]*pss.Pss - cryptoUtils pss.CryptoUtils - crypto pss.CryptoBackend + psses map[string]*pss.Pss + crypt crypto.Crypto ) func init() { testutil.Init() - cryptoUtils = pss.NewCryptoUtils() - crypto = pss.NewCryptoBackend() + crypt = crypto.New() psses = make(map[string]*pss.Pss) } @@ -107,7 +107,7 @@ func TestStart(t *testing.T) { } rsrcName := "foo.eth" - rsrcTopic := pss.BytesToTopic([]byte(rsrcName)) + rsrcTopic := message.NewTopic([]byte(rsrcName)) // wait for kademlia table to populate time.Sleep(time.Second) @@ -131,7 +131,7 @@ func TestStart(t *testing.T) { if err != nil { t.Fatal(err) } - pubkey, err := crypto.UnmarshalPubkey(pubkeybytes) + pubkey, err := crypt.UnmarshalPublicKey(pubkeybytes) if err != nil { t.Fatal(err) } @@ -221,13 +221,7 @@ func newServices(allowRaw bool) adapters.Services { } return adapters.Services{ "pss": func(ctx *adapters.ServiceContext) (node.Service, error) { - ctxlocal, cancel := context.WithTimeout(context.Background(), time.Second) - defer cancel() - keys, err := cryptoUtils.NewKeyPair(ctxlocal) - if err != nil { - return nil, err - } - privkey, err := cryptoUtils.GetPrivateKey(keys) + privkey, err := ethCrypto.GenerateKey() if err != nil { return nil, err } @@ -239,7 +233,7 @@ func newServices(allowRaw bool) adapters.Services { if err != nil { return nil, err } - psses[hexutil.Encode(crypto.FromECDSAPub(&privkey.PublicKey))] = ps + psses[hexutil.Encode(crypt.SerializePublicKey(&privkey.PublicKey))] = ps return ps, nil }, "bzz": func(ctx *adapters.ServiceContext) (node.Service, error) { diff --git a/pss/protocol.go b/pss/protocol.go index c1483dc181..aed4d7a5a3 100644 --- a/pss/protocol.go +++ b/pss/protocol.go @@ -28,6 +28,7 @@ import ( "github.com/ethereum/go-ethereum/rlp" "github.com/ethersphere/swarm/log" "github.com/ethersphere/swarm/p2p/protocols" + "github.com/ethersphere/swarm/pss/message" ) const ( @@ -76,8 +77,8 @@ type PssReadWriter struct { LastActive time.Time rw chan p2p.Msg spec *protocols.Spec - topic *Topic - sendFunc func(string, Topic, []byte) error + topic *message.Topic + sendFunc func(string, message.Topic, []byte) error key string closed bool } @@ -119,7 +120,7 @@ func (prw *PssReadWriter) injectMsg(msg p2p.Msg) error { type Protocol struct { *Pss proto *p2p.Protocol - topic *Topic + topic *message.Topic spec *protocols.Spec pubKeyRWPool map[string]p2p.MsgReadWriter symKeyRWPool map[string]p2p.MsgReadWriter @@ -134,7 +135,7 @@ type Protocol struct { // only one is specified, the protocol will not be valid // for the other, and will make the message handler // return errors -func RegisterProtocol(ps *Pss, topic *Topic, spec *protocols.Spec, targetprotocol *p2p.Protocol, options *ProtocolParams) (*Protocol, error) { +func RegisterProtocol(ps *Pss, topic *message.Topic, spec *protocols.Spec, targetprotocol *p2p.Protocol, options *ProtocolParams) (*Protocol, error) { if !options.Asymmetric && !options.Symmetric { return nil, fmt.Errorf("specify at least one of asymmetric or symmetric messaging mode") } @@ -198,12 +199,12 @@ func (p *Protocol) Handle(msg []byte, peer *p2p.Peer, asymmetric bool, keyid str } // check if (peer) symmetric key is currently registered with this topic -func (p *Protocol) isActiveSymKey(key string, topic Topic) bool { +func (p *Protocol) isActiveSymKey(key string, topic message.Topic) bool { return p.symKeyRWPool[key] != nil } // check if (peer) asymmetric key is currently registered with this topic -func (p *Protocol) isActiveAsymKey(key string, topic Topic) bool { +func (p *Protocol) isActiveAsymKey(key string, topic message.Topic) bool { return p.pubKeyRWPool[key] != nil } @@ -227,7 +228,7 @@ func ToP2pMsg(msg []byte) (p2p.Msg, error) { // `key` and `asymmetric` specifies what encryption key // to link the peer to. // The key must exist in the pss store prior to adding the peer. -func (p *Protocol) AddPeer(peer *p2p.Peer, topic Topic, asymmetric bool, key string) (p2p.MsgReadWriter, error) { +func (p *Protocol) AddPeer(peer *p2p.Peer, topic message.Topic, asymmetric bool, key string) (p2p.MsgReadWriter, error) { rw := &PssReadWriter{ Pss: p.Pss, rw: make(chan p2p.Msg), @@ -278,6 +279,6 @@ func (p *Protocol) RemovePeer(asymmetric bool, key string) { } // Uniform translation of protocol specifiers to topic -func ProtocolTopic(spec *protocols.Spec) Topic { - return BytesToTopic([]byte(fmt.Sprintf("%s:%d", spec.Name, spec.Version))) +func ProtocolTopic(spec *protocols.Spec) message.Topic { + return message.NewTopic([]byte(fmt.Sprintf("%s:%d", spec.Name, spec.Version))) } diff --git a/pss/prox_test.go b/pss/prox_test.go index 65482976fb..1d49f2df1e 100644 --- a/pss/prox_test.go +++ b/pss/prox_test.go @@ -11,6 +11,7 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common/hexutil" + ethCrypto "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/node" "github.com/ethereum/go-ethereum/p2p" @@ -20,6 +21,7 @@ import ( "github.com/ethersphere/swarm/network" "github.com/ethersphere/swarm/network/simulation" "github.com/ethersphere/swarm/pot" + "github.com/ethersphere/swarm/pss/message" "github.com/ethersphere/swarm/state" "github.com/ethersphere/swarm/testutil" ) @@ -55,7 +57,7 @@ type testData struct { var ( pof = pot.DefaultPof(256) // generate messages and index them - topic = BytesToTopic([]byte{0xf3, 0x9e, 0x06, 0x82}) + topic = message.NewTopic([]byte{0xf3, 0x9e, 0x06, 0x82}) ) func (td *testData) pushNotification(val handlerNotification) { @@ -234,7 +236,7 @@ func TestProxNetworkLong(t *testing.T) { func testProxNetwork(t *testing.T, nodeCount int, msgCount int, timeout time.Duration) { td := newTestData() - handlerContextFuncs := make(map[Topic]handlerContextFunc) + handlerContextFuncs := make(map[message.Topic]handlerContextFunc) handlerContextFuncs[topic] = nodeMsgHandler services := newProxServices(td, true, handlerContextFuncs, td.kademlias) td.sim = simulation.NewInProc(services) @@ -380,7 +382,7 @@ func nodeMsgHandler(td *testData, config *adapters.NodeConfig) *handler { // an adaptation of the same services setup as in pss_test.go // replaces pss_test.go when those tests are rewritten to the new swarm/network/simulation package -func newProxServices(td *testData, allowRaw bool, handlerContextFuncs map[Topic]handlerContextFunc, kademlias map[enode.ID]*network.Kademlia) map[string]simulation.ServiceFunc { +func newProxServices(td *testData, allowRaw bool, handlerContextFuncs map[message.Topic]handlerContextFunc, kademlias map[enode.ID]*network.Kademlia) map[string]simulation.ServiceFunc { stateStore := state.NewInmemoryStore() kademlia := func(id enode.ID, bzzkey []byte) *network.Kademlia { if k, ok := kademlias[id]; ok { @@ -426,11 +428,8 @@ func newProxServices(td *testData, allowRaw bool, handlerContextFuncs map[Topic] // execadapter does not exec init() initTest() - // create keys in cryptoUtils and set up the pss object - ctxlocal, cancel := context.WithTimeout(context.Background(), time.Second*3) - defer cancel() - keys, err := cryptoUtils.NewKeyPair(ctxlocal) - privkey, err := cryptoUtils.GetPrivateKey(keys) + // create keys and set up the pss object + privkey, err := ethCrypto.GenerateKey() pssp := NewParams().WithPrivateKey(privkey) pssp.AllowRaw = allowRaw bzzPrivateKey, err := simulation.BzzPrivateKeyFromConfig(ctx.Config) diff --git a/pss/pss.go b/pss/pss.go index d8177216e7..ed8287d8a0 100644 --- a/pss/pss.go +++ b/pss/pss.go @@ -20,10 +20,9 @@ import ( "bytes" "context" "crypto/ecdsa" - "crypto/rand" + "encoding/hex" "errors" "fmt" - "hash" "sync" "time" @@ -36,18 +35,16 @@ import ( "github.com/ethersphere/swarm/network" "github.com/ethersphere/swarm/p2p/protocols" "github.com/ethersphere/swarm/pot" + "github.com/ethersphere/swarm/pss/crypto" + "github.com/ethersphere/swarm/pss/message" "github.com/ethersphere/swarm/storage" "golang.org/x/crypto/sha3" ) const ( - defaultPaddingByteSize = 16 defaultMsgTTL = time.Second * 120 defaultDigestCacheTTL = time.Second * 10 defaultSymKeyCacheCapacity = 512 - digestLength = 32 // byte length of digest used for pss cache (currently same as swarm chunk hash) - defaultWhisperWorkTime = 3 - defaultWhisperPoW = 0.0000000001 defaultMaxMsgSize = 1024 * 1024 defaultCleanInterval = time.Second * 60 * 10 defaultOutboxCapacity = 100000 @@ -65,7 +62,7 @@ var spec = &protocols.Spec{ Version: protocolVersion, MaxMsgSize: defaultMaxMsgSize, Messages: []interface{}{ - PssMsg{}, + message.Message{}, }, } @@ -120,10 +117,10 @@ type outbox struct { slots chan int process chan int quitC chan struct{} - forward func(msg *PssMsg) error + forward func(msg *message.Message) error } -func newOutbox(capacity int, quitC chan struct{}, forward func(msg *PssMsg) error) outbox { +func newOutbox(capacity int, quitC chan struct{}, forward func(msg *message.Message) error) outbox { outbox := outbox{ queue: make([]*outboxMsg, capacity), slots: make(chan int, capacity), @@ -212,19 +209,18 @@ type Pss struct { peers map[string]*protocols.Peer // keep track of all peers sitting on the pssmsg routing layer peersMu sync.RWMutex - fwdCache map[digest]cacheEntry // checksum of unique fields from pssmsg mapped to expiry, cache to determine whether to drop msg - fwdCacheMu sync.RWMutex - cacheTTL time.Duration // how long to keep messages in fwdCache (not implemented) - msgTTL time.Duration - paddingByteSize int - capstring string - outbox outbox + fwdCache map[message.Digest]cacheEntry // checksum of unique fields from pssmsg mapped to expiry, cache to determine whether to drop msg + fwdCacheMu sync.RWMutex + cacheTTL time.Duration // how long to keep messages in fwdCache (not implemented) + msgTTL time.Duration + capstring string + outbox outbox // message handling - handlers map[Topic]map[*handler]bool // topic and version based pss payload handlers. See pss.Handle() + handlers map[message.Topic]map[*handler]bool // topic and version based pss payload handlers. See pss.Handle() handlersMu sync.RWMutex hashPool sync.Pool - topicHandlerCaps map[Topic]*handlerCaps // caches capabilities of each topic's handlers + topicHandlerCaps map[message.Topic]*handlerCaps // caches capabilities of each topic's handlers topicHandlerCapsMu sync.RWMutex // process @@ -232,7 +228,7 @@ type Pss struct { } func (p *Pss) String() string { - return fmt.Sprintf("pss: addr %x, pubkey %v", p.BaseAddr(), common.ToHex(p.Crypto.FromECDSAPub(&p.privateKey.PublicKey))) + return fmt.Sprintf("pss: addr %x, pubkey %v", p.BaseAddr(), hex.EncodeToString(p.Crypto.SerializePublicKey(&p.privateKey.PublicKey))) } // Creates a new Pss instance. @@ -254,15 +250,14 @@ func New(k *network.Kademlia, params *Params) (*Pss, error) { privateKey: params.privateKey, quitC: make(chan struct{}), - peers: make(map[string]*protocols.Peer), - fwdCache: make(map[digest]cacheEntry), - cacheTTL: params.CacheTTL, - msgTTL: params.MsgTTL, - paddingByteSize: defaultPaddingByteSize, - capstring: c.String(), + peers: make(map[string]*protocols.Peer), + fwdCache: make(map[message.Digest]cacheEntry), + cacheTTL: params.CacheTTL, + msgTTL: params.MsgTTL, + capstring: c.String(), - handlers: make(map[Topic]map[*handler]bool), - topicHandlerCaps: make(map[Topic]*handlerCaps), + handlers: make(map[message.Topic]map[*handler]bool), + topicHandlerCaps: make(map[message.Topic]*handlerCaps), hashPool: sync.Pool{ New: func() interface{} { @@ -306,7 +301,7 @@ func (p *Pss) Start(srv *p2p.Server) error { go p.outbox.processOutbox() log.Info("Started Pss") - log.Info("Loaded EC keys", "pubkey", common.ToHex(p.Crypto.FromECDSAPub(p.PublicKey())), "secp256", common.ToHex(p.Crypto.CompressPubkey(p.PublicKey()))) + log.Info("Loaded EC keys", "pubkey", hex.EncodeToString(p.Crypto.SerializePublicKey(p.PublicKey())), "secp256", hex.EncodeToString(p.Crypto.CompressPublicKey(p.PublicKey()))) return nil } @@ -387,14 +382,14 @@ func (p *Pss) PublicKey() *ecdsa.PublicKey { // SECTION: Message handling ///////////////////////////////////////////////////////////////////// -func (p *Pss) getTopicHandlerCaps(topic Topic) (hc *handlerCaps, found bool) { +func (p *Pss) getTopicHandlerCaps(topic message.Topic) (hc *handlerCaps, found bool) { p.topicHandlerCapsMu.RLock() defer p.topicHandlerCapsMu.RUnlock() hc, found = p.topicHandlerCaps[topic] return } -func (p *Pss) setTopicHandlerCaps(topic Topic, hc *handlerCaps) { +func (p *Pss) setTopicHandlerCaps(topic message.Topic, hc *handlerCaps) { p.topicHandlerCapsMu.Lock() defer p.topicHandlerCapsMu.Unlock() p.topicHandlerCaps[topic] = hc @@ -409,7 +404,7 @@ func (p *Pss) setTopicHandlerCaps(topic Topic, hc *handlerCaps) { // // Returns a deregister function which needs to be called to // deregister the handler, -func (p *Pss) Register(topic *Topic, hndlr *handler) func() { +func (p *Pss) Register(topic *message.Topic, hndlr *handler) func() { p.handlersMu.Lock() defer p.handlersMu.Unlock() handlers := p.handlers[*topic] @@ -438,7 +433,7 @@ func (p *Pss) Register(topic *Topic, hndlr *handler) func() { return func() { p.deregister(topic, hndlr) } } -func (p *Pss) deregister(topic *Topic, hndlr *handler) { +func (p *Pss) deregister(topic *message.Topic, hndlr *handler) { p.handlersMu.Lock() defer p.handlersMu.Unlock() handlers := p.handlers[*topic] @@ -467,27 +462,27 @@ func (p *Pss) deregister(topic *Topic, hndlr *handler) { func (p *Pss) handle(ctx context.Context, msg interface{}) error { defer metrics.GetOrRegisterResettingTimer("pss.handle", nil).UpdateSince(time.Now()) - pssmsg, ok := msg.(*PssMsg) + pssmsg, ok := msg.(*message.Message) if !ok { - return fmt.Errorf("invalid message type. Expected *PssMsg, got %T", msg) + return fmt.Errorf("invalid message type. Expected *message.Message, got %T", msg) } - log.Trace("handler", "self", label(p.Kademlia.BaseAddr()), "topic", label(pssmsg.Payload.Topic[:])) + log.Trace("handler", "self", label(p.Kademlia.BaseAddr()), "topic", label(pssmsg.Topic[:])) if int64(pssmsg.Expire) < time.Now().Unix() { metrics.GetOrRegisterCounter("pss.expire", nil).Inc(1) - log.Warn("pss filtered expired message", "from", common.ToHex(p.Kademlia.BaseAddr()), "to", common.ToHex(pssmsg.To)) + log.Warn("pss filtered expired message", "from", hex.EncodeToString(p.Kademlia.BaseAddr()), "to", hex.EncodeToString(pssmsg.To)) return nil } if p.checkFwdCache(pssmsg) { - log.Trace("pss relay block-cache match (process)", "from", common.ToHex(p.Kademlia.BaseAddr()), "to", (common.ToHex(pssmsg.To))) + log.Trace("pss relay block-cache match (process)", "from", hex.EncodeToString(p.Kademlia.BaseAddr()), "to", (hex.EncodeToString(pssmsg.To))) return nil } p.addFwdCache(pssmsg) - psstopic := pssmsg.Payload.Topic + psstopic := pssmsg.Topic // raw is simplest handler contingency to check, so check that first var isRaw bool - if pssmsg.isRaw() { + if pssmsg.Flags.Raw { if capabilities, ok := p.getTopicHandlerCaps(psstopic); ok { if !capabilities.raw { log.Debug("No handler for raw message", "topic", psstopic) @@ -507,11 +502,11 @@ func (p *Pss) handle(ctx context.Context, msg interface{}) error { } isRecipient := p.isSelfPossibleRecipient(pssmsg, isProx) if !isRecipient { - log.Trace("pss msg forwarding ===>", "pss", common.ToHex(p.BaseAddr()), "prox", isProx) + log.Trace("pss msg forwarding ===>", "pss", hex.EncodeToString(p.BaseAddr()), "prox", isProx) return p.enqueue(pssmsg) } - log.Trace("pss msg processing <===", "pss", common.ToHex(p.BaseAddr()), "prox", isProx, "raw", isRaw, "topic", label(pssmsg.Payload.Topic[:])) + log.Trace("pss msg processing <===", "pss", hex.EncodeToString(p.BaseAddr()), "prox", isProx, "raw", isRaw, "topic", label(pssmsg.Topic[:])) if err := p.process(pssmsg, isRaw, isProx); err != nil { qerr := p.enqueue(pssmsg) if qerr != nil { @@ -524,35 +519,32 @@ func (p *Pss) handle(ctx context.Context, msg interface{}) error { // Entry point to processing a message for which the current node can be the intended recipient. // Attempts symmetric and asymmetric decryption with stored keys. // Dispatches message to all handlers matching the message topic -func (p *Pss) process(pssmsg *PssMsg, raw bool, prox bool) error { +func (p *Pss) process(pssmsg *message.Message, raw bool, prox bool) error { defer metrics.GetOrRegisterResettingTimer("pss.process", nil).UpdateSince(time.Now()) var err error - var recvmsg *receivedMessage var payload []byte var from PssAddress var asymmetric bool var keyid string - var keyFunc func(envelope *envelope) (*receivedMessage, string, PssAddress, error) + var keyFunc func(pssMsg *message.Message) ([]byte, string, PssAddress, error) - envelope := pssmsg.Payload - psstopic := envelope.Topic + psstopic := pssmsg.Topic if raw { - payload = pssmsg.Payload.Data + payload = pssmsg.Payload } else { - if pssmsg.isSym() { + if pssmsg.Flags.Symmetric { keyFunc = p.processSym } else { asymmetric = true keyFunc = p.processAsym } - recvmsg, keyid, from, err = keyFunc(envelope) + payload, keyid, from, err = keyFunc(pssmsg) if err != nil { - return errors.New("Decryption failed") + return errors.New("decryption failed") } - payload = recvmsg.Payload } if len(pssmsg.To) < addressLength || prox { @@ -563,7 +555,7 @@ func (p *Pss) process(pssmsg *PssMsg, raw bool, prox bool) error { } // copy all registered handlers for respective topic in order to avoid data race or deadlock -func (p *Pss) getHandlers(topic Topic) (ret []*handler) { +func (p *Pss) getHandlers(topic message.Topic) (ret []*handler) { p.handlersMu.RLock() defer p.handlersMu.RUnlock() for k := range p.handlers[topic] { @@ -572,7 +564,7 @@ func (p *Pss) getHandlers(topic Topic) (ret []*handler) { return ret } -func (p *Pss) executeHandlers(topic Topic, payload []byte, from PssAddress, raw bool, prox bool, asymmetric bool, keyid string) { +func (p *Pss) executeHandlers(topic message.Topic, payload []byte, from PssAddress, raw bool, prox bool, asymmetric bool, keyid string) { defer metrics.GetOrRegisterResettingTimer("pss.execute-handlers", nil).UpdateSince(time.Now()) handlers := p.getHandlers(topic) @@ -594,12 +586,12 @@ func (p *Pss) executeHandlers(topic Topic, payload []byte, from PssAddress, raw } // will return false if using partial address -func (p *Pss) isSelfRecipient(msg *PssMsg) bool { +func (p *Pss) isSelfRecipient(msg *message.Message) bool { return bytes.Equal(msg.To, p.Kademlia.BaseAddr()) } // test match of leftmost bytes in given message to node's Kademlia address -func (p *Pss) isSelfPossibleRecipient(msg *PssMsg, prox bool) bool { +func (p *Pss) isSelfPossibleRecipient(msg *message.Message, prox bool) bool { local := p.Kademlia.BaseAddr() // if a partial address matches we are possible recipient regardless of prox @@ -622,7 +614,7 @@ func (p *Pss) isSelfPossibleRecipient(msg *PssMsg, prox bool) bool { // SECTION: Message sending ///////////////////////////////////////////////////////////////////// -func (p *Pss) enqueue(msg *PssMsg) error { +func (p *Pss) enqueue(msg *message.Message) error { defer metrics.GetOrRegisterResettingTimer("pss.enqueue", nil).UpdateSince(time.Now()) outboxmsg := newOutboxMsg(msg) @@ -633,25 +625,22 @@ func (p *Pss) enqueue(msg *PssMsg) error { // Send a raw message (any encryption is responsibility of calling client) // // Will fail if raw messages are disallowed -func (p *Pss) SendRaw(address PssAddress, topic Topic, msg []byte) error { +func (p *Pss) SendRaw(address PssAddress, topic message.Topic, msg []byte) error { defer metrics.GetOrRegisterResettingTimer("pss.send.raw", nil).UpdateSince(time.Now()) if err := validateAddress(address); err != nil { return err } - pssMsgParams := &msgParams{ - raw: true, - } - payload := &envelope{ - Data: msg, - Topic: topic, + pssMsgParams := message.Flags{ + Raw: true, } - pssMsg := newPssMsg(pssMsgParams) + pssMsg := message.New(pssMsgParams) pssMsg.To = address pssMsg.Expire = uint32(time.Now().Add(p.msgTTL).Unix()) - pssMsg.Payload = payload + pssMsg.Payload = msg + pssMsg.Topic = topic p.addFwdCache(pssMsg) @@ -661,7 +650,7 @@ func (p *Pss) SendRaw(address PssAddress, topic Topic, msg []byte) error { // Send a message using symmetric encryption // // Fails if the key id does not match any of the stored symmetric keys -func (p *Pss) SendSym(symkeyid string, topic Topic, msg []byte) error { +func (p *Pss) SendSym(symkeyid string, topic message.Topic, msg []byte) error { symkey, err := p.GetSymmetricKey(symkeyid) if err != nil { return fmt.Errorf("missing valid send symkey %s: %v", symkeyid, err) @@ -676,8 +665,8 @@ func (p *Pss) SendSym(symkeyid string, topic Topic, msg []byte) error { // Send a message using asymmetric encryption // // Fails if the key id does not match any in of the stored public keys -func (p *Pss) SendAsym(pubkeyid string, topic Topic, msg []byte) error { - if _, err := p.Crypto.UnmarshalPubkey(common.FromHex(pubkeyid)); err != nil { +func (p *Pss) SendAsym(pubkeyid string, topic message.Topic, msg []byte) error { + if _, err := p.Crypto.UnmarshalPublicKey(common.FromHex(pubkeyid)); err != nil { return fmt.Errorf("Cannot unmarshal pubkey: %x", pubkeyid) } psp, ok := p.getPeerPub(pubkeyid, topic) @@ -691,49 +680,40 @@ func (p *Pss) SendAsym(pubkeyid string, topic Topic, msg []byte) error { // It generates an envelope for the specified recipient and topic, // and wraps the message payload in it. // TODO: Implement proper message padding -func (p *Pss) send(to []byte, topic Topic, msg []byte, asymmetric bool, key []byte) error { +func (p *Pss) send(to []byte, topic message.Topic, msg []byte, asymmetric bool, key []byte) error { metrics.GetOrRegisterCounter("pss.send", nil).Inc(1) if key == nil || bytes.Equal(key, []byte{}) { return fmt.Errorf("Zero length key passed to pss send") } - padding := make([]byte, p.paddingByteSize) - c, err := rand.Read(padding) - if err != nil { - return err - } else if c < p.paddingByteSize { - return fmt.Errorf("invalid padding length: %d", c) - } - mparams := &messageParams{ - Src: p.privateKey, - Topic: topic, - Payload: msg, - Padding: padding, + wrapParams := &crypto.WrapParams{ + Sender: p.privateKey, } if asymmetric { - pk, err := p.Crypto.UnmarshalPubkey(key) + pk, err := p.Crypto.UnmarshalPublicKey(key) if err != nil { return fmt.Errorf("Cannot unmarshal pubkey: %x", key) } - mparams.Dst = pk + wrapParams.Receiver = pk } else { - mparams.KeySym = key + wrapParams.SymmetricKey = key } // set up outgoing message container, which does encryption and envelope wrapping - envelope, err := newSentEnvelope(mparams) + envelope, err := p.Crypto.Wrap(msg, wrapParams) if err != nil { return fmt.Errorf("failed to perform message encapsulation and encryption: %v", err) } - log.Trace("pssmsg wrap done", "env", envelope, "mparams payload", common.ToHex(mparams.Payload), "to", common.ToHex(to), "asym", asymmetric, "key", common.ToHex(key)) + log.Trace("pssmsg wrap done", "env", envelope, "mparams payload", hex.EncodeToString(msg), "to", hex.EncodeToString(to), "asym", asymmetric, "key", hex.EncodeToString(key)) // prepare for devp2p transport - pssMsgParams := &msgParams{ - sym: !asymmetric, + pssMsgParams := message.Flags{ + Symmetric: !asymmetric, } - pssMsg := newPssMsg(pssMsgParams) + pssMsg := message.New(pssMsgParams) pssMsg.To = to pssMsg.Expire = uint32(time.Now().Add(p.msgTTL).Unix()) pssMsg.Payload = envelope + pssMsg.Topic = topic return p.enqueue(pssMsg) } @@ -743,7 +723,7 @@ func (p *Pss) send(to []byte, topic Topic, msg []byte, asymmetric bool, key []by var sendFunc = sendMsg // tries to send a message, returns true if successful -func sendMsg(p *Pss, sp *network.Peer, msg *PssMsg) bool { +func sendMsg(p *Pss, sp *network.Peer, msg *message.Message) bool { var isPssEnabled bool info := sp.Info() for _, capability := range info.Caps { @@ -783,7 +763,7 @@ func sendMsg(p *Pss, sp *network.Peer, msg *PssMsg) bool { // are any; otherwise only to one peer, closest to the recipient address. In any case, if the message //// forwarding fails, the node should try to forward it to the next best peer, until the message is //// successfully forwarded to at least one peer. -func (p *Pss) forward(msg *PssMsg) error { +func (p *Pss) forward(msg *message.Message) error { metrics.GetOrRegisterCounter("pss.forward", nil).Inc(1) sent := 0 // number of successful sends to := make([]byte, addressLength) @@ -861,7 +841,7 @@ func label(b []byte) string { } // add a message to the cache -func (p *Pss) addFwdCache(msg *PssMsg) error { +func (p *Pss) addFwdCache(msg *message.Message) error { defer metrics.GetOrRegisterResettingTimer("pss.addfwdcache", nil).UpdateSince(time.Now()) var entry cacheEntry @@ -870,7 +850,7 @@ func (p *Pss) addFwdCache(msg *PssMsg) error { p.fwdCacheMu.Lock() defer p.fwdCacheMu.Unlock() - digest := p.msgDigest(msg) + digest := msg.Digest() if entry, ok = p.fwdCache[digest]; !ok { entry = cacheEntry{} } @@ -880,11 +860,11 @@ func (p *Pss) addFwdCache(msg *PssMsg) error { } // check if message is in the cache -func (p *Pss) checkFwdCache(msg *PssMsg) bool { +func (p *Pss) checkFwdCache(msg *message.Message) bool { p.fwdCacheMu.Lock() defer p.fwdCacheMu.Unlock() - digest := p.msgDigest(msg) + digest := msg.Digest() entry, ok := p.fwdCache[digest] if ok { if entry.expiresAt.After(time.Now()) { @@ -897,22 +877,6 @@ func (p *Pss) checkFwdCache(msg *PssMsg) bool { return false } -// Digest of message -func (p *Pss) msgDigest(msg *PssMsg) digest { - return p.digestBytes(msg.serialize()) -} - -func (p *Pss) digestBytes(msg []byte) digest { - hasher := p.hashPool.Get().(hash.Hash) - defer p.hashPool.Put(hasher) - hasher.Reset() - hasher.Write(msg) - d := digest{} - key := hasher.Sum(nil) - copy(d[:], key[:digestLength]) - return d -} - func validateAddress(addr PssAddress) error { if len(addr) > addressLength { return errors.New("address too long") diff --git a/pss/pss_test.go b/pss/pss_test.go index 2ca5e997b3..c73428404d 100644 --- a/pss/pss_test.go +++ b/pss/pss_test.go @@ -33,6 +33,7 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common/hexutil" + ethCrypto "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/node" "github.com/ethereum/go-ethereum/p2p" @@ -43,13 +44,14 @@ import ( "github.com/ethersphere/swarm/network/simulation" "github.com/ethersphere/swarm/p2p/protocols" "github.com/ethersphere/swarm/pot" + "github.com/ethersphere/swarm/pss/crypto" + "github.com/ethersphere/swarm/pss/message" "github.com/ethersphere/swarm/state" "github.com/ethersphere/swarm/testutil" ) var ( initOnce = sync.Once{} - cryptoUtils CryptoUtils psslogmain log.Logger pssprotocols map[string]*protoCtrl useHandshake bool @@ -69,22 +71,21 @@ func initTest() { func() { psslogmain = log.New("psslog", "*") - cryptoUtils = NewCryptoUtils() - pssprotocols = make(map[string]*protoCtrl) }, ) } -// test that topic conversion functions give predictable results -func TestTopic(t *testing.T) { +// test that API topic conversion functions give predictable results + +func TestAPITopic(t *testing.T) { api := &API{} topicstr := strings.Join([]string{PingProtocol.Name, strconv.Itoa(int(PingProtocol.Version))}, ":") - // bytestotopic is the authoritative topic conversion source - topicobj := BytesToTopic([]byte(topicstr)) + // message.NewTopic is the authoritative topic conversion source + topicobj := message.NewTopic([]byte(topicstr)) // string to topic and bytes to topic must match topicapiobj, _ := api.StringToTopic(topicstr) @@ -101,55 +102,13 @@ func TestTopic(t *testing.T) { if topichex != pingtopichex { t.Fatalf("protocol topic conversion mismatch; %s != %s", topichex, pingtopichex) } - - // json marshal of topic - topicjsonout, err := topicobj.MarshalJSON() - if err != nil { - t.Fatal(err) - } - if string(topicjsonout)[1:len(topicjsonout)-1] != topichex { - t.Fatalf("topic json marshal mismatch; %s != \"%s\"", topicjsonout, topichex) - } - - // json unmarshal of topic - var topicjsonin Topic - topicjsonin.UnmarshalJSON(topicjsonout) - if topicjsonin != topicobj { - t.Fatalf("topic json unmarshal mismatch: %x != %x", topicjsonin, topicobj) - } -} - -// test bit packing of message control flags -func TestMsgParams(t *testing.T) { - var ctrl byte - ctrl |= pssControlRaw - p := newMsgParamsFromBytes([]byte{ctrl}) - m := newPssMsg(p) - if !m.isRaw() || m.isSym() { - t.Fatal("expected raw=true and sym=false") - } - ctrl |= pssControlSym - p = newMsgParamsFromBytes([]byte{ctrl}) - m = newPssMsg(p) - if !m.isRaw() || !m.isSym() { - t.Fatal("expected raw=true and sym=true") - } - ctrl &= 0xff &^ pssControlRaw - p = newMsgParamsFromBytes([]byte{ctrl}) - m = newPssMsg(p) - if m.isRaw() || !m.isSym() { - t.Fatal("expected raw=false and sym=true") - } } // test if we can insert into cache, match items with cache and cache expiry func TestCache(t *testing.T) { var err error to, _ := hex.DecodeString("08090a0b0c0d0e0f1011121314150001020304050607161718191a1b1c1d1e1f") - ctx, cancel := context.WithTimeout(context.Background(), time.Second) - defer cancel() - keys, err := cryptoUtils.NewKeyPair(ctx) - privkey, err := cryptoUtils.GetPrivateKey(keys) + privkey, err := ethCrypto.GenerateKey() if err != nil { t.Fatal(err) } @@ -159,39 +118,38 @@ func TestCache(t *testing.T) { data := []byte("foo") datatwo := []byte("bar") datathree := []byte("baz") - mparams := &messageParams{ - Src: privkey, - Dst: &privkey.PublicKey, - Topic: PingTopic, - Payload: data, + wparams := &crypto.WrapParams{ + Sender: privkey, + Receiver: &privkey.PublicKey, } - env, err := newSentEnvelope(mparams) - msg := &PssMsg{ + env, err := ps.Crypto.Wrap(data, wparams) + msg := &message.Message{ Payload: env, To: to, + Topic: PingTopic, } - mparams.Payload = datatwo - envtwo, err := newSentEnvelope(mparams) - msgtwo := &PssMsg{ + envtwo, err := ps.Crypto.Wrap(datatwo, wparams) + msgtwo := &message.Message{ Payload: envtwo, To: to, + Topic: PingTopic, } - mparams.Payload = datathree - envthree, err := newSentEnvelope(mparams) - msgthree := &PssMsg{ + envthree, err := ps.Crypto.Wrap(datathree, wparams) + msgthree := &message.Message{ Payload: envthree, To: to, + Topic: PingTopic, } - digestone := ps.msgDigest(msg) + digestone := msg.Digest() if err != nil { t.Fatalf("could not store cache msgone: %v", err) } - digesttwo := ps.msgDigest(msgtwo) + digesttwo := msgtwo.Digest() if err != nil { t.Fatalf("could not store cache msgtwo: %v", err) } - digestthree := ps.msgDigest(msgthree) + digestthree := msgthree.Digest() if err != nil { t.Fatalf("could not store cache msgthree: %v", err) } @@ -241,20 +199,17 @@ func TestAddressMatch(t *testing.T) { remoteaddr := []byte("feedbeef") kadparams := network.NewKadParams() kad := network.NewKademlia(localaddr, kadparams) - ctx, cancel := context.WithTimeout(context.Background(), time.Second) - defer cancel() - keys, err := cryptoUtils.NewKeyPair(ctx) + privkey, err := ethCrypto.GenerateKey() if err != nil { t.Fatalf("Could not generate private key: %v", err) } - privkey, err := cryptoUtils.GetPrivateKey(keys) pssp := NewParams().WithPrivateKey(privkey) ps, err := New(kad, pssp) if err != nil { t.Fatal(err.Error()) } - pssmsg := &PssMsg{ + pssmsg := &message.Message{ To: remoteaddr, } @@ -301,7 +256,7 @@ func TestAddressMatchProx(t *testing.T) { peerCount := nnPeerCount + 2 // set up pss - privKey, err := cryptoUtils.GenerateKey() + privKey, err := ethCrypto.GenerateKey() pssp := NewParams().WithPrivateKey(privKey) ps, err := New(kad, pssp) // enqueue method now is blocking, so we need always somebody processing the outbox @@ -369,7 +324,7 @@ func TestAddressMatchProx(t *testing.T) { // first the unit test on the method that calculates possible receipient using prox for i, distance := range remoteDistances { - pssMsg := newPssMsg(&msgParams{}) + pssMsg := message.New(message.Flags{}) pssMsg.To = make([]byte, len(localAddr)) copy(pssMsg.To, localAddr) var byteIdx = distance / 8 @@ -392,7 +347,7 @@ func TestAddressMatchProx(t *testing.T) { } // register it marking prox capability - topic := BytesToTopic([]byte{0x2a}) + topic := message.NewTopic([]byte{0x2a}) hndlrProxDereg := ps.Register(&topic, &handler{ f: rawHandlerFunc, caps: &handlerCaps{ @@ -409,13 +364,11 @@ func TestAddressMatchProx(t *testing.T) { var data [32]byte rand.Read(data[:]) - pssMsg := newPssMsg(&msgParams{raw: true}) + pssMsg := message.New(message.Flags{Raw: true}) pssMsg.To = remoteAddr pssMsg.Expire = uint32(time.Now().Unix() + 4200) - pssMsg.Payload = &envelope{ - Topic: topic, - Data: data[:], - } + pssMsg.Payload = data[:] + pssMsg.Topic = topic log.Trace("withprox addrs", "local", localAddr, "remote", remoteAddr) ps.handle(context.TODO(), pssMsg) @@ -440,13 +393,11 @@ func TestAddressMatchProx(t *testing.T) { var data [32]byte rand.Read(data[:]) - pssMsg := newPssMsg(&msgParams{raw: true}) + pssMsg := message.New(message.Flags{Raw: true}) pssMsg.To = remoteAddr pssMsg.Expire = uint32(time.Now().Unix() + 4200) - pssMsg.Payload = &envelope{ - Topic: topic, - Data: data[:], - } + pssMsg.Payload = data[:] + pssMsg.Topic = topic log.Trace("withprox addrs", "local", localAddr, "remote", remoteAddr) ps.handle(context.TODO(), pssMsg) @@ -464,13 +415,11 @@ func TestAddressMatchProx(t *testing.T) { remotePotAddr := pot.RandomAddressAt(localPotAddr, distance) remoteAddr := remotePotAddr.Bytes() - pssMsg := newPssMsg(&msgParams{raw: true}) + pssMsg := message.New(message.Flags{Raw: true}) pssMsg.To = remoteAddr pssMsg.Expire = uint32(time.Now().Unix() + 4200) - pssMsg.Payload = &envelope{ - Topic: topic, - Data: []byte(remotePotAddr.String()), - } + pssMsg.Payload = []byte(remotePotAddr.String()) + pssMsg.Topic = topic log.Trace("noprox addrs", "local", localAddr, "remote", remoteAddr) ps.handle(context.TODO(), pssMsg) @@ -483,7 +432,7 @@ func TestAddressMatchProx(t *testing.T) { func TestMessageOutbox(t *testing.T) { // setup - privkey, err := cryptoUtils.GenerateKey() + privkey, err := ethCrypto.GenerateKey() if err != nil { t.Fatal(err.Error()) } @@ -494,7 +443,7 @@ func TestMessageOutbox(t *testing.T) { outboxCapacity := 2 successC := make(chan struct{}) - forward := func(msg *PssMsg) error { + forward := func(msg *message.Message) error { successC <- struct{}{} return nil } @@ -519,10 +468,10 @@ func TestMessageOutbox(t *testing.T) { t.Fatal("timeout waiting for success forward") } - failed := make([]*PssMsg, 0) + failed := make([]*message.Message, 0) failedC := make(chan struct{}) continueC := make(chan struct{}) - failedForward := func(msg *PssMsg) error { + failedForward := func(msg *message.Message) error { failed = append(failed, msg) failedC <- struct{}{} <-continueC @@ -558,7 +507,7 @@ func TestMessageOutbox(t *testing.T) { func TestOutboxFull(t *testing.T) { // setup - privkey, err := crypto.GenerateKey() + privkey, err := ethCrypto.GenerateKey() if err != nil { t.Fatal(err.Error()) } @@ -570,7 +519,7 @@ func TestOutboxFull(t *testing.T) { outboxCapacity := 2 procChan := make(chan struct{}) - succesForward := func(msg *PssMsg) error { + succesForward := func(msg *message.Message) error { <-procChan log.Info("Message processed") return nil @@ -605,23 +554,11 @@ func TestOutboxFull(t *testing.T) { // set and generate pubkeys and symkeys func TestKeys(t *testing.T) { // make our key and init pss with it - ctx, cancel := context.WithTimeout(context.Background(), time.Second) - defer cancel() - ourkeys, err := cryptoUtils.NewKeyPair(ctx) - if err != nil { - t.Fatalf("create 'our' key fail") - } - ctx, cancel2 := context.WithTimeout(context.Background(), time.Second) - defer cancel2() - theirkeys, err := cryptoUtils.NewKeyPair(ctx) - if err != nil { - t.Fatalf("create 'their' key fail") - } - ourprivkey, err := cryptoUtils.GetPrivateKey(ourkeys) + ourprivkey, err := ethCrypto.GenerateKey() if err != nil { t.Fatalf("failed to retrieve 'our' private key") } - theirprivkey, err := cryptoUtils.GetPrivateKey(theirkeys) + theirprivkey, err := ethCrypto.GenerateKey() if err != nil { t.Fatalf("failed to retrieve 'their' private key") } @@ -632,7 +569,7 @@ func TestKeys(t *testing.T) { addr := make(PssAddress, 32) copy(addr, network.RandomAddr().Over()) outkey := network.RandomAddr().Over() - topicobj := BytesToTopic([]byte("foo:42")) + topicobj := message.NewTopic([]byte("foo:42")) ps.SetPeerPublicKey(&theirprivkey.PublicKey, topicobj, addr) outkeyid, err := ps.SetSymmetricKey(outkey, topicobj, addr, false) if err != nil { @@ -646,11 +583,11 @@ func TestKeys(t *testing.T) { } // get the key back from crypto backend, check that it's still the same - outkeyback, err := ps.Crypto.GetSymKey(outkeyid) + outkeyback, err := ps.Crypto.GetSymmetricKey(outkeyid) if err != nil { t.Fatalf(err.Error()) } - inkey, err := ps.Crypto.GetSymKey(inkeyid) + inkey, err := ps.Crypto.GetSymmetricKey(inkeyid) if err != nil { t.Fatalf(err.Error()) } @@ -671,7 +608,7 @@ func TestKeys(t *testing.T) { // check that we can retrieve previously added public key entires per topic and peer func TestGetPublickeyEntries(t *testing.T) { - privkey, err := cryptoUtils.GenerateKey() + privkey, err := ethCrypto.GenerateKey() if err != nil { t.Fatal(err) } @@ -679,16 +616,16 @@ func TestGetPublickeyEntries(t *testing.T) { defer ps.Stop() peeraddr := network.RandomAddr().Over() - topicaddr := make(map[Topic]PssAddress) - topicaddr[Topic{0x13}] = peeraddr - topicaddr[Topic{0x2a}] = peeraddr[:16] - topicaddr[Topic{0x02, 0x9a}] = []byte{} + topicaddr := make(map[message.Topic]PssAddress) + topicaddr[message.Topic{0x13}] = peeraddr + topicaddr[message.Topic{0x2a}] = peeraddr[:16] + topicaddr[message.Topic{0x02, 0x9a}] = []byte{} - remoteprivkey, err := cryptoUtils.GenerateKey() + remoteprivkey, err := ethCrypto.GenerateKey() if err != nil { t.Fatal(err) } - remotepubkeybytes := ps.Crypto.FromECDSAPub(&remoteprivkey.PublicKey) + remotepubkeybytes := ps.Crypto.SerializePublicKey(&remoteprivkey.PublicKey) remotepubkeyhex := common.ToHex(remotepubkeybytes) pssapi := NewAPI(ps) @@ -732,7 +669,7 @@ OUTER: func TestPeerCapabilityMismatch(t *testing.T) { // create privkey for forwarder node - privkey, err := cryptoUtils.GenerateKey() + privkey, err := ethCrypto.GenerateKey() if err != nil { t.Fatal(err) } @@ -774,10 +711,10 @@ func TestPeerCapabilityMismatch(t *testing.T) { kad.On(nopsspeer) // create pss - pssmsg := &PssMsg{ + pssmsg := &message.Message{ To: []byte{}, Expire: uint32(time.Now().Add(time.Second).Unix()), - Payload: &envelope{}, + Payload: nil, } ps := newTestPss(privkey, kad, nil) defer ps.Stop() @@ -792,7 +729,7 @@ func TestPeerCapabilityMismatch(t *testing.T) { func TestRawAllow(t *testing.T) { // set up pss like so many times before - privKey, err := cryptoUtils.GenerateKey() + privKey, err := ethCrypto.GenerateKey() if err != nil { t.Fatal(err) } @@ -800,7 +737,7 @@ func TestRawAllow(t *testing.T) { kad := network.NewKademlia((baseAddr).Over(), network.NewKadParams()) ps := newTestPss(privKey, kad, nil) defer ps.Stop() - topic := BytesToTopic([]byte{0x2a}) + topic := message.NewTopic([]byte{0x2a}) // create handler innards that increments every time a message hits it var receives int @@ -817,14 +754,13 @@ func TestRawAllow(t *testing.T) { ps.Register(&topic, hndlrNoRaw) // test it with a raw message, should be poo-poo - pssMsg := newPssMsg(&msgParams{ - raw: true, + pssMsg := message.New(message.Flags{ + Raw: true, }) pssMsg.To = baseAddr.OAddr pssMsg.Expire = uint32(time.Now().Unix() + 4200) - pssMsg.Payload = &envelope{ - Topic: topic, - } + pssMsg.Topic = topic + pssMsg.Payload = nil ps.handle(context.TODO(), pssMsg) if receives > 0 { t.Fatalf("Expected handler not to be executed with raw cap off") @@ -840,7 +776,7 @@ func TestRawAllow(t *testing.T) { deregRawHandler := ps.Register(&topic, hndlrRaw) // should work now - pssMsg.Payload.Data = []byte("Raw Deal") + pssMsg.Payload = []byte("Raw Deal") ps.handle(context.TODO(), pssMsg) if receives == 0 { t.Fatalf("Expected handler to be executed with raw cap on") @@ -851,7 +787,7 @@ func TestRawAllow(t *testing.T) { deregRawHandler() // check that raw messages fail again - pssMsg.Payload.Data = []byte("Raw Trump") + pssMsg.Payload = []byte("Raw Trump") ps.handle(context.TODO(), pssMsg) if receives != prevReceives { t.Fatalf("Expected handler not to be executed when raw handler is retracted") @@ -1516,22 +1452,19 @@ func benchmarkSymKeySend(b *testing.B) { if err != nil { b.Fatalf("benchmark called with invalid msgsize param '%s': %v", msgsizestring[1], err) } - ctx, cancel := context.WithTimeout(context.Background(), time.Second) - defer cancel() - keys, err := cryptoUtils.NewKeyPair(ctx) - privkey, err := cryptoUtils.GetPrivateKey(keys) + privkey, err := ethCrypto.GenerateKey() ps := newTestPss(privkey, nil, nil) defer ps.Stop() msg := make([]byte, msgsize) rand.Read(msg) - topic := BytesToTopic([]byte("foo")) + topic := message.NewTopic([]byte("foo")) to := make(PssAddress, 32) copy(to[:], network.RandomAddr().Over()) symkeyid, err := ps.GenerateSymmetricKey(topic, to, true) if err != nil { b.Fatalf("could not generate symkey: %v", err) } - symkey, err := ps.Crypto.GetSymKey(symkeyid) + symkey, err := ps.Crypto.GetSymmetricKey(symkeyid) if err != nil { b.Fatalf("could not retrieve symkey: %v", err) } @@ -1561,21 +1494,18 @@ func benchmarkAsymKeySend(b *testing.B) { if err != nil { b.Fatalf("benchmark called with invalid msgsize param '%s': %v", msgsizestring[1], err) } - ctx, cancel := context.WithTimeout(context.Background(), time.Second) - defer cancel() - keys, err := cryptoUtils.NewKeyPair(ctx) - privkey, err := cryptoUtils.GetPrivateKey(keys) + privkey, err := ethCrypto.GenerateKey() ps := newTestPss(privkey, nil, nil) defer ps.Stop() msg := make([]byte, msgsize) rand.Read(msg) - topic := BytesToTopic([]byte("foo")) + topic := message.NewTopic([]byte("foo")) to := make(PssAddress, 32) copy(to[:], network.RandomAddr().Over()) ps.SetPeerPublicKey(&privkey.PublicKey, topic, to) b.ResetTimer() for i := 0; i < b.N; i++ { - ps.SendAsym(common.ToHex(ps.Crypto.FromECDSAPub(&privkey.PublicKey)), topic, msg) + ps.SendAsym(common.ToHex(ps.Crypto.SerializePublicKey(&privkey.PublicKey)), topic, msg) } } func BenchmarkSymkeyBruteforceChangeaddr(b *testing.B) { @@ -1607,19 +1537,16 @@ func benchmarkSymkeyBruteforceChangeaddr(b *testing.B) { b.Fatalf("benchmark called with invalid cachesize '%s': %v", keycountstring[2], err) } } - pssmsgs := make([]*PssMsg, 0, keycount) + pssmsgs := make([]*message.Message, 0, keycount) var keyid string - ctx, cancel := context.WithTimeout(context.Background(), time.Second) - defer cancel() - keys, err := cryptoUtils.NewKeyPair(ctx) - privkey, err := cryptoUtils.GetPrivateKey(keys) + privkey, err := ethCrypto.GenerateKey() if cachesize > 0 { ps = newTestPss(privkey, nil, &Params{SymKeyCacheCapacity: int(cachesize)}) } else { ps = newTestPss(privkey, nil, nil) } defer ps.Stop() - topic := BytesToTopic([]byte("foo")) + topic := message.NewTopic([]byte("foo")) for i := 0; i < int(keycount); i++ { to := make(PssAddress, 32) copy(to[:], network.RandomAddr().Over()) @@ -1627,26 +1554,24 @@ func benchmarkSymkeyBruteforceChangeaddr(b *testing.B) { if err != nil { b.Fatalf("cant generate symkey #%d: %v", i, err) } - symkey, err := ps.Crypto.GetSymKey(keyid) + symkey, err := ps.Crypto.GetSymmetricKey(keyid) if err != nil { b.Fatalf("could not retrieve symkey %s: %v", keyid, err) } - mparams := &messageParams{ - KeySym: symkey, - Topic: topic, - Payload: []byte("xyzzy"), - Padding: []byte("1234567890abcdef"), + wparams := &crypto.WrapParams{ + SymmetricKey: symkey, } - env, err := newSentEnvelope(mparams) + payload, err := ps.Crypto.Wrap([]byte("xyzzy"), wparams) if err != nil { b.Fatalf("could not generate envelope: %v", err) } ps.Register(&topic, &handler{ f: noopHandlerFunc, }) - pssmsgs = append(pssmsgs, &PssMsg{ + pssmsgs = append(pssmsgs, &message.Message{ To: to, - Payload: env, + Topic: topic, + Payload: payload, }) } b.ResetTimer() @@ -1687,17 +1612,14 @@ func benchmarkSymkeyBruteforceSameaddr(b *testing.B) { } } addr := make([]PssAddress, keycount) - ctx, cancel := context.WithTimeout(context.Background(), time.Second) - defer cancel() - keys, err := cryptoUtils.NewKeyPair(ctx) - privkey, err := cryptoUtils.GetPrivateKey(keys) + privkey, err := ethCrypto.GenerateKey() if cachesize > 0 { ps = newTestPss(privkey, nil, &Params{SymKeyCacheCapacity: int(cachesize)}) } else { ps = newTestPss(privkey, nil, nil) } defer ps.Stop() - topic := BytesToTopic([]byte("foo")) + topic := message.NewTopic([]byte("foo")) for i := 0; i < int(keycount); i++ { copy(addr[i], network.RandomAddr().Over()) keyid, err = ps.GenerateSymmetricKey(topic, addr[i], true) @@ -1706,26 +1628,24 @@ func benchmarkSymkeyBruteforceSameaddr(b *testing.B) { } } - symkey, err := ps.Crypto.GetSymKey(keyid) + symkey, err := ps.Crypto.GetSymmetricKey(keyid) if err != nil { b.Fatalf("could not retrieve symkey %s: %v", keyid, err) } - mparams := &messageParams{ - KeySym: symkey, - Topic: topic, - Payload: []byte("xyzzy"), - Padding: []byte("1234567890abcdef"), + wparams := &crypto.WrapParams{ + SymmetricKey: symkey, } - env, err := newSentEnvelope(mparams) + payload, err := ps.Crypto.Wrap([]byte("xyzzy"), wparams) if err != nil { b.Fatalf("could not generate envelope: %v", err) } ps.Register(&topic, &handler{ f: noopHandlerFunc, }) - pssmsg := &PssMsg{ + pssmsg := &message.Message{ To: addr[len(addr)-1][:], - Payload: env, + Topic: topic, + Payload: payload, } for i := 0; i < b.N; i++ { if err := ps.process(pssmsg, false, false); err != nil { @@ -1734,16 +1654,14 @@ func benchmarkSymkeyBruteforceSameaddr(b *testing.B) { } } -func testRandomMessage() *PssMsg { +func testRandomMessage() *message.Message { addr := make([]byte, 32) addr[0] = 0x01 - msg := newPssMsg(&msgParams{}) + msg := message.New(message.Flags{}) msg.To = addr msg.Expire = uint32(time.Now().Add(time.Second * 60).Unix()) - msg.Payload = &envelope{ - Topic: [4]byte{}, - Data: []byte{0x66, 0x6f, 0x6f}, - } + msg.Topic = [4]byte{} + msg.Payload = []byte{0x66, 0x6f, 0x6f} return msg } @@ -1818,10 +1736,7 @@ func newServices(allowRaw bool) map[string]simulation.ServiceFunc { // execadapter does not exec init() initTest() - ctxlocal, cancel := context.WithTimeout(context.Background(), time.Second) - defer cancel() - keys, err := cryptoUtils.NewKeyPair(ctxlocal) - privkey, err := cryptoUtils.GetPrivateKey(keys) + privkey, err := ethCrypto.GenerateKey() pssp := NewParams().WithPrivateKey(privkey) pssp.AllowRaw = allowRaw bzzPrivateKey, err := simulation.BzzPrivateKeyFromConfig(ctx.Config) @@ -1913,7 +1828,7 @@ func NewAPITest(ps *Pss) *APITest { return &APITest{Pss: ps} } -func (apitest *APITest) SetSymKeys(pubkeyid string, recvsymkey []byte, sendsymkey []byte, limit uint16, topic Topic, to hexutil.Bytes) ([2]string, error) { +func (apitest *APITest) SetSymKeys(pubkeyid string, recvsymkey []byte, sendsymkey []byte, limit uint16, topic message.Topic, to hexutil.Bytes) ([2]string, error) { recvsymkeyid, err := apitest.SetSymmetricKey(recvsymkey, topic, PssAddress(to), true) if err != nil { diff --git a/pss/types.go b/pss/types.go index 6241d10a54..964b227ac6 100644 --- a/pss/types.go +++ b/pss/types.go @@ -18,55 +18,17 @@ package pss import ( "encoding/json" - "fmt" - "sync" "time" - "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common/hexutil" "github.com/ethereum/go-ethereum/p2p" - "github.com/ethereum/go-ethereum/rlp" - "github.com/ethersphere/swarm/storage" -) - -const ( - defaultWhisperTTL = 6000 -) - -const ( - pssControlSym = 1 - pssControlRaw = 1 << 1 - TopicLength = 4 // in bytes Taken from Whisper + "github.com/ethersphere/swarm/pss/message" ) var ( - topicHashMutex = sync.Mutex{} - topicHashFunc = storage.MakeHashFunc("SHA256")() - rawTopic = Topic{} + rawTopic = message.Topic{} ) -// Topic is the PSS encapsulation of the Whisper topic type -type Topic [TopicLength]byte - -func (t *Topic) String() string { - return hexutil.Encode(t[:]) -} - -// MarshalJSON implements the json.Marshaler interface -func (t Topic) MarshalJSON() (b []byte, err error) { - return json.Marshal(t.String()) -} - -// MarshalJSON implements the json.Marshaler interface -func (t *Topic) UnmarshalJSON(input []byte) error { - topicbytes, err := hexutil.Decode(string(input[1 : len(input)-1])) - if err != nil { - return err - } - copy(t[:], topicbytes) - return nil -} - // PssAddress is an alias for []byte. It represents a variable length address type PssAddress []byte @@ -87,91 +49,19 @@ func (a *PssAddress) UnmarshalJSON(input []byte) error { return nil } -// holds the digest of a message used for caching -type digest [digestLength]byte - -// conceals bitwise operations on the control flags byte -type msgParams struct { - raw bool - sym bool -} - -func newMsgParamsFromBytes(paramBytes []byte) *msgParams { - if len(paramBytes) != 1 { - return nil - } - return &msgParams{ - raw: paramBytes[0]&pssControlRaw > 0, - sym: paramBytes[0]&pssControlSym > 0, - } -} - -func (m *msgParams) Bytes() (paramBytes []byte) { - var b byte - if m.raw { - b |= pssControlRaw - } - if m.sym { - b |= pssControlSym - } - paramBytes = append(paramBytes, b) - return paramBytes -} - type outboxMsg struct { - msg *PssMsg + msg *message.Message startedAt time.Time } -func newOutboxMsg(msg *PssMsg) *outboxMsg { +func newOutboxMsg(msg *message.Message) *outboxMsg { return &outboxMsg{ msg: msg, startedAt: time.Now(), } } -// PssMsg encapsulates messages transported over pss. -type PssMsg struct { - To []byte - Control []byte - Expire uint32 - Payload *envelope -} - -func newPssMsg(param *msgParams) *PssMsg { - return &PssMsg{ - Control: param.Bytes(), - } -} - -// message is flagged as raw / external encryption -func (msg *PssMsg) isRaw() bool { - return msg.Control[0]&pssControlRaw > 0 -} - -// message is flagged as symmetrically encrypted -func (msg *PssMsg) isSym() bool { - return msg.Control[0]&pssControlSym > 0 -} - -// serializes the message for use in cache -func (msg *PssMsg) serialize() []byte { - rlpdata, _ := rlp.EncodeToBytes(struct { - To []byte - Payload *envelope - }{ - To: msg.To, - Payload: msg.Payload, - }) - return rlpdata -} - -// String representation of PssMsg -func (msg *PssMsg) String() string { - return fmt.Sprintf("PssMsg: Recipient: %x", common.ToHex(msg.To)) -} - -// Signature for a message handler function for a PssMsg +// Signature for a message handler function for a Message // Implementations of this type are passed to Pss.Register together with a topic, type HandlerFunc func(msg []byte, p *p2p.Peer, asymmetric bool, keyid string) error @@ -219,12 +109,3 @@ func (store *stateStore) Load(key string) ([]byte, error) { func (store *stateStore) Save(key string, v []byte) error { return nil } - -// BytesToTopic hashes an arbitrary length byte slice and truncates it to the length of a topic, using only the first bytes of the digest -func BytesToTopic(b []byte) Topic { - topicHashMutex.Lock() - defer topicHashMutex.Unlock() - topicHashFunc.Reset() - topicHashFunc.Write(b) - return toTopic(topicHashFunc.Sum(nil)) -} diff --git a/swarm.go b/swarm.go index dc8952063e..690ec2c3fb 100644 --- a/swarm.go +++ b/swarm.go @@ -52,6 +52,7 @@ import ( "github.com/ethersphere/swarm/network/stream/v2" "github.com/ethersphere/swarm/p2p/protocols" "github.com/ethersphere/swarm/pss" + pssmessage "github.com/ethersphere/swarm/pss/message" "github.com/ethersphere/swarm/state" "github.com/ethersphere/swarm/storage" "github.com/ethersphere/swarm/storage/feed" @@ -567,7 +568,7 @@ func (s *Swarm) APIs() []rpc.API { } // RegisterPssProtocol adds a devp2p protocol to the swarm node's Pss instance -func (s *Swarm) RegisterPssProtocol(topic *pss.Topic, spec *protocols.Spec, targetprotocol *p2p.Protocol, options *pss.ProtocolParams) (*pss.Protocol, error) { +func (s *Swarm) RegisterPssProtocol(topic *pssmessage.Topic, spec *protocols.Spec, targetprotocol *p2p.Protocol, options *pss.ProtocolParams) (*pss.Protocol, error) { return pss.RegisterProtocol(s.ps, topic, spec, targetprotocol, options) }