-
Notifications
You must be signed in to change notification settings - Fork 121
Use journal table v2 as default #3921
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Closed
slinkydeveloper
wants to merge
13
commits into
restatedev:main
from
slinkydeveloper:issues/3184-add-feature-flag
Closed
Changes from all commits
Commits
Show all changes
13 commits
Select commit
Hold shift + click to select a range
8d319fc
Starting point: introduce experimental feature and use it in the serv…
slinkydeveloper 1d5ea2e
Allow to enable the feature flag via env, primarily for testing purpose
slinkydeveloper 4b81531
Invoker storage reader now tries to read first from journal table v2,…
slinkydeveloper c0c0f95
Handle the case where in storage we use journal table v2, but for wha…
slinkydeveloper 99563a4
Add some unit test coverage of journal table v2 feature, especially t…
slinkydeveloper cd0423b
Nit: rename experimental features to just features.
slinkydeveloper 2ff6495
Little change to the workaround for restart as new RPC handler
slinkydeveloper c2541b4
Little thing
slinkydeveloper 60b2632
Excluded few e2e tests when running journal v2 default test configura…
slinkydeveloper b45f03d
Write to journal table v2 by default also in the init_journal function.
slinkydeveloper 4f4528e
Test some corner case with suspension and signals
slinkydeveloper b30bef6
Little rename
slinkydeveloper a127b80
Figured the spell for exclusions
slinkydeveloper File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -73,6 +73,7 @@ pub mod test_util { | |
| 0, | ||
| MillisSinceEpoch::UNIX_EPOCH, | ||
| 0, | ||
| true, | ||
| ), | ||
| futures::stream::empty(), | ||
| ))) | ||
|
|
||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -184,8 +184,13 @@ where | |
|
|
||
| // Execute the replay | ||
| crate::shortcircuit!( | ||
| self.replay_loop(&mut http_stream_tx, &mut decoder_stream, journal_stream) | ||
| .await | ||
| self.replay_loop( | ||
| &mut http_stream_tx, | ||
| &mut decoder_stream, | ||
| journal_stream, | ||
| journal_metadata.length | ||
| ) | ||
| .await | ||
| ); | ||
|
|
||
| // If we have the invoker_rx and the protocol type is bidi stream, | ||
|
|
@@ -305,13 +310,15 @@ where | |
| http_stream_tx: &mut InvokerRequestStreamSender, | ||
| http_stream_rx: &mut S, | ||
| journal_stream: JournalStream, | ||
| expected_entries_count: u32, | ||
| ) -> TerminalLoopState<()> | ||
| where | ||
| JournalStream: Stream<Item = JournalEntry> + Unpin, | ||
| S: Stream<Item = Result<DecoderStreamItem, InvokerError>> + Unpin, | ||
| { | ||
| let mut journal_stream = journal_stream.fuse(); | ||
| let mut got_headers = false; | ||
| let mut sent_entries = 0; | ||
|
|
||
| loop { | ||
| tokio::select! { | ||
|
|
@@ -334,10 +341,11 @@ where | |
| opt_je = journal_stream.next() => { | ||
| match opt_je { | ||
| Some(JournalEntry::JournalV2(entry)) => { | ||
| sent_entries += 1; | ||
| crate::shortcircuit!(self.write_entry(http_stream_tx, entry.inner).await); | ||
|
|
||
| } | ||
| Some(JournalEntry::JournalV1(old_entry)) => { | ||
| sent_entries += 1; | ||
| if let journal::Entry::Input(input_entry) = crate::shortcircuit!(old_entry.deserialize_entry::<ProtobufRawEntryCodec>()) { | ||
| crate::shortcircuit!(self.write_entry( | ||
| http_stream_tx, | ||
|
|
@@ -352,6 +360,14 @@ where | |
| } | ||
| }, | ||
| None => { | ||
| // Let's verify if we sent all the entries we promised, otherwise the stream will hang in a bad way! | ||
| if sent_entries < expected_entries_count { | ||
| return TerminalLoopState::Failed(InvokerError::UnexpectedEntryCount { | ||
| actual: sent_entries, | ||
| expected: expected_entries_count, | ||
| }) | ||
| } | ||
|
Comment on lines
+363
to
+369
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is this a situation that couldn't arise before and is now possible due to the changes of the PR or is this covering a case that could have happened before as well? |
||
|
|
||
| // No need to wait for the headers to continue | ||
| trace!("Finished to replay the journal"); | ||
| return TerminalLoopState::Continue(()) | ||
|
|
||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.