diff --git a/artifacts/definitions/Rsyslog/Events/Clients.yaml b/artifacts/definitions/Rsyslog/Events/Clients.yaml new file mode 100644 index 000000000..475eaadfa --- /dev/null +++ b/artifacts/definitions/Rsyslog/Events/Clients.yaml @@ -0,0 +1,44 @@ +name: RsyslogShipper.Events.Clients + +description: | + This server side event monitoring artifact will watch a selection of client + monitoring artifacts for new events and push them rsyslog. + + NOTE: You must ensure you are collecting these artifacts from the + clients by adding them to the "Client Events" GUI. + +type: SERVER_EVENT + +parameters: + - name: UnixSocket + description: Path to the Unix Domain Socket to send events to + type: string + default: /tmp/velo-socket + + - name: Threads + description: Number of threads to start up to post events + type: int + default: 2 + + - name: Artifacts + type: artifactset + artifact_type: CLIENT_EVENT + description: Client artifacts to monitor + +sources: + - query: | + LET _ <= SELECT log(message="ERROR: parameter Artifacts cannot be empty!") FROM scope() WHERE len(list=Artifacts) = 0 + + LET artifacts_to_watch = SELECT Artifact FROM Artifacts + WHERE log(message="Sending events from client artifact " + Artifact + " to rsyslog") + + LET events = SELECT * FROM foreach( + row=artifacts_to_watch, + async=TRUE, // Required for event queries in foreach() + query={ + SELECT *, Artifact, timestamp(epoch=now()) AS timestamp + FROM watch_monitoring(artifact=Artifact) + }) + + SELECT * FROM rsyslog_upload(query=events, unix_domain=UnixSocket, threads=Threads) + diff --git a/go.mod b/go.mod index c560c851e..9229e30b1 100644 --- a/go.mod +++ b/go.mod @@ -198,6 +198,7 @@ require ( github.com/klauspost/compress v1.17.8 // indirect github.com/klauspost/pgzip v1.2.5 // indirect github.com/kr/fs v0.1.0 // indirect + github.com/leodido/go-syslog v1.0.1 // indirect github.com/lestrrat-go/strftime v1.0.5 // indirect github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0 // indirect github.com/mattermost/xml-roundtrip-validator v0.1.0 // indirect diff --git a/go.sum b/go.sum index 973f8998f..45e782b8f 100644 --- a/go.sum +++ b/go.sum @@ -402,6 +402,8 @@ github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= github.com/kylelemons/godebug v1.1.0 h1:RPNrshWIDI6G2gRW9EHilWtl7Z6Sb1BR0xunSBf0SNc= +github.com/leodido/go-syslog v1.0.1 h1:/I6CcKOIT/Auo/vjvemIaQYnSNFmcv6Xd6kqXaKHZyM= +github.com/leodido/go-syslog v1.0.1/go.mod h1:iGQLav8eZdt0+DaWcqmGKurRtDPyxwD1Dvc4DG5GMoU= github.com/lestrrat-go/envload v0.0.0-20180220234015-a3eb8ddeffcc h1:RKf14vYWi2ttpEmkA4aQ3j4u9dStX2t4M8UM6qqNsG8= github.com/lestrrat-go/envload v0.0.0-20180220234015-a3eb8ddeffcc/go.mod h1:kopuH9ugFRkIXf3YoqHKyrJ9YfUFsckUU9S7B+XP+is= github.com/lestrrat-go/strftime v1.0.5 h1:A7H3tT8DhTz8u65w+JRpiBxM4dINQhUXAZnhBa2xeOE= diff --git a/vql/server/rsyslog.go b/vql/server/rsyslog.go new file mode 100644 index 000000000..df594601b --- /dev/null +++ b/vql/server/rsyslog.go @@ -0,0 +1,199 @@ +package server + +import ( + "context" + "io" + "net" + "os" + "strconv" + "sync" + "time" + + "github.com/Velocidex/ordereddict" + "github.com/leodido/go-syslog/rfc5424" + "golang.org/x/time/rate" + "www.velocidex.com/golang/velociraptor/acls" + "www.velocidex.com/golang/velociraptor/file_store/api" + "www.velocidex.com/golang/velociraptor/file_store/directory" + "www.velocidex.com/golang/velociraptor/utils" + "www.velocidex.com/golang/velociraptor/vql" + "www.velocidex.com/golang/vfilter" + "www.velocidex.com/golang/vfilter/arg_parser" +) + +const pluginName = "rsyslog_upload" + +type rsyslogUploadPluginArgs struct { + Query vfilter.StoredQuery `vfilter:"required,field=query,doc=Source for rows to upload."` + UnixDomain string `vfilter:"required,field=unix_domain,doc=path to unix domain socket rsyslog listens on"` + Threads int `vfilter:"optional,field=threads,doc=How many threads to use to send events."` +} + +type rsyslogUploadPlugin struct{} + +func (r rsyslogUploadPlugin) Info( + scope vfilter.Scope, typeMap *vfilter.TypeMap, +) *vfilter.PluginInfo { + return &vfilter.PluginInfo{ + Name: pluginName, + Doc: "Upload rows to rsyslog", + ArgType: typeMap.AddType(scope, &rsyslogUploadPluginArgs{}), + Metadata: vql.VQLMetadata().Permissions(acls.COLLECT_SERVER).Build(), + } +} + +func (r rsyslogUploadPlugin) Call( + ctx context.Context, scope vfilter.Scope, args *ordereddict.Dict, +) <-chan vfilter.Row { + // this plugin does not send anything to its output channel + outputCh := make(chan vfilter.Row) + + go func() { + defer close(outputCh) + + err := vql.CheckAccess(scope, acls.COLLECT_SERVER) + if err != nil { + scope.Log("%s: check access failed: %v", pluginName, err) + return + } + + arg := rsyslogUploadPluginArgs{} + err = arg_parser.ExtractArgsWithContext(ctx, scope, args, &arg) + if err != nil { + scope.Log("%s: parsing args: %v", pluginName, err) + return + } + if arg.UnixDomain == "" { + scope.Log("%s: parameter UnixDomain must be set", pluginName) + return + } + if arg.Threads == 0 { + arg.Threads = 1 + } + + configObj, ok := vql.GetServerConfig(scope) + if !ok { + scope.Log("%s: could not get config from scope", pluginName) + return + } + + options := api.QueueOptions{ + DisableFileBuffering: false, + FileBufferLeaseSize: 100, + OwnerName: pluginName, + } + + listenerCtx, cancelListener := context.WithCancel(context.Background()) + defer cancelListener() + + listener, err := directory.NewListener(configObj, listenerCtx, pluginName, options) + if err != nil { + scope.Log("%s: could not create listener: %v", pluginName, err) + return + } + + scope.Log("%s: starting %d worker threads", pluginName, arg.Threads) + wg := sync.WaitGroup{} + for i := 0; i < arg.Threads; i++ { + wg.Add(1) + go rsyslogSender(ctx, &wg, arg.UnixDomain, scope, listener.Output()) + } + + rowCh := arg.Query.Eval(ctx, scope) + + quitLoop := false + for !quitLoop { + select { + case <-ctx.Done(): + listener.Close() + quitLoop = true + case row, ok := <-rowCh: + if !ok { + continue + } + listener.Send(vfilter.RowToDict(ctx, scope, row)) + } + } + + // the workers will return when they detect that + // the listener had closed its output channel + wg.Wait() + }() + return outputCh +} + +func rsyslogSender( + ctx context.Context, wg *sync.WaitGroup, address string, + scope vfilter.Scope, rowCh <-chan *ordereddict.Dict, +) { + defer func() { + scope.Log("%s: worker done", pluginName) + wg.Done() + }() + + scope.Log("%s: worker started", pluginName) + var ( + pid = strconv.Itoa(os.Getpid()) + conn net.Conn + message string + rrDialLog = rate.Sometimes{Interval: time.Minute} + rrWriteLog = rate.Sometimes{Interval: time.Minute} + ) + for { + if conn == nil { + var err error + conn, err = net.DialTimeout("unixgram", address, time.Second) + if err != nil { + rrDialLog.Do(func() { scope.Log("%s: dialing: %v", pluginName, err) }) + utils.SleepWithCtx(ctx, time.Second) + if ctx.Err() != nil { + // avoid spinning here if rsyslogd is not + // listening when the plugin is shutting down. + return + } + conn = nil // probably not needed, but no harm. + continue // retry dial + } + scope.Log("%s: worker connected!", pluginName) + } + if message == "" { + row, ok := <-rowCh + if !ok { + // the listener closed its channel + return + } + var err error + message, err = rowToRsyslogString(row, pid) + if err != nil { + scope.Log("%s: creating rsyslog message: %v", pluginName, err) + return + } + } + conn.SetWriteDeadline(time.Now().Add(time.Second)) + _, err := io.WriteString(conn, message) + if err != nil { + rrWriteLog.Do(func() { scope.Log("%s: writing to conn: %v", pluginName, err) }) + conn.Close() + conn = nil // conn is an interface! + continue // Retry sending the same message on the next iteration. + } + + // the message was sent successfully. + message = "" + } +} + +func rowToRsyslogString(row *ordereddict.Dict, pid string) (string, error) { + message := rfc5424.SyslogMessage{} + message.SetPriority(0) + message.SetVersion(1) + message.SetAppname("velociraptor") + message.SetProcID(pid) + message.SetMessage(row.String()) // json + + return message.String() +} + +func init() { + vql.RegisterPlugin(&rsyslogUploadPlugin{}) +}