Skip to content

Add helper methods to support kafka output in beatreceivers#49768

Open
khushijain21 wants to merge 4 commits intoelastic:mainfrom
khushijain21:dynamic-topic-kafka
Open

Add helper methods to support kafka output in beatreceivers#49768
khushijain21 wants to merge 4 commits intoelastic:mainfrom
khushijain21:dynamic-topic-kafka

Conversation

@khushijain21
Copy link
Copy Markdown
Contributor

@khushijain21 khushijain21 commented Mar 30, 2026

Proposed commit message

This PR adds helper methods on fmtstr package to help parse dynamic fields. This is required to support kafka output on beatreceivers

Checklist

  • My code follows the style guidelines of this project
  • I have commented my code, particularly in hard-to-understand areas
  • I have made corresponding changes to the documentation
  • I have made corresponding change to the default configuration files
  • I have added tests that prove my fix is effective or that my feature works. Where relevant, I have used the stresstest.sh script to run them under stress conditions and race detector to verify their stability.
  • I have added an entry in ./changelog/fragments using the changelog tool.

Disruptive User Impact

None

Related issues

@botelastic botelastic bot added the needs_team Indicates that the issue/PR needs a Team:* label label Mar 30, 2026
@github-actions
Copy link
Copy Markdown
Contributor

🤖 GitHub comments

Just comment with:

  • run docs-build : Re-trigger the docs validation. (use unformatted text in the comment!)

@mergify
Copy link
Copy Markdown
Contributor

mergify bot commented Mar 30, 2026

This pull request does not have a backport label.
If this is a bug or security fix, could you label this PR @khushijain21? 🙏.
For such, you'll need to label your PR with:

  • The upcoming major version of the Elastic Stack
  • The upcoming minor version of the Elastic Stack (if you're not pushing a breaking change)

To fixup this pull request, you need to add the backport labels for the needed
branches, such as:

  • backport-8./d is the label to automatically backport to the 8./d branch. /d is the digit
  • backport-active-all is the label that automatically backports to all active branches.
  • backport-active-8 is the label that automatically backports to all active minor branches for the 8 major.
  • backport-active-9 is the label that automatically backports to all active minor branches for the 9 major.

@khushijain21 khushijain21 added the Team:Elastic-Agent-Data-Plane Label for the Agent Data Plane team label Mar 30, 2026
@botelastic botelastic bot removed the needs_team Indicates that the issue/PR needs a Team:* label label Mar 30, 2026
@khushijain21 khushijain21 added skip-changelog backport-8.19 Automated backport to the 8.19 branch labels Mar 30, 2026
@khushijain21 khushijain21 marked this pull request as ready for review March 30, 2026 11:44
@khushijain21 khushijain21 requested a review from a team as a code owner March 30, 2026 11:44
@elasticmachine
Copy link
Copy Markdown
Contributor

Pinging @elastic/elastic-agent-data-plane (Team:Elastic-Agent-Data-Plane)

@khushijain21 khushijain21 requested review from belimawr and rdner March 30, 2026 11:44
@coderabbitai
Copy link
Copy Markdown

coderabbitai bot commented Mar 30, 2026

📝 Walkthrough

Walkthrough

The libbeat format string package exports three previously internal utilities. In formatevents.go, the parseEventPath function is renamed and exported as ParseEventPath. In formatstring.go, the makeLexer function is renamed to MakeLexer and exported, a new VariableToken type is introduced, and a new ParseRawTokens function is added to parse lexer tokens into a slice of literal strings and variable tokens. An internal helper parseVariableToken is added to support this functionality. Tests are added to verify ParseRawTokens handles variable tokens, string boundaries, default lookups, and escaped syntax correctly.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • 🛠️ Update Documentation: Commit on current branch
  • 🛠️ Update Documentation: Create PR

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Copy Markdown

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@libbeat/common/fmtstr/formatstring.go`:
- Around line 340-365: ParseRawTokens can return early and leave the lexer
producer goroutine blocked; ensure we always drain lex.Tokens() on any early
exit by adding a deferred drain. At the top of ParseRawTokens, add a defer that
iterates over lex.Tokens() and discards remaining tokens (for range lex.Tokens()
{ }) unless the function completes normally; mark normal completion (e.g., a
local done flag set to true just before the final return) so the deferred drain
only runs on error/early returns. This ensures any early returns from cases like
tokErr or parseVariableToken failures do not leak the goroutine started by
MakeLexer while keeping normal behavior for the existing token handling
(tokString, tokOpen, tokClose, tokOperator) and existing use of
parseVariableToken and VariableToken.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: 6a04693a-87bf-405a-8074-0a210fd48f5c

📥 Commits

Reviewing files that changed from the base of the PR and between 2d10f57 and f53f390.

📒 Files selected for processing (3)
  • libbeat/common/fmtstr/formatevents.go
  • libbeat/common/fmtstr/formatstring.go
  • libbeat/common/fmtstr/formatstring_test.go

Comment on lines +340 to +365
func ParseRawTokens(lex lexer) ([]any, error) {
var elems []any

for token := range lex.Tokens() {
switch token.typ {
case tokErr:
return nil, errors.New(token.val)

case tokString:
elems = append(elems, token.val)

case tokOpen:
elem, err := parseVariableToken(lex)
if err != nil {
return nil, err
}
elems = append(elems, VariableToken(elem))

case tokClose, tokOperator:
// should not happen, but let's return error just in case
return nil, fmt.Errorf("Token '%v'(%v) not allowed", token.val, token.typ)
}
}

return elems, nil
}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Drain the lexer on all ParseRawTokens exits to prevent goroutine leaks.

At Line 340, ParseRawTokens can return early on malformed input (e.g., nested variable from Line 382) without draining lex. Since MakeLexer starts a producer goroutine, this can leave it blocked if callers forget Finish().

Proposed fix
 func ParseRawTokens(lex lexer) ([]any, error) {
+	defer lex.Finish()
 	var elems []any

 	for token := range lex.Tokens() {
 		switch token.typ {
 		case tokErr:
 			return nil, errors.New(token.val)
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
func ParseRawTokens(lex lexer) ([]any, error) {
var elems []any
for token := range lex.Tokens() {
switch token.typ {
case tokErr:
return nil, errors.New(token.val)
case tokString:
elems = append(elems, token.val)
case tokOpen:
elem, err := parseVariableToken(lex)
if err != nil {
return nil, err
}
elems = append(elems, VariableToken(elem))
case tokClose, tokOperator:
// should not happen, but let's return error just in case
return nil, fmt.Errorf("Token '%v'(%v) not allowed", token.val, token.typ)
}
}
return elems, nil
}
func ParseRawTokens(lex lexer) ([]any, error) {
defer lex.Finish()
var elems []any
for token := range lex.Tokens() {
switch token.typ {
case tokErr:
return nil, errors.New(token.val)
case tokString:
elems = append(elems, token.val)
case tokOpen:
elem, err := parseVariableToken(lex)
if err != nil {
return nil, err
}
elems = append(elems, VariableToken(elem))
case tokClose, tokOperator:
// should not happen, but let's return error just in case
return nil, fmt.Errorf("Token '%v'(%v) not allowed", token.val, token.typ)
}
}
return elems, nil
}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@libbeat/common/fmtstr/formatstring.go` around lines 340 - 365, ParseRawTokens
can return early and leave the lexer producer goroutine blocked; ensure we
always drain lex.Tokens() on any early exit by adding a deferred drain. At the
top of ParseRawTokens, add a defer that iterates over lex.Tokens() and discards
remaining tokens (for range lex.Tokens() { }) unless the function completes
normally; mark normal completion (e.g., a local done flag set to true just
before the final return) so the deferred drain only runs on error/early returns.
This ensures any early returns from cases like tokErr or parseVariableToken
failures do not leak the goroutine started by MakeLexer while keeping normal
behavior for the existing token handling (tokString, tokOpen, tokClose,
tokOperator) and existing use of parseVariableToken and VariableToken.

name: "with escaped % symbol",
input: `\%{abc}`,
expectedList: []any{`%{abc}`},
},
Copy link
Copy Markdown
Member

