-
Notifications
You must be signed in to change notification settings - Fork 0
Manage file added again to same session, data is updated. #21
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from 10 commits
df35686
caf6fcd
66d8df8
795cc16
19e094f
e425dcf
9eefa2e
f9cef7c
343e978
a0ff5e9
ba0f644
7aef366
70620ab
84bd591
b0d39a6
ee3b6c4
006d4cb
fa8e300
820ea31
c753000
01581a4
5e60ef1
385efb9
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,35 @@ | ||
| # -*- coding: utf-8 -*- | ||
|
|
||
| from imio.esign.utils import get_sessions_for | ||
|
|
||
|
|
||
| def on_categorized_annex_updated(annex, event): | ||
| '''When an annex is modified, update check if need to update esign session.''' | ||
| old_values = event.old_values | ||
| # we are creating a new annex, not in a session | ||
| if not old_values: | ||
| return | ||
|
|
||
| sessions = get_sessions_for(event.parent.UID(), readonly=False) | ||
| if not sessions: | ||
| return | ||
|
|
||
| new_values = event.new_values | ||
| # if something usefull changed, we will update the session | ||
| update = False | ||
| checked_keys = ['title', 'filesize', 'relative_url'] | ||
| for checked_key in checked_keys: | ||
| if new_values[checked_key] != old_values[checked_key]: | ||
| update = True | ||
| break | ||
| if update: | ||
| annex_uid = annex.UID() | ||
| for session_id, session in sessions.items(): | ||
| # size | ||
| size_diff = new_values['filesize'] - old_values['filesize'] | ||
| session['size'] += size_diff | ||
| # title and filename | ||
| for file_data in session['files']: | ||
| if file_data['uid'] == annex_uid: | ||
| file_data['title'] = new_values['title'] | ||
| file_data['filename'] = annex.file.filename |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,8 @@ | ||
| <configure | ||
| xmlns="http://namespaces.zope.org/zope"> | ||
|
|
||
| <subscriber for="imio.annex.content.annex.IAnnex | ||
| collective.iconifiedcategory.interfaces.ICategorizedElementUpdatedEvent" | ||
| handler=".events.on_categorized_annex_updated" /> | ||
|
|
||
| </configure> |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -30,6 +30,8 @@ | |
| from plone.namedfile.file import NamedBlobFile | ||
| from plone.namedfile.file import NamedBlobImage | ||
| from zope.annotation import IAnnotations | ||
| from zope.event import notify | ||
| from zope.lifecycleevent import ObjectModifiedEvent | ||
|
|
||
| import collective.iconifiedcategory | ||
| import json | ||
|
|
@@ -452,6 +454,66 @@ def test_add_files_with_duplicate_filenames(self): | |
| self.assertIn("same_filename-1.pdf", filenames) | ||
| self.assertIn("same_filename-2.pdf", filenames) | ||
|
|
||
| def test_add_files_already_exist_is_updated(self): | ||
| """When adding a file to the same session, it is updated.""" | ||
| annot = get_session_annotation() | ||
| self.assertEqual(len(annot["sessions"]), 0) | ||
|
|
||
| signers = [ | ||
| ("user1", "user1@sign.com", "User 1", "Position 1"), | ||
| ] | ||
|
|
||
| annex0_uid = self.uids[0] | ||
| annex0 = api.content.get(UID=annex0_uid) | ||
|
|
||
| sid, session = add_files_to_session(signers, (annex0_uid,)) | ||
| self.assertEqual(sid, 0) | ||
| self.assertEqual(len(session["files"]), 1) | ||
| self.assertEqual(session["files"][0]["filename"], "annex0.pdf") | ||
| self.assertEqual(session["files"][0]["title"], "Annex 0") | ||
| self.assertEqual(session["size"], 6968) | ||
| # edit annex and add again, still one annex in session and data are updated | ||
| annex0.file.filename = u"new_annex0.pdf" | ||
| annex0.setTitle('New Annex 0') | ||
| sid, session = add_files_to_session(signers, (annex0_uid,)) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Est-ce qu'il ne faudrait pas tester ici que le filename et le title sont déjà mis à jour dans la session même avant de l'ajouter à nouveau ? Si oui, alors on n'est plus vraiment dans le contexte de test_utils mais plutôt test_events
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Je vais compléter test_categorized_element_updated_event avec changement file, çà teste que c'est bien màj lors d'un edit, et donc ici c'est la création, on a le postulat que dans ce cas c'est bien à jour, d'ailleurs on le voit dans ce test, sinon on re-teste des choses déjà testées... Il me semble...
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. à voir car création ou ajout même annexe ne passe pas par l'event de modif, c'est juste le add_files_to_session qui gère cela, çà va mettre à jour les infos correctement, le code lié à l'event est ci-dessous dans le "edit file"...
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @chris-adam en effet le test a évolué petit à petit, et la partie "add" après le "modify" a été enlevée pour montrer que c'est màj sans re-add, j'avais lu trop vite car ton commentaire était "trop haut" dans le test, ici çà me semble bon comme çà on pourrait merger à mon sens |
||
| # same session_id | ||
| self.assertEqual(sid, 0) | ||
| self.assertEqual(len(session["files"]), 1) | ||
| self.assertEqual(session["files"][0]["filename"], "new_annex0.pdf") | ||
| self.assertEqual(session["files"][0]["title"], "New Annex 0") | ||
| self.assertEqual(session["size"], 6968) | ||
| # add again exact same file | ||
| sid, session = add_files_to_session(signers, (annex0_uid,)) | ||
| self.assertEqual(sid, 0) | ||
| self.assertEqual(len(session["files"]), 1) | ||
| self.assertEqual(session["files"][0]["filename"], "new_annex0.pdf") | ||
| self.assertEqual(session["files"][0]["title"], "New Annex 0") | ||
| self.assertEqual(session["size"], 6968) | ||
| # add second file 2 times | ||
| annex1_uid = self.uids[1] | ||
| annex1 = api.content.get(UID=annex1_uid) | ||
| sid, session = add_files_to_session(signers, (annex1_uid,)) | ||
| self.assertEqual(sid, 0) | ||
| self.assertEqual(len(session["files"]), 2) | ||
| self.assertEqual(session["files"][1]["filename"], "annex1.pdf") | ||
| self.assertEqual(session["files"][1]["title"], "Annex 1") | ||
| self.assertEqual(session["size"], 13982) | ||
| # edit and add again | ||
| annex1.setTitle('New Annex 1') | ||
| # edit file, filename and content so size changed | ||
| with open(os.path.join(os.path.dirname(__file__), "annex1.pdf"), "rb") as f: | ||
| annex1.file = NamedBlobFile(data=f.read(), filename=u"new_annex1.pdf", contentType="application/pdf") | ||
| notify(ObjectModifiedEvent(annex1)) | ||
| sid, session = add_files_to_session(signers, (annex1_uid,)) | ||
| self.assertEqual(sid, 0) | ||
| self.assertEqual(len(session["files"]), 2) | ||
| self.assertEqual(session["files"][1]["filename"], "new_annex1.pdf") | ||
| self.assertEqual(session["files"][1]["title"], "New Annex 1") | ||
| self.assertEqual(session["size"], 13936) | ||
| # just to check, remove annex1 | ||
| remove_files_from_session((annex0_uid,)) | ||
| self.assertEqual(session["size"], 6968) | ||
|
|
||
| def test_remove_context_from_session(self): | ||
| """Test removing a context from a session.""" | ||
| annot = get_session_annotation() | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -86,6 +86,15 @@ def add_files_to_session( | |
| annex = uuidToObject(uuid=uid, unrestricted=True) | ||
| context_uid_provider = getAdapter(annex, IContextUidProvider) | ||
| context_uid = context_uid_provider.get_context_uid() | ||
| # update data if adding same file to same session | ||
| if annot['uids'].get(uid, -1) == session_id: | ||
| logger.info('File with UID %s is already in session_id %s and data were updated!', uid, session_id) | ||
| # remove old filename to avoid filename being renamed | ||
| old_filename = path.splitext([fn for fn in session["files"] | ||
| if fn['uid'] == uid][0]['filename'])[0] | ||
| existing_files.remove(old_filename) | ||
| remove_files_from_session([uid], remove_empty_session=False) | ||
|
Comment on lines
+93
to
+96
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Prevent crash when UID mapping is stale. This lookup assumes the UID always exists in 💡 Proposed fix- old_filename = path.splitext([fn for fn in session["files"]
- if fn['uid'] == uid][0]['filename'])[0]
+ old_file = next((fn for fn in session["files"] if fn["uid"] == uid), None)
+ if old_file is None:
+ logger.error(
+ "File UID %s mapped to session %s but missing from session files.",
+ uid, session_id,
+ )
+ continue
+ old_filename = path.splitext(old_file["filename"])[0]🧰 Tools🪛 Ruff (0.15.4)[warning] 95-96: Prefer Replace with (RUF015) 🤖 Prompt for AI Agents |
||
|
|
||
| filename, ext = path.splitext(annex.file.filename or "no_filename.pdf") | ||
| new_filename = get_correct_id(existing_files, filename) | ||
| session["files"].append( | ||
|
|
@@ -342,6 +351,17 @@ def get_session_info(session_id, portal=None): | |
| return annot['sessions'][session_id] | ||
|
|
||
|
|
||
| def get_sessions_for(context_uid): | ||
|
||
| """ """ | ||
| sessions = [] | ||
| annot = get_session_annotation() | ||
| for f_uid in annot["c_uids"].get(context_uid, []): | ||
| session_id = annot["uids"].get(f_uid) | ||
| if session_id is not None: | ||
| sessions.append(annot["sessions"].get(session_id)) | ||
| return sessions | ||
|
|
||
|
|
||
| def remove_context_from_session(context_uids): | ||
| """Remove all files from a session that are linked to the given context UIDs. | ||
|
|
||
|
|
@@ -356,10 +376,11 @@ def remove_context_from_session(context_uids): | |
| remove_files_from_session(list(c_uids[context_uid])) | ||
|
|
||
|
|
||
| def remove_files_from_session(files_uids): | ||
| def remove_files_from_session(files_uids, remove_empty_session=True): | ||
| """Remove files from their corresponding sessions. | ||
|
|
||
| :param files_uids: list of file UIDs to remove | ||
| :param remove_empty_session: when the last file of a session is removed the session will be removed by default, except when False, the empty session is kept | ||
| """ | ||
| annot = get_session_annotation() | ||
| sessions = annot["sessions"] | ||
|
|
@@ -389,7 +410,7 @@ def remove_files_from_session(files_uids): | |
| continue | ||
|
|
||
| del session["files"][i] | ||
| if not session["files"]: | ||
| if not session["files"] and remove_empty_session: | ||
| del sessions[session_id] | ||
| else: | ||
| session["last_update"] = datetime.now() | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
imio.annex en double