Skip to content

Commit bee7e1b

Browse files
committed
wip
1 parent 017fd44 commit bee7e1b

File tree

3 files changed

+304
-302
lines changed

3 files changed

+304
-302
lines changed

cshared.go

Lines changed: 0 additions & 194 deletions
Original file line numberDiff line numberDiff line change
@@ -11,9 +11,7 @@ import (
1111
"errors"
1212
"fmt"
1313
"io"
14-
"log"
1514
"os"
16-
"runtime"
1715
"strconv"
1816
"strings"
1917
"time"
@@ -121,41 +119,6 @@ func FLBPluginInit(ptr unsafe.Pointer) (respCode int) {
121119
return input.FLB_OK
122120
}
123121

124-
func (p *pluginInstance) inputPreRun() error {
125-
p.mu.Lock()
126-
defer p.mu.Unlock()
127-
128-
// Only input plugins have a pre-run step
129-
if p.meta.input == nil {
130-
return nil
131-
}
132-
133-
if p.state != instanceStateInitialized {
134-
return fmt.Errorf("invalid plugin state %q", p.state)
135-
}
136-
137-
runCtx, runCancel := context.WithCancel(context.Background())
138-
p.runCtx = runCtx
139-
p.runCancel = runCancel
140-
p.msgChannel = make(chan Message, p.maxBufferedMessages)
141-
p.state = instanceStateRunnable
142-
143-
go func() {
144-
err := p.meta.input.Collect(runCtx, p.msgChannel)
145-
if err != nil {
146-
fmt.Fprintf(os.Stderr, "collect error: %v\n", err)
147-
}
148-
}()
149-
150-
go func() {
151-
<-runCtx.Done()
152-
153-
log.Printf("goroutine will be stopping: name=%q\n", p.meta.name)
154-
}()
155-
156-
return nil
157-
}
158-
159122
// FLBPluginInputPreRun is invoked by the fluent-bit runtime after the plugin has been
160123
// initialized using FLBPluginRegister but before executing plugin callback functions.
161124
//
@@ -198,48 +161,6 @@ func FLBPluginInputPause() {
198161
}
199162
}
200163

201-
func (p *pluginInstance) stop() error {
202-
p.mu.Lock()
203-
defer p.mu.Unlock()
204-
205-
if p.state != instanceStateRunnable && p.state != instanceStatePreExit {
206-
return fmt.Errorf("cannot stop plugin in state %q", p.state)
207-
}
208-
209-
if p.runCancel != nil {
210-
p.runCancel()
211-
}
212-
213-
p.runningWG.Wait()
214-
215-
if p.msgChannel != nil {
216-
close(p.msgChannel)
217-
}
218-
219-
p.state = instanceStateInitialized
220-
p.runCtx = nil
221-
p.runCancel = nil
222-
p.msgChannel = nil
223-
224-
return nil
225-
}
226-
227-
func (p *pluginInstance) resume() error {
228-
p.mu.Lock()
229-
defer p.mu.Unlock()
230-
231-
if p.state != instanceStateInitialized {
232-
return fmt.Errorf("cannot resume plugin in state %q", p.state)
233-
}
234-
235-
if err := p.inputPreRun(); err != nil {
236-
return err
237-
}
238-
239-
p.state = instanceStateInitialized
240-
return nil
241-
}
242-
243164
// FLBPluginInputResume this method gets invoked by the fluent-bit runtime, once the plugin has been
244165
// resumed, the plugin invoked this method and re-running state.
245166
//
@@ -276,32 +197,6 @@ func FLBPluginOutputPreExit() {
276197
}
277198
}
278199

279-
func (p *pluginInstance) outputPreExit() error {
280-
p.mu.Lock()
281-
defer p.mu.Unlock()
282-
283-
// Only output plugins have a pre-exit step
284-
if p.meta.output == nil {
285-
return fmt.Errorf("plugin is not an output plugin")
286-
}
287-
288-
if p.state != instanceStateRunnable {
289-
return fmt.Errorf("invalid plugin state %q", p.state)
290-
}
291-
292-
p.runCancel()
293-
294-
// Wait for any running callback/flush to finish before closing the message channel
295-
p.runningWG.Wait()
296-
close(p.msgChannel)
297-
298-
p.state = instanceStatePreExit
299-
p.runCtx = nil
300-
p.runCancel = nil
301-
302-
return nil
303-
}
304-
305200
// FLBPluginOutputPreRun -
306201
//
307202
//export FLBPluginOutputPreRun
@@ -324,40 +219,6 @@ func FLBPluginOutputPreRun(useHotReload C.int) int {
324219
return input.FLB_OK
325220
}
326221

