7
7
8
8
import codecs
9
9
import copy
10
- import json
11
10
import os
11
+ import tempfile
12
+ import uuid
12
13
from collections import OrderedDict
13
14
from decimal import Decimal
14
15
from warnings import warn
15
16
17
+ import BTrees .OOBTree
18
+ import ijson
19
+ import transaction
16
20
import xmltodict
21
+ import zc .zlibstorage
22
+ import ZODB .FileStorage
17
23
18
24
from flattentool .i18n import _
19
25
from flattentool .input import path_search
20
26
from flattentool .schema import make_sub_sheet_name
21
- from flattentool .sheet import Sheet
27
+ from flattentool .sheet import PersistentSheet
22
28
23
29
BASIC_TYPES = [str , bool , int , Decimal , type (None )]
24
30
@@ -112,9 +118,31 @@ def __init__(
112
118
remove_empty_schema_columns = False ,
113
119
rollup = False ,
114
120
truncation_length = 3 ,
121
+ persist = False ,
115
122
):
123
+ if persist :
124
+ # Use temp directories in OS agnostic way
125
+ self .zodb_db_location = (
126
+ tempfile .gettempdir () + "/flattentool-" + str (uuid .uuid4 ())
127
+ )
128
+ # zlibstorage lowers disk usage by a lot at very small performance cost
129
+ zodb_storage = zc .zlibstorage .ZlibStorage (
130
+ ZODB .FileStorage .FileStorage (self .zodb_db_location )
131
+ )
132
+ self .db = ZODB .DB (zodb_storage )
133
+ else :
134
+ # If None, in memory storage is used.
135
+ self .db = ZODB .DB (None )
136
+
137
+ self .connection = self .db .open ()
138
+
139
+ # ZODB root, only objects attached here will be persisted
140
+ root = self .connection .root
141
+ # OOBTree means a btree with keys and values are objects (including strings)
142
+ root .sheet_store = BTrees .OOBTree .BTree ()
143
+
116
144
self .sub_sheets = {}
117
- self .main_sheet = Sheet ( )
145
+ self .main_sheet = PersistentSheet ( connection = self . connection , name = "" )
118
146
self .root_list_path = root_list_path
119
147
self .root_id = root_id
120
148
self .use_titles = use_titles
@@ -125,9 +153,19 @@ def __init__(
125
153
self .filter_value = filter_value
126
154
self .remove_empty_schema_columns = remove_empty_schema_columns
127
155
self .seen_paths = set ()
156
+ self .persist = persist
128
157
129
158
if schema_parser :
130
- self .main_sheet = copy .deepcopy (schema_parser .main_sheet )
159
+ # schema parser does not make sheets that are persistent,
160
+ # so use from_sheets which deep copies everything in it.
161
+ self .main_sheet = PersistentSheet .from_sheet (
162
+ schema_parser .main_sheet , self .connection
163
+ )
164
+ for sheet_name , sheet in list (self .sub_sheets .items ()):
165
+ self .sub_sheets [sheet_name ] = PersistentSheet .from_sheet (
166
+ sheet , self .connection
167
+ )
168
+
131
169
self .sub_sheets = copy .deepcopy (schema_parser .sub_sheets )
132
170
if remove_empty_schema_columns :
133
171
# Don't use columns from the schema parser
@@ -194,18 +232,13 @@ def __init__(
194
232
_ ("Only one of json_file or root_json_dict should be supplied" )
195
233
)
196
234
197
- if json_filename :
198
- with codecs .open (json_filename , encoding = "utf-8" ) as json_file :
199
- try :
200
- self .root_json_dict = json .load (
201
- json_file , object_pairs_hook = OrderedDict , parse_float = Decimal
202
- )
203
- except UnicodeError as err :
204
- raise BadlyFormedJSONErrorUTF8 (* err .args )
205
- except ValueError as err :
206
- raise BadlyFormedJSONError (* err .args )
207
- else :
208
- self .root_json_dict = root_json_dict
235
+ if not json_filename :
236
+ if self .root_list_path is None :
237
+ self .root_json_list = root_json_dict
238
+ else :
239
+ self .root_json_list = path_search (
240
+ root_json_dict , self .root_list_path .split ("/" )
241
+ )
209
242
210
243
if preserve_fields :
211
244
# Extract fields to be preserved from input file (one path per line)
@@ -240,19 +273,41 @@ def __init__(
240
273
self .preserve_fields = None
241
274
self .preserve_fields_input = None
242
275
276
+ if json_filename :
277
+ if self .root_list_path is None :
278
+ path = "item"
279
+ else :
280
+ path = root_list_path .replace ("/" , "." ) + ".item"
281
+
282
+ json_file = codecs .open (json_filename , encoding = "utf-8" )
283
+
284
+ self .root_json_list = ijson .items (json_file , path , map_type = OrderedDict )
285
+
286
+ try :
287
+ self .parse ()
288
+ except ijson .common .IncompleteJSONError as err :
289
+ raise BadlyFormedJSONError (* err .args )
290
+ except UnicodeDecodeError as err :
291
+ raise BadlyFormedJSONErrorUTF8 (* err .args )
292
+ finally :
293
+ if json_filename :
294
+ json_file .close ()
295
+
243
296
def parse (self ):
244
- if self .root_list_path is None :
245
- root_json_list = self .root_json_dict
246
- else :
247
- root_json_list = path_search (
248
- self .root_json_dict , self .root_list_path .split ("/" )
249
- )
250
- for json_dict in root_json_list :
297
+ for num , json_dict in enumerate (self .root_json_list ):
251
298
if json_dict is None :
252
299
# This is particularly useful for IATI XML, in order to not
253
300
# fall over on empty activity, e.g. <iati-activity/>
254
301
continue
255
302
self .parse_json_dict (json_dict , sheet = self .main_sheet )
303
+ # only persist every 2000 objects. peristing more often slows down storing.
304
+ # 2000 top level objects normally not too much to store in memory.
305
+ if num % 2000 == 0 and num != 0 :
306
+ transaction .commit ()
307
+
308
+ # This commit could be removed which would mean that upto 2000 objects
309
+ # could be stored in memory without anything being persisted.
310
+ transaction .commit ()
256
311
257
312
if self .remove_empty_schema_columns :
258
313
# Remove sheets with no lines of data
@@ -501,7 +556,9 @@ def parse_json_dict(
501
556
parent_name , key , truncation_length = self .truncation_length
502
557
)
503
558
if sub_sheet_name not in self .sub_sheets :
504
- self .sub_sheets [sub_sheet_name ] = Sheet (name = sub_sheet_name )
559
+ self .sub_sheets [sub_sheet_name ] = PersistentSheet (
560
+ name = sub_sheet_name , connection = self .connection
561
+ )
505
562
506
563
for json_dict in value :
507
564
if json_dict is None :
@@ -518,4 +575,16 @@ def parse_json_dict(
518
575
raise ValueError (_ ("Unsupported type {}" ).format (type (value )))
519
576
520
577
if top :
521
- sheet .lines .append (flattened_dict )
578
+ sheet .append_line (flattened_dict )
579
+
580
+ def __enter__ (self ):
581
+ return self
582
+
583
+ def __exit__ (self , type , value , traceback ):
584
+ if self .persist :
585
+ self .connection .close ()
586
+ self .db .close ()
587
+ os .remove (self .zodb_db_location )
588
+ os .remove (self .zodb_db_location + ".lock" )
589
+ os .remove (self .zodb_db_location + ".index" )
590
+ os .remove (self .zodb_db_location + ".tmp" )
0 commit comments