-
Notifications
You must be signed in to change notification settings - Fork 14.7k
KAFKA-7503: Connect integration test harness #5516
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
133161f to
334d430
Compare
f0da1e9 to
5ff2f8a
Compare
|
@rhauch @kkonstantine @ewencp any thoughts on this PR? thanks! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is really great, @wicknicks! A few questions inline below.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IIUC, we don't allow the tests to be run in parallel, because the ports are pre-defined. Is that right? If we can run in parallel, then should this be non-final and initialized in setup()?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why use strings here rather than the configuration constants? Do we think not using the constants will help with verifying backward compatibility regressions?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will we always want to test with only 1 embedded broker? Should this be a default that we use most of the time?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, this should be a default. and should be overridden by the constructor parameters passed to this.
would we need multiple brokers for integration tests? I did this thinking that would be more of "system tests" territory.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If I use the Avro converter, then these will be added even though they make no sense. Perhaps we should simply check the value of the key.converter and value.converter, and only set these if we're using the corresponding JSON converter.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good point. should we use StringConverter as defaults (if user has not passed them in workerProps)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I forget -- does Connect print the URL? If not, then it'd be nice to have a DEBUG (or INFO?) statement here reminding the developer what the URL is. WDYT?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The rest server prints out this URL here:
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We don't want to go overboard, but since we're going to be using this in lots of downstream tests it'd be worth thinking about the public API. Have you thought about a builder API to make it easier to choose between different options rather than find a constructor that fits? Doing that would make it easier to add other options in the future, such as using multiple brokers and/or workers (e.g., for stopping some of them during tests), more easily setting individual properties, etc., without having to create a boatload of constructors or use setter methods that shouldn't be called after the workers are started.
EmbeddedConnectCluster.builder()
.name("MyCluster")
.withAvroConverters(schemaRegistryUrl)
.withProducerProp("batch.size", 10)
.withWorkers(3)
.withBrokers(3)
.build()
.start();
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Or just accept a Map<String, String> props?
That's pretty symmetric to what EmbeddedKafkaCluster uses with Properties. We just use a map instead of Properties in Connect.
...ect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnectCluster.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there any way to subclass DistributedWorker to avoid duplicating the code and risk not keeping this class in sync?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about writing a simple test that checks the behavior when a request to create a connector is performed with an invalid connector? Seems like it might be hard to discern the exact failure from a simple IOException.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I really anticipate using this! Thanks @wicknicks !
Left a round of comments.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think CountDownLatch would be preferable so that we don't call sleep in any place in the code.
Feel free to declare more than one if you have more than one waiting conditions.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it's not too straightforward putting a Latch here since those can't be reset, and we would have to recreate those for every test.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
And why not allocating a new one in every test?
I really believe that sleep eventually leads to flaky tests and also introduces unneeded, constant delays, that eventually add up and make tests unbearable to sit and wait for to finish.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hmm, why not MonitorableSinkTask? It's a common convention.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ask above too. MonitorableSinkTask?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about: return "unknown";?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's a common default in connectors
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Or just accept a Map<String, String> props?
That's pretty symmetric to what EmbeddedKafkaCluster uses with Properties. We just use a map instead of Properties in Connect.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The numbers alone are hardly self explaining when looking quickly. Especially 18
See also comment about using a CountDownLatch and avoid constants and sleep
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you can time wait on the latch if you want to log progress
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think N in consumeNRecords is redundant. The fact that it accepts an argument (that ideally has a name vs a value) should be enough. consume?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe it's not obvious that inside the parenthesis you'll have key and value.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not obvious what this condition symbolizes with just the value 7. Use a variable name?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you prefer the simple class name here? Not sure.
2a8763c to
7fd609a
Compare
|
@rhauch @kkonstantine this is ready for a second pass. thanks! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for addressing reviews @wicknicks
One nit and one question still from me.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Everywhere I remember we use _MS suffix for time unit. You might not want to introduce yet another way (in case you grep or search some time) CONSUME_MAX_DURATION_MS?
603cea3 to
1484b57
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should this be final since it's already global? Are you planning to reassign it? (probably not given its name)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for addressing my latest comments @wicknicks
Took a fresh look and added a few more which I hope you'll find useful. I think we are close.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just be ready for odd number of NUM_RECORDS_PRODUCED this not to do what you'd think it'd do (will add one more record I guess)
I wonder why it's important to have different prefix (hello- and world-) vs a common one such as hello-world-. I see this is the value not the key.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is 10 here? Retries? Let's give it a name.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The message is true here. But also, not clear, if you don't look at the implementation of this class. I feel something like "Task has not been started" might make more sense and not expose impl details.
nit: (there's no no-upper-case policy from what I know in our messages :) )
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If I'm not mistaken you know how many records you expect.
Should we print that to the message ("Fewer than X records were consumed by the task in Y ms") ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here's an idea. How about locking the whole map and wait on the map itself (timed wait)? Once a thread adds a task to the map it should notify this map. Then you can check on the size of the map and determine when to make progress.
Given that, you might want to use Collection.synchronizedMap even it doesn't expose it's internal mutex so you can use it. Still, it's clear you are not depending on the lock free concurrent map version but you lock the whole map. No big deal for this use case. It's not critical contention wise.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Comments on the ConnectIntegrationTest apply here too. Please make sure the approach in both classes is the same.
|
@kkonstantine thanks so much for your multiple passes. I believe I've addressed all your comments (tried something a bit with the latches though). Please take a look at the changes, and let me know if they work out. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it looks great and is a significant step forward in Connect's testing capabilities.
Thanks for addressing all the comments in detail.
LGTM by me.
|
Thanks so much, @kkonstantine. Highly appreciate the review comments. @rhauch I believe I have addressed all your comments as well. would you mind taking another pass at this? |
|
edit: ah, looks like the test jars to pushed to maven central as well. please ignore this comment. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fantastic, @wicknicks! I have a few suggestions below, but all are pretty minor.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It might be even more valuable to break this method into multiple protected methods and to have more of the components created in this method be members. However, that's probably work for another ticket.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should have JavaDoc that explains the purpose and scope of each of these IntegrationTest classes. For example, what is the scope of ConnectIntegrationTest and DeadLetterQueueIntegrationTest? When should we create additional classes versus adding methods to one of these two? If each had a brief summary of the intended scope, it becomes easier to know whether to add a method or add a new integration test class.
Or, perhaps instead this class should be renamed to more precisely define the scope.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The added javadoc helps, but I'd also just rename this. It's so generic that it can easily become a dumping ground.
More generally re: integration tests, striking the right balance can be tough -- this name is probably to broad and open ended, the dead letter queue test is probably too narrow. This integration test is validating basic happy path behavior, so I'd indicate that in the name somehow.
...ect/runtime/src/test/java/org/apache/kafka/connect/integration/MonitorableSinkConnector.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What do these comments mean?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this be taskId to be more consistent with the parameters of the methods below?
connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedKafkaCluster.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about one for POST?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we need one right now? most of the connect API uses GET or PUT. POST is only for restarting connectors or tasks.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Might be nice to have an updateConnector method, since I'd probably want to have tests that check the behavior (at least within AK) to verify that connectors can be updated and they (re)start correctly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the REST endpoint for updating the connector is the same as the one for creating. we simply reuse the name. That's why I didn't add a new method for updateConnector.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, what if I want to test updating a connector configuration to see if my connector responds correctly? Seems valuable enough, and if it's not in the integration test framework connector projects simply can't do it. (They could implement their own, but why not have it here?)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i think the follow up here given the operation is the same could be to just rename this as configureConnector or something like that, such that it is not specific to whether it's the first time you are doing it (which also creates the connector) or later (which just updates)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just one worker? Why not allow multiple workers? We absolutely need integration tests that verify that the followers properly forward requests to the leader. Yes, you'll have to manage port numbers and maybe have a method that allows the caller to use a particular worker (the bulk of the methods in this class can just use the current worker).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The workers will conflict over the REST port. Right now, until we can make the REST port configurable, we will need to limit this to one worker per concurrent test.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Okay, that's a good start.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not sure i understand the comment re: REST port. that is configurable already. seems like if there is a constraint re: ports it would be whether we can go back and extract the bound port since for tests you want to configure it on port 0 to avoid conflicts but then need to look up actual bound port.
we should not commit these integration tests without fixing that. not using dynamic ports has historically been a huge source of flaky tests. (i'm particularly sensitive about this issue since i spent a bunch of time fixing https://issues.apache.org/jira/browse/KAFKA-1501 way back in the day :) )
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sorry, I meant to say that Connect simply uses the port number given to it. Unlike EmbeddedKafkaCluster, where setting a port value of 0 will make it select any free port available on the system. I thought making that change would need a KIP. Let me know if this is not the case.
P.S: I couldn't open the link. The website is probably down atm.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not sure i understand the issue, why wouldn't the tests just override the port setting to 0 and then lookup the bound port, just like the EmbeddedKafkaCluster? Kafka also normally has a default port of 9092 in its config, but the tests override to 0, and EmbeddedKafkaCluster calls that "default" because for tests it is the only way to make it reliable and not make tests take many hours to run. This will, however, likely requiring adding a bit of code in the Jetty/Jersey REST API code to extract the bound port.
We basically can't merge this without fixing this since it breaks the common developer path of ./gradlew integrationTest (or anything that covers all tests), which is documented as a normal workflow in README.md.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Woah, I almost missed this important call to remove(taskId) since it's within a log message. Not sure this is a good idea.
This won't work if the # of tasks is > 1, since the first task to be stopped will remove the Handle for the connector.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wouldn't taskId be different for each task?
1693f7d to
6da1e0a
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@wicknicks, really nice work. This will be so valuable, and I can't wait to use it already!
|
@ewencp any thoughts on this PR? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The added javadoc helps, but I'd also just rename this. It's so generic that it can easily become a dumping ground.
More generally re: integration tests, striking the right balance can be tough -- this name is probably to broad and open ended, the dead letter queue test is probably too narrow. This integration test is validating basic happy path behavior, so I'd indicate that in the name somehow.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like this as a baseline integration test.
We should consider what set of happy path dimensions we want to validate here and if test parameterization or extension here would help with testing coverage.
In particular, I think there's a number of implicit decisions here: which converter is being tested, there are no transformations, that the default integration test uses distributed mode (and therefore specific offsetwriter, config source, etc), we're using connector overridden converter settings, which data types are used in the records, etc. There's always going to be a fair bit that is explicit, but some of this may be worth deciding whether to specifically test here, and to get coverage in a smaller number of tests.
In particular, things like covering many/all data types and including testing of transformation functionality are things I think we should probably try to include in a single test. Our experience elsewhere with these heavyweight "integration" tests which actually also pull up external services rather than mocking them is that they get very expensive, very fast, and that's part of why I avoided adding them for Connect initially. I think if we're doing these expensive "local system test" version of integration testing, we should try to write fewer more comprehensive tests.
As another example, this test could also validate various REST API functionality once the connector is submitted. This covers a bit of it with the create/delete, but we could also validate, e.g., the status APIs.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@ewencp yes, agree that these tests should not be expensive. But when you say they must be comprehensive, does that mean creating multiple tests with different converters and transformation. I don't think that should be the purpose here. We need to ensure that the interfaces of these different components work well together. Plugging in different implementations of converters ensures their correctness, but that can be tested with unit tests. Here we want to make sure that all the components (through their interfaces, work correctly with each other). For example, a test could be extended to add a transformation and/or a dead letter queue to check if all of these pieces work well together (which is what DeadLetterQueueIntegrationTest does, btw). Variations of this test would be running the same test (move data through a connector pipeline) in the absence of the optional components and asserting the correct behavior.
in the light of the above comment, what about calling this test as EndToEndSinkConnectorIntegrationTest, and moving the test from DeadLetterQueueIntegration into this class as a testWithDeadLetterQueue test method?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
By comprehensive, I actually just meant that the integration test should evaluate things more end-to-end, i.e. that the integration of multiple components are working.
I agree that you are frequently testing interfaces, but also lots of frequently unspoken assumptions (or not enforced), e.g. even simple things like if null is a valid value for a particular argument or not, and whether other components calling the method will deliver them.
You can't cover the entire space of combinations, but I called out naming because I think the name says a lot about what you think you're testing. DeadLetterQueueIntegrationTest is named to be quite specifically targeted -- it's testing a single config/policy/mostly one class (DeadLetterQueueReporter).
The suggestion re: individual test methods is interesting as well -- this was also what I was getting at wrt this style of test being quite heavyweight and therefore better to keep broader. If it was an integration test with fast mocks (e.g. even if full Connect worker, mocked out ZK + broker), I'd be more inclined to have tests like ErrorHandlingIntegrationTest with separate tests for each. But given how expensive they are, why not just have a single test method in an ErrorHandlingIntegrationTest validate all the error handling cases end-to-end?
I guess high level what I'm getting at is that there's a tendency to write integration and system tests that validate individual features that are frequently effectively a single class + a couple of surrounding ones, but pull in a ton of other test surface area. I think getting the balance is especially hard with integration tests, but with system tests I've noticed some similar tests as well. But if you go that route, you end up with basically as many integration tests as you have unit tests, which shouldn't really be the case (the thing the integration test is testing should be larger than the unit test)?
wrt "testing the same thing", I think an end-to-end source connector and sink connector test where we swap out components in a parameterization that reasonably covers the space is a good set of integration tests (since it validates the source and sink data pipelines, integrated together, under at least a few different variants, e.g. different types of converter, transforms, etc). Some of those will ultimately land with the individual connectors as well (e.g. is the connector & upstream converters/transforms actually obeying the assumptions of interfaces that may not be explicit yet).
Another way to view this is that the javadoc should say what is under test and maybe even why we need this in addition to unit tests; currently this test's javadoc just says what it will do.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, that makes sense. By interfaces I meant the glue code which sits between modules not necessarily just the "Java interfaces". I've made the changes based on this discussion. Ideally we could have removed the ConnectIntegrationTest class, but it's now renamed to ExampleConnectIntegrationTest with the javadoc specifying that this class is to be simply used to show how to use the integration test framework. In future, when we make enhancements, we can update this test accordingly. But otherwise we should not be adding any tests here. The DeadLetterQueueTest has been moved to ErrorHandlingTest and occurs all the error handling policies. Also, this is perfect since all the integration points of those policies are independent of each other, and we can test them in one test.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
re: naming and how broad/narrow tests are, it might make sense to have a single test run separate connectors for each of the error handling strategies and validate them. i realize it makes the test a bit more complex and potentially a bit harder to debug, but I think for the cost of this style of integration tests, that's the right scope for a single test. (this is something I increasingly think we need to consider when writing system tests as well...)
...ect/runtime/src/test/java/org/apache/kafka/connect/integration/MonitorableSinkConnector.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not sure i understand the comment re: REST port. that is configurable already. seems like if there is a constraint re: ports it would be whether we can go back and extract the bound port since for tests you want to configure it on port 0 to avoid conflicts but then need to look up actual bound port.
we should not commit these integration tests without fixing that. not using dynamic ports has historically been a huge source of flaky tests. (i'm particularly sensitive about this issue since i spent a bunch of time fixing https://issues.apache.org/jira/browse/KAFKA-1501 way back in the day :) )
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i think the follow up here given the operation is the same could be to just rename this as configureConnector or something like that, such that it is not specific to whether it's the first time you are doing it (which also creates the connector) or later (which just updates)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we should use all available broker addresses here. this would be important in any test cases that might cover failure of brokers.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we should just throw an exception if there aren't enough brokers. i think the behavior if we do this will just be confusing (something we've seen in practice too with the consumer offsets topic when there aren't enough active brokers and it gets created with too low of a replication factor).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why this breakdown of the max duration? it doesn't look like there is anything we do periodically (e.g. logging) that requires breaking this down
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I suppose this was just my way of making sure we wait for at most maxDuration and consume n records. I think there are some instances where we wait for maxDuration, and if consumer returns sooner with fewer records, the logic will recalculate how much time is left, and then attempt again. having said that, looks like my logic would break if n records are spread over more than num intervals.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is this used, and if so, how? doesn't seem useful to be creating consumers that don't subscribe to anything
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is used right below, line 312:
public KafkaConsumer<byte[], byte[]> createConsumerAndSubscribeTo(String... topics) {
KafkaConsumer<byte[], byte[]> consumer = createConsumer();
consumer.subscribe(Arrays.asList(topics));
....
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
right, my point was usage outside of this class. We have multiple public createConsumer* methods here, but the only one that's actually useful externally is createConsumerAndSubscribe. Being able to also override consumer props seems potentially useful, but that should just be two methods -- one that takes only topics and another that takes topics and consumer props overrides.
f34485e to
3c3cf38
Compare
Signed-off-by: Arjun Satish <[email protected]>
Signed-off-by: Arjun Satish <[email protected]>
Signed-off-by: Arjun Satish <[email protected]>
Signed-off-by: Arjun Satish <[email protected]>
Signed-off-by: Arjun Satish <[email protected]>
Signed-off-by: Arjun Satish <[email protected]>
Signed-off-by: Arjun Satish <[email protected]>
This is useful when the exact expectations cannot be set for tasks (for the purposes of integration tests, tasks can be a bit unreliable since a rebalance can delay when a task comes up and gets ready to consume records). Signed-off-by: Arjun Satish <[email protected]>
Signed-off-by: Arjun Satish <[email protected]>
Signed-off-by: Arjun Satish <[email protected]>
Signed-off-by: Arjun Satish <[email protected]>
Signed-off-by: Arjun Satish <[email protected]>
Signed-off-by: Arjun Satish <[email protected]>
Signed-off-by: Arjun Satish <[email protected]>
This is necessary to keep the advertisedUrl consistent with the host:port bound to by the SocketServer when a REST_PORT_CONFIG of 0 is passed into RestServer from integration tests. Signed-off-by: Arjun Satish <[email protected]>
Signed-off-by: Arjun Satish <[email protected]>
Signed-off-by: Arjun Satish <[email protected]>
Signed-off-by: Arjun Satish <[email protected]>
Signed-off-by: Arjun Satish <[email protected]>
… doesn't count down Signed-off-by: Arjun Satish <[email protected]>
Signed-off-by: Arjun Satish <[email protected]>
fc549dc to
027e68e
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, merging to trunk and as far back as it'll cherry-pick cleanly and pass tests since this is just adding tests that would be useful on any branch. Thanks for all the iterations on this!
|
Thanks for your time and patience on this, @ewencp! Much appreciated! |
Expose a programmatic way to bring up a Kafka and Zk cluster through Java API to facilitate integration tests for framework level changes in Kafka Connect. The Kafka classes would be similar to KafkaEmbedded in streams. The new classes would reuse the kafka.server.KafkaServer classes from :core, and provide a simple interface to bring up brokers in integration tests. Signed-off-by: Arjun Satish <arjunconfluent.io> Author: Arjun Satish <[email protected]> Author: Arjun Satish <[email protected]> Reviewers: Randall Hauch <[email protected]>, Konstantine Karantasis <[email protected]>, Ewen Cheslack-Postava <[email protected]> Closes #5516 from wicknicks/connect-integration-test (cherry picked from commit 69d8d2e) Signed-off-by: Ewen Cheslack-Postava <[email protected]>
Expose a programmatic way to bring up a Kafka and Zk cluster through Java API to facilitate integration tests for framework level changes in Kafka Connect. The Kafka classes would be similar to KafkaEmbedded in streams. The new classes would reuse the kafka.server.KafkaServer classes from :core, and provide a simple interface to bring up brokers in integration tests. Signed-off-by: Arjun Satish <arjunconfluent.io> Author: Arjun Satish <[email protected]> Author: Arjun Satish <[email protected]> Reviewers: Randall Hauch <[email protected]>, Konstantine Karantasis <[email protected]>, Ewen Cheslack-Postava <[email protected]> Closes #5516 from wicknicks/connect-integration-test (cherry picked from commit 69d8d2e) Signed-off-by: Ewen Cheslack-Postava <[email protected]>
Expose a programmatic way to bring up a Kafka and Zk cluster through Java API to facilitate integration tests for framework level changes in Kafka Connect. The Kafka classes would be similar to KafkaEmbedded in streams. The new classes would reuse the kafka.server.KafkaServer classes from :core, and provide a simple interface to bring up brokers in integration tests. Signed-off-by: Arjun Satish <arjunconfluent.io> Author: Arjun Satish <[email protected]> Author: Arjun Satish <[email protected]> Reviewers: Randall Hauch <[email protected]>, Konstantine Karantasis <[email protected]>, Ewen Cheslack-Postava <[email protected]> Closes apache#5516 from wicknicks/connect-integration-test
Expose a programmatic way to bring up a Kafka and Zk cluster through Java API to facilitate integration tests for framework level changes in Kafka Connect. The Kafka classes would be similar to KafkaEmbedded in streams. The new classes would reuse the kafka.server.KafkaServer classes from :core, and provide a simple interface to bring up brokers in integration tests.
Signed-off-by: Arjun Satish [email protected]
Committer Checklist (excluded from commit message)