diff --git a/go.mod b/go.mod index e1a4c93..0a97f60 100644 --- a/go.mod +++ b/go.mod @@ -10,11 +10,11 @@ require ( require ( github.com/gogo/protobuf v1.3.2 // indirect - github.com/klauspost/compress v1.17.11 // indirect - github.com/nats-io/nats-server/v2 v2.10.22 // indirect + github.com/klauspost/compress v1.18.0 // indirect + github.com/nats-io/nats-server/v2 v2.10.27 // indirect github.com/nats-io/nats-streaming-server v0.25.6 // indirect - github.com/nats-io/nats.go v1.37.0 // indirect - github.com/nats-io/nkeys v0.4.8 // indirect + github.com/nats-io/nats.go v1.39.1 // indirect + github.com/nats-io/nkeys v0.4.10 // indirect github.com/nats-io/nuid v1.0.1 // indirect golang.org/x/crypto v0.35.0 // indirect golang.org/x/sys v0.30.0 // indirect diff --git a/go.sum b/go.sum index 423ef3b..6d5c6f6 100644 --- a/go.sum +++ b/go.sum @@ -16,26 +16,26 @@ github.com/hashicorp/raft v1.6.0 h1:tkIAORZy2GbJ2Trp5eUSggLXDPOJLXC+JJLNMMqtgtM= github.com/hashicorp/raft v1.6.0/go.mod h1:Xil5pDgeGwRWuX4uPUmwa+7Vagg4N804dz6mhNi6S7o= github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8= github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= -github.com/klauspost/compress v1.17.11 h1:In6xLpyWOi1+C7tXUUWv2ot1QvBjxevKAaI6IXrJmUc= -github.com/klauspost/compress v1.17.11/go.mod h1:pMDklpSncoRMuLFrf1W9Ss9KT+0rH90U12bZKk7uwG0= +github.com/klauspost/compress v1.18.0 h1:c/Cqfb0r+Yi+JtIEq73FWXVkRonBlf0CRNYc8Zttxdo= +github.com/klauspost/compress v1.18.0/go.mod h1:2Pp+KzxcywXVXMr50+X0Q/Lsb43OQHYWRCY2AiWywWQ= github.com/mattn/go-colorable v0.1.13 h1:fFA4WZxdEF4tXPZVKMLwD8oUnCTTo08duU7wxecdEvA= github.com/mattn/go-colorable v0.1.13/go.mod h1:7S9/ev0klgBDR4GtXTXX8a3vIGJpMovkB8vQcUbaXHg= github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY= github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y= github.com/minio/highwayhash v1.0.3 h1:kbnuUMoHYyVl7szWjSxJnxw11k2U709jqFPPmIUyD6Q= github.com/minio/highwayhash v1.0.3/go.mod h1:GGYsuwP/fPD6Y9hMiXuapVvlIUEhFhMTh0rxU3ik1LQ= -github.com/nats-io/jwt/v2 v2.5.8 h1:uvdSzwWiEGWGXf+0Q+70qv6AQdvcvxrv9hPM0RiPamE= -github.com/nats-io/jwt/v2 v2.5.8/go.mod h1:ZdWS1nZa6WMZfFwwgpEaqBV8EPGVgOTDHN/wTbz0Y5A= -github.com/nats-io/nats-server/v2 v2.10.22 h1:Yt63BGu2c3DdMoBZNcR6pjGQwk/asrKU7VX846ibxDA= -github.com/nats-io/nats-server/v2 v2.10.22/go.mod h1:X/m1ye9NYansUXYFrbcDwUi/blHkrgHh2rgCJaakonk= +github.com/nats-io/jwt/v2 v2.7.3 h1:6bNPK+FXgBeAqdj4cYQ0F8ViHRbi7woQLq4W29nUAzE= +github.com/nats-io/jwt/v2 v2.7.3/go.mod h1:GvkcbHhKquj3pkioy5put1wvPxs78UlZ7D/pY+BgZk4= +github.com/nats-io/nats-server/v2 v2.10.27 h1:A/i3JqtrP897UHc2/Jia/mqaXkqj9+HGdpz+R0mC+sM= +github.com/nats-io/nats-server/v2 v2.10.27/go.mod h1:SGzoWGU8wUVnMr/HJhEMv4R8U4f7hF4zDygmRxpNsvg= github.com/nats-io/nats-streaming-server v0.25.6 h1:8OBRaIl64u+DFvZYpF50RRzwG/yLcJZL0R7VMc7tp4Y= github.com/nats-io/nats-streaming-server v0.25.6/go.mod h1:LEcu6uGSDBB4O/IBUsDBHYk/O0K7XZza8nMjCIXicLk= github.com/nats-io/nats.go v1.22.1/go.mod h1:tLqubohF7t4z3du1QDPYJIQQyhb4wl6DhjxEajSI7UA= -github.com/nats-io/nats.go v1.37.0 h1:07rauXbVnnJvv1gfIyghFEo6lUcYRY0WXc3x7x0vUxE= -github.com/nats-io/nats.go v1.37.0/go.mod h1:Ubdu4Nh9exXdSz0RVWRFBbRfrbSxOYd26oF0wkWclB8= +github.com/nats-io/nats.go v1.39.1 h1:oTkfKBmz7W047vRxV762M67ZdXeOtUgvbBaNoQ+3PPk= +github.com/nats-io/nats.go v1.39.1/go.mod h1:MgRb8oOdigA6cYpEPhXJuRVH6UE/V4jblJ2jQ27IXYM= github.com/nats-io/nkeys v0.3.0/go.mod h1:gvUNGjVcM2IPr5rCsRsC6Wb3Hr2CQAm08dsxtV6A5y4= -github.com/nats-io/nkeys v0.4.8 h1:+wee30071y3vCZAYRsnrmIPaOe47A/SkK/UBDPdIV70= -github.com/nats-io/nkeys v0.4.8/go.mod h1:kqXRgRDPlGy7nGaEDMuYzmiJCIAAWDK0IMBtDmGD0nc= +github.com/nats-io/nkeys v0.4.10 h1:glmRrpCmYLHByYcePvnTBEAwawwapjCPMjy2huw20wc= +github.com/nats-io/nkeys v0.4.10/go.mod h1:OjRrnIKnWBFl+s4YK5ChQfvHP2fxqZexrKJoVVyWB3U= github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw= github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c= github.com/nats-io/stan.go v0.10.4 h1:19GS/eD1SeQJaVkeM9EkvEYattnvnWrZ3wkSWSw4uXw= @@ -88,8 +88,8 @@ golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= golang.org/x/text v0.6.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= -golang.org/x/time v0.7.0 h1:ntUhktv3OPE6TgYxXWv9vKvUSJyIFJlyohwbkEwPrKQ= -golang.org/x/time v0.7.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM= +golang.org/x/time v0.10.0 h1:3usCWA8tQn0L8+hFJQNgzpWbd89begxN66o1Ojdn5L4= +golang.org/x/time v0.10.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= golang.org/x/tools v0.0.0-20200619180055-7c47624df98f/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE= diff --git a/vendor/github.com/klauspost/compress/flate/fast_encoder.go b/vendor/github.com/klauspost/compress/flate/fast_encoder.go index c8124b5..0e8b163 100644 --- a/vendor/github.com/klauspost/compress/flate/fast_encoder.go +++ b/vendor/github.com/klauspost/compress/flate/fast_encoder.go @@ -6,8 +6,10 @@ package flate import ( - "encoding/binary" "fmt" + "math/bits" + + "github.com/klauspost/compress/internal/le" ) type fastEnc interface { @@ -58,11 +60,11 @@ const ( ) func load3232(b []byte, i int32) uint32 { - return binary.LittleEndian.Uint32(b[i:]) + return le.Load32(b, i) } func load6432(b []byte, i int32) uint64 { - return binary.LittleEndian.Uint64(b[i:]) + return le.Load64(b, i) } type tableEntry struct { @@ -134,8 +136,8 @@ func hashLen(u uint64, length, mls uint8) uint32 { // matchlen will return the match length between offsets and t in src. // The maximum length returned is maxMatchLength - 4. // It is assumed that s > t, that t >=0 and s < len(src). -func (e *fastGen) matchlen(s, t int32, src []byte) int32 { - if debugDecode { +func (e *fastGen) matchlen(s, t int, src []byte) int32 { + if debugDeflate { if t >= s { panic(fmt.Sprint("t >=s:", t, s)) } @@ -149,18 +151,34 @@ func (e *fastGen) matchlen(s, t int32, src []byte) int32 { panic(fmt.Sprint(s, "-", t, "(", s-t, ") > maxMatchLength (", maxMatchOffset, ")")) } } - s1 := int(s) + maxMatchLength - 4 - if s1 > len(src) { - s1 = len(src) + s1 := min(s+maxMatchLength-4, len(src)) + left := s1 - s + n := int32(0) + for left >= 8 { + diff := le.Load64(src, s) ^ le.Load64(src, t) + if diff != 0 { + return n + int32(bits.TrailingZeros64(diff)>>3) + } + s += 8 + t += 8 + n += 8 + left -= 8 } - // Extend the match to be as long as possible. - return int32(matchLen(src[s:s1], src[t:])) + a := src[s:s1] + b := src[t:] + for i := range a { + if a[i] != b[i] { + break + } + n++ + } + return n } // matchlenLong will return the match length between offsets and t in src. // It is assumed that s > t, that t >=0 and s < len(src). -func (e *fastGen) matchlenLong(s, t int32, src []byte) int32 { +func (e *fastGen) matchlenLong(s, t int, src []byte) int32 { if debugDeflate { if t >= s { panic(fmt.Sprint("t >=s:", t, s)) @@ -176,7 +194,28 @@ func (e *fastGen) matchlenLong(s, t int32, src []byte) int32 { } } // Extend the match to be as long as possible. - return int32(matchLen(src[s:], src[t:])) + left := len(src) - s + n := int32(0) + for left >= 8 { + diff := le.Load64(src, s) ^ le.Load64(src, t) + if diff != 0 { + return n + int32(bits.TrailingZeros64(diff)>>3) + } + s += 8 + t += 8 + n += 8 + left -= 8 + } + + a := src[s:] + b := src[t:] + for i := range a { + if a[i] != b[i] { + break + } + n++ + } + return n } // Reset the encoding table. diff --git a/vendor/github.com/klauspost/compress/flate/huffman_bit_writer.go b/vendor/github.com/klauspost/compress/flate/huffman_bit_writer.go index f70594c..afdc8c0 100644 --- a/vendor/github.com/klauspost/compress/flate/huffman_bit_writer.go +++ b/vendor/github.com/klauspost/compress/flate/huffman_bit_writer.go @@ -5,10 +5,11 @@ package flate import ( - "encoding/binary" "fmt" "io" "math" + + "github.com/klauspost/compress/internal/le" ) const ( @@ -438,7 +439,7 @@ func (w *huffmanBitWriter) writeOutBits() { n := w.nbytes // We over-write, but faster... - binary.LittleEndian.PutUint64(w.bytes[n:], bits) + le.Store64(w.bytes[n:], bits) n += 6 if n >= bufferFlushSize { @@ -854,7 +855,7 @@ func (w *huffmanBitWriter) writeTokens(tokens []token, leCodes, oeCodes []hcode) bits |= c.code64() << (nbits & 63) nbits += c.len() if nbits >= 48 { - binary.LittleEndian.PutUint64(w.bytes[nbytes:], bits) + le.Store64(w.bytes[nbytes:], bits) //*(*uint64)(unsafe.Pointer(&w.bytes[nbytes])) = bits bits >>= 48 nbits -= 48 @@ -882,7 +883,7 @@ func (w *huffmanBitWriter) writeTokens(tokens []token, leCodes, oeCodes []hcode) bits |= c.code64() << (nbits & 63) nbits += c.len() if nbits >= 48 { - binary.LittleEndian.PutUint64(w.bytes[nbytes:], bits) + le.Store64(w.bytes[nbytes:], bits) //*(*uint64)(unsafe.Pointer(&w.bytes[nbytes])) = bits bits >>= 48 nbits -= 48 @@ -905,7 +906,7 @@ func (w *huffmanBitWriter) writeTokens(tokens []token, leCodes, oeCodes []hcode) bits |= uint64(extraLength) << (nbits & 63) nbits += extraLengthBits if nbits >= 48 { - binary.LittleEndian.PutUint64(w.bytes[nbytes:], bits) + le.Store64(w.bytes[nbytes:], bits) //*(*uint64)(unsafe.Pointer(&w.bytes[nbytes])) = bits bits >>= 48 nbits -= 48 @@ -931,7 +932,7 @@ func (w *huffmanBitWriter) writeTokens(tokens []token, leCodes, oeCodes []hcode) bits |= c.code64() << (nbits & 63) nbits += c.len() if nbits >= 48 { - binary.LittleEndian.PutUint64(w.bytes[nbytes:], bits) + le.Store64(w.bytes[nbytes:], bits) //*(*uint64)(unsafe.Pointer(&w.bytes[nbytes])) = bits bits >>= 48 nbits -= 48 @@ -953,7 +954,7 @@ func (w *huffmanBitWriter) writeTokens(tokens []token, leCodes, oeCodes []hcode) bits |= uint64((offset-(offsetComb>>8))&matchOffsetOnlyMask) << (nbits & 63) nbits += uint8(offsetComb) if nbits >= 48 { - binary.LittleEndian.PutUint64(w.bytes[nbytes:], bits) + le.Store64(w.bytes[nbytes:], bits) //*(*uint64)(unsafe.Pointer(&w.bytes[nbytes])) = bits bits >>= 48 nbits -= 48 @@ -1107,7 +1108,7 @@ func (w *huffmanBitWriter) writeBlockHuff(eof bool, input []byte, sync bool) { // We must have at least 48 bits free. if nbits >= 8 { n := nbits >> 3 - binary.LittleEndian.PutUint64(w.bytes[nbytes:], bits) + le.Store64(w.bytes[nbytes:], bits) bits >>= (n * 8) & 63 nbits -= n * 8 nbytes += n @@ -1136,7 +1137,7 @@ func (w *huffmanBitWriter) writeBlockHuff(eof bool, input []byte, sync bool) { // Remaining... for _, t := range input { if nbits >= 48 { - binary.LittleEndian.PutUint64(w.bytes[nbytes:], bits) + le.Store64(w.bytes[nbytes:], bits) //*(*uint64)(unsafe.Pointer(&w.bytes[nbytes])) = bits bits >>= 48 nbits -= 48 diff --git a/vendor/github.com/klauspost/compress/flate/level1.go b/vendor/github.com/klauspost/compress/flate/level1.go index 703b9a8..c3581a3 100644 --- a/vendor/github.com/klauspost/compress/flate/level1.go +++ b/vendor/github.com/klauspost/compress/flate/level1.go @@ -1,9 +1,9 @@ package flate import ( - "encoding/binary" "fmt" - "math/bits" + + "github.com/klauspost/compress/internal/le" ) // fastGen maintains the table for matches, @@ -77,6 +77,7 @@ func (e *fastEncL1) Encode(dst *tokens, src []byte) { nextS := s var candidate tableEntry + var t int32 for { nextHash := hashLen(cv, tableBits, hashBytes) candidate = e.table[nextHash] @@ -88,9 +89,8 @@ func (e *fastEncL1) Encode(dst *tokens, src []byte) { now := load6432(src, nextS) e.table[nextHash] = tableEntry{offset: s + e.cur} nextHash = hashLen(now, tableBits, hashBytes) - - offset := s - (candidate.offset - e.cur) - if offset < maxMatchOffset && uint32(cv) == load3232(src, candidate.offset-e.cur) { + t = candidate.offset - e.cur + if s-t < maxMatchOffset && uint32(cv) == load3232(src, t) { e.table[nextHash] = tableEntry{offset: nextS + e.cur} break } @@ -103,8 +103,8 @@ func (e *fastEncL1) Encode(dst *tokens, src []byte) { now >>= 8 e.table[nextHash] = tableEntry{offset: s + e.cur} - offset = s - (candidate.offset - e.cur) - if offset < maxMatchOffset && uint32(cv) == load3232(src, candidate.offset-e.cur) { + t = candidate.offset - e.cur + if s-t < maxMatchOffset && uint32(cv) == load3232(src, t) { e.table[nextHash] = tableEntry{offset: nextS + e.cur} break } @@ -120,36 +120,10 @@ func (e *fastEncL1) Encode(dst *tokens, src []byte) { // literal bytes prior to s. // Extend the 4-byte match as long as possible. - t := candidate.offset - e.cur - var l = int32(4) - if false { - l = e.matchlenLong(s+4, t+4, src) + 4 - } else { - // inlined: - a := src[s+4:] - b := src[t+4:] - for len(a) >= 8 { - if diff := binary.LittleEndian.Uint64(a) ^ binary.LittleEndian.Uint64(b); diff != 0 { - l += int32(bits.TrailingZeros64(diff) >> 3) - break - } - l += 8 - a = a[8:] - b = b[8:] - } - if len(a) < 8 { - b = b[:len(a)] - for i := range a { - if a[i] != b[i] { - break - } - l++ - } - } - } + l := e.matchlenLong(int(s+4), int(t+4), src) + 4 // Extend backwards - for t > 0 && s > nextEmit && src[t-1] == src[s-1] { + for t > 0 && s > nextEmit && le.Load8(src, t-1) == le.Load8(src, s-1) { s-- t-- l++ @@ -221,8 +195,8 @@ func (e *fastEncL1) Encode(dst *tokens, src []byte) { candidate = e.table[currHash] e.table[currHash] = tableEntry{offset: o + 2} - offset := s - (candidate.offset - e.cur) - if offset > maxMatchOffset || uint32(x) != load3232(src, candidate.offset-e.cur) { + t = candidate.offset - e.cur + if s-t > maxMatchOffset || uint32(x) != load3232(src, t) { cv = x >> 8 s++ break diff --git a/vendor/github.com/klauspost/compress/flate/level2.go b/vendor/github.com/klauspost/compress/flate/level2.go index 876dfbe..c8d047f 100644 --- a/vendor/github.com/klauspost/compress/flate/level2.go +++ b/vendor/github.com/klauspost/compress/flate/level2.go @@ -126,7 +126,7 @@ func (e *fastEncL2) Encode(dst *tokens, src []byte) { // Extend the 4-byte match as long as possible. t := candidate.offset - e.cur - l := e.matchlenLong(s+4, t+4, src) + 4 + l := e.matchlenLong(int(s+4), int(t+4), src) + 4 // Extend backwards for t > 0 && s > nextEmit && src[t-1] == src[s-1] { diff --git a/vendor/github.com/klauspost/compress/flate/level3.go b/vendor/github.com/klauspost/compress/flate/level3.go index 7aa2b72..33f9fb1 100644 --- a/vendor/github.com/klauspost/compress/flate/level3.go +++ b/vendor/github.com/klauspost/compress/flate/level3.go @@ -135,7 +135,7 @@ func (e *fastEncL3) Encode(dst *tokens, src []byte) { // Extend the 4-byte match as long as possible. // t := candidate.offset - e.cur - l := e.matchlenLong(s+4, t+4, src) + 4 + l := e.matchlenLong(int(s+4), int(t+4), src) + 4 // Extend backwards for t > 0 && s > nextEmit && src[t-1] == src[s-1] { diff --git a/vendor/github.com/klauspost/compress/flate/level4.go b/vendor/github.com/klauspost/compress/flate/level4.go index 23c08b3..88509e1 100644 --- a/vendor/github.com/klauspost/compress/flate/level4.go +++ b/vendor/github.com/klauspost/compress/flate/level4.go @@ -98,19 +98,19 @@ func (e *fastEncL4) Encode(dst *tokens, src []byte) { e.bTable[nextHashL] = entry t = lCandidate.offset - e.cur - if s-t < maxMatchOffset && uint32(cv) == load3232(src, lCandidate.offset-e.cur) { + if s-t < maxMatchOffset && uint32(cv) == load3232(src, t) { // We got a long match. Use that. break } t = sCandidate.offset - e.cur - if s-t < maxMatchOffset && uint32(cv) == load3232(src, sCandidate.offset-e.cur) { + if s-t < maxMatchOffset && uint32(cv) == load3232(src, t) { // Found a 4 match... lCandidate = e.bTable[hash7(next, tableBits)] // If the next long is a candidate, check if we should use that instead... - lOff := nextS - (lCandidate.offset - e.cur) - if lOff < maxMatchOffset && load3232(src, lCandidate.offset-e.cur) == uint32(next) { + lOff := lCandidate.offset - e.cur + if nextS-lOff < maxMatchOffset && load3232(src, lOff) == uint32(next) { l1, l2 := matchLen(src[s+4:], src[t+4:]), matchLen(src[nextS+4:], src[nextS-lOff+4:]) if l2 > l1 { s = nextS @@ -127,7 +127,7 @@ func (e *fastEncL4) Encode(dst *tokens, src []byte) { // them as literal bytes. // Extend the 4-byte match as long as possible. - l := e.matchlenLong(s+4, t+4, src) + 4 + l := e.matchlenLong(int(s+4), int(t+4), src) + 4 // Extend backwards for t > 0 && s > nextEmit && src[t-1] == src[s-1] { diff --git a/vendor/github.com/klauspost/compress/flate/level5.go b/vendor/github.com/klauspost/compress/flate/level5.go index 1f61ec1..6e5c215 100644 --- a/vendor/github.com/klauspost/compress/flate/level5.go +++ b/vendor/github.com/klauspost/compress/flate/level5.go @@ -111,16 +111,16 @@ func (e *fastEncL5) Encode(dst *tokens, src []byte) { t = lCandidate.Cur.offset - e.cur if s-t < maxMatchOffset { - if uint32(cv) == load3232(src, lCandidate.Cur.offset-e.cur) { + if uint32(cv) == load3232(src, t) { // Store the next match e.table[nextHashS] = tableEntry{offset: nextS + e.cur} eLong := &e.bTable[nextHashL] eLong.Cur, eLong.Prev = tableEntry{offset: nextS + e.cur}, eLong.Cur t2 := lCandidate.Prev.offset - e.cur - if s-t2 < maxMatchOffset && uint32(cv) == load3232(src, lCandidate.Prev.offset-e.cur) { - l = e.matchlen(s+4, t+4, src) + 4 - ml1 := e.matchlen(s+4, t2+4, src) + 4 + if s-t2 < maxMatchOffset && uint32(cv) == load3232(src, t2) { + l = e.matchlen(int(s+4), int(t+4), src) + 4 + ml1 := e.matchlen(int(s+4), int(t2+4), src) + 4 if ml1 > l { t = t2 l = ml1 @@ -130,7 +130,7 @@ func (e *fastEncL5) Encode(dst *tokens, src []byte) { break } t = lCandidate.Prev.offset - e.cur - if s-t < maxMatchOffset && uint32(cv) == load3232(src, lCandidate.Prev.offset-e.cur) { + if s-t < maxMatchOffset && uint32(cv) == load3232(src, t) { // Store the next match e.table[nextHashS] = tableEntry{offset: nextS + e.cur} eLong := &e.bTable[nextHashL] @@ -140,9 +140,9 @@ func (e *fastEncL5) Encode(dst *tokens, src []byte) { } t = sCandidate.offset - e.cur - if s-t < maxMatchOffset && uint32(cv) == load3232(src, sCandidate.offset-e.cur) { + if s-t < maxMatchOffset && uint32(cv) == load3232(src, t) { // Found a 4 match... - l = e.matchlen(s+4, t+4, src) + 4 + l = e.matchlen(int(s+4), int(t+4), src) + 4 lCandidate = e.bTable[nextHashL] // Store the next match @@ -153,8 +153,8 @@ func (e *fastEncL5) Encode(dst *tokens, src []byte) { // If the next long is a candidate, use that... t2 := lCandidate.Cur.offset - e.cur if nextS-t2 < maxMatchOffset { - if load3232(src, lCandidate.Cur.offset-e.cur) == uint32(next) { - ml := e.matchlen(nextS+4, t2+4, src) + 4 + if load3232(src, t2) == uint32(next) { + ml := e.matchlen(int(nextS+4), int(t2+4), src) + 4 if ml > l { t = t2 s = nextS @@ -164,8 +164,8 @@ func (e *fastEncL5) Encode(dst *tokens, src []byte) { } // If the previous long is a candidate, use that... t2 = lCandidate.Prev.offset - e.cur - if nextS-t2 < maxMatchOffset && load3232(src, lCandidate.Prev.offset-e.cur) == uint32(next) { - ml := e.matchlen(nextS+4, t2+4, src) + 4 + if nextS-t2 < maxMatchOffset && load3232(src, t2) == uint32(next) { + ml := e.matchlen(int(nextS+4), int(t2+4), src) + 4 if ml > l { t = t2 s = nextS @@ -185,9 +185,9 @@ func (e *fastEncL5) Encode(dst *tokens, src []byte) { if l == 0 { // Extend the 4-byte match as long as possible. - l = e.matchlenLong(s+4, t+4, src) + 4 + l = e.matchlenLong(int(s+4), int(t+4), src) + 4 } else if l == maxMatchLength { - l += e.matchlenLong(s+l, t+l, src) + l += e.matchlenLong(int(s+l), int(t+l), src) } // Try to locate a better match by checking the end of best match... @@ -203,7 +203,7 @@ func (e *fastEncL5) Encode(dst *tokens, src []byte) { s2 := s + skipBeginning off := s2 - t2 if t2 >= 0 && off < maxMatchOffset && off > 0 { - if l2 := e.matchlenLong(s2, t2, src); l2 > l { + if l2 := e.matchlenLong(int(s2), int(t2), src); l2 > l { t = t2 l = l2 s = s2 @@ -423,14 +423,14 @@ func (e *fastEncL5Window) Encode(dst *tokens, src []byte) { t = lCandidate.Cur.offset - e.cur if s-t < maxMatchOffset { - if uint32(cv) == load3232(src, lCandidate.Cur.offset-e.cur) { + if uint32(cv) == load3232(src, t) { // Store the next match e.table[nextHashS] = tableEntry{offset: nextS + e.cur} eLong := &e.bTable[nextHashL] eLong.Cur, eLong.Prev = tableEntry{offset: nextS + e.cur}, eLong.Cur t2 := lCandidate.Prev.offset - e.cur - if s-t2 < maxMatchOffset && uint32(cv) == load3232(src, lCandidate.Prev.offset-e.cur) { + if s-t2 < maxMatchOffset && uint32(cv) == load3232(src, t2) { l = e.matchlen(s+4, t+4, src) + 4 ml1 := e.matchlen(s+4, t2+4, src) + 4 if ml1 > l { @@ -442,7 +442,7 @@ func (e *fastEncL5Window) Encode(dst *tokens, src []byte) { break } t = lCandidate.Prev.offset - e.cur - if s-t < maxMatchOffset && uint32(cv) == load3232(src, lCandidate.Prev.offset-e.cur) { + if s-t < maxMatchOffset && uint32(cv) == load3232(src, t) { // Store the next match e.table[nextHashS] = tableEntry{offset: nextS + e.cur} eLong := &e.bTable[nextHashL] @@ -452,7 +452,7 @@ func (e *fastEncL5Window) Encode(dst *tokens, src []byte) { } t = sCandidate.offset - e.cur - if s-t < maxMatchOffset && uint32(cv) == load3232(src, sCandidate.offset-e.cur) { + if s-t < maxMatchOffset && uint32(cv) == load3232(src, t) { // Found a 4 match... l = e.matchlen(s+4, t+4, src) + 4 lCandidate = e.bTable[nextHashL] @@ -465,7 +465,7 @@ func (e *fastEncL5Window) Encode(dst *tokens, src []byte) { // If the next long is a candidate, use that... t2 := lCandidate.Cur.offset - e.cur if nextS-t2 < maxMatchOffset { - if load3232(src, lCandidate.Cur.offset-e.cur) == uint32(next) { + if load3232(src, t2) == uint32(next) { ml := e.matchlen(nextS+4, t2+4, src) + 4 if ml > l { t = t2 @@ -476,7 +476,7 @@ func (e *fastEncL5Window) Encode(dst *tokens, src []byte) { } // If the previous long is a candidate, use that... t2 = lCandidate.Prev.offset - e.cur - if nextS-t2 < maxMatchOffset && load3232(src, lCandidate.Prev.offset-e.cur) == uint32(next) { + if nextS-t2 < maxMatchOffset && load3232(src, t2) == uint32(next) { ml := e.matchlen(nextS+4, t2+4, src) + 4 if ml > l { t = t2 diff --git a/vendor/github.com/klauspost/compress/flate/level6.go b/vendor/github.com/klauspost/compress/flate/level6.go index f1e9d98..96f5bb4 100644 --- a/vendor/github.com/klauspost/compress/flate/level6.go +++ b/vendor/github.com/klauspost/compress/flate/level6.go @@ -113,7 +113,7 @@ func (e *fastEncL6) Encode(dst *tokens, src []byte) { t = lCandidate.Cur.offset - e.cur if s-t < maxMatchOffset { - if uint32(cv) == load3232(src, lCandidate.Cur.offset-e.cur) { + if uint32(cv) == load3232(src, t) { // Long candidate matches at least 4 bytes. // Store the next match @@ -123,9 +123,9 @@ func (e *fastEncL6) Encode(dst *tokens, src []byte) { // Check the previous long candidate as well. t2 := lCandidate.Prev.offset - e.cur - if s-t2 < maxMatchOffset && uint32(cv) == load3232(src, lCandidate.Prev.offset-e.cur) { - l = e.matchlen(s+4, t+4, src) + 4 - ml1 := e.matchlen(s+4, t2+4, src) + 4 + if s-t2 < maxMatchOffset && uint32(cv) == load3232(src, t2) { + l = e.matchlen(int(s+4), int(t+4), src) + 4 + ml1 := e.matchlen(int(s+4), int(t2+4), src) + 4 if ml1 > l { t = t2 l = ml1 @@ -136,7 +136,7 @@ func (e *fastEncL6) Encode(dst *tokens, src []byte) { } // Current value did not match, but check if previous long value does. t = lCandidate.Prev.offset - e.cur - if s-t < maxMatchOffset && uint32(cv) == load3232(src, lCandidate.Prev.offset-e.cur) { + if s-t < maxMatchOffset && uint32(cv) == load3232(src, t) { // Store the next match e.table[nextHashS] = tableEntry{offset: nextS + e.cur} eLong := &e.bTable[nextHashL] @@ -146,9 +146,9 @@ func (e *fastEncL6) Encode(dst *tokens, src []byte) { } t = sCandidate.offset - e.cur - if s-t < maxMatchOffset && uint32(cv) == load3232(src, sCandidate.offset-e.cur) { + if s-t < maxMatchOffset && uint32(cv) == load3232(src, t) { // Found a 4 match... - l = e.matchlen(s+4, t+4, src) + 4 + l = e.matchlen(int(s+4), int(t+4), src) + 4 // Look up next long candidate (at nextS) lCandidate = e.bTable[nextHashL] @@ -162,7 +162,7 @@ func (e *fastEncL6) Encode(dst *tokens, src []byte) { const repOff = 1 t2 := s - repeat + repOff if load3232(src, t2) == uint32(cv>>(8*repOff)) { - ml := e.matchlen(s+4+repOff, t2+4, src) + 4 + ml := e.matchlen(int(s+4+repOff), int(t2+4), src) + 4 if ml > l { t = t2 l = ml @@ -175,8 +175,8 @@ func (e *fastEncL6) Encode(dst *tokens, src []byte) { // If the next long is a candidate, use that... t2 = lCandidate.Cur.offset - e.cur if nextS-t2 < maxMatchOffset { - if load3232(src, lCandidate.Cur.offset-e.cur) == uint32(next) { - ml := e.matchlen(nextS+4, t2+4, src) + 4 + if load3232(src, t2) == uint32(next) { + ml := e.matchlen(int(nextS+4), int(t2+4), src) + 4 if ml > l { t = t2 s = nextS @@ -186,8 +186,8 @@ func (e *fastEncL6) Encode(dst *tokens, src []byte) { } // If the previous long is a candidate, use that... t2 = lCandidate.Prev.offset - e.cur - if nextS-t2 < maxMatchOffset && load3232(src, lCandidate.Prev.offset-e.cur) == uint32(next) { - ml := e.matchlen(nextS+4, t2+4, src) + 4 + if nextS-t2 < maxMatchOffset && load3232(src, t2) == uint32(next) { + ml := e.matchlen(int(nextS+4), int(t2+4), src) + 4 if ml > l { t = t2 s = nextS @@ -207,9 +207,9 @@ func (e *fastEncL6) Encode(dst *tokens, src []byte) { // Extend the 4-byte match as long as possible. if l == 0 { - l = e.matchlenLong(s+4, t+4, src) + 4 + l = e.matchlenLong(int(s+4), int(t+4), src) + 4 } else if l == maxMatchLength { - l += e.matchlenLong(s+l, t+l, src) + l += e.matchlenLong(int(s+l), int(t+l), src) } // Try to locate a better match by checking the end-of-match... @@ -227,7 +227,7 @@ func (e *fastEncL6) Encode(dst *tokens, src []byte) { off := s2 - t2 if off < maxMatchOffset { if off > 0 && t2 >= 0 { - if l2 := e.matchlenLong(s2, t2, src); l2 > l { + if l2 := e.matchlenLong(int(s2), int(t2), src); l2 > l { t = t2 l = l2 s = s2 @@ -237,7 +237,7 @@ func (e *fastEncL6) Encode(dst *tokens, src []byte) { t2 = eLong.Prev.offset - e.cur - l + skipBeginning off := s2 - t2 if off > 0 && off < maxMatchOffset && t2 >= 0 { - if l2 := e.matchlenLong(s2, t2, src); l2 > l { + if l2 := e.matchlenLong(int(s2), int(t2), src); l2 > l { t = t2 l = l2 s = s2 diff --git a/vendor/github.com/klauspost/compress/flate/matchlen_amd64.go b/vendor/github.com/klauspost/compress/flate/matchlen_amd64.go deleted file mode 100644 index 4bd3885..0000000 --- a/vendor/github.com/klauspost/compress/flate/matchlen_amd64.go +++ /dev/null @@ -1,16 +0,0 @@ -//go:build amd64 && !appengine && !noasm && gc -// +build amd64,!appengine,!noasm,gc - -// Copyright 2019+ Klaus Post. All rights reserved. -// License information can be found in the LICENSE file. - -package flate - -// matchLen returns how many bytes match in a and b -// -// It assumes that: -// -// len(a) <= len(b) and len(a) > 0 -// -//go:noescape -func matchLen(a []byte, b []byte) int diff --git a/vendor/github.com/klauspost/compress/flate/matchlen_amd64.s b/vendor/github.com/klauspost/compress/flate/matchlen_amd64.s deleted file mode 100644 index 0782b86..0000000 --- a/vendor/github.com/klauspost/compress/flate/matchlen_amd64.s +++ /dev/null @@ -1,66 +0,0 @@ -// Copied from S2 implementation. - -//go:build !appengine && !noasm && gc && !noasm - -#include "textflag.h" - -// func matchLen(a []byte, b []byte) int -TEXT ·matchLen(SB), NOSPLIT, $0-56 - MOVQ a_base+0(FP), AX - MOVQ b_base+24(FP), CX - MOVQ a_len+8(FP), DX - - // matchLen - XORL SI, SI - CMPL DX, $0x08 - JB matchlen_match4_standalone - -matchlen_loopback_standalone: - MOVQ (AX)(SI*1), BX - XORQ (CX)(SI*1), BX - JZ matchlen_loop_standalone - -#ifdef GOAMD64_v3 - TZCNTQ BX, BX -#else - BSFQ BX, BX -#endif - SHRL $0x03, BX - LEAL (SI)(BX*1), SI - JMP gen_match_len_end - -matchlen_loop_standalone: - LEAL -8(DX), DX - LEAL 8(SI), SI - CMPL DX, $0x08 - JAE matchlen_loopback_standalone - -matchlen_match4_standalone: - CMPL DX, $0x04 - JB matchlen_match2_standalone - MOVL (AX)(SI*1), BX - CMPL (CX)(SI*1), BX - JNE matchlen_match2_standalone - LEAL -4(DX), DX - LEAL 4(SI), SI - -matchlen_match2_standalone: - CMPL DX, $0x02 - JB matchlen_match1_standalone - MOVW (AX)(SI*1), BX - CMPW (CX)(SI*1), BX - JNE matchlen_match1_standalone - LEAL -2(DX), DX - LEAL 2(SI), SI - -matchlen_match1_standalone: - CMPL DX, $0x01 - JB gen_match_len_end - MOVB (AX)(SI*1), BL - CMPB (CX)(SI*1), BL - JNE gen_match_len_end - INCL SI - -gen_match_len_end: - MOVQ SI, ret+48(FP) - RET diff --git a/vendor/github.com/klauspost/compress/flate/matchlen_generic.go b/vendor/github.com/klauspost/compress/flate/matchlen_generic.go index ad5cd81..6149384 100644 --- a/vendor/github.com/klauspost/compress/flate/matchlen_generic.go +++ b/vendor/github.com/klauspost/compress/flate/matchlen_generic.go @@ -1,27 +1,29 @@ -//go:build !amd64 || appengine || !gc || noasm -// +build !amd64 appengine !gc noasm - // Copyright 2019+ Klaus Post. All rights reserved. // License information can be found in the LICENSE file. package flate import ( - "encoding/binary" "math/bits" + + "github.com/klauspost/compress/internal/le" ) // matchLen returns the maximum common prefix length of a and b. // a must be the shortest of the two. func matchLen(a, b []byte) (n int) { - for ; len(a) >= 8 && len(b) >= 8; a, b = a[8:], b[8:] { - diff := binary.LittleEndian.Uint64(a) ^ binary.LittleEndian.Uint64(b) + left := len(a) + for left >= 8 { + diff := le.Load64(a, n) ^ le.Load64(b, n) if diff != 0 { return n + bits.TrailingZeros64(diff)>>3 } n += 8 + left -= 8 } + a = a[n:] + b = b[n:] for i := range a { if a[i] != b[i] { break @@ -29,5 +31,4 @@ func matchLen(a, b []byte) (n int) { n++ } return n - } diff --git a/vendor/github.com/klauspost/compress/flate/stateless.go b/vendor/github.com/klauspost/compress/flate/stateless.go index f3d4139..13b9b10 100644 --- a/vendor/github.com/klauspost/compress/flate/stateless.go +++ b/vendor/github.com/klauspost/compress/flate/stateless.go @@ -4,6 +4,8 @@ import ( "io" "math" "sync" + + "github.com/klauspost/compress/internal/le" ) const ( @@ -152,18 +154,11 @@ func hashSL(u uint32) uint32 { } func load3216(b []byte, i int16) uint32 { - // Help the compiler eliminate bounds checks on the read so it can be done in a single read. - b = b[i:] - b = b[:4] - return uint32(b[0]) | uint32(b[1])<<8 | uint32(b[2])<<16 | uint32(b[3])<<24 + return le.Load32(b, i) } func load6416(b []byte, i int16) uint64 { - // Help the compiler eliminate bounds checks on the read so it can be done in a single read. - b = b[i:] - b = b[:8] - return uint64(b[0]) | uint64(b[1])<<8 | uint64(b[2])<<16 | uint64(b[3])<<24 | - uint64(b[4])<<32 | uint64(b[5])<<40 | uint64(b[6])<<48 | uint64(b[7])<<56 + return le.Load64(b, i) } func statelessEnc(dst *tokens, src []byte, startAt int16) { diff --git a/vendor/github.com/klauspost/compress/internal/le/le.go b/vendor/github.com/klauspost/compress/internal/le/le.go new file mode 100644 index 0000000..e54909e --- /dev/null +++ b/vendor/github.com/klauspost/compress/internal/le/le.go @@ -0,0 +1,5 @@ +package le + +type Indexer interface { + int | int8 | int16 | int32 | int64 | uint | uint8 | uint16 | uint32 | uint64 +} diff --git a/vendor/github.com/klauspost/compress/internal/le/unsafe_disabled.go b/vendor/github.com/klauspost/compress/internal/le/unsafe_disabled.go new file mode 100644 index 0000000..0cfb5c0 --- /dev/null +++ b/vendor/github.com/klauspost/compress/internal/le/unsafe_disabled.go @@ -0,0 +1,42 @@ +//go:build !(amd64 || arm64 || ppc64le || riscv64) || nounsafe || purego || appengine + +package le + +import ( + "encoding/binary" +) + +// Load8 will load from b at index i. +func Load8[I Indexer](b []byte, i I) byte { + return b[i] +} + +// Load16 will load from b at index i. +func Load16[I Indexer](b []byte, i I) uint16 { + return binary.LittleEndian.Uint16(b[i:]) +} + +// Load32 will load from b at index i. +func Load32[I Indexer](b []byte, i I) uint32 { + return binary.LittleEndian.Uint32(b[i:]) +} + +// Load64 will load from b at index i. +func Load64[I Indexer](b []byte, i I) uint64 { + return binary.LittleEndian.Uint64(b[i:]) +} + +// Store16 will store v at b. +func Store16(b []byte, v uint16) { + binary.LittleEndian.PutUint16(b, v) +} + +// Store32 will store v at b. +func Store32(b []byte, v uint32) { + binary.LittleEndian.PutUint32(b, v) +} + +// Store64 will store v at b. +func Store64(b []byte, v uint64) { + binary.LittleEndian.PutUint64(b, v) +} diff --git a/vendor/github.com/klauspost/compress/internal/le/unsafe_enabled.go b/vendor/github.com/klauspost/compress/internal/le/unsafe_enabled.go new file mode 100644 index 0000000..ada45cd --- /dev/null +++ b/vendor/github.com/klauspost/compress/internal/le/unsafe_enabled.go @@ -0,0 +1,55 @@ +// We enable 64 bit LE platforms: + +//go:build (amd64 || arm64 || ppc64le || riscv64) && !nounsafe && !purego && !appengine + +package le + +import ( + "unsafe" +) + +// Load8 will load from b at index i. +func Load8[I Indexer](b []byte, i I) byte { + //return binary.LittleEndian.Uint16(b[i:]) + //return *(*uint16)(unsafe.Pointer(&b[i])) + return *(*byte)(unsafe.Add(unsafe.Pointer(unsafe.SliceData(b)), i)) +} + +// Load16 will load from b at index i. +func Load16[I Indexer](b []byte, i I) uint16 { + //return binary.LittleEndian.Uint16(b[i:]) + //return *(*uint16)(unsafe.Pointer(&b[i])) + return *(*uint16)(unsafe.Add(unsafe.Pointer(unsafe.SliceData(b)), i)) +} + +// Load32 will load from b at index i. +func Load32[I Indexer](b []byte, i I) uint32 { + //return binary.LittleEndian.Uint32(b[i:]) + //return *(*uint32)(unsafe.Pointer(&b[i])) + return *(*uint32)(unsafe.Add(unsafe.Pointer(unsafe.SliceData(b)), i)) +} + +// Load64 will load from b at index i. +func Load64[I Indexer](b []byte, i I) uint64 { + //return binary.LittleEndian.Uint64(b[i:]) + //return *(*uint64)(unsafe.Pointer(&b[i])) + return *(*uint64)(unsafe.Add(unsafe.Pointer(unsafe.SliceData(b)), i)) +} + +// Store16 will store v at b. +func Store16(b []byte, v uint16) { + //binary.LittleEndian.PutUint16(b, v) + *(*uint16)(unsafe.Pointer(unsafe.SliceData(b))) = v +} + +// Store32 will store v at b. +func Store32(b []byte, v uint32) { + //binary.LittleEndian.PutUint32(b, v) + *(*uint32)(unsafe.Pointer(unsafe.SliceData(b))) = v +} + +// Store64 will store v at b. +func Store64(b []byte, v uint64) { + //binary.LittleEndian.PutUint64(b, v) + *(*uint64)(unsafe.Pointer(unsafe.SliceData(b))) = v +} diff --git a/vendor/github.com/nats-io/nats.go/.travis.yml b/vendor/github.com/nats-io/nats.go/.travis.yml deleted file mode 100644 index 9a6b4a8..0000000 --- a/vendor/github.com/nats-io/nats.go/.travis.yml +++ /dev/null @@ -1,36 +0,0 @@ -language: go -go: -- "1.22.x" -- "1.21.x" -go_import_path: github.com/nats-io/nats.go -install: -- go get -t ./... -- curl -sSfL https://raw.githubusercontent.com/golangci/golangci-lint/master/install.sh | sh -s -- -b $(go env GOPATH)/bin -- if [[ "$TRAVIS_GO_VERSION" =~ 1.22 ]]; then - go install github.com/mattn/goveralls@latest; - go install github.com/wadey/gocovmerge@latest; - go install honnef.co/go/tools/cmd/staticcheck@latest; - go install github.com/client9/misspell/cmd/misspell@latest; - fi -before_script: -- $(exit $(go fmt ./... | wc -l)) -- go vet -modfile=go_test.mod ./... -- if [[ "$TRAVIS_GO_VERSION" =~ 1.22 ]]; then - find . -type f -name "*.go" | xargs misspell -error -locale US; - GOFLAGS="-mod=mod -modfile=go_test.mod" staticcheck ./...; - fi -- golangci-lint run ./jetstream/... -script: -- go test -modfile=go_test.mod -v -run=TestNoRace -p=1 ./... --failfast -vet=off -- if [[ "$TRAVIS_GO_VERSION" =~ 1.22 ]]; then ./scripts/cov.sh TRAVIS; else go test -modfile=go_test.mod -race -v -p=1 ./... --failfast -vet=off -tags=internal_testing; fi -after_success: -- if [[ "$TRAVIS_GO_VERSION" =~ 1.22 ]]; then $HOME/gopath/bin/goveralls -coverprofile=acc.out -service travis-ci; fi - -jobs: - include: - - name: "Go: 1.22.x (nats-server@main)" - go: "1.22.x" - before_script: - - go get -modfile go_test.mod github.com/nats-io/nats-server/v2@main - allow_failures: - - name: "Go: 1.22.x (nats-server@main)" diff --git a/vendor/github.com/nats-io/nats.go/README.md b/vendor/github.com/nats-io/nats.go/README.md index 0d8ccca..6961b58 100644 --- a/vendor/github.com/nats-io/nats.go/README.md +++ b/vendor/github.com/nats-io/nats.go/README.md @@ -7,8 +7,8 @@ A [Go](http://golang.org) client for the [NATS messaging system](https://nats.io [License-Image]: https://img.shields.io/badge/License-Apache2-blue.svg [ReportCard-Url]: https://goreportcard.com/report/github.com/nats-io/nats.go [ReportCard-Image]: https://goreportcard.com/badge/github.com/nats-io/nats.go -[Build-Status-Url]: https://travis-ci.com/github/nats-io/nats.go -[Build-Status-Image]: https://travis-ci.com/nats-io/nats.go.svg?branch=main +[Build-Status-Url]: https://github.com/nats-io/nats.go/actions +[Build-Status-Image]: https://github.com/nats-io/nats.go/actions/workflows/ci.yaml/badge.svg?branch=main [GoDoc-Url]: https://pkg.go.dev/github.com/nats-io/nats.go [GoDoc-Image]: https://img.shields.io/badge/GoDoc-reference-007d9c [Coverage-Url]: https://coveralls.io/r/nats-io/nats.go?branch=main @@ -19,25 +19,14 @@ A [Go](http://golang.org) client for the [NATS messaging system](https://nats.io ## Installation ```bash -# Go client -go get github.com/nats-io/nats.go/ +# To get the latest released Go client: +go get github.com/nats-io/nats.go@latest -# Server -go get github.com/nats-io/nats-server -``` - -When using or transitioning to Go modules support: - -```bash -# Go client latest or explicit version -go get github.com/nats-io/nats.go/@latest -go get github.com/nats-io/nats.go/@v1.37.0 - -# For latest NATS Server, add /v2 at the end -go get github.com/nats-io/nats-server/v2 +# To get a specific version: +go get github.com/nats-io/nats.go@v1.39.1 -# NATS Server v1 is installed otherwise -# go get github.com/nats-io/nats-server +# Note that the latest major version for NATS Server is v2: +go get github.com/nats-io/nats-server/v2@latest ``` ## Basic Usage diff --git a/vendor/github.com/nats-io/nats.go/context.go b/vendor/github.com/nats-io/nats.go/context.go index c19673c..382335e 100644 --- a/vendor/github.com/nats-io/nats.go/context.go +++ b/vendor/github.com/nats-io/nats.go/context.go @@ -88,7 +88,7 @@ func (nc *Conn) oldRequestWithContext(ctx context.Context, subj string, hdr, dat inbox := nc.NewInbox() ch := make(chan *Msg, RequestChanLen) - s, err := nc.subscribe(inbox, _EMPTY_, nil, ch, true, nil) + s, err := nc.subscribe(inbox, _EMPTY_, nil, ch, nil, true, nil) if err != nil { return nil, err } diff --git a/vendor/github.com/nats-io/nats.go/dependencies.tpl b/vendor/github.com/nats-io/nats.go/dependencies.tpl new file mode 100644 index 0000000..41cd374 --- /dev/null +++ b/vendor/github.com/nats-io/nats.go/dependencies.tpl @@ -0,0 +1,9 @@ +# External Dependencies + +This file lists the dependencies used in this repository. + +{{/* compress has actually a BSD 3-Clause license, but the License file in the repo confuses go-license tooling, hence the manual exception */}} +| Dependency | License | +|--------------------------------------------------|-----------------------------------------| +{{ range . }}| {{ .Name }} | {{ if eq .Name "github.com/klauspost/compress/flate" }}BSD 3-Clause{{ else }}{{ .LicenseName }}{{ end }} | +{{ end }} diff --git a/vendor/github.com/nats-io/nats.go/enc.go b/vendor/github.com/nats-io/nats.go/enc.go index 78bcc21..34a3fae 100644 --- a/vendor/github.com/nats-io/nats.go/enc.go +++ b/vendor/github.com/nats-io/nats.go/enc.go @@ -258,7 +258,7 @@ func (c *EncodedConn) subscribe(subject, queue string, cb Handler) (*Subscriptio cbValue.Call(oV) } - return c.Conn.subscribe(subject, queue, natsCB, nil, false, nil) + return c.Conn.subscribe(subject, queue, natsCB, nil, nil, false, nil) } // FlushTimeout allows a Flush operation to have an associated timeout. diff --git a/vendor/github.com/nats-io/nats.go/go_test.mod b/vendor/github.com/nats-io/nats.go/go_test.mod index 20e1ab7..32c1003 100644 --- a/vendor/github.com/nats-io/nats.go/go_test.mod +++ b/vendor/github.com/nats-io/nats.go/go_test.mod @@ -1,25 +1,23 @@ module github.com/nats-io/nats.go -go 1.21 - -toolchain go1.22.5 +go 1.22.0 require ( github.com/golang/protobuf v1.4.2 - github.com/klauspost/compress v1.17.9 + github.com/klauspost/compress v1.17.11 github.com/nats-io/jwt v1.2.2 - github.com/nats-io/nats-server/v2 v2.10.17 - github.com/nats-io/nkeys v0.4.7 + github.com/nats-io/nats-server/v2 v2.10.24 + github.com/nats-io/nkeys v0.4.9 github.com/nats-io/nuid v1.0.1 go.uber.org/goleak v1.3.0 - golang.org/x/text v0.16.0 + golang.org/x/text v0.21.0 google.golang.org/protobuf v1.23.0 ) require ( - github.com/minio/highwayhash v1.0.2 // indirect - github.com/nats-io/jwt/v2 v2.5.7 // indirect - golang.org/x/crypto v0.24.0 // indirect - golang.org/x/sys v0.21.0 // indirect - golang.org/x/time v0.5.0 // indirect + github.com/minio/highwayhash v1.0.3 // indirect + github.com/nats-io/jwt/v2 v2.7.3 // indirect + golang.org/x/crypto v0.31.0 // indirect + golang.org/x/sys v0.28.0 // indirect + golang.org/x/time v0.8.0 // indirect ) diff --git a/vendor/github.com/nats-io/nats.go/go_test.sum b/vendor/github.com/nats-io/nats.go/go_test.sum index df0ef6d..f6223f2 100644 --- a/vendor/github.com/nats-io/nats.go/go_test.sum +++ b/vendor/github.com/nats-io/nats.go/go_test.sum @@ -11,19 +11,19 @@ github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMyw github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= github.com/google/go-cmp v0.4.0 h1:xsAVV57WRhGj6kEIi8ReJzQlHHqcBYCElAvkovg3B/4= github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= -github.com/klauspost/compress v1.17.9 h1:6KIumPrER1LHsvBVuDa0r5xaG0Es51mhhB9BQB2qeMA= -github.com/klauspost/compress v1.17.9/go.mod h1:Di0epgTjJY877eYKx5yC51cX2A2Vl2ibi7bDH9ttBbw= -github.com/minio/highwayhash v1.0.2 h1:Aak5U0nElisjDCfPSG79Tgzkn2gl66NxOMspRrKnA/g= -github.com/minio/highwayhash v1.0.2/go.mod h1:BQskDq+xkJ12lmlUUi7U0M5Swg3EWR+dLTk+kldvVxY= +github.com/klauspost/compress v1.17.11 h1:In6xLpyWOi1+C7tXUUWv2ot1QvBjxevKAaI6IXrJmUc= +github.com/klauspost/compress v1.17.11/go.mod h1:pMDklpSncoRMuLFrf1W9Ss9KT+0rH90U12bZKk7uwG0= +github.com/minio/highwayhash v1.0.3 h1:kbnuUMoHYyVl7szWjSxJnxw11k2U709jqFPPmIUyD6Q= +github.com/minio/highwayhash v1.0.3/go.mod h1:GGYsuwP/fPD6Y9hMiXuapVvlIUEhFhMTh0rxU3ik1LQ= github.com/nats-io/jwt v1.2.2 h1:w3GMTO969dFg+UOKTmmyuu7IGdusK+7Ytlt//OYH/uU= github.com/nats-io/jwt v1.2.2/go.mod h1:/xX356yQA6LuXI9xWW7mZNpxgF2mBmGecH+Fj34sP5Q= -github.com/nats-io/jwt/v2 v2.5.7 h1:j5lH1fUXCnJnY8SsQeB/a/z9Azgu2bYIDvtPVNdxe2c= -github.com/nats-io/jwt/v2 v2.5.7/go.mod h1:ZdWS1nZa6WMZfFwwgpEaqBV8EPGVgOTDHN/wTbz0Y5A= -github.com/nats-io/nats-server/v2 v2.10.17 h1:PTVObNBD3TZSNUDgzFb1qQsQX4mOgFmOuG9vhT+KBUY= -github.com/nats-io/nats-server/v2 v2.10.17/go.mod h1:5OUyc4zg42s/p2i92zbbqXvUNsbF0ivdTLKshVMn2YQ= +github.com/nats-io/jwt/v2 v2.7.3 h1:6bNPK+FXgBeAqdj4cYQ0F8ViHRbi7woQLq4W29nUAzE= +github.com/nats-io/jwt/v2 v2.7.3/go.mod h1:GvkcbHhKquj3pkioy5put1wvPxs78UlZ7D/pY+BgZk4= +github.com/nats-io/nats-server/v2 v2.10.24 h1:KcqqQAD0ZZcG4yLxtvSFJY7CYKVYlnlWoAiVZ6i/IY4= +github.com/nats-io/nats-server/v2 v2.10.24/go.mod h1:olvKt8E5ZlnjyqBGbAXtxvSQKsPodISK5Eo/euIta4s= github.com/nats-io/nkeys v0.2.0/go.mod h1:XdZpAbhgyyODYqjTawOnIOI7VlbKSarI9Gfy1tqEu/s= -github.com/nats-io/nkeys v0.4.7 h1:RwNJbbIdYCoClSDNY7QVKZlyb/wfT6ugvFCiKy6vDvI= -github.com/nats-io/nkeys v0.4.7/go.mod h1:kqXRgRDPlGy7nGaEDMuYzmiJCIAAWDK0IMBtDmGD0nc= +github.com/nats-io/nkeys v0.4.9 h1:qe9Faq2Gxwi6RZnZMXfmGMZkg3afLLOtrU+gDZJ35b0= +github.com/nats-io/nkeys v0.4.9/go.mod h1:jcMqs+FLG+W5YO36OX6wFIFcmpdAns+w1Wm6D3I/evE= github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw= github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= @@ -34,19 +34,19 @@ go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20200323165209-0ec3e9974c59/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= -golang.org/x/crypto v0.24.0 h1:mnl8DM0o513X8fdIkmyFE/5hTYxbwYOjDS/+rK6qpRI= -golang.org/x/crypto v0.24.0/go.mod h1:Z1PMYSOR5nyMcyAVAIQSKCDwalqy85Aqn1x3Ws4L5DM= +golang.org/x/crypto v0.31.0 h1:ihbySMvVjLAeSH1IbfcRTkD/iNscyz8rGzjF/E5hV6U= +golang.org/x/crypto v0.31.0/go.mod h1:kDsLvtWBEx7MV9tJOj9bnXsPbxwJQ6csT/x4KIN4Ssk= golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= -golang.org/x/sys v0.0.0-20190130150945-aca44879d564/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.21.0 h1:rF+pYz3DAGSQAxAu1CbC7catZg4ebC4UIeIhKxBZvws= golang.org/x/sys v0.21.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/sys v0.28.0 h1:Fksou7UEQUWlKvIdsqzJmUmCX3cZuD2+P3XyyzwMhlA= +golang.org/x/sys v0.28.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= -golang.org/x/text v0.16.0 h1:a94ExnEXNtEwYLGJSIUxnWoxoRz/ZcCsV63ROupILh4= -golang.org/x/text v0.16.0/go.mod h1:GhwF1Be+LQoKShO3cGOHzqOgRrGaYc9AvblQOmPVHnI= -golang.org/x/time v0.5.0 h1:o7cqy6amK/52YcAKIPlM3a+Fpj35zvRj2TP+e1xFSfk= -golang.org/x/time v0.5.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM= +golang.org/x/text v0.21.0 h1:zyQAAkrwaneQ066sspRyJaG9VNi/YJ1NfzcGB3hZ/qo= +golang.org/x/text v0.21.0/go.mod h1:4IBbMaMmOPCJ8SecivzSH54+73PCFmPWxNTLm+vZkEQ= +golang.org/x/time v0.8.0 h1:9i3RxcPv3PZnitoVGMPDKZSq1xW1gK1Xy3ArNOGZfEg= +golang.org/x/time v0.8.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8= diff --git a/vendor/github.com/nats-io/nats.go/js.go b/vendor/github.com/nats-io/nats.go/js.go index c8b6268..fe246aa 100644 --- a/vendor/github.com/nats-io/nats.go/js.go +++ b/vendor/github.com/nats-io/nats.go/js.go @@ -120,6 +120,7 @@ type JetStream interface { // PullSubscribe creates a Subscription that can fetch messages. // See important note in Subscribe(). Additionally, for an ephemeral pull consumer, the "durable" value must be // set to an empty string. + // When using PullSubscribe, the messages are fetched using Fetch() and FetchBatch() methods. PullSubscribe(subj, durable string, opts ...SubOpt) (*Subscription, error) } @@ -479,6 +480,9 @@ type pubOpts struct { // stallWait is the max wait of a async pub ack. stallWait time.Duration + + // internal option to re-use existing paf in case of retry. + pafRetry *pubAckFuture } // pubAckResponse is the ack response from the JetStream API when publishing a message. @@ -544,7 +548,7 @@ func (js *js) PublishMsg(m *Msg, opts ...PubOpt) (*PubAck, error) { o.ttl = js.opts.wait } if o.stallWait > 0 { - return nil, fmt.Errorf("nats: stall wait cannot be set to sync publish") + return nil, errors.New("nats: stall wait cannot be set to sync publish") } if o.id != _EMPTY_ { @@ -633,13 +637,17 @@ type PubAckFuture interface { } type pubAckFuture struct { - js *js - msg *Msg - pa *PubAck - st time.Time - err error - errCh chan error - doneCh chan *PubAck + js *js + msg *Msg + pa *PubAck + st time.Time + err error + errCh chan error + doneCh chan *PubAck + retries int + maxRetries int + retryWait time.Duration + reply string } func (paf *pubAckFuture) Ok() <-chan *PubAck { @@ -848,20 +856,30 @@ func (js *js) handleAsyncReply(m *Msg) { js.mu.Unlock() return } - // Remove - delete(js.pafs, id) - // Check on anyone stalled and waiting. - if js.stc != nil && len(js.pafs) < js.opts.maxpa { - close(js.stc) - js.stc = nil + closeStc := func() { + // Check on anyone stalled and waiting. + if js.stc != nil && len(js.pafs) < js.opts.maxpa { + close(js.stc) + js.stc = nil + } } - // Check on anyone one waiting on done status. - if js.dch != nil && len(js.pafs) == 0 { - dch := js.dch - js.dch = nil - // Defer here so error is processed and can be checked. - defer close(dch) + + closeDchFn := func() func() { + var dch chan struct{} + // Check on anyone one waiting on done status. + if js.dch != nil && len(js.pafs) == 0 { + dch = js.dch + js.dch = nil + } + // Return function to close done channel which + // should be deferred so that error is processed and + // can be checked. + return func() { + if dch != nil { + close(dch) + } + } } doErr := func(err error) { @@ -878,10 +896,39 @@ func (js *js) handleAsyncReply(m *Msg) { // Process no responders etc. if len(m.Data) == 0 && m.Header.Get(statusHdr) == noResponders { + if paf.retries < paf.maxRetries { + paf.retries++ + time.AfterFunc(paf.retryWait, func() { + js.mu.Lock() + paf := js.getPAF(id) + js.mu.Unlock() + if paf == nil { + return + } + _, err := js.PublishMsgAsync(paf.msg, pubOptFn(func(po *pubOpts) error { + po.pafRetry = paf + return nil + })) + if err != nil { + js.mu.Lock() + doErr(err) + } + }) + js.mu.Unlock() + return + } + delete(js.pafs, id) + closeStc() + defer closeDchFn()() doErr(ErrNoResponders) return } + //remove + delete(js.pafs, id) + closeStc() + defer closeDchFn()() + var pa pubAckResponse if err := json.Unmarshal(m.Data, &pa); err != nil { doErr(ErrInvalidJSAck) @@ -948,6 +995,10 @@ func (js *js) PublishMsgAsync(m *Msg, opts ...PubOpt) (PubAckFuture, error) { } } + if o.rnum < 0 { + return nil, fmt.Errorf("%w: retry attempts cannot be negative", ErrInvalidArg) + } + // Timeouts and contexts do not make sense for these. if o.ttl != 0 || o.ctx != nil { return nil, ErrContextAndTimeout @@ -975,30 +1026,42 @@ func (js *js) PublishMsgAsync(m *Msg, opts ...PubOpt) (PubAckFuture, error) { } // Reply - if m.Reply != _EMPTY_ { + paf := o.pafRetry + if paf == nil && m.Reply != _EMPTY_ { return nil, errors.New("nats: reply subject should be empty") } - reply := m.Reply - m.Reply = js.newAsyncReply() - defer func() { m.Reply = reply }() + var id string + var reply string - if m.Reply == _EMPTY_ { - return nil, errors.New("nats: error creating async reply handler") - } + // register new paf if not retrying + if paf == nil { + reply = js.newAsyncReply() + + if reply == _EMPTY_ { + return nil, errors.New("nats: error creating async reply handler") + } - id := m.Reply[js.replyPrefixLen:] - paf := &pubAckFuture{msg: m, st: time.Now()} - numPending, maxPending := js.registerPAF(id, paf) + id = reply[js.replyPrefixLen:] + paf = &pubAckFuture{msg: m, st: time.Now(), maxRetries: o.rnum, retryWait: o.rwait, reply: reply} + numPending, maxPending := js.registerPAF(id, paf) - if maxPending > 0 && numPending >= maxPending { - select { - case <-js.asyncStall(): - case <-time.After(stallWait): - js.clearPAF(id) - return nil, errors.New("nats: stalled with too many outstanding async published messages") + if maxPending > 0 && numPending > maxPending { + select { + case <-js.asyncStall(): + case <-time.After(stallWait): + js.clearPAF(id) + return nil, errors.New("nats: stalled with too many outstanding async published messages") + } } + } else { + reply = paf.reply + id = reply[js.replyPrefixLen:] } - if err := js.nc.PublishMsg(m); err != nil { + hdr, err := m.headerBytes() + if err != nil { + return nil, err + } + if err := js.nc.publish(m.Subject, reply, hdr, m.Data); err != nil { js.clearPAF(id) return nil, err } @@ -1081,7 +1144,7 @@ func RetryAttempts(num int) PubOpt { func StallWait(ttl time.Duration) PubOpt { return pubOptFn(func(opts *pubOpts) error { if ttl <= 0 { - return fmt.Errorf("nats: stall wait should be more than 0") + return errors.New("nats: stall wait should be more than 0") } opts.stallWait = ttl return nil @@ -1439,11 +1502,11 @@ func processConsInfo(info *ConsumerInfo, userCfg *ConsumerConfig, isPullMode boo // Prevent an user from attempting to create a queue subscription on // a JS consumer that was not created with a deliver group. if queue != _EMPTY_ { - return _EMPTY_, fmt.Errorf("cannot create a queue subscription for a consumer without a deliver group") + return _EMPTY_, errors.New("cannot create a queue subscription for a consumer without a deliver group") } else if info.PushBound { // Need to reject a non queue subscription to a non queue consumer // if the consumer is already bound. - return _EMPTY_, fmt.Errorf("consumer is already bound to a subscription") + return _EMPTY_, errors.New("consumer is already bound to a subscription") } } else { // If the JS consumer has a deliver group, we need to fail a non queue @@ -1465,7 +1528,7 @@ func processConsInfo(info *ConsumerInfo, userCfg *ConsumerConfig, isPullMode boo func checkConfig(s, u *ConsumerConfig) error { makeErr := func(fieldName string, usrVal, srvVal any) error { - return fmt.Errorf("configuration requests %s to be %v, but consumer's value is %v", fieldName, usrVal, srvVal) + return fmt.Errorf("nats: configuration requests %s to be %v, but consumer's value is %v", fieldName, usrVal, srvVal) } if u.Durable != _EMPTY_ && u.Durable != s.Durable { @@ -1545,7 +1608,7 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync, // If no stream name is specified, the subject cannot be empty. if subj == _EMPTY_ && o.stream == _EMPTY_ { - return nil, fmt.Errorf("nats: subject required") + return nil, errors.New("nats: subject required") } // Note that these may change based on the consumer info response we may get. @@ -1567,7 +1630,7 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync, // would subscribe to and server would send on. if o.cfg.Heartbeat > 0 || o.cfg.FlowControl { // Not making this a public ErrXXX in case we allow in the future. - return nil, fmt.Errorf("nats: queue subscription doesn't support idle heartbeat nor flow control") + return nil, errors.New("nats: queue subscription doesn't support idle heartbeat nor flow control") } // If this is a queue subscription and no consumer nor durable name was specified, @@ -1605,31 +1668,31 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync, if o.ordered { // Make sure we are not durable. if isDurable { - return nil, fmt.Errorf("nats: durable can not be set for an ordered consumer") + return nil, errors.New("nats: durable can not be set for an ordered consumer") } // Check ack policy. if o.cfg.AckPolicy != ackPolicyNotSet { - return nil, fmt.Errorf("nats: ack policy can not be set for an ordered consumer") + return nil, errors.New("nats: ack policy can not be set for an ordered consumer") } // Check max deliver. if o.cfg.MaxDeliver != 1 && o.cfg.MaxDeliver != 0 { - return nil, fmt.Errorf("nats: max deliver can not be set for an ordered consumer") + return nil, errors.New("nats: max deliver can not be set for an ordered consumer") } // No deliver subject, we pick our own. if o.cfg.DeliverSubject != _EMPTY_ { - return nil, fmt.Errorf("nats: deliver subject can not be set for an ordered consumer") + return nil, errors.New("nats: deliver subject can not be set for an ordered consumer") } // Queue groups not allowed. if queue != _EMPTY_ { - return nil, fmt.Errorf("nats: queues not be set for an ordered consumer") + return nil, errors.New("nats: queues not be set for an ordered consumer") } // Check for bound consumers. if consumer != _EMPTY_ { - return nil, fmt.Errorf("nats: can not bind existing consumer for an ordered consumer") + return nil, errors.New("nats: can not bind existing consumer for an ordered consumer") } // Check for pull mode. if isPullMode { - return nil, fmt.Errorf("nats: can not use pull mode for an ordered consumer") + return nil, errors.New("nats: can not use pull mode for an ordered consumer") } // Setup how we need it to be here. o.cfg.FlowControl = true @@ -1777,7 +1840,7 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync, ocb := cb cb = func(m *Msg) { ocb(m); m.Ack() } } - sub, err := nc.subscribe(deliver, queue, cb, ch, isSync, jsi) + sub, err := nc.subscribe(deliver, queue, cb, ch, nil, isSync, jsi) if err != nil { return nil, err } @@ -1848,7 +1911,7 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync, jsi.hbi = info.Config.Heartbeat // Recreate the subscription here. - sub, err = nc.subscribe(jsi.deliver, queue, cb, ch, isSync, jsi) + sub, err = nc.subscribe(jsi.deliver, queue, cb, ch, nil, isSync, jsi) if err != nil { return nil, err } @@ -2363,7 +2426,7 @@ func Description(description string) SubOpt { func Durable(consumer string) SubOpt { return subOptFn(func(opts *subOpts) error { if opts.cfg.Durable != _EMPTY_ { - return fmt.Errorf("nats: option Durable set more than once") + return errors.New("nats: option Durable set more than once") } if opts.consumer != _EMPTY_ && opts.consumer != consumer { return fmt.Errorf("nats: duplicate consumer names (%s and %s)", opts.consumer, consumer) @@ -3052,20 +3115,27 @@ type MessageBatch interface { } type messageBatch struct { + sync.Mutex msgs chan *Msg err error done chan struct{} } func (mb *messageBatch) Messages() <-chan *Msg { + mb.Lock() + defer mb.Unlock() return mb.msgs } func (mb *messageBatch) Error() error { + mb.Lock() + defer mb.Unlock() return mb.err } func (mb *messageBatch) Done() <-chan struct{} { + mb.Lock() + defer mb.Unlock() return mb.done } @@ -3240,12 +3310,11 @@ func (sub *Subscription) FetchBatch(batch int, opts ...PullOpt) (MessageBatch, e } var hbTimer *time.Timer var hbErr error - hbLock := sync.Mutex{} if o.hb > 0 { hbTimer = time.AfterFunc(2*o.hb, func() { - hbLock.Lock() + result.Lock() hbErr = ErrNoHeartbeat - hbLock.Unlock() + result.Unlock() cancel() }) } @@ -3276,21 +3345,25 @@ func (sub *Subscription) FetchBatch(batch int, opts ...PullOpt) (MessageBatch, e break } if usrMsg { + result.Lock() result.msgs <- msg + result.Unlock() requestMsgs++ } } if err != nil { - hbLock.Lock() + result.Lock() if hbErr != nil { result.err = hbErr } else { result.err = o.checkCtxErr(err) } - hbLock.Unlock() + result.Unlock() } close(result.msgs) + result.Lock() result.done <- struct{}{} + result.Unlock() }() return result, nil } @@ -3888,7 +3961,7 @@ func (alg StoreCompression) MarshalJSON() ([]byte, error) { case NoCompression: str = "none" default: - return nil, fmt.Errorf("unknown compression algorithm") + return nil, errors.New("unknown compression algorithm") } return json.Marshal(str) } @@ -3904,7 +3977,7 @@ func (alg *StoreCompression) UnmarshalJSON(b []byte) error { case "none": *alg = NoCompression default: - return fmt.Errorf("unknown compression algorithm") + return errors.New("unknown compression algorithm") } return nil } diff --git a/vendor/github.com/nats-io/nats.go/jsm.go b/vendor/github.com/nats-io/nats.go/jsm.go index 6826647..2ae19c7 100644 --- a/vendor/github.com/nats-io/nats.go/jsm.go +++ b/vendor/github.com/nats-io/nats.go/jsm.go @@ -1330,11 +1330,11 @@ func convertDirectGetMsgResponseToMsg(name string, r *Msg) (*RawStreamMsg, error // Check for headers that give us the required information to // reconstruct the message. if len(r.Header) == 0 { - return nil, fmt.Errorf("nats: response should have headers") + return nil, errors.New("nats: response should have headers") } stream := r.Header.Get(JSStream) if stream == _EMPTY_ { - return nil, fmt.Errorf("nats: missing stream header") + return nil, errors.New("nats: missing stream header") } // Mirrors can now answer direct gets, so removing check for name equality. @@ -1342,7 +1342,7 @@ func convertDirectGetMsgResponseToMsg(name string, r *Msg) (*RawStreamMsg, error seqStr := r.Header.Get(JSSequence) if seqStr == _EMPTY_ { - return nil, fmt.Errorf("nats: missing sequence header") + return nil, errors.New("nats: missing sequence header") } seq, err := strconv.ParseUint(seqStr, 10, 64) if err != nil { @@ -1350,7 +1350,7 @@ func convertDirectGetMsgResponseToMsg(name string, r *Msg) (*RawStreamMsg, error } timeStr := r.Header.Get(JSTimeStamp) if timeStr == _EMPTY_ { - return nil, fmt.Errorf("nats: missing timestamp header") + return nil, errors.New("nats: missing timestamp header") } // Temporary code: the server in main branch is sending with format // "2006-01-02 15:04:05.999999999 +0000 UTC", but will be changed @@ -1365,7 +1365,7 @@ func convertDirectGetMsgResponseToMsg(name string, r *Msg) (*RawStreamMsg, error } subj := r.Header.Get(JSSubject) if subj == _EMPTY_ { - return nil, fmt.Errorf("nats: missing subject header") + return nil, errors.New("nats: missing subject header") } return &RawStreamMsg{ Subject: subj, diff --git a/vendor/github.com/nats-io/nats.go/kv.go b/vendor/github.com/nats-io/nats.go/kv.go index 4e7a3fd..bcb283f 100644 --- a/vendor/github.com/nats-io/nats.go/kv.go +++ b/vendor/github.com/nats-io/nats.go/kv.go @@ -54,6 +54,7 @@ type KeyValue interface { // Create will add the key/value pair iff it does not exist. Create(key string, value []byte) (revision uint64, err error) // Update will update the value iff the latest revision matches. + // Update also resets the TTL associated with the key (if any). Update(key string, value []byte, last uint64) (revision uint64, err error) // Delete will place a delete marker and leave all revisions. Delete(key string, opts ...DeleteOpt) error @@ -64,6 +65,9 @@ type KeyValue interface { Watch(keys string, opts ...WatchOpt) (KeyWatcher, error) // WatchAll will invoke the callback for all updates. WatchAll(opts ...WatchOpt) (KeyWatcher, error) + // WatchFiltered will watch for any updates to keys that match the keys + // argument. It can be configured with the same options as Watch. + WatchFiltered(keys []string, opts ...WatchOpt) (KeyWatcher, error) // Keys will return all keys. // Deprecated: Use ListKeys instead to avoid memory issues. Keys(opts ...WatchOpt) ([]string, error) @@ -963,11 +967,11 @@ func (kv *kvs) WatchAll(opts ...WatchOpt) (KeyWatcher, error) { return kv.Watch(AllKeys, opts...) } -// Watch will fire the callback when a key that matches the keys pattern is updated. -// keys needs to be a valid NATS subject. -func (kv *kvs) Watch(keys string, opts ...WatchOpt) (KeyWatcher, error) { - if !searchKeyValid(keys) { - return nil, fmt.Errorf("%w: %s", ErrInvalidKey, "keys cannot be empty and must be a valid NATS subject") +func (kv *kvs) WatchFiltered(keys []string, opts ...WatchOpt) (KeyWatcher, error) { + for _, key := range keys { + if !searchKeyValid(key) { + return nil, fmt.Errorf("%w: %s", ErrInvalidKey, "key cannot be empty and must be a valid NATS subject") + } } var o watchOpts for _, opt := range opts { @@ -979,10 +983,20 @@ func (kv *kvs) Watch(keys string, opts ...WatchOpt) (KeyWatcher, error) { } // Could be a pattern so don't check for validity as we normally do. - var b strings.Builder - b.WriteString(kv.pre) - b.WriteString(keys) - keys = b.String() + for i, key := range keys { + var b strings.Builder + b.WriteString(kv.pre) + b.WriteString(key) + keys[i] = b.String() + } + + // if no keys are provided, watch all keys + if len(keys) == 0 { + var b strings.Builder + b.WriteString(kv.pre) + b.WriteString(AllKeys) + keys = []string{b.String()} + } // We will block below on placing items on the chan. That is by design. w := &watcher{updates: make(chan KeyValueEntry, 256), ctx: o.ctx} @@ -1055,7 +1069,14 @@ func (kv *kvs) Watch(keys string, opts ...WatchOpt) (KeyWatcher, error) { // update() callback. w.mu.Lock() defer w.mu.Unlock() - sub, err := kv.js.Subscribe(keys, update, subOpts...) + var sub *Subscription + var err error + if len(keys) == 1 { + sub, err = kv.js.Subscribe(keys[0], update, subOpts...) + } else { + subOpts = append(subOpts, ConsumerFilterSubjects(keys...)) + sub, err = kv.js.Subscribe("", update, subOpts...) + } if err != nil { return nil, err } @@ -1082,6 +1103,12 @@ func (kv *kvs) Watch(keys string, opts ...WatchOpt) (KeyWatcher, error) { return w, nil } +// Watch will fire the callback when a key that matches the keys pattern is updated. +// keys needs to be a valid NATS subject. +func (kv *kvs) Watch(keys string, opts ...WatchOpt) (KeyWatcher, error) { + return kv.WatchFiltered([]string{keys}, opts...) +} + // Bucket returns the current bucket name (JetStream stream). func (kv *kvs) Bucket() string { return kv.name diff --git a/vendor/github.com/nats-io/nats.go/nats.go b/vendor/github.com/nats-io/nats.go/nats.go index 10fa0f5..0d13581 100644 --- a/vendor/github.com/nats-io/nats.go/nats.go +++ b/vendor/github.com/nats-io/nats.go/nats.go @@ -47,7 +47,7 @@ import ( // Default Constants const ( - Version = "1.37.0" + Version = "1.39.1" DefaultURL = "nats://127.0.0.1:4222" DefaultPort = 4222 DefaultMaxReconnect = 60 @@ -86,6 +86,9 @@ const ( // MAX_CONNECTIONS_ERR is for when nats server denies the connection due to server max_connections limit MAX_CONNECTIONS_ERR = "maximum connections exceeded" + + // MAX_SUBSCRIPTIONS_ERR is for when nats server denies the connection due to server subscriptions limit + MAX_SUBSCRIPTIONS_ERR = "maximum subscriptions exceeded" ) // Errors @@ -106,6 +109,7 @@ var ( ErrAuthorization = errors.New("nats: authorization violation") ErrAuthExpired = errors.New("nats: authentication expired") ErrAuthRevoked = errors.New("nats: authentication revoked") + ErrPermissionViolation = errors.New("nats: permissions violation") ErrAccountAuthExpired = errors.New("nats: account authentication expired") ErrNoServers = errors.New("nats: no servers available for connection") ErrJsonParse = errors.New("nats: connect message, json parse error") @@ -131,6 +135,7 @@ var ( ErrNkeysNotSupported = errors.New("nats: nkeys not supported by the server") ErrStaleConnection = errors.New("nats: " + STALE_CONNECTION) ErrTokenAlreadySet = errors.New("nats: token and token handler both set") + ErrUserInfoAlreadySet = errors.New("nats: cannot set user info callback and user/pass") ErrMsgNotBound = errors.New("nats: message is not bound to subscription/connection") ErrMsgNoReply = errors.New("nats: message does not have a reply") ErrClientIPNotSupported = errors.New("nats: client IP not supported by this server") @@ -140,6 +145,7 @@ var ( ErrNoResponders = errors.New("nats: no responders available for request") ErrMaxConnectionsExceeded = errors.New("nats: server maximum connections exceeded") ErrConnectionNotTLS = errors.New("nats: connection is not tls") + ErrMaxSubscriptionsExceeded = errors.New("nats: server maximum subscriptions exceeded") ) // GetDefaultOptions returns default configuration options for the client. @@ -230,6 +236,9 @@ type SignatureHandler func([]byte) ([]byte, error) // AuthTokenHandler is used to generate a new token. type AuthTokenHandler func() string +// UserInfoCB is used to pass the username and password when establishing connection. +type UserInfoCB func() (string, string) + // ReconnectDelayHandler is used to get from the user the desired // delay the library should pause before attempting to reconnect // again. Note that this is invoked after the library tried the @@ -443,6 +452,9 @@ type Options struct { // Password sets the password to be used when connecting to a server. Password string + // UserInfo sets the callback handler that will fetch the username and password. + UserInfo UserInfoCB + // Token sets the token to be used when connecting to a server. Token string @@ -499,6 +511,11 @@ type Options struct { // SkipHostLookup skips the DNS lookup for the server hostname. SkipHostLookup bool + + // PermissionErrOnSubscribe - if set to true, the client will return ErrPermissionViolation + // from SubscribeSync if the server returns a permissions error for a subscription. + // Defaults to false. + PermissionErrOnSubscribe bool } const ( @@ -607,17 +624,19 @@ type Subscription struct { // For holding information about a JetStream consumer. jsi *jsSub - delivered uint64 - max uint64 - conn *Conn - mcb MsgHandler - mch chan *Msg - closed bool - sc bool - connClosed bool - draining bool - status SubStatus - statListeners map[chan SubStatus][]SubStatus + delivered uint64 + max uint64 + conn *Conn + mcb MsgHandler + mch chan *Msg + errCh chan (error) + closed bool + sc bool + connClosed bool + draining bool + status SubStatus + statListeners map[chan SubStatus][]SubStatus + permissionsErr error // Type of Subscription typ SubscriptionType @@ -1166,6 +1185,13 @@ func UserInfo(user, password string) Option { } } +func UserInfoHandler(cb UserInfoCB) Option { + return func(o *Options) error { + o.UserInfo = cb + return nil + } +} + // Token is an Option to set the token to use // when a token is not included directly in the URLs // and when a token handler is not provided. @@ -1359,7 +1385,7 @@ func ProxyPath(path string) Option { func CustomInboxPrefix(p string) Option { return func(o *Options) error { if p == "" || strings.Contains(p, ">") || strings.Contains(p, "*") || strings.HasSuffix(p, ".") { - return fmt.Errorf("nats: invalid custom prefix") + return errors.New("nats: invalid custom prefix") } o.InboxPrefix = p return nil @@ -1383,6 +1409,13 @@ func SkipHostLookup() Option { } } +func PermissionErrOnSubscribe(enabled bool) Option { + return func(o *Options) error { + o.PermissionErrOnSubscribe = enabled + return nil + } +} + // TLSHandshakeFirst is an Option to perform the TLS handshake first, that is // before receiving the INFO protocol. This requires the server to also be // configured with such option, otherwise the connection will fail. @@ -1814,7 +1847,7 @@ func (nc *Conn) addURLToPool(sURL string, implicit, saveTLSName bool) error { if len(nc.srvPool) == 0 { nc.ws = isWS } else if isWS && !nc.ws || !isWS && nc.ws { - return fmt.Errorf("mixing of websocket and non websocket URLs is not allowed") + return errors.New("mixing of websocket and non websocket URLs is not allowed") } var tlsName string @@ -2563,6 +2596,13 @@ func (nc *Conn) connectProto() (string, error) { pass = o.Password token = o.Token nkey = o.Nkey + + if nc.Opts.UserInfo != nil { + if user != _EMPTY_ || pass != _EMPTY_ { + return _EMPTY_, ErrUserInfoAlreadySet + } + user, pass = nc.Opts.UserInfo() + } } // Look for user jwt. @@ -2952,11 +2992,11 @@ func (nc *Conn) doReconnect(err error, forceReconnect bool) { // processOpErr handles errors from reading or parsing the protocol. // The lock should not be held entering this function. -func (nc *Conn) processOpErr(err error) { +func (nc *Conn) processOpErr(err error) bool { nc.mu.Lock() + defer nc.mu.Unlock() if nc.isConnecting() || nc.isClosed() || nc.isReconnecting() { - nc.mu.Unlock() - return + return false } if nc.Opts.AllowReconnect && nc.status == CONNECTED { @@ -2976,14 +3016,12 @@ func (nc *Conn) processOpErr(err error) { nc.clearPendingFlushCalls() go nc.doReconnect(err, false) - nc.mu.Unlock() - return + return false } nc.changeConnStatus(DISCONNECTED) nc.err = err - nc.mu.Unlock() - nc.close(CLOSED, true, nil) + return true } // dispatch is responsible for calling any async callbacks @@ -3080,7 +3118,9 @@ func (nc *Conn) readLoop() { err = nc.parse(buf) } if err != nil { - nc.processOpErr(err) + if shouldClose := nc.processOpErr(err); shouldClose { + nc.close(CLOSED, true, nil) + } break } } @@ -3410,15 +3450,41 @@ slowConsumer: } } -// processPermissionsViolation is called when the server signals a subject -// permissions violation on either publish or subscribe. -func (nc *Conn) processPermissionsViolation(err string) { +var permissionsRe = regexp.MustCompile(`Subscription to "(\S+)"`) +var permissionsQueueRe = regexp.MustCompile(`using queue "(\S+)"`) + +// processTransientError is called when the server signals a non terminal error +// which does not close the connection or trigger a reconnect. +// This will trigger the async error callback if set. +// These errors include the following: +// - permissions violation on publish or subscribe +// - maximum subscriptions exceeded +func (nc *Conn) processTransientError(err error) { nc.mu.Lock() - // create error here so we can pass it as a closure to the async cb dispatcher. - e := errors.New("nats: " + err) - nc.err = e + nc.err = err + if errors.Is(err, ErrPermissionViolation) { + matches := permissionsRe.FindStringSubmatch(err.Error()) + if len(matches) >= 2 { + queueMatches := permissionsQueueRe.FindStringSubmatch(err.Error()) + var q string + if len(queueMatches) >= 2 { + q = queueMatches[1] + } + subject := matches[1] + for _, sub := range nc.subs { + if sub.Subject == subject && sub.Queue == q && sub.permissionsErr == nil { + sub.mu.Lock() + if sub.errCh != nil { + sub.errCh <- err + } + sub.permissionsErr = err + sub.mu.Unlock() + } + } + } + } if nc.Opts.AsyncErrorCB != nil { - nc.ach.push(func() { nc.Opts.AsyncErrorCB(nc, nil, e) }) + nc.ach.push(func() { nc.Opts.AsyncErrorCB(nc, nil, err) }) } nc.mu.Unlock() } @@ -3650,15 +3716,17 @@ func (nc *Conn) processErr(ie string) { // convert to lower case. e := strings.ToLower(ne) - close := false + var close bool // FIXME(dlc) - process Slow Consumer signals special. if e == STALE_CONNECTION { - nc.processOpErr(ErrStaleConnection) + close = nc.processOpErr(ErrStaleConnection) } else if e == MAX_CONNECTIONS_ERR { - nc.processOpErr(ErrMaxConnectionsExceeded) + close = nc.processOpErr(ErrMaxConnectionsExceeded) } else if strings.HasPrefix(e, PERMISSIONS_ERR) { - nc.processPermissionsViolation(ne) + nc.processTransientError(fmt.Errorf("%w: %s", ErrPermissionViolation, ne)) + } else if strings.HasPrefix(e, MAX_SUBSCRIPTIONS_ERR) { + nc.processTransientError(ErrMaxSubscriptionsExceeded) } else if authErr := checkAuthError(e); authErr != nil { nc.mu.Lock() close = nc.processAuthError(authErr) @@ -3999,10 +4067,6 @@ func (nc *Conn) respHandler(m *Msg) { // Helper to setup and send new request style requests. Return the chan to receive the response. func (nc *Conn) createNewRequestAndSend(subj string, hdr, data []byte) (chan *Msg, string, error) { nc.mu.Lock() - // Do setup for the new style if needed. - if nc.respMap == nil { - nc.initNewResp() - } // Create new literal Inbox and map to a chan msg. mch := make(chan *Msg, RequestChanLen) respInbox := nc.newRespInbox() @@ -4013,7 +4077,7 @@ func (nc *Conn) createNewRequestAndSend(subj string, hdr, data []byte) (chan *Ms // Create the response subscription we will use for all new style responses. // This will be on an _INBOX with an additional terminal token. The subscription // will be on a wildcard. - s, err := nc.subscribeLocked(nc.respSub, _EMPTY_, nc.respHandler, nil, false, nil) + s, err := nc.subscribeLocked(nc.respSub, _EMPTY_, nc.respHandler, nil, nil, false, nil) if err != nil { nc.mu.Unlock() return nil, token, err @@ -4111,7 +4175,7 @@ func (nc *Conn) oldRequest(subj string, hdr, data []byte, timeout time.Duration) inbox := nc.NewInbox() ch := make(chan *Msg, RequestChanLen) - s, err := nc.subscribe(inbox, _EMPTY_, nil, ch, true, nil) + s, err := nc.subscribe(inbox, _EMPTY_, nil, ch, nil, true, nil) if err != nil { return nil, err } @@ -4217,14 +4281,14 @@ func (nc *Conn) respToken(respInbox string) string { // since it can't match more than one token. // Messages will be delivered to the associated MsgHandler. func (nc *Conn) Subscribe(subj string, cb MsgHandler) (*Subscription, error) { - return nc.subscribe(subj, _EMPTY_, cb, nil, false, nil) + return nc.subscribe(subj, _EMPTY_, cb, nil, nil, false, nil) } // ChanSubscribe will express interest in the given subject and place // all messages received on the channel. // You should not close the channel until sub.Unsubscribe() has been called. func (nc *Conn) ChanSubscribe(subj string, ch chan *Msg) (*Subscription, error) { - return nc.subscribe(subj, _EMPTY_, nil, ch, false, nil) + return nc.subscribe(subj, _EMPTY_, nil, ch, nil, false, nil) } // ChanQueueSubscribe will express interest in the given subject. @@ -4234,7 +4298,7 @@ func (nc *Conn) ChanSubscribe(subj string, ch chan *Msg) (*Subscription, error) // You should not close the channel until sub.Unsubscribe() has been called. // Note: This is the same than QueueSubscribeSyncWithChan. func (nc *Conn) ChanQueueSubscribe(subj, group string, ch chan *Msg) (*Subscription, error) { - return nc.subscribe(subj, group, nil, ch, false, nil) + return nc.subscribe(subj, group, nil, ch, nil, false, nil) } // SubscribeSync will express interest on the given subject. Messages will @@ -4244,7 +4308,11 @@ func (nc *Conn) SubscribeSync(subj string) (*Subscription, error) { return nil, ErrInvalidConnection } mch := make(chan *Msg, nc.Opts.SubChanLen) - return nc.subscribe(subj, _EMPTY_, nil, mch, true, nil) + var errCh chan error + if nc.Opts.PermissionErrOnSubscribe { + errCh = make(chan error, 100) + } + return nc.subscribe(subj, _EMPTY_, nil, mch, errCh, true, nil) } // QueueSubscribe creates an asynchronous queue subscriber on the given subject. @@ -4252,7 +4320,7 @@ func (nc *Conn) SubscribeSync(subj string) (*Subscription, error) { // only one member of the group will be selected to receive any given // message asynchronously. func (nc *Conn) QueueSubscribe(subj, queue string, cb MsgHandler) (*Subscription, error) { - return nc.subscribe(subj, queue, cb, nil, false, nil) + return nc.subscribe(subj, queue, cb, nil, nil, false, nil) } // QueueSubscribeSync creates a synchronous queue subscriber on the given @@ -4261,7 +4329,11 @@ func (nc *Conn) QueueSubscribe(subj, queue string, cb MsgHandler) (*Subscription // given message synchronously using Subscription.NextMsg(). func (nc *Conn) QueueSubscribeSync(subj, queue string) (*Subscription, error) { mch := make(chan *Msg, nc.Opts.SubChanLen) - return nc.subscribe(subj, queue, nil, mch, true, nil) + var errCh chan error + if nc.Opts.PermissionErrOnSubscribe { + errCh = make(chan error, 100) + } + return nc.subscribe(subj, queue, nil, mch, errCh, true, nil) } // QueueSubscribeSyncWithChan will express interest in the given subject. @@ -4271,7 +4343,7 @@ func (nc *Conn) QueueSubscribeSync(subj, queue string) (*Subscription, error) { // You should not close the channel until sub.Unsubscribe() has been called. // Note: This is the same than ChanQueueSubscribe. func (nc *Conn) QueueSubscribeSyncWithChan(subj, queue string, ch chan *Msg) (*Subscription, error) { - return nc.subscribe(subj, queue, nil, ch, false, nil) + return nc.subscribe(subj, queue, nil, ch, nil, false, nil) } // badSubject will do quick test on whether a subject is acceptable. @@ -4295,16 +4367,16 @@ func badQueue(qname string) bool { } // subscribe is the internal subscribe function that indicates interest in a subject. -func (nc *Conn) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync bool, js *jsSub) (*Subscription, error) { +func (nc *Conn) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, errCh chan (error), isSync bool, js *jsSub) (*Subscription, error) { if nc == nil { return nil, ErrInvalidConnection } nc.mu.Lock() defer nc.mu.Unlock() - return nc.subscribeLocked(subj, queue, cb, ch, isSync, js) + return nc.subscribeLocked(subj, queue, cb, ch, errCh, isSync, js) } -func (nc *Conn) subscribeLocked(subj, queue string, cb MsgHandler, ch chan *Msg, isSync bool, js *jsSub) (*Subscription, error) { +func (nc *Conn) subscribeLocked(subj, queue string, cb MsgHandler, ch chan *Msg, errCh chan (error), isSync bool, js *jsSub) (*Subscription, error) { if nc == nil { return nil, ErrInvalidConnection } @@ -4355,6 +4427,7 @@ func (nc *Conn) subscribeLocked(subj, queue string, cb MsgHandler, ch chan *Msg, } else { // Sync Subscription sub.typ = SyncSubscription sub.mch = ch + sub.errCh = errCh } nc.subsMu.Lock() @@ -4612,14 +4685,14 @@ func (s *Subscription) Unsubscribe() error { // checkDrained will watch for a subscription to be fully drained // and then remove it. func (nc *Conn) checkDrained(sub *Subscription) { + if nc == nil || sub == nil { + return + } defer func() { sub.mu.Lock() defer sub.mu.Unlock() sub.draining = false }() - if nc == nil || sub == nil { - return - } // This allows us to know that whatever we have in the client pending // is correct and the server will not send additional information. @@ -4799,16 +4872,92 @@ func (s *Subscription) NextMsg(timeout time.Duration) (*Msg, error) { t := globalTimerPool.Get(timeout) defer globalTimerPool.Put(t) + if s.errCh != nil { + select { + case msg, ok = <-mch: + if !ok { + return nil, s.getNextMsgErr() + } + if err := s.processNextMsgDelivered(msg); err != nil { + return nil, err + } + case err := <-s.errCh: + return nil, err + case <-t.C: + return nil, ErrTimeout + } + } else { + select { + case msg, ok = <-mch: + if !ok { + return nil, s.getNextMsgErr() + } + if err := s.processNextMsgDelivered(msg); err != nil { + return nil, err + } + case <-t.C: + return nil, ErrTimeout + } + } + + return msg, nil +} + +// nextMsgNoTimeout works similarly to Subscription.NextMsg() but will not +// time out. It is only used internally for non-timeout subscription iterator. +func (s *Subscription) nextMsgNoTimeout() (*Msg, error) { + if s == nil { + return nil, ErrBadSubscription + } + + s.mu.Lock() + err := s.validateNextMsgState(false) + if err != nil { + s.mu.Unlock() + return nil, err + } + + // snapshot + mch := s.mch + s.mu.Unlock() + + var ok bool + var msg *Msg + + // If something is available right away, let's optimize that case. select { case msg, ok = <-mch: + if !ok { + return nil, s.getNextMsgErr() + } + if err := s.processNextMsgDelivered(msg); err != nil { + return nil, err + } else { + return msg, nil + } + default: + } + + if s.errCh != nil { + select { + case msg, ok = <-mch: + if !ok { + return nil, s.getNextMsgErr() + } + if err := s.processNextMsgDelivered(msg); err != nil { + return nil, err + } + case err := <-s.errCh: + return nil, err + } + } else { + msg, ok = <-mch if !ok { return nil, s.getNextMsgErr() } if err := s.processNextMsgDelivered(msg); err != nil { return nil, err } - case <-t.C: - return nil, ErrTimeout } return msg, nil @@ -4831,6 +4980,12 @@ func (s *Subscription) validateNextMsgState(pullSubInternal bool) error { if s.mcb != nil { return ErrSyncSubRequired } + // if this subscription previously had a permissions error + // and no reconnect has been attempted, return the permissions error + // since the subscription does not exist on the server + if s.conn.Opts.PermissionErrOnSubscribe && s.permissionsErr != nil { + return s.permissionsErr + } if s.sc { s.changeSubStatus(SubscriptionActive) s.sc = false @@ -5107,7 +5262,9 @@ func (nc *Conn) processPingTimer() { nc.pout++ if nc.pout > nc.Opts.MaxPingsOut { nc.mu.Unlock() - nc.processOpErr(ErrStaleConnection) + if shouldClose := nc.processOpErr(ErrStaleConnection); shouldClose { + nc.close(CLOSED, true, nil) + } return } @@ -5204,6 +5361,9 @@ func (nc *Conn) resendSubscriptions() { for _, s := range subs { adjustedMax := uint64(0) s.mu.Lock() + // when resending subscriptions, the permissions error should be cleared + // since the user may have fixed the permissions issue + s.permissionsErr = nil if s.max > 0 { if s.delivered < s.max { adjustedMax = s.max - s.delivered @@ -5242,9 +5402,6 @@ func (nc *Conn) clearPendingFlushCalls() { // This will clear any pending Request calls. // Lock is assumed to be held by the caller. func (nc *Conn) clearPendingRequestCalls() { - if nc.respMap == nil { - return - } for key, ch := range nc.respMap { if ch != nil { close(ch) @@ -5792,7 +5949,7 @@ func NkeyOptionFromSeed(seedFile string) (Option, error) { return nil, err } if !nkeys.IsValidPublicUserKey(pub) { - return nil, fmt.Errorf("nats: Not a valid nkey user seed") + return nil, errors.New("nats: Not a valid nkey user seed") } sigCB := func(nonce []byte) ([]byte, error) { return sigHandler(nonce, seedFile) diff --git a/vendor/github.com/nats-io/nats.go/nats_iter.go b/vendor/github.com/nats-io/nats.go/nats_iter.go new file mode 100644 index 0000000..05234d5 --- /dev/null +++ b/vendor/github.com/nats-io/nats.go/nats_iter.go @@ -0,0 +1,72 @@ +// Copyright 2012-2024 The NATS Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//go:build go1.23 + +package nats + +import ( + "errors" + "iter" + "time" +) + +// Msgs returns an iter.Seq2[*Msg, error] that can be used to iterate over +// messages. It can only be used with a subscription that has been created with +// SubscribeSync or QueueSubscribeSync, otherwise it will return an error on the +// first iteration. +// +// The iterator will block until a message is available. The +// subscription will not be closed when the iterator is done. +func (sub *Subscription) Msgs() iter.Seq2[*Msg, error] { + return func(yield func(*Msg, error) bool) { + for { + msg, err := sub.nextMsgNoTimeout() + if err != nil { + yield(nil, err) + return + } + if !yield(msg, nil) { + return + } + + } + } +} + +// MsgsTimeout returns an iter.Seq2[*Msg, error] that can be used to iterate +// over messages. It can only be used with a subscription that has been created +// with SubscribeSync or QueueSubscribeSync, otherwise it will return an error +// on the first iteration. +// +// The iterator will block until a message is available or the timeout is +// reached. If the timeout is reached, the iterator will return nats.ErrTimeout +// but it will not be closed. +func (sub *Subscription) MsgsTimeout(timeout time.Duration) iter.Seq2[*Msg, error] { + return func(yield func(*Msg, error) bool) { + for { + msg, err := sub.NextMsg(timeout) + if err != nil { + if !yield(nil, err) { + return + } + if !errors.Is(err, ErrTimeout) { + return + } + } + if !yield(msg, nil) { + return + } + } + } +} diff --git a/vendor/github.com/nats-io/nats.go/netchan.go b/vendor/github.com/nats-io/nats.go/netchan.go index 3722d9f..35d9214 100644 --- a/vendor/github.com/nats-io/nats.go/netchan.go +++ b/vendor/github.com/nats-io/nats.go/netchan.go @@ -113,5 +113,5 @@ func (c *EncodedConn) bindRecvChan(subject, queue string, channel any) (*Subscri chVal.Send(oPtr) } - return c.Conn.subscribe(subject, queue, cb, nil, false, nil) + return c.Conn.subscribe(subject, queue, cb, nil, nil, false, nil) } diff --git a/vendor/github.com/nats-io/nats.go/rand.go b/vendor/github.com/nats-io/nats.go/rand.go deleted file mode 100644 index 0cdee0a..0000000 --- a/vendor/github.com/nats-io/nats.go/rand.go +++ /dev/null @@ -1,29 +0,0 @@ -// Copyright 2023 The NATS Authors -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -//go:build !go1.20 -// +build !go1.20 - -// A Go client for the NATS messaging system (https://nats.io). -package nats - -import ( - "math/rand" - "time" -) - -func init() { - // This is not needed since Go 1.20 because now rand.Seed always happens - // by default (uses runtime.fastrand64 instead as source). - rand.Seed(time.Now().UnixNano()) -} diff --git a/vendor/github.com/nats-io/nats.go/testing_internal.go b/vendor/github.com/nats-io/nats.go/testing_internal.go index 1839702..78ca3b1 100644 --- a/vendor/github.com/nats-io/nats.go/testing_internal.go +++ b/vendor/github.com/nats-io/nats.go/testing_internal.go @@ -12,7 +12,6 @@ // limitations under the License. //go:build internal_testing -// +build internal_testing // Functions in this file are only available when building nats.go with the // internal_testing build tag. They are used by the nats.go test suite. diff --git a/vendor/github.com/nats-io/nats.go/util/tls.go b/vendor/github.com/nats-io/nats.go/util/tls.go index af9f51f..ae16c3d 100644 --- a/vendor/github.com/nats-io/nats.go/util/tls.go +++ b/vendor/github.com/nats-io/nats.go/util/tls.go @@ -11,9 +11,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -//go:build go1.8 -// +build go1.8 - package util import "crypto/tls" diff --git a/vendor/github.com/nats-io/nats.go/util/tls_go17.go b/vendor/github.com/nats-io/nats.go/util/tls_go17.go deleted file mode 100644 index 44d46b4..0000000 --- a/vendor/github.com/nats-io/nats.go/util/tls_go17.go +++ /dev/null @@ -1,50 +0,0 @@ -// Copyright 2016-2022 The NATS Authors -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -//go:build go1.7 && !go1.8 -// +build go1.7,!go1.8 - -package util - -import ( - "crypto/tls" -) - -// CloneTLSConfig returns a copy of c. Only the exported fields are copied. -// This is temporary, until this is provided by the language. -// https://go-review.googlesource.com/#/c/28075/ -func CloneTLSConfig(c *tls.Config) *tls.Config { - return &tls.Config{ - Rand: c.Rand, - Time: c.Time, - Certificates: c.Certificates, - NameToCertificate: c.NameToCertificate, - GetCertificate: c.GetCertificate, - RootCAs: c.RootCAs, - NextProtos: c.NextProtos, - ServerName: c.ServerName, - ClientAuth: c.ClientAuth, - ClientCAs: c.ClientCAs, - InsecureSkipVerify: c.InsecureSkipVerify, - CipherSuites: c.CipherSuites, - PreferServerCipherSuites: c.PreferServerCipherSuites, - SessionTicketsDisabled: c.SessionTicketsDisabled, - SessionTicketKey: c.SessionTicketKey, - ClientSessionCache: c.ClientSessionCache, - MinVersion: c.MinVersion, - MaxVersion: c.MaxVersion, - CurvePreferences: c.CurvePreferences, - DynamicRecordSizingDisabled: c.DynamicRecordSizingDisabled, - Renegotiation: c.Renegotiation, - } -} diff --git a/vendor/github.com/nats-io/nats.go/ws.go b/vendor/github.com/nats-io/nats.go/ws.go index 2c2d421..46d8300 100644 --- a/vendor/github.com/nats-io/nats.go/ws.go +++ b/vendor/github.com/nats-io/nats.go/ws.go @@ -76,14 +76,15 @@ var wsGUID = []byte("258EAFA5-E914-47DA-95CA-C5AB0DC85B11") var compressFinalBlock = []byte{0x00, 0x00, 0xff, 0xff, 0x01, 0x00, 0x00, 0xff, 0xff} type websocketReader struct { - r io.Reader - pending [][]byte - ib []byte - ff bool - fc bool - nl bool - dc *wsDecompressor - nc *Conn + r io.Reader + pending [][]byte + compress bool + ib []byte + ff bool + fc bool + nl bool + dc *wsDecompressor + nc *Conn } type wsDecompressor struct { @@ -237,8 +238,8 @@ func (r *websocketReader) Read(p []byte) (int, error) { case wsPingMessage, wsPongMessage, wsCloseMessage: if rem > wsMaxControlPayloadSize { return 0, fmt.Errorf( - fmt.Sprintf("control frame length bigger than maximum allowed of %v bytes", - wsMaxControlPayloadSize)) + "control frame length bigger than maximum allowed of %v bytes", + wsMaxControlPayloadSize) } if compressed { return 0, errors.New("control frame should not be compressed") @@ -312,6 +313,8 @@ func (r *websocketReader) Read(p []byte) (int, error) { } r.fc = false } + } else if r.compress { + b = bytes.Clone(b) } // Add to the pending list if dealing with uncompressed frames or // after we have received the full compressed message and decompressed it. @@ -622,7 +625,7 @@ func (nc *Conn) wsInitHandshake(u *url.URL) error { !strings.EqualFold(resp.Header.Get("Connection"), "upgrade") || resp.Header.Get("Sec-Websocket-Accept") != wsAcceptKey(wsKey)) { - err = fmt.Errorf("invalid websocket connection") + err = errors.New("invalid websocket connection") } // Check compression extension... if err == nil && compress { @@ -634,7 +637,7 @@ func (nc *Conn) wsInitHandshake(u *url.URL) error { if !srvCompress { compress = false } else if !noCtxTakeover { - err = fmt.Errorf("compression negotiation error") + err = errors.New("compression negotiation error") } } if resp != nil { @@ -647,6 +650,7 @@ func (nc *Conn) wsInitHandshake(u *url.URL) error { wsr := wsNewReader(nc.br.r) wsr.nc = nc + wsr.compress = compress // We have to slurp whatever is in the bufio reader and copy to br.r if n := br.Buffered(); n != 0 { wsr.ib, _ = br.Peek(n) diff --git a/vendor/modules.txt b/vendor/modules.txt index 2a73668..be0a6cc 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -3,20 +3,21 @@ github.com/gogo/protobuf/gogoproto github.com/gogo/protobuf/proto github.com/gogo/protobuf/protoc-gen-gogo/descriptor -# github.com/klauspost/compress v1.17.11 -## explicit; go 1.21 +# github.com/klauspost/compress v1.18.0 +## explicit; go 1.22 github.com/klauspost/compress/flate -# github.com/nats-io/nats-server/v2 v2.10.22 -## explicit; go 1.21.0 +github.com/klauspost/compress/internal/le +# github.com/nats-io/nats-server/v2 v2.10.27 +## explicit; go 1.23.0 # github.com/nats-io/nats-streaming-server v0.25.6 ## explicit; go 1.20 -# github.com/nats-io/nats.go v1.37.0 -## explicit; go 1.20 +# github.com/nats-io/nats.go v1.39.1 +## explicit; go 1.22.0 github.com/nats-io/nats.go github.com/nats-io/nats.go/encoders/builtin github.com/nats-io/nats.go/internal/parser github.com/nats-io/nats.go/util -# github.com/nats-io/nkeys v0.4.8 +# github.com/nats-io/nkeys v0.4.10 ## explicit; go 1.20 github.com/nats-io/nkeys # github.com/nats-io/nuid v1.0.1