Skip to content

Commit 024e230

Browse files
authored
fix: wrap errors using %w to preserve context (#1321)
* fix: wrap errors using %w to preserve context * move the consumer state check
1 parent 31ba011 commit 024e230

File tree

12 files changed

+56
-55
lines changed

12 files changed

+56
-55
lines changed

oauth2/auth.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -112,7 +112,7 @@ func ExtractUserName(token oauth2.Token) (string, error) {
112112
p := jwt.Parser{}
113113
claims := jwt.MapClaims{}
114114
if _, _, err := p.ParseUnverified(token.AccessToken, claims); err != nil {
115-
return "", fmt.Errorf("unable to decode the access token: %v", err)
115+
return "", fmt.Errorf("unable to decode the access token: %w", err)
116116
}
117117
username, ok := claims[ClaimNameUserName]
118118
if !ok {

oauth2/cache/cache.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,7 @@ func (t *tokenCache) Token() (*xoauth2.Token, error) {
8080
// load from the store and use the access token if it isn't expired
8181
grant, err := t.store.LoadGrant(t.audience)
8282
if err != nil {
83-
return nil, fmt.Errorf("LoadGrant: %v", err)
83+
return nil, fmt.Errorf("LoadGrant: %w", err)
8484
}
8585
t.token = grant.Token
8686
if t.token != nil && t.validateAccessToken(*t.token) {
@@ -90,13 +90,13 @@ func (t *tokenCache) Token() (*xoauth2.Token, error) {
9090
// obtain and cache a fresh access token
9191
grant, err = t.refresher.Refresh(grant)
9292
if err != nil {
93-
return nil, fmt.Errorf("RefreshGrant: %v", err)
93+
return nil, fmt.Errorf("RefreshGrant: %w", err)
9494
}
9595
t.token = grant.Token
9696
err = t.store.SaveGrant(t.audience, *grant)
9797
if err != nil {
9898
// TODO log rather than throw
99-
return nil, fmt.Errorf("SaveGrant: %v", err)
99+
return nil, fmt.Errorf("SaveGrant: %w", err)
100100
}
101101

102102
return t.token, nil
@@ -117,14 +117,14 @@ func (t *tokenCache) InvalidateToken() error {
117117
}
118118
grant, err := t.store.LoadGrant(t.audience)
119119
if err != nil {
120-
return fmt.Errorf("LoadGrant: %v", err)
120+
return fmt.Errorf("LoadGrant: %w", err)
121121
}
122122
if grant.Token != nil && grant.Token.AccessToken == previous.AccessToken {
123123
grant.Token.Expiry = time.Unix(0, 0).Add(expiryDelta)
124124
err = t.store.SaveGrant(t.audience, *grant)
125125
if err != nil {
126126
// TODO log rather than throw
127-
return fmt.Errorf("SaveGrant: %v", err)
127+
return fmt.Errorf("SaveGrant: %w", err)
128128
}
129129
}
130130
return nil

oauth2/store/keyring.go

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ package store
2020
import (
2121
"crypto/sha1"
2222
"encoding/json"
23+
"errors"
2324
"fmt"
2425
"sync"
2526

@@ -92,7 +93,7 @@ func (f *KeyringStore) LoadGrant(audience string) (*oauth2.AuthorizationGrant, e
9293

9394
item, err := f.getItem(audience)
9495
if err != nil {
95-
if err == keyring.ErrKeyNotFound {
96+
if errors.Is(err, keyring.ErrKeyNotFound) {
9697
return nil, ErrNoAuthenticationData
9798
}
9899
return nil, err
@@ -119,10 +120,10 @@ func (f *KeyringStore) WhoAmI(audience string) (string, error) {
119120
key := hashKeyringKey(audience)
120121
authItem, err := f.kr.Get(key)
121122
if err != nil {
122-
if err == keyring.ErrKeyNotFound {
123+
if errors.Is(err, keyring.ErrKeyNotFound) {
123124
return "", ErrNoAuthenticationData
124125
}
125-
return "", fmt.Errorf("unable to get information from the keyring: %v", err)
126+
return "", fmt.Errorf("unable to get information from the keyring: %w", err)
126127
}
127128
return authItem.Label, nil
128129
}
@@ -134,13 +135,13 @@ func (f *KeyringStore) Logout() error {
134135
var err error
135136
keys, err := f.kr.Keys()
136137
if err != nil {
137-
return fmt.Errorf("unable to get information from the keyring: %v", err)
138+
return fmt.Errorf("unable to get information from the keyring: %w", err)
138139
}
139140
for _, key := range keys {
140141
err = f.kr.Remove(key)
141142
}
142143
if err != nil {
143-
return fmt.Errorf("unable to update the keyring: %v", err)
144+
return fmt.Errorf("unable to update the keyring: %w", err)
144145
}
145146
return nil
146147
}
@@ -180,7 +181,7 @@ func (f *KeyringStore) setItem(item storedItem) error {
180181
}
181182
err = f.kr.Set(i)
182183
if err != nil {
183-
return fmt.Errorf("unable to update the keyring: %v", err)
184+
return fmt.Errorf("unable to update the keyring: %w", err)
184185
}
185186
return nil
186187
}

oauth2/store/store.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ import (
2626
// ErrNoAuthenticationData indicates that stored authentication data is not available
2727
var ErrNoAuthenticationData = errors.New("authentication data is not available")
2828

29-
// ErrUnsupportedAuthData ndicates that stored authentication data is unusable
29+
// ErrUnsupportedAuthData indicates that stored authentication data is unusable
3030
var ErrUnsupportedAuthData = errors.New("authentication data is not usable")
3131

3232
// Store is responsible for persisting authorization grants

pulsar/consumer_partition.go

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1158,21 +1158,21 @@ func (pc *partitionConsumer) MessageReceived(response *pb.CommandMessage, header
11581158
// error decrypting the payload
11591159
if err != nil {
11601160
// default crypto failure action
1161-
crypToFailureAction := crypto.ConsumerCryptoFailureActionFail
1161+
cryptoFailureAction := crypto.ConsumerCryptoFailureActionFail
11621162
if pc.options.decryption != nil {
1163-
crypToFailureAction = pc.options.decryption.ConsumerCryptoFailureAction
1163+
cryptoFailureAction = pc.options.decryption.ConsumerCryptoFailureAction
11641164
}
11651165

1166-
switch crypToFailureAction {
1166+
switch cryptoFailureAction {
11671167
case crypto.ConsumerCryptoFailureActionFail:
1168-
pc.log.Errorf("consuming message failed due to decryption err :%v", err)
1168+
pc.log.Errorf("consuming message failed due to decryption err: %v", err)
11691169
pc.NackID(newTrackingMessageID(int64(pbMsgID.GetLedgerId()), int64(pbMsgID.GetEntryId()), 0, 0, 0, nil))
11701170
return err
11711171
case crypto.ConsumerCryptoFailureActionDiscard:
11721172
pc.discardCorruptedMessage(pbMsgID, pb.CommandAck_DecryptionError)
1173-
return fmt.Errorf("discarding message on decryption error :%v", err)
1173+
return fmt.Errorf("discarding message on decryption error: %w", err)
11741174
case crypto.ConsumerCryptoFailureActionConsume:
1175-
pc.log.Warnf("consuming encrypted message due to error in decryption :%v", err)
1175+
pc.log.Warnf("consuming encrypted message due to error in decryption: %v", err)
11761176
messages := []*message{
11771177
{
11781178
publishTime: timeFromUnixTimestampMillis(msgMeta.GetPublishTime()),
@@ -1767,16 +1767,16 @@ func (pc *partitionConsumer) runEventsLoop() {
17671767
func (pc *partitionConsumer) internalClose(req *closeRequest) {
17681768
defer close(req.doneCh)
17691769
state := pc.getConsumerState()
1770-
if state != consumerReady {
1771-
// this might be redundant but to ensure nack tracker is closed
1770+
if state == consumerClosed || state == consumerClosing {
1771+
pc.log.WithField("state", state).Error("Consumer is closing or has closed")
17721772
if pc.nackTracker != nil {
17731773
pc.nackTracker.Close()
17741774
}
17751775
return
17761776
}
17771777

1778-
if state == consumerClosed || state == consumerClosing {
1779-
pc.log.WithField("state", state).Error("Consumer is closing or has closed")
1778+
if state != consumerReady {
1779+
// this might be redundant but to ensure nack tracker is closed
17801780
if pc.nackTracker != nil {
17811781
pc.nackTracker.Close()
17821782
}

pulsar/crypto/default_message_crypto.go

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,7 @@ func (d *DefaultMessageCrypto) addPublicKeyCipher(keyName string, keyReader KeyR
9595
d.cipherLock.Lock()
9696
defer d.cipherLock.Unlock()
9797
if keyName == "" || keyReader == nil {
98-
return fmt.Errorf("keyname or keyreader is null")
98+
return fmt.Errorf("keyname or keyreader is nil")
9999
}
100100

101101
// read the public key and its info using keyReader
@@ -212,7 +212,7 @@ func (d *DefaultMessageCrypto) Encrypt(encKeys []string,
212212
func (d *DefaultMessageCrypto) Decrypt(msgMetadata MessageMetadataSupplier,
213213
payload []byte,
214214
keyReader KeyReader) ([]byte, error) {
215-
// if data key is present, attempt to derypt using the existing key
215+
// if data key is present, attempt to decrypt using the existing key
216216
if d.dataKey != nil {
217217
decryptedData, err := d.getKeyAndDecryptData(msgMetadata, payload)
218218
if err != nil {
@@ -342,20 +342,20 @@ func (d *DefaultMessageCrypto) loadPrivateKey(key []byte) (gocrypto.PrivateKey,
342342

343343
// read the public key into RSA key
344344
func (d *DefaultMessageCrypto) loadPublicKey(key []byte) (gocrypto.PublicKey, error) {
345-
var publickKey gocrypto.PublicKey
345+
var publicKey gocrypto.PublicKey
346346

347347
pubPem, _ := pem.Decode(key)
348348
if pubPem == nil {
349-
return publickKey, fmt.Errorf("failed to decode public key")
349+
return publicKey, fmt.Errorf("failed to decode public key")
350350
}
351351

352352
genericPublicKey, err := x509.ParsePKIXPublicKey(pubPem.Bytes)
353353
if err != nil {
354-
return publickKey, err
354+
return publicKey, err
355355
}
356-
publickKey = genericPublicKey
356+
publicKey = genericPublicKey
357357

358-
return publickKey, nil
358+
return publicKey, nil
359359
}
360360

361361
func generateDataKey() ([]byte, error) {

pulsar/internal/commands.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -272,7 +272,7 @@ func serializeMessage(wb Buffer,
272272
encryptedPayload, err := encryptor.Encrypt(compressedPayload, msgMetadata)
273273
if err != nil {
274274
// error occurred while encrypting the payload, ProducerCryptoFailureAction is set to Fail
275-
return fmt.Errorf("encryption of message failed, ProducerCryptoFailureAction is set to Fail. Error :%v", err)
275+
return fmt.Errorf("encryption of message failed, ProducerCryptoFailureAction is set to Fail. Error: %w", err)
276276
}
277277

278278
cmdSize := uint32(proto.Size(cmdSend))

pulsar/internal/connection.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -947,7 +947,7 @@ func (c *connection) handleTopicMigrated(commandTopicMigrated *pb.CommandTopicMi
947947
resourceID := commandTopicMigrated.GetResourceId()
948948
migratedBrokerServiceURL := c.getMigratedBrokerServiceURL(commandTopicMigrated)
949949
if migratedBrokerServiceURL == "" {
950-
c.log.Warnf("Failed to find the migrated broker url for resource: %s, migratedBrokerUrl: %s, migratedBrokerUrlTls:%s",
950+
c.log.Warnf("Failed to find the migrated broker url for resource: %d, migratedBrokerUrl: %s, migratedBrokerUrlTls:%s",
951951
resourceID,
952952
commandTopicMigrated.GetBrokerServiceUrl(),
953953
commandTopicMigrated.GetBrokerServiceUrlTls())

pulsar/internal/crypto/producer_encryptor.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -55,19 +55,19 @@ func (e *producerEncryptor) Encrypt(payload []byte, msgMetadata *pb.MessageMetad
5555
crypto.NewMessageMetadataSupplier(msgMetadata),
5656
payload)
5757

58-
// error encryping the payload
58+
// error encrypting the payload
5959
if err != nil {
6060
// error occurred in encrypting the payload
6161
// crypto ProducerCryptoFailureAction is set to send
62-
// send unencrypted message
62+
// unencrypted message
6363
if e.producerCryptoFailureAction == crypto.ProducerCryptoFailureActionSend {
6464
e.logger.
6565
WithError(err).
6666
Warnf("Encryption failed for payload sending unencrypted message ProducerCryptoFailureAction is set to send")
6767
return payload, nil
6868
}
6969

70-
return nil, fmt.Errorf("ProducerCryptoFailureAction is set to Fail and error occurred in encrypting payload :%v", err)
70+
return nil, fmt.Errorf("ProducerCryptoFailureAction is set to Fail and error occurred in encrypting payload: %w", err)
7171
}
7272
return encryptedPayload, nil
7373
}

pulsar/primitiveSerDe.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -100,14 +100,14 @@ func (b BinaryFreeList) Uint64(r io.Reader, byteOrder binary.ByteOrder) (uint64,
100100

101101
func (b BinaryFreeList) Float64(buf []byte) (float64, error) {
102102
if len(buf) < 8 {
103-
return 0, fmt.Errorf("cannot decode binary double: %s", io.ErrShortBuffer)
103+
return 0, fmt.Errorf("cannot decode binary double: %w", io.ErrShortBuffer)
104104
}
105105
return math.Float64frombits(binary.BigEndian.Uint64(buf[:8])), nil
106106
}
107107

108108
func (b BinaryFreeList) Float32(buf []byte) (float32, error) {
109109
if len(buf) < 4 {
110-
return 0, fmt.Errorf("cannot decode binary float: %s", io.ErrShortBuffer)
110+
return 0, fmt.Errorf("cannot decode binary float: %w", io.ErrShortBuffer)
111111
}
112112
return math.Float32frombits(binary.BigEndian.Uint32(buf[:4])), nil
113113
}

0 commit comments

Comments
 (0)