Fail Stream Replicator on startup if source or target isn't reachable#205
Fail Stream Replicator on startup if source or target isn't reachable#205TiganeteaRobert (TiganeteaRobert) wants to merge 6 commits intomasterfrom
Conversation
|
Duplicate of #183 |
|
Responded to comment about a removed unit test here https://github.com/snowplow-devops/stream-replicator/pull/183/files/adb452ba5f917aa0fc01e8f02a2cc25661c33ef5..5e9cf7c2dbc230c4449093f48efb17ab1cf305e4#r944267068 |
| assert.Equal(int64(10), ackOps) | ||
| } | ||
|
|
||
| func TestHttpWrite_RequestFailure(t *testing.T) { |
There was a problem hiding this comment.
Added test back after discussion here: #183 (comment)
| panic(err) | ||
| } | ||
| mutex.Lock() | ||
| if req.URL.Path == `/error` { |
There was a problem hiding this comment.
Added after discussion here: #183 (comment)
There was a problem hiding this comment.
Nice, this is clean and gives us better coverage of the cases we want.
pkg/source/sqs/sqs_source_test.go
Outdated
|
|
||
| err = source.Read(nil) | ||
| assert.True(strings.HasPrefix(err.Error(), `Failed to get SQS queue URL`)) | ||
| if err != nil { |
There was a problem hiding this comment.
Added after discussion here: #183 (comment)
|
Besides the fixed I've done I have responded to some comments on #183 |
colmsnowplow
left a comment
There was a problem hiding this comment.
Looks like we're missing the following:
- kinesis source
- kafka target
- kinesis target
- sqs target
Some of them have tests already written but commented out, some need tests to be written.
|
Well that's good- less work involved. I think every source and target needs a test for this - so as well as uncommenting the ones that are there already, if there's any that don't have tests for any, then let's add them. |
|
Added checks (or unit tests if check exists) for all sources and targets. Also updated PR desc. |
| // Append errors and crash | ||
| multierror.Append(kinesisPullErr, errors.Errorf("wg.Wait() took too long, forcing app close.")) | ||
| ks.log.WithFields(log.Fields{"error": err}).Fatal(err) | ||
| ks.log.WithFields(log.Fields{"error": kinesisPullErr}).Fatal(kinesisPullErr) |
There was a problem hiding this comment.
Noticed randomly that the error we logged here was the wrong one, fixed it.
There was a problem hiding this comment.
Good spot - Can we pull this into its own issue for auditability - it's a distinct bug which might be important to some debugging investigation in future - so I'd prefer it to have its own audit trail.
(Doesn't need a separate PR though, just its own commit and issue within this PR is fine)
There was a problem hiding this comment.
👍
There was a problem hiding this comment.
Created issue here: #209 and a separate commit here for it.
5b99e66 to
b5f0154
Compare
b5f0154 to
bfaa707
Compare
| assert.Nil(source) | ||
| assert.EqualError(err, `Failed to create Kinsumer client: leaderActionFrequency config value is mandatory and must be at least as long as ShardCheckFrequency`) | ||
|
|
||
| if testing.Short() { |
There was a problem hiding this comment.
copy paste error here I think?
There was a problem hiding this comment.
Yup, weird how that got there
pkg/source/sqs/sqs_source_test.go
Outdated
|
|
||
| assert.Nil(source) | ||
| assert.NotNil(err) | ||
| assert.NotNil(source) |
There was a problem hiding this comment.
This is still the opposite behaviour to what we want: #183 (comment)
If the source can't be reached then newSQSSourceWithInterfaces should throw an error.
There was a problem hiding this comment.
I added a call to GetQueueUrl inside newSQSSourceWithInterfaces which should fail if there is no connection to SQS.
pkg/source/sqs/sqs_source_test.go
Outdated
| } | ||
|
|
||
| // TestNewSqsSource_ConnectionCheck tests that the SQS source fails on start-up if the connection to SQS fails | ||
| func TestNewSqsSource_ConnectionCheck(t *testing.T) { |
There was a problem hiding this comment.
This doesn't test a connection check with the SQS queue. It tests if an AWS client session can be created.
We can have a valid AWS session and without having a valid connection to the SQS queue, but not vice-versa.
There was a problem hiding this comment.
Indeed, this tests the connection to AWS. I am now testing the case where a connection to SQS cannot be done in TestNewSQSSourceWithInterfaces_Failure
| ) | ||
|
|
||
| // TestNewKinesisTarget_ConnectionCheck tests that the Kinesis target fails on start-up if the connection to Kinesis fails | ||
| func TestNewKinesisTarget_ConnectionCheck(t *testing.T) { |
There was a problem hiding this comment.
Same as my coment for the sqs source - there's no check for a connection to a stream, this failure only looks at the AWS session.
I don't see anywhere in the kinesis target codebase where we're doing a connection check at all
There was a problem hiding this comment.
I just added a DescribeStream call to the Kinesis target too. The case where the connection to Kinesis could not be done is now tested in TestKinesisTarget_WriteFailure
There was a problem hiding this comment.
Are you sure you've pushed all your changes? I can't see any TestKinesisTarget_WriteFailure test. I can see you've renamed it and replaced it with TestNewKinesisTarget_ConnectionCheck and TestKinesisTarget_KinesisConnectionFailure, neither of which appear to test this feature.
pkg/target/sqs_test.go
Outdated
| ) | ||
|
|
||
| // TestNewSqsTarget_ConnectionCheck tests that the SQS target fails on start-up if the connection to SQS fails | ||
| func TestNewSqsTarget_ConnectionCheck(t *testing.T) { |
There was a problem hiding this comment.
Same comment as for the sqs source and the kinesis target.
There was a problem hiding this comment.
Same as in the SQS source, I added a quick call to GetQueueUrl in newSQSTargetWithInterfaces so we test the connection to SQS instead of AWS.
e12d862 to
a1f30ad
Compare
420d2ca to
d1aa932
Compare
|
|
||
| err = source.Read(nil) | ||
| assert.NotNil(err) | ||
| if err != nil { |
There was a problem hiding this comment.
This test will pass when we don't throw any error. It needs to assert against the error existing first.
| assert := assert.New(t) | ||
|
|
||
| client := testutil.GetAWSLocalstackKinesisClient() | ||
| client := kinesis.New(session.Must(session.NewSession(&aws.Config{ |
There was a problem hiding this comment.
I don't understand why you're manually creating another new kinesis client here?
colmsnowplow
left a comment
There was a problem hiding this comment.
Please stop force pushing before requested changes have been reviewed. It just creates needless work and makes reviews of your PRs take longer than they need to.
It's easier for everyone to squash once at the end, when the entire review is done.
|
I'm gonna reopen this and move it to draft - there's been a lot of messing around in between but I still plan on picking this up from where we left off and pulling it into an upcoming release! |
d832b17 to
642c6cd
Compare
dd7c31c to
e4c848a
Compare
Added connection checks to: