forked from ekanite/ekanite
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathengine.go
More file actions
391 lines (338 loc) · 9.44 KB
/
engine.go
File metadata and controls
391 lines (338 loc) · 9.44 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
package ekanite
import (
"expvar"
"fmt"
"log"
"os"
"path/filepath"
"sort"
"strings"
"sync"
"time"
"github.com/ekanite/ekanite/input"
)
const (
DefaultNumShards = 16
DefaultIndexDuration = 24 * time.Hour
DefaultRetentionPeriod = 24 * time.Hour
RetentionCheckInterval = time.Hour
)
var (
stats = expvar.NewMap("engine")
)
type EventIndexer interface {
Index(events []*Event) error
}
// Batcher accepts "input events", and once it has a certain number, or a certain amount
// of time has passed, sends those as indexable Events to an Indexer. It also supports a
// maximum number of unprocessed Events it will keep pending. Once this limit is reached,
// it will not accept anymore until outstanding Events are processed.
type Batcher struct {
indexer EventIndexer
size int
duration time.Duration
c chan *input.Event
}
// NewBatcher returns a Batcher for EventIndexer e, a batching size of sz, a maximum duration
// of dur, and a maximum outstanding count of max.
func NewBatcher(e EventIndexer, sz int, dur time.Duration, max int) *Batcher {
return &Batcher{
indexer: e,
size: sz,
duration: dur,
c: make(chan *input.Event, max),
}
}
// Start starts the batching process.
func (b *Batcher) Start(errChan chan<- error) error {
go func() {
batch := make([]*Event, 0, b.size)
timer := time.NewTimer(b.duration)
timer.Stop() // Stop any first firing.
send := func() {
err := b.indexer.Index(batch)
if err != nil {
stats.Add("batchIndexedError", 1)
return
}
stats.Add("batchIndexed", 1)
stats.Add("eventsIndexed", int64(len(batch)))
if errChan != nil {
errChan <- err
}
batch = make([]*Event, 0, b.size)
}
for {
select {
case event := <-b.c:
idxEvent := &Event{
event,
}
batch = append(batch, idxEvent)
if len(batch) == 1 {
timer.Reset(b.duration)
}
if len(batch) == b.size {
timer.Stop()
send()
}
case <-timer.C:
stats.Add("batchTimeout", 1)
send()
}
}
}()
return nil
}
// C returns the channel on the batcher to which events should be sent.
func (b *Batcher) C() chan<- *input.Event {
return b.c
}
// Engine is the component that performs all indexing.
type Engine struct {
path string // Path to all indexed data
NumShards int // Number of shards to use when creating an index.
IndexDuration time.Duration // Duration of created indexes.
RetentionPeriod time.Duration // How long after Index end-time to hang onto data.
mu sync.RWMutex
indexes Indexes
open bool
done chan struct{}
wg sync.WaitGroup
Logger *log.Logger
}
// NewEngine returns a new indexing engine, which will use any data located at path.
func NewEngine(path string) *Engine {
return &Engine{
path: path,
NumShards: DefaultNumShards,
IndexDuration: DefaultIndexDuration,
RetentionPeriod: DefaultRetentionPeriod,
done: make(chan struct{}),
Logger: log.New(os.Stderr, "[engine] ", log.LstdFlags),
}
}
// Open opens the engine.
func (e *Engine) Open() error {
if err := os.MkdirAll(e.path, 0755); err != nil {
return err
}
d, err := os.Open(e.path)
if err != nil {
return fmt.Errorf("failed to open engine: %s", err.Error())
}
fis, err := d.Readdir(0)
if err != nil {
return err
}
// Open all indexes.
for _, fi := range fis {
if !fi.IsDir() || strings.HasPrefix(fi.Name(), ".") {
continue
}
indexPath := filepath.Join(e.path, fi.Name())
i, err := OpenIndex(indexPath)
if err != nil {
log.Printf("engine failed to open at index %s: %s", indexPath, err.Error())
return err
}
log.Printf("engine opened index with %d shard(s) at %s", len(i.Shards), indexPath)
e.indexes = append(e.indexes, i)
}
e.wg.Add(1)
go e.runRetentionEnforcement()
e.open = true
return nil
}
// Close closes the engine.
func (e *Engine) Close() error {
if !e.open {
return nil
}
for _, i := range e.indexes {
if err := i.Close(); err != nil {
return err
}
}
close(e.done)
e.wg.Wait()
e.open = false
return nil
}
// Total returns the total number of documents indexed.
func (e *Engine) Total() (uint64, error) {
e.mu.RLock()
defer e.mu.RUnlock()
var total uint64
for _, i := range e.indexes {
t, err := i.Total()
if err != nil {
return 0, err
}
total += t
}
return total, nil
}
// runRetentionEnforcement periodically run retention enforcement.
func (e *Engine) runRetentionEnforcement() {
defer e.wg.Done()
for {
select {
case <-e.done:
return
case <-time.After(RetentionCheckInterval):
e.Logger.Print("retention enforcement commencing")
stats.Add("retentionEnforcementRun", 1)
e.enforceRetention()
}
}
}
// enforceRetention removes indexes which have aged out.
func (e *Engine) enforceRetention() {
e.mu.Lock()
defer e.mu.Unlock()
filtered := e.indexes[:0]
for _, i := range e.indexes {
if i.Expired(time.Now().UTC(), e.RetentionPeriod) {
if err := DeleteIndex(i); err != nil {
e.Logger.Printf("retention enforcement failed to delete index %s: %s", i.path, err.Error())
} else {
e.Logger.Printf("retention enforcement deleted index %s", i.path)
stats.Add("retentionEnforcementDeletions", 1)
}
} else {
filtered = append(filtered, i)
}
}
e.indexes = filtered
return
}
// indexForReferenceTime returns an index suitable for indexing an event
// for the given reference time. Must be called under RLock.
func (e *Engine) indexForReferenceTime(t time.Time) *Index {
for _, i := range e.indexes {
if i.Contains(t) {
return i
}
}
return nil
}
// createIndex creates an index with a given start and end time and adds the
// created index to the Engine's store. It must be called under lock.
func (e *Engine) createIndex(startTime, endTime time.Time) (*Index, error) {
// There cannot be two indexes with the same start time, since this would mean
// two indexes with the same path. So if an index already exists with the requested
// start time, use that index's end time as the start time.
var idx *Index
for _, i := range e.indexes {
if i.startTime == startTime {
idx = i
break
}
}
if idx != nil {
startTime = idx.endTime // XXX This could still align with another start time! Needs some sort of loop.
assert(!startTime.After(endTime), "new start time after end time")
}
i, err := NewIndex(e.path, startTime, endTime, e.NumShards)
if err != nil {
return nil, err
}
e.indexes = append(e.indexes, i)
sort.Sort(e.indexes)
e.Logger.Printf("index %s created with %d shards, start time: %s, end time: %s",
i.Path(), e.NumShards, i.StartTime(), i.EndTime())
return i, nil
}
// createIndexForReferenceTime creates an index suitable for indexing an event at the given
// reference time.
func (e *Engine) createIndexForReferenceTime(rt time.Time) (*Index, error) {
start := rt.Truncate(e.IndexDuration).UTC()
end := start.Add(e.IndexDuration).UTC()
return e.createIndex(start, end)
}
// Index indexes a batch of Events. It blocks until all processing has completed.
func (e *Engine) Index(events []*Event) error {
e.mu.RLock()
defer e.mu.RUnlock()
var wg sync.WaitGroup
// De-multiplex the batch into sub-batches, one sub-batch for each Index.
subBatches := make(map[*Index][]Document, 0)
for _, ev := range events {
index := e.indexForReferenceTime(ev.ReferenceTime())
if index == nil {
func() {
// Take a RWLock, check again, and create a new index if necessary.
// Doing this in a function makes lock management foolproof.
e.mu.RUnlock()
defer e.mu.RLock()
e.mu.Lock()
defer e.mu.Unlock()
index = e.indexForReferenceTime(ev.ReferenceTime())
if index == nil {
var err error
index, err = e.createIndexForReferenceTime(ev.ReferenceTime())
if err != nil || index == nil {
panic(fmt.Sprintf("failed to create index for %s: %s", ev.ReferenceTime(), err))
}
}
}()
}
if _, ok := subBatches[index]; !ok {
subBatches[index] = make([]Document, 0)
}
subBatches[index] = append(subBatches[index], ev)
}
// Index each batch in parallel.
for index, subBatch := range subBatches {
wg.Add(1)
go func(i *Index, b []Document) {
defer wg.Done()
i.Index(b)
}(index, subBatch)
}
wg.Wait()
return nil
}
// Search performs a search.
func (e *Engine) Search(query string) (<-chan string, error) {
e.mu.RLock()
defer e.mu.RUnlock()
stats.Add("queriesRx", 1)
// Buffer channel to control how many docs are sent back. XXX Will this allow
// the client to control? Possibly.
c := make(chan string, 1)
go func() {
// Sequentially search each index, starting with the earliest in time.
// This could be done in parallel but more sorting would be required.
for i := len(e.indexes) - 1; i >= 0; i-- {
e.Logger.Printf("searching index %s", e.indexes[i].Path())
ids, err := e.indexes[i].Search(query)
if err != nil {
e.Logger.Println("error performing search:", err.Error())
break
}
for _, id := range ids {
b, err := e.indexes[i].Document(id)
if err != nil {
e.Logger.Println("error getting document:", err.Error())
break
}
stats.Add("docsIDsRetrived", 1)
c <- string(b) // There is excessive byte-slice-to-strings here.
}
}
close(c)
}()
return c, nil
}
// Path returns the path to the indexed data directory.
func (e *Engine) Path() string {
return e.path
}
// assert will panic with a given formatted message if the given condition is false.
func assert(condition bool, msg string, v ...interface{}) {
if !condition {
panic(fmt.Sprintf("assert failed: "+msg, v...))
}
}