|
2 | 2 | from flask_babel import force_locale |
3 | 3 | import re |
4 | 4 | import simplejson as json |
5 | | -from urllib.parse import quote |
6 | 5 | from markupsafe import Markup |
7 | 6 |
|
8 | 7 | from typing import Union, Dict, Tuple, Any |
|
46 | 45 | from ckanext.recombinant.tables import get_chromo, get_geno |
47 | 46 | from ckanext.recombinant.helpers import ( |
48 | 47 | recombinant_primary_key_fields, recombinant_choice_fields) |
| 48 | +from ckanext.recombinant.utils import get_constraint_error_from_psql_error |
49 | 49 |
|
50 | 50 | from io import BytesIO |
51 | 51 |
|
|
58 | 58 | log = getLogger(__name__) |
59 | 59 | recombinant = Blueprint('recombinant', __name__) |
60 | 60 |
|
61 | | -FK_DETAILS_MATCH__KEYS = re.compile('(?<=DETAIL:).*(\((.*)\))=') |
62 | | -FK_DETAILS_MATCH__VALUES = re.compile('(?<=DETAIL:).*(\((.*)\))') |
63 | | -FK_DETAILS_MATCH__TABLE = re.compile('(?<=DETAIL:).*"(.*?)"') |
64 | | - |
65 | 61 |
|
66 | 62 | @recombinant.route('/recombinant/upload/<id>', methods=['POST']) |
67 | 63 | def upload(id: str) -> Response: |
@@ -208,15 +204,8 @@ def record_fail(err: Union[str, Markup]) -> Union[str, Markup]: |
208 | 204 | error_message = chromo.get('datastore_constraint_errors', {}).get( |
209 | 205 | 'delete', e.error_dict['foreign_constraints'][0]) # type: ignore |
210 | 206 | sql_error_string = e.error_dict['info']['orig'] |
211 | | - ref_keys, ref_values, ref_resource = \ |
212 | | - _get_constraint_info_from_psql_error(lc, sql_error_string) |
213 | | - error_message = _(error_message) |
214 | | - if 'refKeys' in error_message: |
215 | | - error_message = error_message.format(refKeys=ref_keys) |
216 | | - if 'refValues' in error_message: |
217 | | - error_message = error_message.format(refValues=ref_values) |
218 | | - if 'refTable' in error_message: |
219 | | - error_message = error_message.format(refTable=ref_resource) |
| 207 | + error_message = get_constraint_error_from_psql_error( |
| 208 | + lc, sql_error_string, error_message) |
220 | 209 | h.flash_error(error_message) |
221 | 210 | # type_ignore_reason: incomplete typing |
222 | 211 | return record_fail(Markup(error_message)) # type: ignore |
@@ -731,17 +720,9 @@ def _process_upload_file(lc: LocalCKAN, |
731 | 720 | foreign_error = chromo.get( |
732 | 721 | 'datastore_constraint_errors', {}).get('upsert') |
733 | 722 | sql_error_string = e.error_dict['upsert_info']['orig'] |
734 | | - ref_keys, ref_values, ref_resource = \ |
735 | | - _get_constraint_info_from_psql_error(lc, sql_error_string) |
736 | 723 | if foreign_error: |
737 | | - foreign_error = _(foreign_error) |
738 | | - if 'refKeys' in foreign_error: |
739 | | - foreign_error = foreign_error.format(refKeys=ref_keys) |
740 | | - if 'refValues' in foreign_error: |
741 | | - foreign_error = foreign_error.format(refValues=ref_values) |
742 | | - if 'refTable' in foreign_error: |
743 | | - foreign_error = foreign_error.format(refTable=ref_resource) |
744 | | - pgerror = Markup(foreign_error) |
| 724 | + pgerror = get_constraint_error_from_psql_error( |
| 725 | + lc, sql_error_string, foreign_error) |
745 | 726 | elif 'invalid input syntax for type integer' in pgerror: |
746 | 727 | if ':' in pgerror: |
747 | 728 | pgerror = _('Invalid input syntax for type integer: {}')\ |
@@ -769,49 +750,3 @@ def _process_upload_file(lc: LocalCKAN, |
769 | 750 | raise |
770 | 751 | finally: |
771 | 752 | ds_write_connection.close() |
772 | | - |
773 | | - |
774 | | -def _get_constraint_info_from_psql_error( |
775 | | - lc: LocalCKAN, sql_error_string: str) -> Tuple[str, str, str]: |
776 | | - """ |
777 | | - Parses the pSQL original constraint error string to determine |
778 | | - the referenced/referencing keys, values, and table. |
779 | | -
|
780 | | - Will return a tuple of the keys, values, and referenced/referencing table |
781 | | - as a URI to the CKAN resource. |
782 | | -
|
783 | | - If the resource is a PD type, it will append the key/value filters |
784 | | - for a DataTables query. |
785 | | - """ |
786 | | - keys_match = re.search(FK_DETAILS_MATCH__KEYS, sql_error_string) |
787 | | - values_match = re.search(FK_DETAILS_MATCH__VALUES, sql_error_string) |
788 | | - table_match = re.search(FK_DETAILS_MATCH__TABLE, sql_error_string) |
789 | | - ref_keys = keys_match.group(2) if keys_match else None |
790 | | - ref_values = values_match.group(2) if values_match else None |
791 | | - ref_resource = table_match.group(1) if table_match else None |
792 | | - if ref_resource: |
793 | | - try: |
794 | | - ref_res_dict = lc.action.resource_show(id=ref_resource) |
795 | | - ref_pkg_dict = lc.action.package_show(id=ref_res_dict['package_id']) |
796 | | - if ref_pkg_dict['type'] in h.recombinant_get_types(): |
797 | | - if ref_keys and ref_values: |
798 | | - dt_query = {} |
799 | | - _ref_keys = ref_keys.replace(' ', '').split(',') |
800 | | - _ref_values = ref_values.replace(' ', '').split(',') |
801 | | - for _i, key in enumerate(_ref_keys): |
802 | | - dt_query[key] = _ref_values[_i] |
803 | | - dt_query = json.dumps(dt_query, separators=(',', ':')) |
804 | | - ref_res_uri = h.url_for( |
805 | | - 'recombinant.preview_table', |
806 | | - resource_name=ref_res_dict['name'], |
807 | | - owner_org=ref_pkg_dict['organization']['name'], |
808 | | - dt_query=dt_query) |
809 | | - else: |
810 | | - ref_res_uri = h.url_for( |
811 | | - 'resource.read', |
812 | | - id=ref_res_dict['package_id'], |
813 | | - resource_id=ref_res_dict['id']) |
814 | | - ref_resource = ref_res_uri |
815 | | - except Exception: |
816 | | - pass |
817 | | - return ref_keys, ref_values, ref_resource |
0 commit comments