Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions runtime/sam/expr/function/grok.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@ func (g *Grok) Call(_ super.Allocator, args []super.Value) super.Value {
if err != nil {
return g.error(err.Error(), defArg)
}
if patternArg.IsNull() || inputArg.IsNull() {
return super.NewValue(g.zctx.MustLookupTypeRecord(nil), nil)
}
p, err := h.getPattern(patternArg.AsString())
if err != nil {
return g.error(err.Error(), patternArg)
Expand Down
44 changes: 0 additions & 44 deletions runtime/sam/expr/function/ztests/grok.yaml

This file was deleted.

3 changes: 3 additions & 0 deletions runtime/vam/expr/function/function.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@ func New(zctx *super.Context, name string, narg int) (expr.Function, field.Path,
case "grep":
argmax = 2
f = &Grep{zctx: zctx}
case "grok":
argmin, argmax = 2, 3
f = newGrok(zctx)
case "hex":
f = &Hex{zctx}
case "join":
Expand Down
142 changes: 142 additions & 0 deletions runtime/vam/expr/function/grok.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
package function

import (
"strings"

"github.com/brimdata/super"
"github.com/brimdata/super/pkg/grok"
"github.com/brimdata/super/vector"
"github.com/brimdata/super/zcode"
)

type Grok struct {
zctx *super.Context
builder zcode.Builder
hosts map[string]*host
// fields is used as a scratch space to avoid allocating a new slice.
fields []super.Field
}

func newGrok(zctx *super.Context) *Grok {
return &Grok{
zctx: zctx,
hosts: make(map[string]*host),
}
}

func (g *Grok) Call(args ...vector.Any) vector.Any {
patternArg, inputArg := args[0], args[1]
defArg := vector.Any(vector.NewConst(super.NullString, args[0].Len(), nil))
if len(args) == 3 {
defArg = args[2]
}
switch {
case super.TypeUnder(defArg.Type()) != super.TypeString:
return g.error("definitions argument must be a string", defArg)
case super.TypeUnder(patternArg.Type()) != super.TypeString:
return g.error("pattern argument must be a string", patternArg)
case super.TypeUnder(inputArg.Type()) != super.TypeString:
return g.error("input argument must be a string", inputArg)
}
builder := vector.NewDynamicBuilder()
var defErrs, patErrs []string
var defErrsIdx, patErrsIdx, inErrsIdx []uint32
for i := range patternArg.Len() {
def, _ := vector.StringValue(defArg, i)
h, err := g.getHost(def)
if err != nil {
defErrs = append(defErrs, err.Error())
defErrsIdx = append(defErrsIdx, i)
continue
}
pat, isnull := vector.StringValue(patternArg, i)
if isnull {
builder.Write(super.NewValue(g.zctx.MustLookupTypeRecord(nil), nil))
continue
}
p, err := h.getPattern(pat)
if err != nil {
patErrs = append(patErrs, err.Error())
patErrsIdx = append(patErrsIdx, i)
continue
}
in, isnull := vector.StringValue(inputArg, i)
if isnull {
builder.Write(super.NewValue(g.zctx.MustLookupTypeRecord(nil), nil))
continue
}
keys, vals, match := p.ParseKeyValues(in)
if !match {
inErrsIdx = append(inErrsIdx, i)
continue
}
g.fields = g.fields[:0]
for _, key := range keys {
g.fields = append(g.fields, super.NewField(key, super.TypeString))
}
typ := g.zctx.MustLookupTypeRecord(g.fields)
g.builder.Reset()
if len(vals) == 0 {
// If we have a match but no key/vals return empty record.
g.builder.Append(nil)
} else {
for _, s := range vals {
g.builder.Append([]byte(s))
}
}
builder.Write(super.NewValue(typ, g.builder.Bytes()))
}
combiner := vector.NewCombiner(builder.Build())
if len(defErrsIdx) > 0 {
combiner.Add(defErrsIdx, g.errorVec(defErrs, defErrsIdx, defArg))
}
if len(patErrsIdx) > 0 {
combiner.Add(patErrsIdx, g.errorVec(patErrs, patErrsIdx, patternArg))
}
if len(inErrsIdx) > 0 {
combiner.Add(inErrsIdx, g.error("value does not match pattern", vector.NewView(inputArg, inErrsIdx)))
}
return combiner.Result()
}

func (g *Grok) errorVec(msgs []string, index []uint32, vec vector.Any) vector.Any {
s := vector.NewStringEmpty(0, nil)
for _, m := range msgs {
s.Append("grok(): " + m)
}
return vector.NewVecWrappedError(g.zctx, s, vector.NewView(vec, index))
}

func (g *Grok) error(msg string, vec vector.Any) vector.Any {
return vector.NewWrappedError(g.zctx, "grok(): "+msg, vec)
}

func (g *Grok) getHost(defs string) (*host, error) {
h, ok := g.hosts[defs]
if !ok {
h = &host{Host: grok.NewBase(), patterns: make(map[string]*grok.Pattern)}
if err := h.AddFromReader(strings.NewReader(defs)); err != nil {
return nil, err
}
g.hosts[defs] = h
}
return h, nil
}

type host struct {
grok.Host
patterns map[string]*grok.Pattern
}

func (h *host) getPattern(patternArg string) (*grok.Pattern, error) {
p, ok := h.patterns[patternArg]
if !ok {
var err error
p, err = h.Host.Compile(patternArg)
if err != nil {
return nil, err
}
h.patterns[patternArg] = p
}
return p, nil
}
70 changes: 70 additions & 0 deletions runtime/ztests/expr/function/grok.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
zed: yield grok(pattern, input, defs)

vector: true

input: |
{
pattern: "%{TIMESTAMP_ISO8601:event_time} %{LOGLEVEL:log_level} %{GREEDYDATA:log_message}",
input: "2020-09-16T04:20:42.45+01:00 DEBUG This is a sample debug log message",
defs: null(string)
}
{
pattern: "%{ONE:zero}-%{TWO:three}",
input: "0-1-2",
defs: "ONE \\d\nTWO %{ONE:one}-%{ONE:two}"
}
// Ignores type annotation.
{
pattern: "%{INT:int:int64}",
input: "'0'",
defs: null(string)
}
// Check to see that duplicate fields are squashed. This is not great but
// this is what grokconstructor.appspot.com does.
{
pattern: "%{INT:one} %{INT:one}",
input: "1 2",
defs: null(string)
}
// Differentiate between no keys and a non-match
{
pattern: "%{INT}",
input: "1",
defs: null(string)
}
{
pattern: "%{INT}",
input: "foo",
defs: null(string)
}
{pattern: null(string),input: "foo",defs: null(string)}
{pattern: "%{INT:int}",input:null(string),defs: null(string)}
// Error cases
{
pattern: "%{INT:int}",
input: "string value",
defs: null(string)
}
{
pattern: "%{DOESNOTEXIST:dne}",
input: "foo",
defs: null(string)
}
{pattern: 1,input: "foo",defs: null(string)}
{pattern: "%{INT:int}",input:1,defs: null(string)}
{pattern: "%{INT:int}",input:"1",defs:1}

output: |
{event_time:"2020-09-16T04:20:42.45+01:00",log_level:"DEBUG",log_message:"This is a sample debug log message"}
{zero:"0",three:"1-2",one:"1",two:"2"}
{int:"0"}
{one:"2"}
{}
error({message:"grok(): value does not match pattern",on:"foo"})
null({})
null({})
error({message:"grok(): value does not match pattern",on:"string value"})
error({message:"grok(): the 'DOESNOTEXIST' pattern doesn't exist",on:"%{DOESNOTEXIST:dne}"})
error({message:"grok(): pattern argument must be a string",on:1})
error({message:"grok(): input argument must be a string",on:1})
error({message:"grok(): definitions argument must be a string",on:1})