Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 44 additions & 0 deletions artifacts/definitions/Rsyslog/Events/Clients.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
name: RsyslogShipper.Events.Clients

description: |
This server side event monitoring artifact will watch a selection of client
monitoring artifacts for new events and push them rsyslog.

NOTE: You must ensure you are collecting these artifacts from the
clients by adding them to the "Client Events" GUI.

type: SERVER_EVENT

parameters:
- name: UnixSocket
description: Path to the Unix Domain Socket to send events to
type: string
default: /tmp/velo-socket

- name: Threads
description: Number of threads to start up to post events
type: int
default: 2

- name: Artifacts
type: artifactset
artifact_type: CLIENT_EVENT
description: Client artifacts to monitor

sources:
- query: |
LET _ <= SELECT log(message="ERROR: parameter Artifacts cannot be empty!") FROM scope() WHERE len(list=Artifacts) = 0

LET artifacts_to_watch = SELECT Artifact FROM Artifacts
WHERE log(message="Sending events from client artifact " + Artifact + " to rsyslog")

LET events = SELECT * FROM foreach(
row=artifacts_to_watch,
async=TRUE, // Required for event queries in foreach()
query={
SELECT *, Artifact, timestamp(epoch=now()) AS timestamp
FROM watch_monitoring(artifact=Artifact)
})

SELECT * FROM rsyslog_upload(query=events, unix_domain=UnixSocket, threads=Threads)

