forked from datastax-archive/logparse
-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathnovadb_log_events.py
More file actions
162 lines (130 loc) · 5.48 KB
/
novadb_log_events.py
File metadata and controls
162 lines (130 loc) · 5.48 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
# Copyright (c) Microsoft Corporation. All rights reserved.
# Utility methods to deal with log events in DB.
import json
import os
import shutil
import sqlite3
import statsd
import time
import traceback
from contextlib import closing
# set up statsd
stats = statsd.StatsClient('localhost', 8125)
account_name = os.getenv("MONITORING_MDM_ACCOUNT");
namespace = os.getenv("MONITORING_MDM_NAMESPACE");
tenant = os.getenv("MONITORING_TENANT");
role = os.getenv("MONITORING_ROLE");
role_instance = os.getenv("MONITORING_ROLE_INSTANCE");
metric_identifier = {
"Account": account_name,
"Namespace": namespace,
}
def create_connection(database_file):
return sqlite3.connect(database_file)
def create_table(connection):
with closing(connection.cursor()) as cursor:
query = """CREATE TABLE IF NOT EXISTS LogEvent (
event_product text,
event_category text,
event_type text,
event_date text,
attributes text,
PRIMARY KEY(event_product, event_category, event_type)
); """
cursor.execute(query)
def upsert_events(connection, events_map):
'''
Values of the events_map are upserted into the DB as part of a single transaction,
and committed at the end, to minimize disk I/O.
'''
if len(events_map) == 0:
return
success = False
retry_attempt = 0
max_retry_attempts = 5
while not success and retry_attempt < max_retry_attempts:
with closing(connection.cursor()) as cursor:
try:
for event in events_map.values():
attributes = get_attributes(event)
upsert(connection, cursor, event["event_product"], event["event_category"], event["event_type"], event["date"], attributes)
connection.commit()
success = True
except sqlite3.Error as e:
print("Error occurred while comitting log events {0}. Retrying: {1}".format(str(events_map), str(e)))
connection.rollback()
success = False
retry_attempt += 1
if retry_attempt == max_retry_attempts:
print("Emitting metrics for commit error")
emit_commit_error_metrics()
else:
time.sleep(1)
def get_attributes(event):
attributes = {}
try:
if event["event_category"] == 'startup':
if event["event_type"] == 'ip_address_conflict':
attributes = {"endpoint": event["endpoint"]}
if event["event_category"] == 'tombstone':
if event["event_type"] == 'warning_threshold_exceeded':
attributes = {"tombstoned_cells": event["tombstoned_cells"], "keyspace": event["keyspace"], "table": event["table"]}
if event["event_type"] == 'error_threshold_exceeded':
attributes = {"live_cells": event['live_cells'], "tombstoned_cells": event['tombstoned_cells'], "keyspace": event['keyspace'], "table": event['table'], "key": event['key'], "requested_columns": event['requested_columns'], slice_start: event['slice_start'], "slice_end": event['slice_end'], "deletion_info": event['deletion_info']}
except:
ex = traceback.format_exc()
print("get_attributes: exception encountered - " + ex)
return json.dumps(attributes)
def upsert(connection, cursor, event_product, event_category, event_type, event_date, attributes):
query = """INSERT INTO LogEvent(event_product, event_category, event_type, event_date)
VALUES (?, ?, ?, ?) ON CONFLICT(event_product, event_category, event_type) DO UPDATE set event_date=?, attributes=?"""
values = (event_product, event_category, event_type, event_date, event_date, attributes)
cursor.execute(query, values)
def emit_commit_error_metrics():
dims = metric_identifier.copy()
dims['Metric'] = "CommitLogEventsError"
dims['Dims'] = {
'Tenant': tenant,
"Role": role,
"RoleInstance": role_instance,
"Service": "cassandra",
}
emit_metrics(dims)
emit_metrics_to_file(dims, '/var/log/logevents_commit_error_metrics_new.json', '/var/log/logevents_commit_error_metrics.json')
def emit_metrics(dims):
jsonDims = None
try:
jsonDims = json.dumps(dims)
stats.gauge(jsonDims, 1)
except Exception as e:
print("Error emitting metrics " + jsonDims + ": " + str(e))
def emit_metrics_to_file(dims, src, dst):
# Dump metrics into local filesystem
try:
# Make a copy of destination file if it already exists
if os.path.exists(dst):
shutil.copyfile(dst, src)
# Append to it and rename it as the destination file
with open(src, 'a+') as f:
f.write(json.dumps(dims))
f.close()
os.rename(f.name, dst)
except Exception as e:
print("Error writing out metrics to file "+ dst +": " + str(e))
def init():
database_file = r"/var/lib/cassandra/nova/logevents.db"
connection = create_connection(database_file)
connection.execute('pragma journal_mode=wal')
create_table(connection)
return connection
def main():
with closing(init()) as conn:
print("Log events stored in Nova DB:")
query = '''SELECT * FROM LogEvent'''
with closing(conn.cursor()) as cursor:
cursor.execute(query)
output = cursor.fetchall()
for row in output:
print(row)
if __name__ == '__main__':
main()