@mauri870 mauri870 Mar 30, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we are missing some testcases here, for example what should happen with these cases:

  • ""
  • 100% literal
  • %%
  • %%{}
  • %\{}
  • %{}
  • %{a:b:c}
  • %{a:b:?c}
  • %{a

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, maybe a fuzz test is a good idea.

Copy link
Copy Markdown
Contributor

@belimawr belimawr left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall LGMT, but I agree with Mauri, a more exhaustive test for ParseRawTokens would be great.

Comment on lines 271 to +340
for token := range lex.Tokens() {
switch token.typ {
case tokErr:
return nil, errors.New(token.val)

case tokString:
elems = append(elems, StringElement{token.val})

case tokOpen:
elem, err := parseVariable(lex)
if err != nil {
return nil, err
}
elems = append(elems, elem)

case tokClose, tokOperator:
// should not happen, but let's return error just in case
return nil, fmt.Errorf("Token '%v'(%v) not allowed", token.val, token.typ)
}
}

return elems, nil
}

func parseVariable(lex lexer) (formatElement, error) {
var strings []string
var ops []string

for token := range lex.Tokens() {
switch token.typ {
case tokErr:
return nil, errors.New(token.val)

case tokOpen:
return nil, errNestedVar

case tokClose:
if len(strings) == 0 {
return nil, errEmptyFormat
}
return makeVariableElement(strings[0], ops, strings[1:])

case tokString:
if len(strings) != len(ops) {
return nil, fmt.Errorf("Unexpected string token %v, expected operator", token.val)
}
strings = append(strings, token.val)

case tokOperator:
if len(strings) == 0 {
return nil, errUnexpectedOperator
}
ops = append(ops, token.val)
if len(ops) > len(strings) {
return nil, fmt.Errorf("Consecutive operator tokens '%v'", token.val)
}

default:
return nil, fmt.Errorf("Unexpected token '%v' (%v)", token.val, token.typ)
}
}

return nil, errMissingClose
}

func makeLexer(in string) lexer {
type VariableToken string

// ParseRawTokens walks the lexer output and returns a slice in order: plain
// strings for literal segments and VariableToken for each %{...} block.
func ParseRawTokens(lex lexer) ([]any, error) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the exact same as the private parse except with the return type changed? Is there no way to re-use the existing code? Making the function generic on the type?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

backport-8.19 Automated backport to the 8.19 branch skip-changelog Team:Elastic-Agent-Data-Plane Label for the Agent Data Plane team

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants