-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathreindex_elasticsearch.py
More file actions
136 lines (111 loc) · 5.05 KB
/
reindex_elasticsearch.py
File metadata and controls
136 lines (111 loc) · 5.05 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
import argparse
from elasticsearch import Elasticsearch
import sys
import re
def validate_index_name(index_name):
"""Validate index name format (e.g., fg-009783)."""
pattern = r'^[A-Za-z]+-\d{6}$'
return bool(re.match(pattern, index_name))
def parse_index_range(start_index, end_index):
try:
start_parts = start_index.split('-')
end_parts = end_index.split('-')
if len(start_parts) < 2 or len(end_parts) < 2:
raise ValueError("Index names must contain a prefix and number separated by '-'")
prefix_start = '-'.join(start_parts[:-1])
prefix_end = '-'.join(end_parts[:-1])
if prefix_start != prefix_end:
raise ValueError("Start and end index prefixes do not match.")
start_num = int(start_parts[-1])
end_num = int(end_parts[-1])
if start_num > end_num:
raise ValueError("Start index number must be less than or equal to end index number.")
return [f"{prefix_start}-{str(i).zfill(6)}" for i in range(start_num, end_num + 1)]
except ValueError as e:
raise ValueError(f"Invalid index format: {str(e)}")
def reindex(es_client, source_index, dest_index, alias=None):
"""Reindex a single index and optionally assign an alias."""
try:
# Check if source index exists
if not es_client.indices.exists(index=source_index):
print(f"Source index {source_index} does not exist. Skipping.")
return False
# Create destination index if it doesn't exist
if not es_client.indices.exists(index=dest_index):
es_client.indices.create(index=dest_index)
# Add alias to the reindexed index if specified
if alias:
try:
es_client.indices.put_alias(index=dest_index, name=alias)
print(f"Added alias {alias} to {dest_index}")
except Exception as e:
print(f"Error adding alias {alias} to {dest_index}: {str(e)}")
# Set index.lifecycle.indexing_complete to true
try:
es_client.indices.put_settings(
index=dest_index,
body={
"settings": {
"index.lifecycle.indexing_complete": True
}
}
)
print(f"Set index.lifecycle.indexing_complete=true for {dest_index}")
except Exception as e:
print(f"Warning: Could not set indexing_complete for {dest_index}: {e}")
# Perform reindex
response = es_client.reindex(
body={
"source": {"index": source_index},
"dest": {"index": dest_index}
},
wait_for_completion=True
)
print(f"Reindexed {source_index} to {dest_index}: {response['took']}ms, "
f"Created: {response['created']}, Updated: {response['updated']}")
return True
except Exception as e:
print(f"Error reindexing {source_index} to {dest_index}: {str(e)}")
return False
def main():
parser = argparse.ArgumentParser(description="Reindex Elasticsearch indexes.")
parser.add_argument('--host', default='localhost:9200', help='Elasticsearch host (default: localhost:9200)')
parser.add_argument('--username', help='Elasticsearch username')
parser.add_argument('--password', help='Elasticsearch password')
parser.add_argument('--start-index', required=True, help='Start index name (e.g., fg-009783)')
parser.add_argument('--end-index', required=True, help='End index name (e.g., fg-009789)')
parser.add_argument('--alias', help='Alias to assign to reindexed indexes')
parser.add_argument('--no-verify-certs', action='store_false', dest='verify_certs',
help='Disable SSL certificate verification')
args = parser.parse_args()
# Validate index names
if not (validate_index_name(args.start_index) and validate_index_name(args.end_index)):
print("Index names must be in format '<prefix>-######' (e.g., fg-009783)")
sys.exit(1)
# Initialize Elasticsearch client
try:
es_client = Elasticsearch(
[args.host],
basic_auth=(args.username, args.password) if args.username and args.password else None,
verify_certs=args.verify_certs
)
except Exception as e:
print(f"Failed to connect to Elasticsearch: {str(e)}")
sys.exit(1)
# Get list of indexes to reindex
try:
indexes = parse_index_range(args.start_index, args.end_index)
except ValueError as e:
print(f"Error: {str(e)}")
sys.exit(1)
# Reindex each index
for source_index in indexes:
dest_index = f"{source_index}-reindexed"
print(f"Starting reindex from {source_index} to {dest_index}")
success = reindex(es_client, source_index, dest_index, args.alias)
if success:
print(f"Successfully reindexed {source_index}")
else:
print(f"Failed to reindex {source_index}")
if __name__ == "__main__":
main()