Skip to content

Commit cc4f378

Browse files
authored
Merge pull request #2 from A-Baji/fix-#926
Fix #926
2 parents f65243d + 8718b2b commit cc4f378

39 files changed

+623
-225
lines changed

CHANGELOG.md

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,21 @@
11
## Release notes
22

33
### 0.13.3 -- TBD
4+
* Bugfix - Fix error in listing ancestors, descendants with part tables.
5+
* Bugfix - Fix Python 3.10 compatibility (#983) PR #972
6+
* Bugfix - Allow renaming non-conforming attributes in proj (#982) PR #972
7+
* Add - Expose proxy feature for S3 external stores (#961) PR #962
8+
* Add - implement multiprocessing in populate (#695) PR #704, #969
49
* Bugfix - Dependencies not properly loaded on populate. (#902) PR #919
510
* Bugfix - Replace use of numpy aliases of built-in types with built-in type. (#938) PR #939
11+
* Bugfix - Deletes and drops must include the master of each part. (#151, #374) PR #957
612
* Bugfix - `ExternalTable.delete` should not remove row on error (#953) PR #956
713
* Bugfix - Fix error handling of remove_object function in `s3.py` (#952) PR #955
14+
* Bugfix - Fix regression issue with `DISTINCT` clause and `GROUP_BY` (#914) PR #963
15+
* Bugfix - Fix sql code generation to comply with sql mode `ONLY_FULL_GROUP_BY` (#916) PR #965
16+
* Bugfix - Fix count for left-joined `QueryExpressions` (#951) PR #966
17+
* Bugfix - Fix assertion error when performing a union into a join (#930) PR #967
18+
* Update `~jobs.error_stack` from blob to mediumblob to allow error stacks >64kB in jobs (#984) PR #986
819
* Bugfix - Fix error when performing a union on multiple tables (#926) PR #964
920

1021
### 0.13.2 -- May 7, 2021
@@ -126,7 +137,7 @@
126137
* Fix #628 - incompatibility with pyparsing 2.4.1
127138

128139
### 0.11.1 -- Nov 15, 2018
129-
* Fix ordering of attributes in proj (#483 and #516)
140+
* Fix ordering of attributes in proj (#483, #516)
130141
* Prohibit direct insert into auto-populated tables (#511)
131142

132143
### 0.11.0 -- Oct 25, 2018
@@ -239,7 +250,7 @@ Documentation and tutorials available at https://docs.datajoint.io and https://t
239250
* ERD() no longer text the context argument.
240251
* ERD.draw() now takes an optional context argument. By default uses the caller's locals.
241252

242-
### 0.3.2.
253+
### 0.3.2.
243254
* Fixed issue #223: `insert` can insert relations without fetching.
244255
* ERD() now takes the `context` argument, which specifies in which context to look for classes. The default is taken from the argument (schema or relation).
245256
* ERD.draw() no longer has the `prefix` argument: class names are shown as found in the context.

LNX-docker-compose.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ services:
3232
interval: 1s
3333
fakeservices.datajoint.io:
3434
<<: *net
35-
image: datajoint/nginx:v0.0.18
35+
image: datajoint/nginx:v0.0.19
3636
environment:
3737
- ADD_db_TYPE=DATABASE
3838
- ADD_db_ENDPOINT=db:3306

datajoint/autopopulate.py

Lines changed: 142 additions & 68 deletions
Original file line numberDiff line numberDiff line change
@@ -8,12 +8,37 @@
88
from .expression import QueryExpression, AndList
99
from .errors import DataJointError, LostConnectionError
1010
import signal
11+
import multiprocessing as mp
1112

1213
# noinspection PyExceptionInherit,PyCallingNonCallable
1314

1415
logger = logging.getLogger(__name__)
1516

1617

18+
# --- helper functions for multiprocessing --
19+
20+
def _initialize_populate(table, jobs, populate_kwargs):
21+
"""
22+
Initialize the process for mulitprocessing.
23+
Saves the unpickled copy of the table to the current process and reconnects.
24+
"""
25+
process = mp.current_process()
26+
process.table = table
27+
process.jobs = jobs
28+
process.populate_kwargs = populate_kwargs
29+
table.connection.connect() # reconnect
30+
31+
32+
def _call_populate1(key):
33+
"""
34+
Call current process' table._populate1()
35+
:key - a dict specifying job to compute
36+
:return: key, error if error, otherwise None
37+
"""
38+
process = mp.current_process()
39+
return process.table._populate1(key, process.jobs, **process.populate_kwargs)
40+
41+
1742
class AutoPopulate:
1843
"""
1944
AutoPopulate is a mixin class that adds the method populate() to a Relation class.
@@ -26,18 +51,20 @@ class AutoPopulate:
2651
@property
2752
def key_source(self):
2853
"""
29-
:return: the relation whose primary key values are passed, sequentially, to the
30-
``make`` method when populate() is called.
31-
The default value is the join of the parent relations.
32-
Users may override to change the granularity or the scope of populate() calls.
54+
:return: the query expression that yields primary key values to be passed,
55+
sequentially, to the ``make`` method when populate() is called.
56+
The default value is the join of the parent tables references from the primary key.
57+
Subclasses may override they key_source to change the scope or the granularity
58+
of the make calls.
3359
"""
3460
def _rename_attributes(table, props):
3561
return (table.proj(
3662
**{attr: ref for attr, ref in props['attr_map'].items() if attr != ref})
37-
if props['aliased'] else table)
63+
if props['aliased'] else table.proj())
3864

3965
if self._key_source is None:
40-
parents = self.target.parents(primary=True, as_objects=True, foreign_key_info=True)
66+
parents = self.target.parents(
67+
primary=True, as_objects=True, foreign_key_info=True)
4168
if not parents:
4269
raise DataJointError('A table must have dependencies '
4370
'from its primary key for auto-populate to work')
@@ -48,17 +75,19 @@ def _rename_attributes(table, props):
4875

4976
def make(self, key):
5077
"""
51-
Derived classes must implement method `make` that fetches data from tables that are
52-
above them in the dependency hierarchy, restricting by the given key, computes dependent
53-
attributes, and inserts the new tuples into self.
78+
Derived classes must implement method `make` that fetches data from tables
79+
above them in the dependency hierarchy, restricting by the given key,
80+
computes secondary attributes, and inserts the new tuples into self.
5481
"""
55-
raise NotImplementedError('Subclasses of AutoPopulate must implement the method `make`')
82+
raise NotImplementedError(
83+
'Subclasses of AutoPopulate must implement the method `make`')
5684

5785
@property
5886
def target(self):
5987
"""
6088
:return: table to be populated.
61-
In the typical case, dj.AutoPopulate is mixed into a dj.Table class by inheritance and the target is self.
89+
In the typical case, dj.AutoPopulate is mixed into a dj.Table class by
90+
inheritance and the target is self.
6291
"""
6392
return self
6493

@@ -85,40 +114,45 @@ def _jobs_to_do(self, restrictions):
85114

86115
if not isinstance(todo, QueryExpression):
87116
raise DataJointError('Invalid key_source value')
88-
# check if target lacks any attributes from the primary key of key_source
117+
89118
try:
119+
# check if target lacks any attributes from the primary key of key_source
90120
raise DataJointError(
91-
'The populate target lacks attribute %s from the primary key of key_source' % next(
92-
name for name in todo.heading.primary_key if name not in self.target.heading))
121+
'The populate target lacks attribute %s '
122+
'from the primary key of key_source' % next(
123+
name for name in todo.heading.primary_key
124+
if name not in self.target.heading))
93125
except StopIteration:
94126
pass
95127
return (todo & AndList(restrictions)).proj()
96128

97129
def populate(self, *restrictions, suppress_errors=False, return_exception_objects=False,
98130
reserve_jobs=False, order="original", limit=None, max_calls=None,
99-
display_progress=False):
131+
display_progress=False, processes=1):
100132
"""
101-
rel.populate() calls rel.make(key) for every primary key in self.key_source
102-
for which there is not already a tuple in rel.
103-
:param restrictions: a list of restrictions each restrict (rel.key_source - target.proj())
133+
table.populate() calls table.make(key) for every primary key in self.key_source
134+
for which there is not already a tuple in table.
135+
:param restrictions: a list of restrictions each restrict
136+
(table.key_source - target.proj())
104137
:param suppress_errors: if True, do not terminate execution.
105138
:param return_exception_objects: return error objects instead of just error messages
106-
:param reserve_jobs: if true, reserves job to populate in asynchronous fashion
139+
:param reserve_jobs: if True, reserve jobs to populate in asynchronous fashion
107140
:param order: "original"|"reverse"|"random" - the order of execution
141+
:param limit: if not None, check at most this many keys
142+
:param max_calls: if not None, populate at most this many keys
108143
:param display_progress: if True, report progress_bar
109-
:param limit: if not None, checks at most that many keys
110-
:param max_calls: if not None, populates at max that many keys
144+
:param processes: number of processes to use. When set to a large number, then
145+
uses as many as CPU cores
111146
"""
112147
if self.connection.in_transaction:
113148
raise DataJointError('Populate cannot be called during a transaction.')
114149

115150
valid_order = ['original', 'reverse', 'random']
116151
if order not in valid_order:
117152
raise DataJointError('The order argument must be one of %s' % str(valid_order))
118-
error_list = [] if suppress_errors else None
119153
jobs = self.connection.schemas[self.target.database].jobs if reserve_jobs else None
120154

121-
# define and setup signal handler for SIGTERM
155+
# define and set up signal handler for SIGTERM:
122156
if reserve_jobs:
123157
def handler(signum, frame):
124158
logger.info('Populate terminated by SIGTERM')
@@ -131,60 +165,99 @@ def handler(signum, frame):
131165
elif order == "random":
132166
random.shuffle(keys)
133167

134-
call_count = 0
135168
logger.info('Found %d keys to populate' % len(keys))
136169

137-
make = self._make_tuples if hasattr(self, '_make_tuples') else self.make
170+
keys = keys[:max_calls]
171+
nkeys = len(keys)
138172

139-
for key in (tqdm(keys, desc=self.__class__.__name__) if display_progress else keys):
140-
if max_calls is not None and call_count >= max_calls:
141-
break
142-
if not reserve_jobs or jobs.reserve(self.target.table_name, self._job_key(key)):
143-
self.connection.start_transaction()
144-
if key in self.target: # already populated
145-
self.connection.cancel_transaction()
146-
if reserve_jobs:
147-
jobs.complete(self.target.table_name, self._job_key(key))
173+
if processes > 1:
174+
processes = min(processes, nkeys, mp.cpu_count())
175+
176+
error_list = []
177+
populate_kwargs = dict(
178+
suppress_errors=suppress_errors,
179+
return_exception_objects=return_exception_objects)
180+
181+
if processes == 1:
182+
for key in tqdm(keys, desc=self.__class__.__name__) if display_progress else keys:
183+
error = self._populate1(key, jobs, **populate_kwargs)
184+
if error is not None:
185+
error_list.append(error)
186+
else:
187+
# spawn multiple processes
188+
self.connection.close() # disconnect parent process from MySQL server
189+
del self.connection._conn.ctx # SSLContext is not pickleable
190+
with mp.Pool(processes, _initialize_populate, (self, populate_kwargs)) as pool:
191+
if display_progress:
192+
with tqdm(desc="Processes: ", total=nkeys) as pbar:
193+
for error in pool.imap(_call_populate1, keys, chunksize=1):
194+
if error is not None:
195+
error_list.append(error)
196+
pbar.update()
148197
else:
149-
logger.info('Populating: ' + str(key))
150-
call_count += 1
151-
self.__class__._allow_insert = True
152-
try:
153-
make(dict(key))
154-
except (KeyboardInterrupt, SystemExit, Exception) as error:
155-
try:
156-
self.connection.cancel_transaction()
157-
except LostConnectionError:
158-
pass
159-
error_message = '{exception}{msg}'.format(
160-
exception=error.__class__.__name__,
161-
msg=': ' + str(error) if str(error) else '')
162-
if reserve_jobs:
163-
# show error name and error message (if any)
164-
jobs.error(
165-
self.target.table_name, self._job_key(key),
166-
error_message=error_message, error_stack=traceback.format_exc())
167-
if not suppress_errors or isinstance(error, SystemExit):
168-
raise
169-
else:
170-
logger.error(error)
171-
error_list.append((key, error if return_exception_objects else error_message))
172-
else:
173-
self.connection.commit_transaction()
174-
if reserve_jobs:
175-
jobs.complete(self.target.table_name, self._job_key(key))
176-
finally:
177-
self.__class__._allow_insert = False
198+
for error in pool.imap(_call_populate1, keys):
199+
if error is not None:
200+
error_list.append(error)
201+
self.connection.connect() # reconnect parent process to MySQL server
178202

179-
# place back the original signal handler
203+
# restore original signal handler:
180204
if reserve_jobs:
181205
signal.signal(signal.SIGTERM, old_handler)
182-
return error_list
206+
207+
if suppress_errors:
208+
return error_list
209+
210+
def _populate1(self, key, jobs, suppress_errors, return_exception_objects):
211+
"""
212+
populates table for one source key, calling self.make inside a transaction.
213+
:param jobs: the jobs table or None if not reserve_jobs
214+
:param key: dict specifying job to populate
215+
:param suppress_errors: bool if errors should be suppressed and returned
216+
:param return_exception_objects: if True, errors must be returned as objects
217+
:return: (key, error) when suppress_errors=True, otherwise None
218+
"""
219+
make = self._make_tuples if hasattr(self, '_make_tuples') else self.make
220+
221+
if jobs is None or jobs.reserve(self.target.table_name, self._job_key(key)):
222+
self.connection.start_transaction()
223+
if key in self.target: # already populated
224+
self.connection.cancel_transaction()
225+
if jobs is not None:
226+
jobs.complete(self.target.table_name, self._job_key(key))
227+
else:
228+
logger.info('Populating: ' + str(key))
229+
self.__class__._allow_insert = True
230+
try:
231+
make(dict(key))
232+
except (KeyboardInterrupt, SystemExit, Exception) as error:
233+
try:
234+
self.connection.cancel_transaction()
235+
except LostConnectionError:
236+
pass
237+
error_message = '{exception}{msg}'.format(
238+
exception=error.__class__.__name__,
239+
msg=': ' + str(error) if str(error) else '')
240+
if jobs is not None:
241+
# show error name and error message (if any)
242+
jobs.error(
243+
self.target.table_name, self._job_key(key),
244+
error_message=error_message, error_stack=traceback.format_exc())
245+
if not suppress_errors or isinstance(error, SystemExit):
246+
raise
247+
else:
248+
logger.error(error)
249+
return key, error if return_exception_objects else error_message
250+
else:
251+
self.connection.commit_transaction()
252+
if jobs is not None:
253+
jobs.complete(self.target.table_name, self._job_key(key))
254+
finally:
255+
self.__class__._allow_insert = False
183256

184257
def progress(self, *restrictions, display=True):
185258
"""
186-
report progress of populating the table
187-
:return: remaining, total -- tuples to be populated
259+
Report the progress of populating the table.
260+
:return: (remaining, total) -- numbers of tuples to be populated
188261
"""
189262
todo = self._jobs_to_do(restrictions)
190263
total = len(todo)
@@ -193,5 +266,6 @@ def progress(self, *restrictions, display=True):
193266
print('%-20s' % self.__class__.__name__,
194267
'Completed %d of %d (%2.1f%%) %s' % (
195268
total - remaining, total, 100 - 100 * remaining / (total+1e-12),
196-
datetime.datetime.strftime(datetime.datetime.now(), '%Y-%m-%d %H:%M:%S')), flush=True)
269+
datetime.datetime.strftime(datetime.datetime.now(),
270+
'%Y-%m-%d %H:%M:%S')), flush=True)
197271
return remaining, total

datajoint/blob.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -166,6 +166,8 @@ def pack_blob(self, obj):
166166
return self.pack_array(np.array(obj))
167167
if isinstance(obj, (bool, np.bool_)):
168168
return self.pack_array(np.array(obj))
169+
if isinstance(obj, (float, int, complex)):
170+
return self.pack_array(np.array(obj))
169171
if isinstance(obj, (datetime.datetime, datetime.date, datetime.time)):
170172
return self.pack_datetime(obj)
171173
if isinstance(obj, Decimal):

datajoint/connection.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -278,7 +278,7 @@ def query(self, query, args=(), *, as_dict=False, suppress_warnings=True, reconn
278278
# check cache first:
279279
use_query_cache = bool(self._query_cache)
280280
if use_query_cache and not re.match(r"\s*(SELECT|SHOW)", query):
281-
raise errors.DataJointError("Only SELECT query are allowed when query caching is on.")
281+
raise errors.DataJointError("Only SELECT queries are allowed when query caching is on.")
282282
if use_query_cache:
283283
if not config['query_cache']:
284284
raise errors.DataJointError("Provide filepath dj.config['query_cache'] when using query caching.")

datajoint/declare.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111

1212
UUID_DATA_TYPE = 'binary(16)'
1313
MAX_TABLE_NAME_LENGTH = 64
14-
CONSTANT_LITERALS = {'CURRENT_TIMESTAMP'} # SQL literals to be used without quotes (case insensitive)
14+
CONSTANT_LITERALS = {'CURRENT_TIMESTAMP', 'NULL'} # SQL literals to be used without quotes (case insensitive)
1515
EXTERNAL_TABLE_ROOT = '~external'
1616

1717
TYPE_PATTERN = {k: re.compile(v, re.I) for k, v in dict(

datajoint/dependencies.py

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,16 +18,14 @@ def unite_master_parts(lst):
1818
"""
1919
for i in range(2, len(lst)):
2020
name = lst[i]
21-
match = re.match(r'(?P<master>`\w+`.`\w+)__\w+`', name)
21+
match = re.match(r'(?P<master>`\w+`.`#?\w+)__\w+`', name)
2222
if match: # name is a part table
2323
master = match.group('master')
2424
for j in range(i-1, -1, -1):
2525
if lst[j] == master + '`' or lst[j].startswith(master + '__'):
2626
# move from the ith position to the (j+1)th position
2727
lst[j+1:i+1] = [name] + lst[j+1:i]
2828
break
29-
else:
30-
raise DataJointError("Found a part table {name} without its master table.".format(name=name))
3129
return lst
3230

3331

0 commit comments

Comments
 (0)