Skip to content
Merged
31 changes: 26 additions & 5 deletions backend/btrixcloud/colls.py
Original file line number Diff line number Diff line change
Expand Up @@ -731,7 +731,7 @@ async def create_dedupe_index(self, coll: Collection, org: Organization):
# enable index by setting indexState to a non-null value
# and setting stats to zeroed out default
await self.update_dedupe_index_info(
coll.id, state="initing" if coll.crawlCount else "idle"
coll.id, org.id, state="initing" if coll.crawlCount else "idle"
)

await self.update_dedupe_index_stats(coll.id, DedupeIndexStats())
Expand Down Expand Up @@ -793,7 +793,7 @@ async def run_index_import_job(
if job_type in ("import", "purge"):
# if job created, update state here so its reflected in the UI more quickly
await self.update_dedupe_index_info(
coll_id, state="purging" if job_type == "purge" else "importing"
coll_id, oid, state="purging" if job_type == "purge" else "importing"
)

async def delete_dedupe_index(
Expand All @@ -817,6 +817,10 @@ async def delete_dedupe_index(
)
raise HTTPException(status_code=400, detail="file_deletion_error")

await self.orgs.inc_org_bytes_stored_field(
org.id, "bytesStoredDedupeIndexes", -coll.indexFile.size
)

await self.collections.find_one_and_update(
{"_id": coll.id},
{
Expand Down Expand Up @@ -848,7 +852,7 @@ async def update_dedupe_index_stats(
self, coll_id: UUID, stats: DedupeIndexStats, disk_space_used: int = 0
):
"""update dedupe index stats for specified collection"""
self.collections.find_one_and_update(
await self.collections.find_one_and_update(
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

whoa, i'm surprised this was missed before by mypy, and no warnings about it either?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's because there's no return to check the type of. mypy doesn't seem to be making sure that async methods are awaited generally, at least with how we set it up.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems to be a contentious issue over in the mypy repo issues: python/mypy#2499

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This might require some changes in our codebase (there are lots of places where we await without doing anything with the return value of async methods) but this could help: https://mypy.readthedocs.io/en/stable/error_code_list2.html#check-that-awaitable-return-value-is-used-unused-awaitable

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm, this may have more unintended consequence, does adding a return type help generally?

{"_id": coll_id, "indexState": {"$ne": None}},
{
"$set": {
Expand All @@ -861,11 +865,12 @@ async def update_dedupe_index_stats(
async def update_dedupe_index_info(
self,
coll_id: UUID,
oid: UUID,
state: TYPE_DEDUPE_INDEX_STATES,
index_file: Optional[DedupeIndexFile] = None,
dt: Optional[datetime] = None,
if_exists=False,
):
) -> bool:
"""update the state, and optionally, dedupe index file info"""
query: dict[str, Any] = {"indexState": state}
if index_file and dt:
Expand All @@ -877,7 +882,23 @@ async def update_dedupe_index_info(
if if_exists:
match["indexState"] = {"$ne": None}

res = self.collections.find_one_and_update(match, {"$set": query})
res = await self.collections.find_one_and_update(
match,
{"$set": query},
return_document=pymongo.ReturnDocument.BEFORE,
)

if index_file:
size_diff = index_file.size

prev_coll = Collection.from_dict(res) if res else None
if prev_coll and prev_coll.indexFile:
size_diff = index_file.size - prev_coll.indexFile.size

await self.orgs.inc_org_bytes_stored_field(
oid, "bytesStoredDedupeIndexes", size_diff
)

return res is not None

async def get_dedupe_index_saved(self, coll_id: UUID) -> Optional[datetime]:
Expand Down
3 changes: 3 additions & 0 deletions backend/btrixcloud/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -2434,6 +2434,7 @@ class OrgOut(BaseMongoModel):
bytesStoredProfiles: int
bytesStoredSeedFiles: int = 0
bytesStoredThumbnails: int = 0
bytesStoredDedupeIndexes: int = 0
origin: Optional[AnyHttpUrl] = None

storageQuotaReached: Optional[bool] = False
Expand Down Expand Up @@ -2501,6 +2502,7 @@ class Organization(BaseMongoModel):
bytesStoredProfiles: int = 0
bytesStoredSeedFiles: int = 0
bytesStoredThumbnails: int = 0
bytesStoredDedupeIndexes: int = 0

# total usage + exec time
usage: Dict[str, int] = {}
Expand Down Expand Up @@ -2652,6 +2654,7 @@ class OrgMetrics(BaseModel):
storageUsedProfiles: int
storageUsedSeedFiles: int
storageUsedThumbnails: int
storageUsedDedupeIndexes: int
storageQuotaBytes: int
archivedItemCount: int
crawlCount: int
Expand Down
39 changes: 24 additions & 15 deletions backend/btrixcloud/operator/collindexes.py
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,7 @@ async def sync_index(self, data: MCSyncData):
status = CollIndexStatus(**data.parent.get("status", {}))

coll_id = spec.id
oid = spec.oid
redis_name = f"redis-coll-{coll_id}"
new_children = []

Expand All @@ -159,7 +160,7 @@ async def sync_index(self, data: MCSyncData):
if data.finalizing:
is_done = False
if status.state == "saved" and status.finishedAt:
await self.set_state("idle", status, coll_id)
await self.set_state("idle", status, coll_id, oid)
is_done = True
elif status.state == "idle" and status.index.notFound:
is_done = True
Expand All @@ -168,7 +169,7 @@ async def sync_index(self, data: MCSyncData):
is_done = True
else:
try:
coll = await self.coll_ops.get_collection_raw(spec.id, spec.oid)
coll = await self.coll_ops.get_collection_raw(coll_id, oid)
# if index state is not set, index has been deleted
# also delete immediately
if not coll.get("indexState"):
Expand All @@ -184,7 +185,7 @@ async def sync_index(self, data: MCSyncData):
is_done = True

if is_done:
print(f"CollIndex removed: {spec.id}")
print(f"CollIndex removed: {coll_id}")
return {
"status": status.dict(),
"children": [],
Expand All @@ -195,19 +196,19 @@ async def sync_index(self, data: MCSyncData):
# determine if index was previously saved before initing redis
if not redis_pod:
if not status.indexLastSavedAt:
res = await self.coll_ops.get_dedupe_index_saved(spec.id)
res = await self.coll_ops.get_dedupe_index_saved(coll_id)
if res:
status.indexLastSavedAt = date_to_str(res)

if self.is_expired(status) or data.finalizing:
# do actual deletion here
if not data.finalizing:
self.run_task(self.do_delete(spec.id))
self.run_task(self.do_delete(coll_id))

# Saving process
# 1. run bgsave while redis is active
if status.index.running:
await self.do_save_redis(spec.id, status)
await self.do_save_redis(coll_id, oid, status)

elif status.index.finished and not status.index.savedAt:
await self.k8s.send_signal_to_pod(redis_name, "SIGUSR1", "save")
Expand All @@ -217,7 +218,7 @@ async def sync_index(self, data: MCSyncData):
await self.mark_index_saved(redis_name, spec, status)

else:
await self.update_state(data, spec.id, status)
await self.update_state(data, coll_id, oid, status)

# pylint: disable=broad-exception-caught
except Exception as e:
Expand Down Expand Up @@ -289,7 +290,9 @@ def sync_redis_pod_status(self, pod, status: CollIndexStatus):
except:
pass

async def update_state(self, data, coll_id: UUID, status: CollIndexStatus):
async def update_state(
self, data, coll_id: UUID, oid: UUID, status: CollIndexStatus
):
"""update state"""
desired_state = status.state
if not status.index.loaded:
Expand Down Expand Up @@ -317,7 +320,7 @@ async def update_state(self, data, coll_id: UUID, status: CollIndexStatus):
desired_state = "initing"

if desired_state != status.state:
await self.set_state(desired_state, status, coll_id)
await self.set_state(desired_state, status, coll_id, oid)

def is_expired(self, status: CollIndexStatus):
"""return true if collindex is considered expired and should be deleted"""
Expand All @@ -343,21 +346,27 @@ def is_last_active_exceeds(
return False

async def set_state(
self, state: TYPE_DEDUPE_INDEX_STATES, status: CollIndexStatus, coll_id: UUID
self,
state: TYPE_DEDUPE_INDEX_STATES,
status: CollIndexStatus,
coll_id: UUID,
oid: UUID,
):
"""set state after updating db"""
print(f"Setting coll index state {status.state} -> {state} {coll_id}")
status.state = state
status.lastStateChangeAt = date_to_str(dt_now())

await self.coll_ops.update_dedupe_index_info(coll_id, state, if_exists=True)
await self.coll_ops.update_dedupe_index_info(
coll_id, oid, state, if_exists=True
)

async def do_delete(self, coll_id: UUID):
"""delete the CollIndex object"""
print(f"Deleting collindex {coll_id}")
await self.k8s.delete_custom_object(f"collindex-{coll_id}", "collindexes")

async def do_save_redis(self, coll_id: UUID, status: CollIndexStatus):
async def do_save_redis(self, coll_id: UUID, oid: UUID, status: CollIndexStatus):
"""shutdown save redis"""
try:
redis = await self.k8s.get_redis_connected(f"coll-{coll_id}")
Expand All @@ -367,14 +376,14 @@ async def do_save_redis(self, coll_id: UUID, status: CollIndexStatus):
if status.state not in ("saving", "saved"):
await redis.bgsave(False)

await self.set_state("saving", status, coll_id)
await self.set_state("saving", status, coll_id, oid)

if await self.is_bgsave_done(redis):
await redis.shutdown()

# pylint: disable=broad-exception-caught
except Exception:
await self.set_state("ready", status, coll_id)
await self.set_state("ready", status, coll_id, oid)
traceback.print_exc()

async def is_bgsave_done(self, redis: Redis) -> bool:
Expand Down Expand Up @@ -576,5 +585,5 @@ async def update_saved_dedupe_index_state_in_db(
)

await self.coll_ops.update_dedupe_index_info(
coll_id, "idle", index_file, finished_at
coll_id, oid, "idle", index_file, finished_at
)
1 change: 1 addition & 0 deletions backend/btrixcloud/orgs.py
Original file line number Diff line number Diff line change
Expand Up @@ -1129,6 +1129,7 @@ async def get_org_metrics(self, org: Organization) -> dict[str, int]:
"storageUsedProfiles": org.bytesStoredProfiles,
"storageUsedSeedFiles": org.bytesStoredSeedFiles or 0,
"storageUsedThumbnails": org.bytesStoredThumbnails or 0,
"storageUsedDedupeIndexes": org.bytesStoredDedupeIndexes or 0,
"storageQuotaBytes": storage_quota,
"archivedItemCount": archived_item_count,
"crawlCount": crawl_count,
Expand Down
6 changes: 4 additions & 2 deletions backend/test/test_org.py
Original file line number Diff line number Diff line change
Expand Up @@ -520,16 +520,18 @@ def test_org_metrics(crawler_auth_headers, default_org_id):
assert data["storageUsedBytes"] > 0
assert data["storageUsedCrawls"] > 0
assert data["storageUsedUploads"] >= 0
assert data["storageUsedThumbnails"] >= 0
assert data["storageUsedThumbnails"] >= 0
assert data["storageUsedProfiles"] >= 0
assert data["storageUsedSeedFiles"] >= 0
assert data["storageUsedThumbnails"] >= 0
assert data["storageUsedDedupeIndexes"] >= 0
assert (
data["storageUsedBytes"]
== data["storageUsedCrawls"]
+ data["storageUsedUploads"]
+ data["storageUsedProfiles"]
+ data["storageUsedSeedFiles"]
+ data["storageUsedThumbnails"]
+ data["storageUsedDedupeIndexes"]
)
assert data["storageQuotaBytes"] >= 0
assert data["archivedItemCount"] > 0
Expand Down
5 changes: 4 additions & 1 deletion frontend/src/features/meters/storage/storage-meter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,10 @@ export class StorageMeter extends BtrixElement {
const hasQuota = Boolean(metrics.storageQuotaBytes);
const isStorageFull =
hasQuota && metrics.storageUsedBytes >= metrics.storageQuotaBytes;
const misc = metrics.storageUsedSeedFiles + metrics.storageUsedThumbnails;
const misc =
metrics.storageUsedSeedFiles +
metrics.storageUsedThumbnails +
metrics.storageUsedDedupeIndexes;

const values = {
crawls: metrics.storageUsedCrawls,
Expand Down
10 changes: 7 additions & 3 deletions frontend/src/pages/org/dashboard.ts
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,9 @@ export class Dashboard extends BtrixElement {
url: "/browser-profiles",
},
})}
${metrics.storageUsedSeedFiles || metrics.storageUsedThumbnails
${metrics.storageUsedSeedFiles ||
metrics.storageUsedThumbnails ||
metrics.storageUsedDedupeIndexes
? this.renderMiscStorage(metrics)
: nothing}

Expand Down Expand Up @@ -538,7 +540,7 @@ export class Dashboard extends BtrixElement {
${msg("Miscellaneous")}
<btrix-popover
content=${msg(
"Total size of all supplementary files in use by your organization, such as workflow URL list files and custom collection thumbnails.",
"Total size of all supplementary files in use by your organization, such as workflow URL list files, custom collection thumbnails, and deduplication indexes.",
)}
>
<sl-icon
Expand All @@ -549,7 +551,9 @@ export class Dashboard extends BtrixElement {
</dt>
<dd class="font-monostyle text-xs text-neutral-500">
${this.localize.bytes(
metrics.storageUsedSeedFiles + metrics.storageUsedThumbnails,
metrics.storageUsedSeedFiles +
metrics.storageUsedThumbnails +
metrics.storageUsedDedupeIndexes,
)}
</dd>
</div>
Expand Down
1 change: 1 addition & 0 deletions frontend/src/types/org.ts
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ export type Metrics = {
storageUsedProfiles: number;
storageUsedSeedFiles: number;
storageUsedThumbnails: number;
storageUsedDedupeIndexes: number;
storageQuotaBytes: number;
archivedItemCount: number;
crawlCount: number;
Expand Down
Loading