|
75 | 75 | POSTGRES_BIGINT_MAX = 9223372036854775807 |
76 | 76 | POSTGRES_BIGINT_MIN = -9223372036854775808 |
77 | 77 |
|
78 | | -MINIMUM_QSV_VERSION = "0.133.0" |
| 78 | +MINIMUM_QSV_VERSION = "2.1.0" |
79 | 79 | MAX_CONTENT_LENGTH = tk.config.get("ckanext.datapusher_plus.max_content_length") |
80 | 80 |
|
81 | 81 | DATASTORE_URLS = { |
@@ -915,6 +915,38 @@ def _push_to_datastore(task_id, input, dry_run=False, temp_dir=None): |
915 | 915 | raise utils.JobError( |
916 | 916 | "Cannot infer data types and compile statistics: {}".format(e) |
917 | 917 | ) |
| 918 | + |
| 919 | + # remove the last four rows. Do this using the qsv slice command |
| 920 | + # the last four rows are qsv__rowcount, qsv__columncount, qsv__filesize_bytes, qsv__fingerprint_hash |
| 921 | + # they'll be used in later phases of DRUF, but let's remove them for now until then |
| 922 | + qsv_slice_csv = os.path.join(temp_dir, "qsv_slice.csv") |
| 923 | + try: |
| 924 | + subprocess.run( |
| 925 | + [ |
| 926 | + qsv_bin, |
| 927 | + "slice", |
| 928 | + "--start", |
| 929 | + "-4", |
| 930 | + "--invert", |
| 931 | + qsv_stats_csv, |
| 932 | + "--output", |
| 933 | + qsv_slice_csv, |
| 934 | + ], |
| 935 | + check=True, |
| 936 | + ) |
| 937 | + except subprocess.CalledProcessError as e: |
| 938 | + raise utils.JobError("Cannot slice CSV: {}".format(e)) |
| 939 | + |
| 940 | + # read the sliced CSV and remove the qsv__value column (the last column). |
| 941 | + # Do this using the qsv select command |
| 942 | + try: |
| 943 | + subprocess.run( |
| 944 | + [qsv_bin, "select", "!_", qsv_slice_csv, "--output", qsv_stats_csv], |
| 945 | + check=True, |
| 946 | + ) |
| 947 | + except subprocess.CalledProcessError as e: |
| 948 | + raise utils.JobError("Cannot select CSV: {}".format(e)) |
| 949 | + |
918 | 950 | with open(qsv_stats_csv, mode="r") as inp: |
919 | 951 | reader = csv.DictReader(inp) |
920 | 952 | for row in reader: |
@@ -1402,7 +1434,7 @@ def _push_to_datastore(task_id, input, dry_run=False, temp_dir=None): |
1402 | 1434 | except psycopg2.Error as e: |
1403 | 1435 | logger.warning("Could not TRUNCATE: {}".format(e)) |
1404 | 1436 |
|
1405 | | - col_names_list = [h["id"] for h in headers_dicts if not h["id"].startswith("qsv_")] |
| 1437 | + col_names_list = [h["id"] for h in headers_dicts] |
1406 | 1438 | column_names = sql.SQL(",").join(sql.Identifier(c) for c in col_names_list) |
1407 | 1439 | copy_sql = sql.SQL( |
1408 | 1440 | "COPY {} ({}) FROM STDIN " |
|
0 commit comments