1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,7 @@ require (
github.com/klauspost/compress v1.17.8 // indirect
github.com/klauspost/pgzip v1.2.5 // indirect
github.com/kr/fs v0.1.0 // indirect
github.com/leodido/go-syslog v1.0.1 // indirect
github.com/lestrrat-go/strftime v1.0.5 // indirect
github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0 // indirect
github.com/mattermost/xml-roundtrip-validator v0.1.0 // indirect
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -402,6 +402,8 @@ github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/kylelemons/godebug v1.1.0 h1:RPNrshWIDI6G2gRW9EHilWtl7Z6Sb1BR0xunSBf0SNc=
github.com/leodido/go-syslog v1.0.1 h1:/I6CcKOIT/Auo/vjvemIaQYnSNFmcv6Xd6kqXaKHZyM=
github.com/leodido/go-syslog v1.0.1/go.mod h1:iGQLav8eZdt0+DaWcqmGKurRtDPyxwD1Dvc4DG5GMoU=
github.com/lestrrat-go/envload v0.0.0-20180220234015-a3eb8ddeffcc h1:RKf14vYWi2ttpEmkA4aQ3j4u9dStX2t4M8UM6qqNsG8=
github.com/lestrrat-go/envload v0.0.0-20180220234015-a3eb8ddeffcc/go.mod h1:kopuH9ugFRkIXf3YoqHKyrJ9YfUFsckUU9S7B+XP+is=
github.com/lestrrat-go/strftime v1.0.5 h1:A7H3tT8DhTz8u65w+JRpiBxM4dINQhUXAZnhBa2xeOE=
Expand Down
199 changes: 199 additions & 0 deletions vql/server/rsyslog.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,199 @@
package server

import (
"context"
"io"
"net"
"os"
"strconv"
"sync"
"time"

"github.com/Velocidex/ordereddict"
"github.com/leodido/go-syslog/rfc5424"
"golang.org/x/time/rate"
"www.velocidex.com/golang/velociraptor/acls"
"www.velocidex.com/golang/velociraptor/file_store/api"
"www.velocidex.com/golang/velociraptor/file_store/directory"
"www.velocidex.com/golang/velociraptor/utils"
"www.velocidex.com/golang/velociraptor/vql"
"www.velocidex.com/golang/vfilter"
"www.velocidex.com/golang/vfilter/arg_parser"
)

const pluginName = "rsyslog_upload"

type rsyslogUploadPluginArgs struct {
Query vfilter.StoredQuery `vfilter:"required,field=query,doc=Source for rows to upload."`
UnixDomain string `vfilter:"required,field=unix_domain,doc=path to unix domain socket rsyslog listens on"`
Threads int `vfilter:"optional,field=threads,doc=How many threads to use to send events."`
}

type rsyslogUploadPlugin struct{}

func (r rsyslogUploadPlugin) Info(
scope vfilter.Scope, typeMap *vfilter.TypeMap,
) *vfilter.PluginInfo {
return &vfilter.PluginInfo{
Name: pluginName,
Doc: "Upload rows to rsyslog",
ArgType: typeMap.AddType(scope, &rsyslogUploadPluginArgs{}),
Metadata: vql.VQLMetadata().Permissions(acls.COLLECT_SERVER).Build(),
}
}

func (r rsyslogUploadPlugin) Call(
ctx context.Context, scope vfilter.Scope, args *ordereddict.Dict,
) <-chan vfilter.Row {
// this plugin does not send anything to its output channel
outputCh := make(chan vfilter.Row)

go func() {
defer close(outputCh)

err := vql.CheckAccess(scope, acls.COLLECT_SERVER)
if err != nil {
scope.Log("%s: check access failed: %v", pluginName, err)
return
}

arg := rsyslogUploadPluginArgs{}
err = arg_parser.ExtractArgsWithContext(ctx, scope, args, &arg)
if err != nil {
scope.Log("%s: parsing args: %v", pluginName, err)
return
}
if arg.UnixDomain == "" {
scope.Log("%s: parameter UnixDomain must be set", pluginName)
return
}
if arg.Threads == 0 {
arg.Threads = 1
}

configObj, ok := vql.GetServerConfig(scope)
if !ok {
scope.Log("%s: could not get config from scope", pluginName)
return
}

options := api.QueueOptions{
DisableFileBuffering: false,
FileBufferLeaseSize: 100,
OwnerName: pluginName,
}

listenerCtx, cancelListener := context.WithCancel(context.Background())
defer cancelListener()

listener, err := directory.NewListener(configObj, listenerCtx, pluginName, options)
if err != nil {
scope.Log("%s: could not create listener: %v", pluginName, err)
return
}

scope.Log("%s: starting %d worker threads", pluginName, arg.Threads)
wg := sync.WaitGroup{}
for i := 0; i < arg.Threads; i++ {
wg.Add(1)
go rsyslogSender(ctx, &wg, arg.UnixDomain, scope, listener.Output())
}

rowCh := arg.Query.Eval(ctx, scope)

quitLoop := false
for !quitLoop {
select {
case <-ctx.Done():
listener.Close()
quitLoop = true
case row, ok := <-rowCh:
if !ok {
continue
}
listener.Send(vfilter.RowToDict(ctx, scope, row))
}
}

// the workers will return when they detect that
// the listener had closed its output channel
wg.Wait()
}()
return outputCh
}

func rsyslogSender(
ctx context.Context, wg *sync.WaitGroup, address string,
scope vfilter.Scope, rowCh <-chan *ordereddict.Dict,
) {
defer func() {
scope.Log("%s: worker done", pluginName)
wg.Done()
}()

scope.Log("%s: worker started", pluginName)
var (
pid = strconv.Itoa(os.Getpid())
conn net.Conn
message string
rrDialLog = rate.Sometimes{Interval: time.Minute}
rrWriteLog = rate.Sometimes{Interval: time.Minute}
)
for {
if conn == nil {
var err error
conn, err = net.DialTimeout("unixgram", address, time.Second)
if err != nil {
rrDialLog.Do(func() { scope.Log("%s: dialing: %v", pluginName, err) })
utils.SleepWithCtx(ctx, time.Second)
if ctx.Err() != nil {
// avoid spinning here if rsyslogd is not
// listening when the plugin is shutting down.
return
}
conn = nil // probably not needed, but no harm.
continue // retry dial
}
scope.Log("%s: worker connected!", pluginName)
}
if message == "" {
row, ok := <-rowCh
if !ok {
// the listener closed its channel
return
}
var err error
message, err = rowToRsyslogString(row, pid)
if err != nil {
scope.Log("%s: creating rsyslog message: %v", pluginName, err)
return
}
}
conn.SetWriteDeadline(time.Now().Add(time.Second))
_, err := io.WriteString(conn, message)
if err != nil {
rrWriteLog.Do(func() { scope.Log("%s: writing to conn: %v", pluginName, err) })
conn.Close()
conn = nil // conn is an interface!
continue // Retry sending the same message on the next iteration.
}

// the message was sent successfully.
message = ""
}
}

func rowToRsyslogString(row *ordereddict.Dict, pid string) (string, error) {
message := rfc5424.SyslogMessage{}
message.SetPriority(0)
message.SetVersion(1)
message.SetAppname("velociraptor")
message.SetProcID(pid)
message.SetMessage(row.String()) // json

return message.String()
}

func init() {
vql.RegisterPlugin(&rsyslogUploadPlugin{})
}
Loading