Skip to content

Commit 9fb9a10

Browse files
authored
Merge pull request #253 from Icinga/feature/migrate-history
cmd/ido2icingadb: migrate history from IDO to Icinga DB
2 parents e62a620 + c51d767 commit 9fb9a10

File tree

20 files changed

+2240
-40
lines changed

20 files changed

+2240
-40
lines changed

.github/workflows/compliance/check-licenses.sh

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ set -eo pipefail
55
find_license_file() {
66
MOD_NAME="$1"
77
LICENSE_DIR="vendor/$MOD_NAME"
8-
LICENSE_FILES=({,../}LICENSE{,.txt,.md})
8+
LICENSE_FILES=({,../}{,UN}LICENSE{,.txt,.md})
99

1010
for LICENSE_FILE in "${LICENSE_FILES[@]}"; do
1111
LICENSE_FILE="${LICENSE_DIR}/$LICENSE_FILE"
@@ -29,6 +29,8 @@ list_all_deps() {
2929
COMPATIBLE_LINE=$(($LINENO + 2))
3030

3131
COMPATIBLE=(
32+
# public domain
33+
3cee2c43614ad4572d9d594c81b9348cf45ed5ac # vendor/github.com/vbauerster/mpb/v6/UNLICENSE
3234
# MIT
3335
66d504eb2f162b9cbf11b07506eeed90c6edabe1 # vendor/github.com/cespare/xxhash/v2/LICENSE.txt
3436
1513ff663e946fdcadb630bed670d253b8b22e1e # vendor/github.com/davecgh/go-spew/spew/../LICENSE

cmd/icingadb-migrate/cache.go

Lines changed: 298 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,298 @@
1+
package main
2+
3+
import (
4+
"database/sql"
5+
_ "embed"
6+
"github.com/jmoiron/sqlx"
7+
"github.com/pkg/errors"
8+
"math"
9+
"strings"
10+
"time"
11+
)
12+
13+
//go:embed embed/event_time_cache_schema.sql
14+
var eventTimeCacheSchema string
15+
16+
//go:embed embed/previous_hard_state_cache_schema.sql
17+
var previousHardStateCacheSchema string
18+
19+
// buildEventTimeCache rationale:
20+
//
21+
// Icinga DB's flapping_history#id always needs start_time. flapping_end rows would need an IDO subquery for that.
22+
// That would make the IDO reading even slower than the Icinga DB writing.
23+
// Therefore: Stream IDO's icinga_flappinghistory once, compute flapping_history#start_time
24+
// and cache it into an SQLite database. Then steam from that database and the IDO.
25+
//
26+
// Similar for acknowledgements. (On non-recoverable errors the whole program exits.)
27+
func buildEventTimeCache(ht *historyType, idoColumns []string) {
28+
type row = struct {
29+
Id uint64
30+
EventTime int64
31+
EventTimeUsec uint32
32+
EventIsStart uint8
33+
ObjectId uint64
34+
}
35+
36+
chunkCacheTx(ht.cache, func(tx **sqlx.Tx, commitPeriodically func()) {
37+
var checkpoint struct {
38+
Cnt int64
39+
MaxId sql.NullInt64
40+
}
41+
cacheGet(*tx, &checkpoint, "SELECT COUNT(*) cnt, MAX(history_id) max_id FROM end_start_time")
42+
43+
ht.bar.SetCurrent(checkpoint.Cnt * 2)
44+
45+
// Stream source data...
46+
sliceIdoHistory(
47+
ht,
48+
"SELECT "+strings.Join(idoColumns, ", ")+" FROM "+ht.idoTable+
49+
// For actual migration icinga_objects will be joined anyway,
50+
// so it makes no sense to take vanished objects into account.
51+
" xh USE INDEX (PRIMARY) INNER JOIN icinga_objects o ON o.object_id=xh.object_id WHERE xh."+
52+
ht.idoIdColumn+" <= :toid AND xh."+
53+
ht.idoIdColumn+" > :checkpoint ORDER BY xh."+ht.idoIdColumn+" LIMIT :bulk",
54+
nil, checkpoint.MaxId.Int64, // ... since we were interrupted:
55+
func(idoRows []row) (checkpoint interface{}) {
56+
for _, idoRow := range idoRows {
57+
if idoRow.EventIsStart == 0 {
58+
// Ack/flapping end event. Get the start event time:
59+
var lst []struct {
60+
EventTime int64
61+
EventTimeUsec uint32
62+
}
63+
cacheSelect(
64+
*tx, &lst, "SELECT event_time, event_time_usec FROM last_start_time WHERE object_id=?",
65+
idoRow.ObjectId,
66+
)
67+
68+
// If we have that, ...
69+
if len(lst) > 0 {
70+
// ... save the start event time for the actual migration:
71+
cacheExec(
72+
*tx,
73+
"INSERT INTO end_start_time(history_id, event_time, event_time_usec) VALUES (?, ?, ?)",
74+
idoRow.Id, lst[0].EventTime, lst[0].EventTimeUsec,
75+
)
76+
77+
// This previously queried info isn't needed anymore.
78+
cacheExec(*tx, "DELETE FROM last_start_time WHERE object_id=?", idoRow.ObjectId)
79+
}
80+
} else {
81+
// Ack/flapping start event directly after another start event (per checkable).
82+
// The old one won't have (but the new one will) an end event (which will need its time).
83+
cacheExec(*tx, "DELETE FROM last_start_time WHERE object_id=?", idoRow.ObjectId)
84+
85+
// An ack/flapping start event. The following end event (per checkable) will need its time.
86+
cacheExec(
87+
*tx, "INSERT INTO last_start_time(object_id, event_time, event_time_usec) VALUES (?, ?, ?)",
88+
idoRow.ObjectId, idoRow.EventTime, idoRow.EventTimeUsec,
89+
)
90+
}
91+
92+
commitPeriodically()
93+
checkpoint = idoRow.Id
94+
}
95+
96+
ht.bar.IncrBy(len(idoRows))
97+
return
98+
},
99+
)
100+
101+
// This never queried info isn't needed anymore.
102+
cacheExec(*tx, "DELETE FROM last_start_time")
103+
})
104+
105+
ht.bar.SetTotal(ht.bar.Current(), true)
106+
}
107+
108+
// buildPreviousHardStateCache rationale:
109+
//
110+
// Icinga DB's state_history#previous_hard_state would need a subquery.
111+
// That make the IDO reading even slower than the Icinga DB writing.
112+
// Therefore: Stream IDO's icinga_statehistory once, compute state_history#previous_hard_state
113+
// and cache it into an SQLite database. Then steam from that database and the IDO.
114+
//
115+
// Similar for notifications. (On non-recoverable errors the whole program exits.)
116+
func buildPreviousHardStateCache(ht *historyType, idoColumns []string) {
117+
type row = struct {
118+
Id uint64
119+
ObjectId uint64
120+
LastHardState uint8
121+
}
122+
123+
chunkCacheTx(ht.cache, func(tx **sqlx.Tx, commitPeriodically func()) {
124+
var nextIds struct {
125+
Cnt int64
126+
MinId sql.NullInt64
127+
}
128+
cacheGet(*tx, &nextIds, "SELECT COUNT(*) cnt, MIN(history_id) min_id FROM next_ids")
129+
130+
var previousHardStateCnt int64
131+
cacheGet(*tx, &previousHardStateCnt, "SELECT COUNT(*) FROM previous_hard_state")
132+
133+
var checkpoint int64
134+
if nextIds.MinId.Valid { // there are next_ids
135+
checkpoint = nextIds.MinId.Int64 // this kind of caches is filled descending
136+
} else { // there aren't any next_ids
137+
// next_ids contains the most recently processed IDs and is only empty if...
138+
if previousHardStateCnt == 0 {
139+
// ... we didn't actually start yet...
140+
checkpoint = math.MaxInt64 // start from the largest (possible) ID
141+
} else {
142+
// ... or we've already finished.
143+
checkpoint = 0 // make following query no-op
144+
}
145+
}
146+
147+
ht.bar.SetCurrent(previousHardStateCnt + nextIds.Cnt)
148+
149+
// We continue where we finished before. As we build the cache in reverse chronological order:
150+
// 1. If the history grows between two migration trials, we won't migrate the difference. Workarounds:
151+
// a. Start migration after Icinga DB is up and running.
152+
// b. Remove the cache before the next migration trial.
153+
// 2. If the history gets cleaned up between two migration trials,
154+
// the difference either just doesn't appear in the cache or - if already there - will be ignored later.
155+
156+
// Stream source data...
157+
sliceIdoHistory(
158+
ht,
159+
"SELECT "+strings.Join(idoColumns, ", ")+" FROM "+ht.idoTable+
160+
// For actual migration icinga_objects will be joined anyway,
161+
// so it makes no sense to take vanished objects into account.
162+
" xh USE INDEX (PRIMARY) INNER JOIN icinga_objects o ON o.object_id=xh.object_id WHERE xh."+
163+
ht.idoIdColumn+" <= :toid AND xh."+
164+
ht.idoIdColumn+" < :checkpoint ORDER BY xh."+ht.idoIdColumn+" DESC LIMIT :bulk",
165+
nil, checkpoint, // ... since we were interrupted:
166+
func(idoRows []row) (checkpoint interface{}) {
167+
for _, idoRow := range idoRows {
168+
var nhs []struct{ NextHardState uint8 }
169+
cacheSelect(*tx, &nhs, "SELECT next_hard_state FROM next_hard_state WHERE object_id=?", idoRow.ObjectId)
170+
171+
if len(nhs) < 1 { // we just started (per checkable)
172+
// At the moment (we're "travelling back in time") that's the checkable's hard state:
173+
cacheExec(
174+
*tx, "INSERT INTO next_hard_state(object_id, next_hard_state) VALUES (?, ?)",
175+
idoRow.ObjectId, idoRow.LastHardState,
176+
)
177+
178+
// But for the current time point the previous hard state isn't known, yet:
179+
cacheExec(
180+
*tx, "INSERT INTO next_ids(history_id, object_id) VALUES (?, ?)",
181+
idoRow.Id, idoRow.ObjectId,
182+
)
183+
} else if idoRow.LastHardState == nhs[0].NextHardState {
184+
// The hard state didn't change yet (per checkable),
185+
// so this time point also awaits the previous hard state.
186+
cacheExec(
187+
*tx, "INSERT INTO next_ids(history_id, object_id) VALUES (?, ?)",
188+
idoRow.Id, idoRow.ObjectId,
189+
)
190+
} else { // the hard state changed (per checkable)
191+
// That past hard state is now available for the processed future time points:
192+
cacheExec(
193+
*tx,
194+
"INSERT INTO previous_hard_state(history_id, previous_hard_state) "+
195+
"SELECT history_id, ? FROM next_ids WHERE object_id=?",
196+
idoRow.LastHardState, idoRow.ObjectId,
197+
)
198+
199+
// Now they have what they wanted:
200+
cacheExec(*tx, "DELETE FROM next_hard_state WHERE object_id=?", idoRow.ObjectId)
201+
cacheExec(*tx, "DELETE FROM next_ids WHERE object_id=?", idoRow.ObjectId)
202+
203+
// That's done.
204+
// Now do the same thing as in the "we just started" case above, for the same reason:
205+
206+
cacheExec(
207+
*tx, "INSERT INTO next_hard_state(object_id, next_hard_state) VALUES (?, ?)",
208+
idoRow.ObjectId, idoRow.LastHardState,
209+
)
210+
211+
cacheExec(
212+
*tx, "INSERT INTO next_ids(history_id, object_id) VALUES (?, ?)",
213+
idoRow.Id, idoRow.ObjectId,
214+
)
215+
}
216+
217+
commitPeriodically()
218+
checkpoint = idoRow.Id
219+
}
220+
221+
ht.bar.IncrBy(len(idoRows))
222+
return
223+
},
224+
)
225+
226+
// No past hard state is available for the processed future time points, assuming pending:
227+
cacheExec(
228+
*tx, "INSERT INTO previous_hard_state(history_id, previous_hard_state) SELECT history_id, 99 FROM next_ids",
229+
)
230+
231+
// Now they should have what they wanted:
232+
cacheExec(*tx, "DELETE FROM next_hard_state")
233+
cacheExec(*tx, "DELETE FROM next_ids")
234+
})
235+
236+
ht.bar.SetTotal(ht.bar.Current(), true)
237+
}
238+
239+
// chunkCacheTx rationale: during do operate on cache via *tx. After every completed operation call commitPeriodically()
240+
// which periodically commits *tx and starts a new tx. (That's why tx is a **, not just a *.)
241+
// (On non-recoverable errors the whole program exits.)
242+
func chunkCacheTx(cache *sqlx.DB, do func(tx **sqlx.Tx, commitPeriodically func())) {
243+
logger := log.With("backend", "cache")
244+
245+
tx, err := cache.Beginx()
246+
if err != nil {
247+
logger.Fatalf("%+v", errors.Wrap(err, "can't begin transaction"))
248+
}
249+
250+
const commitInterval = 5 * time.Minute
251+
nextCommit := time.Now().Add(commitInterval)
252+
253+
do(&tx, func() { // commitPeriodically
254+
if now := time.Now(); now.After(nextCommit) {
255+
if err := tx.Commit(); err != nil {
256+
logger.Fatalf("%+v", errors.Wrap(err, "can't commit transaction"))
257+
}
258+
259+
var err error
260+
261+
tx, err = cache.Beginx()
262+
if err != nil {
263+
logger.Fatalf("%+v", errors.Wrap(err, "can't begin transaction"))
264+
}
265+
266+
nextCommit = nextCommit.Add(commitInterval)
267+
}
268+
})
269+
270+
if err := tx.Commit(); err != nil {
271+
logger.Fatalf("%+v", errors.Wrap(err, "can't commit transaction"))
272+
}
273+
}
274+
275+
// cacheGet does cache.Get(dest, query, args...). (On non-recoverable errors the whole program exits.)
276+
func cacheGet(cache interface {
277+
Get(dest interface{}, query string, args ...interface{}) error
278+
}, dest interface{}, query string, args ...interface{}) {
279+
if err := cache.Get(dest, query, args...); err != nil {
280+
log.With("backend", "cache", "query", query, "args", args).
281+
Fatalf("%+v", errors.Wrap(err, "can't perform query"))
282+
}
283+
}
284+
285+
// cacheSelect does cacheTx.Select(dest, query, args...). (On non-recoverable errors the whole program exits.)
286+
func cacheSelect(cacheTx *sqlx.Tx, dest interface{}, query string, args ...interface{}) {
287+
if err := cacheTx.Select(dest, query, args...); err != nil {
288+
log.With("backend", "cache", "query", query, "args", args).
289+
Fatalf("%+v", errors.Wrap(err, "can't perform query"))
290+
}
291+
}
292+
293+
// cacheExec does cacheTx.Exec(dml, args...). On non-recoverable errors the whole program exits.
294+
func cacheExec(cacheTx *sqlx.Tx, dml string, args ...interface{}) {
295+
if _, err := cacheTx.Exec(dml, args...); err != nil {
296+
log.With("backend", "cache", "dml", dml, "args", args).Fatalf("%+v", errors.Wrap(err, "can't perform DML"))
297+
}
298+
}

0 commit comments

Comments
 (0)