327-
func (p *pluginInstance) outputPreRun() error {
328-
p.mu.Lock()
329-
defer p.mu.Unlock()
330-
331-
// Only input plugins have a pre-run step
332-
if p.meta.output == nil {
333-
return fmt.Errorf("plugin is not an output plugin")
334-
}
335-
336-
if p.state != instanceStateInitialized {
337-
return fmt.Errorf("invalid plugin state %q", p.state)
338-
}
339-
340-
p.runCtx, p.runCancel = context.WithCancel(context.Background())
341-
p.msgChannel = make(chan Message)
342-
p.state = instanceStateRunnable
343-
344-
go func() {
345-
p.runningWG.Add(1)
346-
defer p.runningWG.Done()
347-
if err := p.meta.output.Flush(p.runCtx, p.msgChannel); err != nil {
348-
fmt.Fprintf(os.Stderr, "FLBPluginOutputPreRun error: %v\n", err)
349-
}
350-
}()
351-
352-
go func() {
353-
<-p.runCtx.Done()
354-
355-
log.Printf("goroutine will be stopping: name=%q\n", p.meta.name)
356-
}()
357-
358-
return nil
359-
}
360-
361222
// FLBPluginInputCallback this method gets invoked by the fluent-bit runtime, once the plugin has been
362223
// initialized, the plugin implementation is responsible for handling the incoming data and the context
363224
// that gets past, for long-living collectors the plugin itself should keep a running thread and fluent-bit
@@ -379,61 +240,6 @@ func FLBPluginInputCallback(data *unsafe.Pointer, csize *C.size_t) int {
379240
return instance.callback(data, csize)
380241
}
381242

382-
func (p *pluginInstance) callback(data *unsafe.Pointer, csize *C.size_t) int {
383-
p.mu.Lock()
384-
if p.state != instanceStateRunnable {
385-
return input.FLB_RETRY
386-
}
387-
388-
p.runningWG.Add(1)
389-
defer p.runningWG.Done()
390-
p.mu.Unlock()
391-
392-
buf := bytes.NewBuffer([]byte{})
393-
394-
for loop := min(len(p.msgChannel), p.maxBufferedMessages); loop > 0; loop-- {
395-
select {
396-
case msg, ok := <-p.msgChannel:
397-
if !ok {
398-
return input.FLB_ERROR
399-
}
400-
401-
b, err := msgpack.Marshal([]any{&EventTime{msg.Time}, msg.Record})
402-
if err != nil {
403-
fmt.Fprintf(os.Stderr, "msgpack marshal: %s\n", err)
404-
return input.FLB_ERROR
405-
}
406-
407-
buf.Grow(len(b))
408-
buf.Write(b)
409-
case <-p.runCtx.Done():
410-
err := p.runCtx.Err()
411-
if err != nil && !errors.Is(err, context.Canceled) {
412-
fmt.Fprintf(os.Stderr, "run: %s\n", err)
413-
return input.FLB_ERROR
414-
}
415-
// enforce a runtime gc, to prevent the thread finalizer on
416-
// fluent-bit to kick in before any remaining data has not been GC'ed
417-
// causing a sigsegv.
418-
defer runtime.GC()
419-
loop = 0
420-
default:
421-
loop = 0
422-
}
423-
}
424-
425-
if buf.Len() > 0 {
426-
b := buf.Bytes()
427-
cdata := C.CBytes(b)
428-
*data = cdata
429-
if csize != nil {
430-
*csize = C.size_t(len(b))
431-
}
432-
}
433-
434-
return input.FLB_OK
435-
}
436-
437243
// FLBPluginInputCleanupCallback releases the memory used during the input callback
438244
//
439245
//export FLBPluginInputCleanupCallback

cshared_test.go

Lines changed: 0 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -624,14 +624,3 @@ func TestOutputFlush(t *testing.T) {
624624
assert.NoError(t, pluginFlush(inst, "foobar", b))
625625
wg.Wait()
626626
}
627-
628-
func assertType[T any](tb testing.TB, got any) T {
629-
tb.Helper()
630-
631-
var want T
632-
633-
v, ok := got.(T)
634-
assert.True(tb, ok, "Expected types to be equal:\n-%T\n+%T", want, got)
635-
636-
return v
637-
}

0 commit comments

Comments
 (0)