77import email .utils
88import mailbox
99import email
10-
10+ import logging
1111
1212http_client = HTTPClient ()
1313
1414DEFAULT_BATCH_SIZE = 500
15- ES_URL = "http://localhost:9200/gmail "
16-
15+ DEFAULT_ES_URL = "http://localhost:9200"
16+ DEFAULT_INDEX_NAME = "gmail"
1717
1818def delete_index ():
1919 try :
20- body = { " refresh" : True }
21- request = HTTPRequest (ES_URL , method = "DELETE" , body = json . dumps ( body ) , request_timeout = 240 )
20+ url = "%s/%s? refresh=true" % ( tornado . options . options . es_url , tornado . options . options . index_name )
21+ request = HTTPRequest (url , method = "DELETE" , request_timeout = 240 )
2222 response = http_client .fetch (request )
23- print 'Delete index done %s' % response .body
23+ logging . info ( 'Delete index done %s' % response .body )
2424 except :
2525 pass
2626
@@ -50,28 +50,31 @@ def create_index():
5050 }
5151
5252 body = json .dumps (schema )
53- request = HTTPRequest (ES_URL , method = "PUT" , body = body , request_timeout = 240 )
54- response = http_client .fetch (request )
55- print 'Create index done %s' % response .body
56-
57- total_uploaded = 0
53+ url = "%s/%s" % (tornado .options .options .es_url , tornado .options .options .index_name )
54+ try :
55+ request = HTTPRequest (url , method = "PUT" , body = body , request_timeout = 240 )
56+ response = http_client .fetch (request )
57+ logging .info ('Create index done %s' % response .body )
58+ except :
59+ pass
5860
5961
62+ total_uploaded = 0
6063def upload_batch (upload_data ):
6164 upload_data_txt = ""
6265 for item in upload_data :
6366 cmd = {'index' : {'_index' : 'gmail' , '_type' : 'email' , '_id' : item ['message-id' ]}}
6467 upload_data_txt += json .dumps (cmd ) + "\n "
6568 upload_data_txt += json .dumps (item ) + "\n "
6669
67- request = HTTPRequest ("http://localhost:9200 /_bulk" , method = "POST" , body = upload_data_txt , request_timeout = 240 )
70+ request = HTTPRequest (tornado . options . options . es_url + " /_bulk" , method = "POST" , body = upload_data_txt , request_timeout = 240 )
6871 response = http_client .fetch (request )
6972 result = json .loads (response .body )
7073
7174 global total_uploaded
7275 total_uploaded += len (upload_data )
7376 res_txt = "OK" if not result ['errors' ] else "FAILED"
74- print "Upload: %s - upload took: %4dms, total messages uploaded: %6d" % (res_txt , result ['took' ], total_uploaded )
77+ logging . info ( "Upload: %s - upload took: %4dms, total messages uploaded: %6d" % (res_txt , result ['took' ], total_uploaded ) )
7578
7679
7780def normalize_email (email_in ):
@@ -122,13 +125,15 @@ def load_from_file():
122125
123126 if tornado .options .options .init :
124127 delete_index ()
125- create_index ()
128+ create_index ()
129+
126130
127131 if tornado .options .options .skip :
128- print "Skipping first %d messages from mbox file" % tornado .options .options .skip
132+ logging . info ( "Skipping first %d messages from mbox file" % tornado .options .options .skip )
129133
130134 count = 0
131135 upload_data = list ()
136+ logging .info ("Starting import from file %s" % tornado .options .options .infile )
132137 mbox = mailbox .UnixMailbox (open (tornado .options .options .infile , 'rb' ), email .message_from_file )
133138 for msg in mbox :
134139 count += 1
@@ -145,34 +150,28 @@ def load_from_file():
145150 if upload_data :
146151 upload_batch (upload_data )
147152
148- print "Done - total count %d" % count
153+ logging . info ( "Import done - total count %d" % count )
149154
150155
151156if __name__ == '__main__' :
152157
153- tornado .options .define (
154- "infile" ,
155- type = str ,
156- default = None ,
157- help = "The mbox input file" )
158-
159- tornado .options .define (
160- "init" ,
161- type = bool ,
162- default = False ,
163- help = "Delete and re-initialize the Elasticsearch index" )
164-
165- tornado .options .define (
166- "batch_size" ,
167- type = int ,
168- default = DEFAULT_BATCH_SIZE ,
169- help = "Elasticsearch bulk index batch size" )
170-
171- tornado .options .define (
172- "skip" ,
173- type = int ,
174- default = 0 ,
175- help = "Number of messages to skip from the mbox file" )
158+ tornado .options .define ("es_url" , type = str , default = DEFAULT_ES_URL ,
159+ help = "URL of your Elasticsearch node" )
160+
161+ tornado .options .define ("index_name" , type = str , default = DEFAULT_INDEX_NAME ,
162+ help = "Name of the index to store your messages" )
163+
164+ tornado .options .define ("infile" , type = str , default = None ,
165+ help = "The mbox input file" )
166+
167+ tornado .options .define ("init" , type = bool , default = False ,
168+ help = "Force deleting and re-initializing the Elasticsearch index" )
169+
170+ tornado .options .define ("batch_size" , type = int , default = DEFAULT_BATCH_SIZE ,
171+ help = "Elasticsearch bulk index batch size" )
172+
173+ tornado .options .define ("skip" , type = int , default = 0 ,
174+ help = "Number of messages to skip from the mbox file" )
176175
177176 tornado .options .parse_command_line ()
178177
0 commit comments