-
Notifications
You must be signed in to change notification settings - Fork 167
Set unique flow id & timestamp to each flow #343
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Set unique flow id & timestamp to each flow #343
Conversation
|
@jem-davies @gregfurman |
|
@sananguliyev Hey! Thanks for the contribution and apologies for the delay. Will take a look this eve 😄 |
gregfurman
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some questions relating to the metadata approach. Think better internal tracing is a brilliant idea so I'm keen to hear your thoughts on my comments 😄
gregfurman
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the follow ups on this!
I'm thinking whether we want to ever set the flowID values outside of the traceInput (i.e when a message part is first created), although I suppose there's the case where we want to create child-spans.
bento/internal/bundle/tracing/input.go
Lines 52 to 57 in 852a365
| if t.e.IsEnabled() { | |
| _ = tran.Payload.Iter(func(i int, part *message.Part) error { | |
| _ = atomic.AddUint64(t.ctr, 1) | |
| t.e.Add(EventProduceOf(part)) | |
| return nil | |
| }) |
Let me know!
Why We Need Both
And this also helps to group events not only by section but also by flow. Please, take another look. Thanks in advance. @gregfurman |
gregfurman
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for your patience with this @sananguliyev 🙏 I've given it a final look and it's looking great! -- although I think we should maybe not enable this by default (incase of bugs).
I think a BuildTracedV2 approach would work best and we can change over the BuildTraced once we're confident the new flowID is working as intended.
cc @jem-davies since this is kinda a far reaching change
| "github.com/warpstreamlabs/bento/internal/component" | ||
| "github.com/warpstreamlabs/bento/internal/component/input" | ||
| "github.com/warpstreamlabs/bento/internal/message" | ||
| internaltracing "github.com/warpstreamlabs/bento/internal/tracing" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: why internaltracing alias here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Without the alias, there would be a naming conflict. The current package is already tracing, so importing another package with the same name would be ambiguous. The alias internaltracing makes it clear which tracing package is being referenced when used in the code.
| shutSig *shutdown.Signaller | ||
| } | ||
|
|
||
| func wrapWithFlowID(i input.Streamed) input.Streamed { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Once this FlowID approach is GA, perhaps we unify this wrapWithFlowID approach with the traceInput approach. Perhaps we add a TODO for that. Thoughts?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, sure. Is writing code comments enough, or do you want to somehow document it somewhere?
|
@gregfurman I have added new Please, take another look to finalize this implementation. |
In the current build of the traced stream, tracing events are currently grouped only by section. This setup makes it difficult to link these events together and understand the complete message journey in the stream flow. The new implementation addresses this issue by:
EventsByFlowID, which organizes events by flow and arranges them in chronological order based on timestamp.Additionally: If no tags exist: git describe --tags will fail, but the error message will be suppressed by 2>/dev/null, and then it will fall back to "v0.0.0". Currently, it prints
fatal: No names found, cannot describe anything.when make file tries to generate version.Fixes #344