Restrict input stream modification in existing seekable stream supervisors#17955
Conversation
| } | ||
|
|
||
| @Override | ||
| public void validateProposedSpecEvolution(SupervisorSpec that) throws IllegalArgumentException |
There was a problem hiding this comment.
I probably want to add a javadoc here now that I notice I didn't
kfaraz
left a comment
There was a problem hiding this comment.
This is a useful validation, @capistrant !
I have left some suggestions.
| * @param that the proposed supervisor spec | ||
| * @throws IllegalArgumentException if the evolution is not allowed | ||
| */ | ||
| void validateProposedSpecEvolution(SupervisorSpec that) throws IllegalArgumentException; |
There was a problem hiding this comment.
Maybe use a simpler method name like validateSpecUpdateTo, and the arg can be called proposedSpec.
We should throw a DruidException (of type not supported or invalid input) instead.
Also, please add a default impl to avoid having to override in all impls where this is not needed right now.
There was a problem hiding this comment.
Thanks for that default tip, cleaned up a ton :) also fixed the Exception type to use DruidException and made an attempt at rename
| SupervisorSpec existingSpec = getSpec(spec.getId()); | ||
| if (existingSpec != null) { | ||
| existingSpec.validateProposedSpecEvolution(spec); | ||
| } |
There was a problem hiding this comment.
This logic should probably live in shouldUpdateSupervisor method itself.
There was a problem hiding this comment.
Made the move of this logic
| ::: | ||
|
|
||
| :::info | ||
| Migrating an existing supervisor to use `topicPattern` instead of `topic` is not supported. It is also not supported to change the `topicPattern` of an existing supervisor to a different regex pattern. |
There was a problem hiding this comment.
Thanks for adding this.
Maybe you can put it in a list format for better readability like
The following updates to a supervisor spec are not supported:
- item 1
- item 2
- item 3
Follow these steps instead:
- Step 1
- Step 2
There was a problem hiding this comment.
Updated with list
| @Override | ||
| public void validateProposedSpecEvolution(SupervisorSpec that) throws IllegalArgumentException | ||
| { | ||
| // No validation logic for compaction spec as of now |
There was a problem hiding this comment.
Seems like this should have been the default impl in the interface itself.
| // Future enhancements could allow for topicPattern to be changed in a way where the new source is additive to the | ||
| // old source. If we did that now, there would be metadata issues due to {@link KafkaDataSourceMetadata} | ||
| // implementation details that aren't set up to handle evolution of metadata in this way. | ||
| if (!this.getSource().equals(other.getSource())) { |
There was a problem hiding this comment.
Should this impl be present for all SeekableStreamSupervisorSpecs and not just Kafka?
There was a problem hiding this comment.
Good question. I was not certain on if we wanted to modify behavior of other streaming ingestion types with this PR, but I can certainly look into it if you think it would make sense. I guess for each existing implementation (kinesis, etc.) we'd have to see if this source change is currently supported and if yes, add an override to their spec class to allow it
There was a problem hiding this comment.
I looked into it and it turns out rabbit and kinesis also share the metadata management logic that wouldn't support updating streams so I made the update for all. I therefore put this in SeekableStreamSupervisorSpec. I still ended up having to have an override in Kafka cuz the multi topic single topic information created an edge case of moving between single/multi with the same exact string for topic/topicPattern. I'm not sure that my kafka override is the proper style. I do the edge case logic and then call into super implementation for the core logic. I thought that would save duplicate code, but wasn't sure if it is necessarily the right way from a java perspective
There was a problem hiding this comment.
Thanks! I have left an alternative suggestion for this.
| } | ||
| KafkaSupervisorSpec other = (KafkaSupervisorSpec) that; | ||
| if (this.getSource() == null || other.getSource() == null) { | ||
| // I don't think this is possible, but covering just in case. |
There was a problem hiding this comment.
| // I don't think this is possible, but covering just in case. | |
| // Not likely to happen, but covering just in case. |
| "Your proposed KafkaSupervisorSpec evolution is invalid. You are attempting to change the topic/topicPattern " | ||
| + "from " + this.getSource() + " to " + other.getSource() + ". This is not supported. If you " | ||
| + "want to change the topic or topicPattern for a supervisor, you must first terminate the supervisor. " | ||
| + "Then create a new one in suspended state with the new topic or topicPattern. Lastly, you will have to " | ||
| + "reset the supervisor offsets. Finally, you can resume the new supervisor. Note that doing this reset can " | ||
| + "cause duplicate events or lost events if any topics who were in the previous supervisor remain in the new " | ||
| + "one."); |
There was a problem hiding this comment.
Suggestion for more concise language and formatted message.
| "Your proposed KafkaSupervisorSpec evolution is invalid. You are attempting to change the topic/topicPattern " | |
| + "from " + this.getSource() + " to " + other.getSource() + ". This is not supported. If you " | |
| + "want to change the topic or topicPattern for a supervisor, you must first terminate the supervisor. " | |
| + "Then create a new one in suspended state with the new topic or topicPattern. Lastly, you will have to " | |
| + "reset the supervisor offsets. Finally, you can resume the new supervisor. Note that doing this reset can " | |
| + "cause duplicate events or lost events if any topics who were in the previous supervisor remain in the new " | |
| + "one."); | |
| "Update of topic/topicPattern from [%s] to [%s] is not supported for a running Kafka supervisor." | |
| + "%nTo perform the update safely, follow these steps:" | |
| + "%n(1) Suspend this supervisor, reset its offsets and then terminate it. " | |
| + "%n(2) Create a new supervisor with the new topic or topicPattern." | |
| + "%nNote that doing the reset can cause data duplication or loss if any topic used in the old supervisor is included in the new one too."); |
I have updated the steps, please fix it up if it doesn't seem correct.
There was a problem hiding this comment.
Thanks, yes your step logic works and is more clear than mine!
|
@kfaraz Thanks for the review! All of the comments make sense to me, I will start working through them soon |
…e for Kafka Since all existing implementations of the SeekableStreamSupervisor do not support stream modification for existing supervisors, it makes sense to centralize a default implementation of spec update validation. Kafka requries some override because the idea of single and multi topic ingest within kafka is unique to kafka
kfaraz
left a comment
There was a problem hiding this comment.
Thanks for incorporating the changes, @capistrant . Left some final suggestions.
We should be good to merge after these.
| /** | ||
| * Checks if a spec can be replaced with a proposed spec (proposesSpec). | ||
| * <p> | ||
| * By default, this method does no validation checks. Implementations of this method can choose to define rules |
There was a problem hiding this comment.
| * By default, this method does no validation checks. Implementations of this method can choose to define rules | |
| * By default, this method does no validation checks. Implementations of this method can choose to define rules |
| * </p> | ||
| * | ||
| * @param proposedSpec the proposed supervisor spec | ||
| * @throws IllegalArgumentException if the spec update is not allowed |
There was a problem hiding this comment.
I think the code throws DruidExceptions of type invalid input now.
| } | ||
|
|
||
| @Test | ||
| public void testValidateSpecUpdateToShortCircuits() |
There was a problem hiding this comment.
Suggestion:
My latest personal preference on test naming convention is something like this:
test_validateSpecUpdateTo_shortCircuits_whenSomethingHappens()
This is only slightly different from the convention you have already followed here,
in that it uses underscores. The underscores help in clearly spelling out which method
is being tested and under what conditions.
That said, you may stick to the current convention if you find it more appealing since
there is no established Druid naming convention yet as long as the test names make sense.
There was a problem hiding this comment.
I like this, it is easier on the eyes IMO. 👍
| return autoScalerConfig; | ||
| } | ||
|
|
||
| private static class OtherSupervisorSpec implements SupervisorSpec |
There was a problem hiding this comment.
There is already a TestSupervisorSpec class that you may be able to use.
| { | ||
| if (!(proposedSpec instanceof KafkaSupervisorSpec)) { | ||
| throw InvalidInput.exception( | ||
| StringUtils.format("Cannot evolve to [%s] from [%s]", getClass().getName(), proposedSpec.getClass().getName()) |
There was a problem hiding this comment.
| StringUtils.format("Cannot evolve to [%s] from [%s]", getClass().getName(), proposedSpec.getClass().getName()) | |
| "Cannot change spec from type[%s] to type[%s]", getClass().getName(), proposedSpec.getClass().getName() |
| ); | ||
|
|
||
| OtherSupervisorSpec otherSpec = new OtherSupervisorSpec(); | ||
| assertThrows( |
There was a problem hiding this comment.
Maybe use DruidExceptionMatcher to verify error message too.
| } else { | ||
| if (!Arrays.equals(specAsBytes, jsonMapper.writeValueAsBytes(currentSupervisor.rhs))) { | ||
| // The spec bytes are different, so we need to check if the replacement is allowed | ||
| currentSupervisor.rhs.validateSpecUpdateTo(spec); | ||
| return true; | ||
| } else { | ||
| return false; | ||
| } | ||
| } |
There was a problem hiding this comment.
Cleaner with the nesting reduced and positive condition first:
| } else { | |
| if (!Arrays.equals(specAsBytes, jsonMapper.writeValueAsBytes(currentSupervisor.rhs))) { | |
| // The spec bytes are different, so we need to check if the replacement is allowed | |
| currentSupervisor.rhs.validateSpecUpdateTo(spec); | |
| return true; | |
| } else { | |
| return false; | |
| } | |
| } | |
| } else if (Arrays.equals(specAsBytes, jsonMapper.writeValueAsBytes(currentSupervisor.rhs))) { | |
| return false; | |
| } else { | |
| // The spec bytes are different, so we need to check if the update is allowed | |
| currentSupervisor.rhs.validateSpecUpdateTo(spec); | |
| return true; | |
| } |
| * @param proposedSource The proposed input source stream | ||
| * @return A formatted error message | ||
| */ | ||
| protected String getIllegalInputSourceUpdateErrorMessage(String existingSource, String proposedSource) |
There was a problem hiding this comment.
This method shouldn't be a part of the contract of this interface.
I would suggest just making the string a constant and formatting it when needed.
| } | ||
|
|
||
| @Override | ||
| protected String getIllegalInputSourceUpdateErrorMessage(String existingSource, String proposedSource) |
There was a problem hiding this comment.
We shouldn't need to override this method.
We can either use the constant defined in SeekableStreamSupervisorSpec as is or a different String altogether.
A cleaner way to include the type name in that String would be to have a placeholder for type and fill it with getType() while formatting.
| ); | ||
|
|
||
| // Changing topic name is not allowed | ||
| String invalidDestSpecTwoJson = "{\n" |
There was a problem hiding this comment.
Do we need to write the specs as json String or can we use objects and then just validate them, same as the other tests?
If using the json form is essential to the test, you can consider using the utility TestUtils.singleQuoteToStandardJson() to make the json easier to read.
|
@capistrant , does this PR need the |
|
Yeah even I feel it should not require a design review. |
Description
Modify the supervisor API to add restrictions on modifications to existing Kafka Supervisor Specs. Prevent the changing of the "stream" for an existing spec. This effectively means, that you cannot submit a spec update that makes a change to
topic, a migration betweentopicandtopicPatternor a change to atopicPattern. The reasoning for this is that the system is not designed to gracefully handle such migrations. In the best case, tasks will fail. And in the worst case, tasks will succeed but the metadata will not be being persisted correctly, leading to eventual data integrity issues.SupervisorSpec Interface Modification
The core of the change lies here with a new interface method:
void validateProposedSpecEvolution(SupervisorSpec that) throws IllegalArgumentException;This method is intended to determine if a proposed evolution of the existing spec to
thatproposed spec is allowed. The way it is spec'd out in this PR that an illegal proposed evolution results in the throwing of anIllegalArgumentException.The only implementation that actually has logic is KafkaSupervisorSpec which prevents the changing of the topic/topicPattern for the existing supervisor. All other spec evolution is allowed.
Alternatives
Support changes in topic/topicPattern
An alternative approach to this would be modifying the system to properly handle change in the topic/topicPattern. I think that is still a good long term plan. But it would require defining exactly what evolution is allowed to occur. You could technically allow any kind of change in the topic/topicPattern, but it is debatable if you want to. For instance, allowing users unbounded ability to change the topic/topicPattern could lead to mistakes that behave how designed, but result in data issues for the user because they did something they didn't intend to, like remove a topic from their supervisor when they actually only meant to add a topic to the set of topics supplying data for the supervisor.
Allow the spec change but prevent the start of tasks if the underlying topic set doesn't match metadata store
Another approach I considered but did not pursue, so I don't know the true viability. would be to accept the spec submission, but not start up new tasks if the topic set in metadata didn't match what the new supervisor was actually seeing from Kafka. I think this would have allowed the change to stay confined to the kafka extension, with the tradeoff being that the feedback to the user wasn't as immediate as my implementation.
Other thoughts
Perhaps, an ideal world would be to identify a way to achieve this immediate negative feedback to the user, while still not modifying code outside of the kafka extension. I am open to hearing these ideas, so I labeled with design review.
Release note
Explicitly prevent Seekable Stream Supervisors (Kafka, Kinesis, and Rabbit) from updating the underlying "input stream" (i.e. topic for kafka) that is persisted for it. This action, while previously allowed by the API, is not fully supported by the underlying system. Going forward, a request to make such a change will result in a
400error from the Supervisor API with details on the reason why it is not allowed. The docs and the message in the response describe a work-a-round for users who are adamant that they want to make such a change.Key changed/added classes in this PR
SupervisorSpecKafkaSupervisorSpecThis PR has: