diff --git a/README.md b/README.md index 0d45df4c..771c9117 100644 --- a/README.md +++ b/README.md @@ -432,7 +432,9 @@ import-borders load borders.csv # Load borders.csv into a table This utility requires PostgreSQL's PG* environment variables, and optionally uses `PBF_DATA_DIR, BORDERS_PBF_FILE, BORDERS_CSV_FILE, BORDERS_TABLE_NAME`. ## Importing into Postgres -The `import-sql` script can execute a single SQL file in Postgres when the file is given as the first parameter. +The `run-psql` helper script will run psql with one or more files provided. It can also run them in parallel. In case of an error, `run-psql` will not start any new files, but will wait for all started files to finish. Once finished, it will re-print the last few lines of STDERR to simplify debugging. Unlike legacy `import-sql`, `run-psql` does not make any assumptions about SQL directories. + +The legacy `import-sql` script can execute a single SQL file in Postgres when the file is given as the first parameter. The script is obsolete and should not be used. If ran without any arguments, `import-sql` executes all of the following: * SQL files from `$SQL_TOOLS_DIR` - contains all SQL files to be imported before the ones generated by OpenMapTiles project. By default, contains OMT fork of the Mapbox's [postgis-vt-util.sql](https://github.com/openmaptiles/postgis-vt-util) helper functions and contains the [sql/language.sql](sql/zzz_language.sql) script. (as `10_postgis-vt-util.sql` - the file name has to be imported before files in `/sql`) @@ -441,7 +443,7 @@ If ran without any arguments, `import-sql` executes all of the following: Generating and importing SQL could be done in a single step with `&&`, e.g. ```bash -generate-sqltomvt openmaptiles.yaml > "$SQL_DIR/mvt.sql" && import-sql +generate-sqltomvt openmaptiles.yaml > "$SQL_DIR/mvt.sql" && run-psql "$SQL_DIR/mvt.sql" ``` Optionally you may pass extra arguments to `psql` by using `PSQL_OPTIONS` environment variable. For example `PSQL_OPTIONS=-a` makes psql echo all commands read from a file into stdout. diff --git a/bin/import-sql b/bin/import-sql index 7e9c7826..14dce8a7 100755 --- a/bin/import-sql +++ b/bin/import-sql @@ -1,7 +1,7 @@ #!/usr/bin/env bash -set -o errexit +set -o errexit # set -e set -o pipefail -set -o nounset +set -o nounset # set -u shopt -s nullglob # For backward compatibility, allow both PG* and POSTGRES_* forms, @@ -13,69 +13,52 @@ export PGUSER="${POSTGRES_USER:-${PGUSER?}}" export PGPASSWORD="${POSTGRES_PASSWORD:-${PGPASSWORD?}}" export PGPORT="${POSTGRES_PORT:-${PGPORT:-5432}}" +PSQL="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)/run-psql" -function exec_psql_file() { - local file_name="$1" - # Allows additional parameters to be passed to psql - # For example, PSQL_OPTIONS='-a -A' would echo everything and disable output alignment - # Using eval allows complex cases with quotes, like PSQL_OPTIONS=" -a -c 'select ...' " - eval "local psql_opts=(${PSQL_OPTIONS:-})" - - echo "Importing $file_name (md5 $(md5sum < "$file_name") $(wc -l < "$file_name") lines) into Postgres..." - - # shellcheck disable=SC2154 - psql \ - -v ON_ERROR_STOP="1" \ - -c '\timing on' \ - -f "$file_name" \ - "${psql_opts[@]}" -} function import_all_sql_files() { local dir="$1" - local sql_file if [[ -d "$dir/parallel" ]]; then # Assume this dir may contain run_first.sql, parallel/*.sql, and run_last.sql # use parallel execution if [[ -f "$dir/run_first.sql" ]]; then - exec_psql_file "$dir/run_first.sql" + "${PSQL}" "$dir/run_first.sql" else echo "File $dir/run_first.sql not found, skipping" fi # Run import-sql script in parallel, up to MAX_PARALLEL_PSQL processes at the same time - : "${MAX_PARALLEL_PSQL:=5}" - echo "Importing $(find "$dir/parallel" -name "*.sql" | wc -l) sql files from $dir/parallel/, up to $MAX_PARALLEL_PSQL files at the same time" - find "$dir/parallel" -name "*.sql" -print0 | xargs -0 -I{} -P "$MAX_PARALLEL_PSQL" sh -c "\"$0\" \"{}\" || exit 255" - echo "Finished importing sql files matching '$dir/parallel/*.sql'" + "${PSQL}" --parallel "${MAX_PARALLEL_PSQL:=1}" "$dir/parallel" if [[ -f "$dir/run_last.sql" ]]; then - exec_psql_file "$dir/run_last.sql" + "${PSQL}" "$dir/run_last.sql" else echo "File $dir/run_last.sql not found, skipping" fi else - for sql_file in "$dir"/*.sql; do - exec_psql_file "$sql_file" - done + "${PSQL}" "$dir" fi } # If there are no arguments, imports everything, # otherwise the first argument is the name of a file to be imported. if [[ $# -eq 0 ]]; then - if [[ "${SQL_TOOLS_DIR:-}" == "" ]]; then - echo "ENV variable SQL_TOOLS_DIR is not set. It should contain directory with .sql files, e.g. postgis-vt-util.sql and language.sql" + if [[ "${SQL_TOOLS_DIR:-}" == "" ]] || [[ ! -d "${SQL_TOOLS_DIR}" ]]; then + echo "ENV variable SQL_TOOLS_DIR is not set or there are no files. It should contain directory with .sql files, e.g. postgis-vt-util.sql and language.sql" exit 1 + else + echo "SQL_TOOLS_DIR=$SQL_TOOLS_DIR" fi - if [[ "${SQL_DIR:-}" == "" ]]; then - echo "ENV variable SQL_DIR is not set. It should contain directory with .sql files, e.g. tileset.sql, or run_first.sql, run_last.sql, and parallel/*.sql" + if [[ "${SQL_DIR:-}" == "" ]] || [[ ! -d "${SQL_DIR}" ]]; then + echo "ENV variable SQL_DIR is not set or there are no files. It should contain directory with .sql files, e.g. tileset.sql, or run_first.sql, run_last.sql, and parallel/*.sql" exit 1 + else + echo "SQL_DIR=$SQL_DIR" fi import_all_sql_files "$SQL_TOOLS_DIR" # import postgis-vt-util.sql and all other tools SQL files from /sql import_all_sql_files "$SQL_DIR" # import compiled tileset.sql else - exec_psql_file "$1" + "${PSQL}" "$1" fi diff --git a/bin/run-psql b/bin/run-psql new file mode 100755 index 00000000..e75a6053 --- /dev/null +++ b/bin/run-psql @@ -0,0 +1,186 @@ +#!/usr/bin/env python +""" +Usage: + run-psql ... + [--parallel= [--print-err=]] + [--pghost=] [--pgport=] [--dbname=] + [--user=] [--password=] [--verbose] + [-- ...] + run-psql --help + run-psql --version + +Options: + One or more SQL files or directories to run using psql. + Override default psql parameters. + -p --parallel= Run all files/dirs specified with ... in parallel, + up to at the same time. [default: 1] + -e --print-err= If psql exits with an error, print last output lines + after all other psql imports finish. Disabled if 0. [default: 10] + -v --verbose Print additional debugging information + --help Show this screen. + --version Show version. + +PostgreSQL Options: + -h --pghost= Postgres hostname. By default uses PGHOST env or "localhost" if not set. + -P --pgport= Postgres port. By default uses PGPORT env or "5432" if not set. + -d --dbname= Postgres db name. By default uses PGDATABASE env or "openmaptiles" if not set. + -U --user= Postgres user. By default uses PGUSER env or "openmaptiles" if not set. + --password= Postgres password. By default uses PGPASSWORD env or "openmaptiles" if not set. + +These legacy environment variables should not be used, but they are still supported: + POSTGRES_HOST, POSTGRES_PORT, POSTGRES_DB, POSTGRES_USER, POSTGRES_PASSWORD +""" +import collections +import os +from dataclasses import dataclass +from hashlib import md5 +from multiprocessing import Pool, Value +from pathlib import Path +from select import select +from subprocess import Popen, PIPE, list2cmdline +from sys import stdout, stderr +from typing import List, Optional + +# noinspection PyProtectedMember +from docopt import docopt, DocoptExit + +import openmaptiles +from openmaptiles.pgutils import parse_pg_args + + +@dataclass +class Params: + print_on_err: int + psql_args: Optional[List[str]] + pghost: str + pgport: str + dbname: str + user: str + password: str + verbose: bool + + +stop_value: Optional[Value] = None + + +def last_lines(output: List[bytes], count: int, name: str) -> str: + if output and count > 0: + text = b''.join(output).decode('utf-8').strip() + if text: + lines = text.split('\n')[-count:] + out = '\n'.join(lines) + return f'\n###### Last {len(lines)} lines of {name}:\n{out}' + return '' + + +def run_psql(file: Path, params: Params): + if stop_value.value != 0: + print(f'Skipping {file}', file=stderr) + return None + + cmd = ['stdbuf', '-oL', '-e0', 'psql', '-f', file] + if params.psql_args is None: + cmd.extend(['-v', 'ON_ERROR_STOP=1', '-c', r'\timing on']) + else: + cmd.extend(params.psql_args) + + env = { + 'PGHOST': params.pghost, + 'PGDATABASE': params.dbname, + 'PGUSER': params.user, + 'PGPASSWORD': params.password, + 'PGPORT': params.pgport, + } + + sql_content = file.read_bytes() + md5sum = md5(sql_content).hexdigest() + lines = sql_content.decode('utf-8').count('\n') + print(f'Importing {file} (md5 {md5sum} {lines} lines) into Postgres...') + if params.verbose: + print(f' {list2cmdline(cmd)}') + print('Environment Variables:') + for k, v in sorted(env.items()): + print(f' {k}={v}') + + stderr_buf = [] + with Popen(cmd, stdout=PIPE, stderr=PIPE, env=env) as proc: + readable = { + proc.stdout.fileno(): stdout.buffer, + proc.stderr.fileno(): stderr.buffer, + } + while readable: + for fd in select(readable, [], [])[0]: + data = os.read(fd, 1024) # read available + if not data: + del readable[fd] + else: + if params.print_on_err > 0 and fd == proc.stderr.fileno(): + stderr_buf.append(data) + readable[fd].write(data) + readable[fd].flush() + # there should be no more output, should be safe to wait + exit_code = proc.wait() + if exit_code == 0: + print(f'Finished importing {file}...') + return None + + stop_value.value = 1 + result = f'File {file} failed with error {exit_code}' + print(result) + return result + last_lines(stderr_buf, params.print_on_err, 'STDERR') + + +def main(args): + pghost, pgport, dbname, user, password = parse_pg_args(args) + paths = args[''] + if '--' in paths: + pos = paths.index('--') + psql_args = paths[pos + 1:] + paths = paths[:pos] + else: + psql_args = None + params = Params(int(args['--print-err']), psql_args, pghost, pgport, dbname, user, password, args['--verbose']) + + paths = [Path(v).resolve() for v in paths] + dups = [vv for vv, cnt in collections.Counter((str(v) for v in paths)).items() if cnt > 1] + if dups: + raise DocoptExit(f'ERROR: Duplicate paths: [{"], [".join(dups)}]') + missing = [] + files = [] + for path in paths: + if not path.exists(): + missing.append(path) + elif path.is_dir(): + for file in path.glob('*.sql'): + if file.is_file(): + files.append(file) + else: + files.append(path) + if missing: + raise DocoptExit(f'ERROR: Path does not exist: [{"], [".join(str(v) for v in missing)}]') + if not files: + # For compatibility, allow empty dir import + print('No .sql files were found') + exit(0) + + global stop_value + stop_value = Value('b', 0) + + parallel = int(args['--parallel']) + with Pool(processes=parallel) as pool: + print(f'Importing {len(files)} files...') + tasks = [(file, params) for file in sorted(files)] + async_result = pool.starmap_async(run_psql, tasks, chunksize=1) + results = async_result.get() + + if stop_value.value != 0: + if parallel > 1: + print('-------------------------- ERROR SUMMARY ---------------------------------') + print('\n'.join(v for v in results if v)) + exit(1) + else: + print('All SQL files successfully executed') + + +if __name__ == '__main__': + main(docopt(__doc__, version=openmaptiles.__version__)) diff --git a/tests/sql/test-sql.sh b/tests/sql/test-sql.sh index efa07758..cbd19cc4 100755 --- a/tests/sql/test-sql.sh +++ b/tests/sql/test-sql.sh @@ -28,6 +28,7 @@ done sleep 1 # Import all pre-required SQL code +mkdir -p "$SQL_DIR" source import-sql echo "++++++++++++++++++++++++"