Skip to content

Commit 4a66d0c

Browse files
authored
fix: remove delete tags for cell_table and meta_table (#215)
* fix: remove delete tags for cell_table and meta_table * refactor: remove unused code blocks * refactor: remove unused code blocks * test: fix filter test * refactor: simplify if else expression
1 parent eee752f commit 4a66d0c

File tree

6 files changed

+69
-56
lines changed

6 files changed

+69
-56
lines changed

annlite/container.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,7 @@ def ivf_search(
105105
continue
106106

107107
indices = None
108-
if where_clause or (cell_table.deleted_count() > 0):
108+
if where_clause:
109109
indices = cell_table.query(
110110
where_clause=where_clause, where_params=where_params
111111
)
@@ -340,7 +340,6 @@ def update(
340340
_cell_id, _offset = self._meta_table.get_address(doc.id)
341341
if cell_id == _cell_id:
342342
self.vec_index(cell_id).add_with_ids(x.reshape(1, -1), [_offset])
343-
self.cell_table(cell_id).undo_delete_by_offset(_offset)
344343
self.doc_store(cell_id).update([doc])
345344
self.meta_table.add_address(doc.id, cell_id, _offset)
346345
update_success += 1

annlite/executor.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ def __init__(
4949
index_access_paths: str = '@r',
5050
search_access_paths: str = '@r',
5151
columns: Optional[Union[List[Tuple[str, str]], Dict[str, str]]] = None,
52+
list_like: Optional[bool] = False,
5253
dim: int = None,
5354
*args,
5455
**kwargs,
@@ -103,6 +104,7 @@ def __init__(
103104
'max_connection': max_connection,
104105
'data_path': data_path or self.workspace or './workspace',
105106
'columns': columns,
107+
'list_like': list_like,
106108
}
107109
self._index = DocumentArray(storage='annlite', config=config)
108110

annlite/index.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -606,8 +606,14 @@ def index_hash(self):
606606
if platform.system() == 'Windows':
607607
return date_time.isoformat('#', 'hours')
608608
return date_time.isoformat('#', 'seconds')
609+
else:
610+
import datetime
609611

610-
return None
612+
return (
613+
datetime.datetime.utcnow().isoformat('#', 'hours')
614+
if platform.system() == 'Windows'
615+
else datetime.datetime.utcnow().isoformat('#', 'seconds')
616+
)
611617

612618
@property
613619
def index_path(self):

annlite/storage/table.py

Lines changed: 31 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -191,7 +191,7 @@ def add_column(self, name: str, dtype: str, create_index: bool = True):
191191

192192
def create_index(self, column: str, commit: bool = True):
193193
sql_statement = f'''CREATE INDEX idx_{column}_
194-
ON {self.name}(_deleted, {column})'''
194+
ON {self.name}({column})'''
195195
self._conn.execute(sql_statement)
196196

197197
if commit:
@@ -200,17 +200,12 @@ def create_index(self, column: str, commit: bool = True):
200200
def create_table(self):
201201
sql = f'''CREATE TABLE {self.name}
202202
(_id INTEGER PRIMARY KEY AUTOINCREMENT,
203-
_doc_id TEXT NOT NULL UNIQUE,
204-
_deleted NUMERIC DEFAULT 0'''
203+
_doc_id TEXT NOT NULL UNIQUE'''
205204
if len(self._columns) > 0:
206205
sql += ', ' + ', '.join(self._columns)
207206
sql += ')'
208207
self._conn.execute(sql)
209208

210-
sql_statement = f'''CREATE INDEX idx__delete_
211-
ON {self.name}(_deleted)'''
212-
self._conn.execute(sql_statement)
213-
214209
for name in self._indexed_keys:
215210
self.create_index(name, commit=False)
216211
self._conn.commit()
@@ -281,18 +276,22 @@ def query(
281276
:return: offsets list of matched docs
282277
"""
283278

284-
where_conds = ['_deleted = ?']
279+
where_conds = []
280+
where = None
285281
if where_clause:
286282
where_conds.append(where_clause)
287-
where = ' and '.join(where_conds)
283+
where = ' and '.join(where_conds)
288284

289285
_order_by = f'{order_by or "_id"} {"ASC" if ascending else "DESC"}'
290286
_limit = f'LIMIT {limit}' if limit > 0 else ''
291287
_offset = f'OFFSET {offset}' if offset > 0 else ''
292288

293-
sql = f'SELECT _id from {self.name} WHERE {where} ORDER BY {_order_by} {_limit} {_offset}'
289+
if where:
290+
sql = f'SELECT _id from {self.name} WHERE {where} ORDER BY {_order_by} {_limit} {_offset}'
291+
else:
292+
sql = f'SELECT _id from {self.name} ORDER BY {_order_by} {_limit} {_offset}'
294293

295-
params = (0,) + tuple([_converting(p) for p in where_params])
294+
params = tuple([_converting(p) for p in where_params])
296295

297296
# # EXPLAIN SQL query
298297
# for row in self._conn.execute('EXPLAIN QUERY PLAN ' + sql, params):
@@ -308,7 +307,10 @@ def _offset_factory(_, record):
308307
cursor = self._conn.cursor()
309308

310309
try:
311-
offsets = cursor.execute(sql, params).fetchall()
310+
if where:
311+
offsets = cursor.execute(sql, params).fetchall()
312+
else:
313+
offsets = cursor.execute(sql).fetchall()
312314
self._conn.row_factory = None
313315
return offsets if offsets else []
314316
except Exception as e:
@@ -320,12 +322,12 @@ def delete(self, doc_ids: List[str]):
320322
321323
:param doc_ids: The IDs of docs
322324
"""
323-
sql = f'UPDATE {self.name} SET _deleted = 1 WHERE _doc_id = ?'
325+
sql = f'DELETE from {self.name} WHERE _doc_id = ?'
324326
self._conn.executemany(sql, doc_ids)
325327
self._conn.commit()
326328

327329
def get_docid_by_offset(self, offset: int):
328-
sql = f'SELECT _doc_id from {self.name} WHERE _id = ? and _deleted = 0 LIMIT 1;'
330+
sql = f'SELECT _doc_id from {self.name} WHERE _id = ? LIMIT 1;'
329331
result = self._conn.execute(sql, (offset + 1,)).fetchone()
330332
if result:
331333
return result[0]
@@ -336,17 +338,12 @@ def delete_by_offset(self, offset: int):
336338
337339
:param offset: The offset of the doc
338340
"""
339-
sql = f'UPDATE {self.name} SET _deleted = 1 WHERE _id = ?'
340-
self._conn.execute(sql, (offset + 1,))
341-
self._conn.commit()
342-
343-
def undo_delete_by_offset(self, offset: int):
344-
sql = f'UPDATE {self.name} SET _deleted = 0 WHERE _id = ?'
341+
sql = f'DELETE FROM {self.name} WHERE _id = ?'
345342
self._conn.execute(sql, (offset + 1,))
346343
self._conn.commit()
347344

348345
def exist(self, doc_id: str):
349-
sql = f'SELECT count(*) from {self.name} WHERE _deleted = 0 and _doc_id = ?;'
346+
sql = f'SELECT count(*) from {self.name} WHERE _doc_id = ?;'
350347
return self._conn.execute(sql, (doc_id,)).fetchone()[0] > 0
351348

352349
def count(self, where_clause: str = '', where_params: Tuple = ()):
@@ -358,27 +355,22 @@ def count(self, where_clause: str = '', where_params: Tuple = ()):
358355

359356
if where_clause:
360357
sql = 'SELECT count(_id) from {table} WHERE {where} LIMIT 1;'
361-
where = f'_deleted = ? and {where_clause}'
358+
where = where_clause
362359
sql = sql.format(table=self.name, where=where)
363360

364-
params = (0,) + tuple([_converting(p) for p in where_params])
361+
params = tuple([_converting(p) for p in where_params])
365362

366363
# # EXPLAIN SQL query
367364
# for row in self._conn.execute('EXPLAIN QUERY PLAN ' + sql, params):
368365
# print(row)
369366
return self._conn.execute(sql, params).fetchone()[0]
370367
else:
371-
sql = f'SELECT MAX(_id) from {self.name} LIMIT 1;'
368+
sql = f'SELECT count(_id) from {self.name};'
372369
result = self._conn.execute(sql).fetchone()
373370
if result[0]:
374-
return result[0] - self.deleted_count()
371+
return result[0]
375372
return 0
376373

377-
def deleted_count(self):
378-
"""Return the total number of record what is marked as soft-deleted."""
379-
sql = f'SELECT count(_id) from {self.name} WHERE _deleted = 1 LIMIT 1'
380-
return self._conn.execute(sql).fetchone()[0]
381-
382374
@property
383375
def size(self):
384376
return self.count()
@@ -404,24 +396,20 @@ def create_table(self):
404396
(_doc_id TEXT NOT NULL PRIMARY KEY,
405397
cell_id INTEGER NOT NULL,
406398
offset INTEGER NOT NULL,
407-
time_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
408-
_deleted NUMERIC DEFAULT 0)'''
399+
time_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP)'''
409400

410401
self._conn.execute(sql)
411402

412403
self._conn.execute(
413404
f'CREATE INDEX if not exists idx_time_at_ ON {self.name}(time_at)'
414405
)
415-
self._conn.execute(
416-
f'CREATE INDEX if not exists idx__delete_ ON {self.name}(_deleted)'
417-
)
418406

419407
self._conn.commit()
420408

421409
def iter_addresses(
422410
self, time_since: 'datetime.datetime' = datetime.datetime(2020, 2, 2, 0, 0)
423411
):
424-
sql = f'SELECT _doc_id, cell_id, offset from {self.name} WHERE time_at >= ? AND _deleted = 0 ORDER BY time_at ASC;'
412+
sql = f'SELECT _doc_id, cell_id, offset from {self.name} WHERE time_at >= ? ORDER BY time_at ASC;'
425413

426414
cursor = self._conn.cursor()
427415
for doc_id, cell_id, offset in cursor.execute(sql, (time_since,)):
@@ -435,29 +423,23 @@ def get_latest_commit(self):
435423
return row
436424

437425
def get_address(self, doc_id: str):
438-
sql = f'SELECT cell_id, offset from {self.name} WHERE _doc_id = ? AND _deleted = 0 LIMIT 1;'
426+
sql = f'SELECT cell_id, offset from {self.name} WHERE _doc_id = ? LIMIT 1;'
439427
cursor = self._conn.execute(sql, (doc_id,))
440428
row = cursor.fetchone()
441429
return (row[0], row[1]) if row else (None, None)
442430

443431
def delete_address(self, doc_id: str, commit: bool = True):
444-
sql = f'UPDATE {self.name} SET _deleted = 1, time_at = ? WHERE _doc_id = ?'
445-
self._conn.execute(
446-
sql,
447-
(
448-
time_now(),
449-
doc_id,
450-
),
451-
)
432+
sql = f'DELETE from {self.name} WHERE _doc_id = ?'
433+
self._conn.execute(sql, (doc_id,))
452434
print(f'Deleted {doc_id} at: {time_now()}')
453435
if commit:
454436
self._conn.commit()
455437

456438
def add_address(self, doc_id: str, cell_id: int, offset: int, commit: bool = True):
457-
sql = f'INSERT OR REPLACE INTO {self.name}(_doc_id, cell_id, offset, time_at, _deleted) VALUES (?, ?, ?, ?, ?);'
439+
sql = f'INSERT OR REPLACE INTO {self.name}(_doc_id, cell_id, offset, time_at) VALUES (?, ?, ?, ?);'
458440
self._conn.execute(
459441
sql,
460-
(doc_id, cell_id, offset, time_now(), 0),
442+
(doc_id, cell_id, offset, time_now()),
461443
)
462444
if commit:
463445
self._conn.commit()
@@ -469,11 +451,11 @@ def bulk_add_address(
469451
offsets: Union[List[int], np.ndarray],
470452
commit: bool = True,
471453
):
472-
sql = f'INSERT OR REPLACE INTO {self.name}(_doc_id, cell_id, offset, time_at, _deleted) VALUES (?, ?, ?, ?, ?);'
454+
sql = f'INSERT OR REPLACE INTO {self.name}(_doc_id, cell_id, offset, time_at) VALUES (?, ?, ?, ?);'
473455
self._conn.executemany(
474456
sql,
475457
[
476-
(doc_id, cell_id, offset, time_now(), 0)
458+
(doc_id, cell_id, offset, time_now())
477459
for doc_id, cell_id, offset in zip(doc_ids, cell_ids, offsets)
478460
],
479461
)

tests/executor/test_executor.py

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -403,3 +403,30 @@ def test_local_storage_with_delete(tmpfile):
403403

404404
assert int(status.tags['total_docs']) == N - 1
405405
assert int(status.tags['index_size']) == N - 1
406+
407+
408+
def test_local_storage_delete_update(tmpfile):
409+
docs = gen_docs(N)
410+
f = Flow().add(
411+
uses=AnnLiteIndexer,
412+
uses_with={
413+
'n_dim': D,
414+
},
415+
workspace=tmpfile,
416+
shards=1,
417+
)
418+
with f:
419+
f.post(on='/index', inputs=docs)
420+
time.sleep(2)
421+
f.post(on='/backup')
422+
423+
f.post(on='/delete', parameters={'ids': ['0']})
424+
time.sleep(2)
425+
f.post(on='/backup')
426+
427+
new_doc = gen_docs(N)[1:]
428+
f.post(on='/update', inputs=new_doc)
429+
status = f.post(on='/status', return_results=True)[0]
430+
431+
assert int(status.tags['total_docs']) == N - 1
432+
assert int(status.tags['index_size']) == N - 1

tests/test_table.py

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ def test_create_cell_table():
4444

4545
def test_schema(dummy_cell_table):
4646
schema = dummy_cell_table.schema
47-
assert len(schema.split('\n')) == 6
47+
assert len(schema.split('\n')) == 5
4848

4949

5050
def test_query(table_with_data):
@@ -134,6 +134,3 @@ def test_create_meta_table(tmpdir):
134134
table.delete_address('0')
135135
addresses = list(table.iter_addresses(time_since=time_since))
136136
assert addresses == []
137-
138-
addr = table.get_latest_commit()
139-
assert addr[-1] >= time_since

0 commit comments

Comments
 (0)