@@ -3,6 +3,7 @@ package kafka
3
3
import (
4
4
"bytes"
5
5
"context"
6
+ "errors"
6
7
"fmt"
7
8
"io"
8
9
"math/rand"
@@ -640,10 +641,13 @@ func testConnReadBatchWithMaxWait(t *testing.T, conn *Conn) {
640
641
conn .Seek (0 , SeekAbsolute )
641
642
conn .SetDeadline (time .Now ().Add (50 * time .Millisecond ))
642
643
batch = conn .ReadBatchWith (cfg )
644
+ var netErr net.Error
643
645
if err := batch .Err (); err == nil {
644
646
t .Fatal ("should have timed out, but got no error" )
645
- } else if netErr , ok := err .(net.Error ); ! ok || ! netErr .Timeout () {
646
- t .Fatalf ("should have timed out, but got: %v" , err )
647
+ } else if errors .As (err , & netErr ) {
648
+ if ! netErr .Timeout () {
649
+ t .Fatalf ("should have timed out, but got: %v" , err )
650
+ }
647
651
}
648
652
}
649
653
@@ -761,7 +765,7 @@ func testConnFindCoordinator(t *testing.T, conn *Conn) {
761
765
762
766
func testConnJoinGroupInvalidGroupID (t * testing.T , conn * Conn ) {
763
767
_ , err := conn .joinGroup (joinGroupRequestV1 {})
764
- if err != InvalidGroupId && err != NotCoordinatorForGroup {
768
+ if ! errors . Is ( err , InvalidGroupId ) && ! errors . Is ( err , NotCoordinatorForGroup ) {
765
769
t .Fatalf ("expected %v or %v; got %v" , InvalidGroupId , NotCoordinatorForGroup , err )
766
770
}
767
771
}
@@ -773,7 +777,7 @@ func testConnJoinGroupInvalidSessionTimeout(t *testing.T, conn *Conn) {
773
777
_ , err := conn .joinGroup (joinGroupRequestV1 {
774
778
GroupID : groupID ,
775
779
})
776
- if err != InvalidSessionTimeout && err != NotCoordinatorForGroup {
780
+ if ! errors . Is ( err , InvalidSessionTimeout ) && ! errors . Is ( err , NotCoordinatorForGroup ) {
777
781
t .Fatalf ("expected %v or %v; got %v" , InvalidSessionTimeout , NotCoordinatorForGroup , err )
778
782
}
779
783
}
@@ -786,7 +790,7 @@ func testConnJoinGroupInvalidRefreshTimeout(t *testing.T, conn *Conn) {
786
790
GroupID : groupID ,
787
791
SessionTimeout : int32 (3 * time .Second / time .Millisecond ),
788
792
})
789
- if err != InvalidSessionTimeout && err != NotCoordinatorForGroup {
793
+ if ! errors . Is ( err , InvalidSessionTimeout ) && ! errors . Is ( err , NotCoordinatorForGroup ) {
790
794
t .Fatalf ("expected %v or %v; got %v" , InvalidSessionTimeout , NotCoordinatorForGroup , err )
791
795
}
792
796
}
@@ -798,7 +802,7 @@ func testConnHeartbeatErr(t *testing.T, conn *Conn) {
798
802
_ , err := conn .syncGroup (syncGroupRequestV0 {
799
803
GroupID : groupID ,
800
804
})
801
- if err != UnknownMemberId && err != NotCoordinatorForGroup {
805
+ if ! errors . Is ( err , UnknownMemberId ) && ! errors . Is ( err , NotCoordinatorForGroup ) {
802
806
t .Fatalf ("expected %v or %v; got %v" , UnknownMemberId , NotCoordinatorForGroup , err )
803
807
}
804
808
}
@@ -810,7 +814,7 @@ func testConnLeaveGroupErr(t *testing.T, conn *Conn) {
810
814
_ , err := conn .leaveGroup (leaveGroupRequestV0 {
811
815
GroupID : groupID ,
812
816
})
813
- if err != UnknownMemberId && err != NotCoordinatorForGroup {
817
+ if ! errors . Is ( err , UnknownMemberId ) && ! errors . Is ( err , NotCoordinatorForGroup ) {
814
818
t .Fatalf ("expected %v or %v; got %v" , UnknownMemberId , NotCoordinatorForGroup , err )
815
819
}
816
820
}
@@ -822,7 +826,7 @@ func testConnSyncGroupErr(t *testing.T, conn *Conn) {
822
826
_ , err := conn .syncGroup (syncGroupRequestV0 {
823
827
GroupID : groupID ,
824
828
})
825
- if err != UnknownMemberId && err != NotCoordinatorForGroup {
829
+ if ! errors . Is ( err , UnknownMemberId ) && ! errors . Is ( err , NotCoordinatorForGroup ) {
826
830
t .Fatalf ("expected %v or %v; got %v" , UnknownMemberId , NotCoordinatorForGroup , err )
827
831
}
828
832
}
@@ -985,7 +989,7 @@ func testConnReadShortBuffer(t *testing.T, conn *Conn) {
985
989
b [3 ] = 0
986
990
987
991
n , err := conn .Read (b )
988
- if err != io .ErrShortBuffer {
992
+ if ! errors . Is ( err , io .ErrShortBuffer ) {
989
993
t .Error ("bad error:" , i , err )
990
994
}
991
995
if n != 4 {
@@ -1061,7 +1065,7 @@ func testDeleteTopicsInvalidTopic(t *testing.T, conn *Conn) {
1061
1065
}
1062
1066
conn .SetDeadline (time .Now ().Add (5 * time .Second ))
1063
1067
err = conn .DeleteTopics ("invalid-topic" , topic )
1064
- if err != UnknownTopicOrPartition {
1068
+ if ! errors . Is ( err , UnknownTopicOrPartition ) {
1065
1069
t .Fatalf ("expected UnknownTopicOrPartition error, but got %v" , err )
1066
1070
}
1067
1071
partitions , err := conn .ReadPartitions (topic )
@@ -1154,7 +1158,7 @@ func TestUnsupportedSASLMechanism(t *testing.T) {
1154
1158
}
1155
1159
defer conn .Close ()
1156
1160
1157
- if err := conn .saslHandshake ("FOO" ); err != UnsupportedSASLMechanism {
1161
+ if err := conn .saslHandshake ("FOO" ); ! errors . Is ( err , UnsupportedSASLMechanism ) {
1158
1162
t .Errorf ("Expected UnsupportedSASLMechanism but got %v" , err )
1159
1163
}
1160
1164
}
0 commit comments