diff --git a/geocoding_ban/custom-recipes/geocoding-ban/recipe.json b/geocoding_ban/custom-recipes/geocoding-ban/recipe.json new file mode 100644 index 00000000..f5358dd4 --- /dev/null +++ b/geocoding_ban/custom-recipes/geocoding-ban/recipe.json @@ -0,0 +1,109 @@ +{ + // Meta data for display purposes + "meta" : { + "label": "Geocode with BAN", + "author": "Ministère de l’intérieur — EIG 2017", + "tags": ["geocoding", "dev"] + }, + + "kind" : "PYTHON", + + "inputRoles" : [{ + "name": "input", + "label": "Dataset to geocode", + "description": "what input A means", + "arity": "UNARY", + "required": true, + "acceptsDataset": true + }], + "selectableFromDataset": "input", + "outputRoles" : [{ + "name": "output", + "label": "Output dataset", + "description": "what main output means", + "arity": "UNARY", + "required": true, + "acceptsDataset": true + }], + + "params": [ + { + "name": "server_address", + "label": "Server hosting Addok", + "type": "STRING", + "description": "Full url with http://. Do not include the path /search", + "mandatory": true, + "defaultValue": "http://api-adresse.data.gouv.fr" + }, + { + "name": "columns", + "label": "Address columns", + "type": "COLUMNS", + "columnRole": "input", + "description": "Multiple columns will be concatenated", + "mandatory": true + }, + { + "name": "post_code", + "label": "Column of the postcode", + "type": "COLUMN", + "columnRole": "input", + "description": "Optional: only results of that postcode will be returned", + "mandatory": false + }, + { + "name": "city_code", + "label": "Column of the city code", + "type": "COLUMN", + "columnRole": "input", + "description": "Optional: only results of that city code will be return. Do not confuse it with postcodes (despite its ressemblance in France)", + "mandatory": false + }, + { + "name": "lines_per_request", + "label": "Lines per request", + "type": "INT", + "defaultValue": 1000, + "mandatory": true, + "description": "Multiple lines per request saves time, but never exceed 8Mb per request" + }, + { + "name": "concurent_requests", + "label": "Concurrent requests", + "type": "INT", + "defaultValue": 1, + "mandatory": true, + "description": "Use more than 1 only if you host your own instance. You might otherwise get banned" + }, + { + "name": "http_proxy", + "label": "HTTP proxy", + "type": "STRING", + "mandatory": false, + "description": "If you don’t have an internet access and have to use a web proxy. Use the full form with http://" + }, + { + "name": "timeout", + "label": "Timeout", + "type": "INT", + "defaultValue": 1000, + "mandatory": true, + "description": "Of the http request, in milliseconds" + }, + { + "name": "prefix", + "label": "Result columns prefix", + "type": "STRING", + "defaultValue": "result_", + "mandatory": true, + "description": "Renames the columns returned by the geocoder" + }, + { + "name": "error_col", + "label": "Column with error message", + "type": "STRING", + "mandatory": false, + "description": "Optional. The column name will be prefixed" + } + ] +} diff --git a/geocoding_ban/custom-recipes/geocoding-ban/recipe.py b/geocoding_ban/custom-recipes/geocoding-ban/recipe.py new file mode 100644 index 00000000..5d59ab02 --- /dev/null +++ b/geocoding_ban/custom-recipes/geocoding-ban/recipe.py @@ -0,0 +1,160 @@ +# coding=utf-8 + +u"""Geocoding plugin for Dataiku Science Studio (DSS) + +It uses (by default) the geocoding server from: http://adresse.data.gouv.fr + +It will also work on any private instance of https://github.com/addok/addok + +This plugin was developped by the Ministère de l’Intérieur +in the context of the program Entrepreneurs d’Intérêt Général 2017 +""" + +from concurrent import futures +import dataiku +from dataiku.customrecipe import get_input_names_for_role +from dataiku.customrecipe import get_output_names_for_role +from dataiku.customrecipe import get_recipe_config +import itertools +import logging +import pandas as pd +import requests +import StringIO + +# We read the addresses from the input dataset +# And write the coordinates in the output dataset +input_name = get_input_names_for_role('input')[0] +input_dataset = dataiku.Dataset(input_name) + +output_name = get_output_names_for_role('output')[0] +output_dataset = dataiku.Dataset(output_name) + +# All the variables when building a request +server_address = get_recipe_config()['server_address'] +columns = get_recipe_config()['columns'] +post_code = get_recipe_config().get('post_code', None) +city_code = get_recipe_config().get('city_code', None) +lines_per_request = int(get_recipe_config()['lines_per_request']) +concurent_requests = int(get_recipe_config()['concurent_requests']) +http_proxy = get_recipe_config().get('http_proxy', None) +timeout = int(get_recipe_config()['timeout']) +prefix = get_recipe_config().get('prefix', None) +error = get_recipe_config().get('error_col', None) +i = 0 + + +def datas(): + """Returns the columns composing the address""" + result = {'columns': columns} + cols = list(columns) + + if post_code: + result['postcode'] = post_code + cols.append(post_code) + + if city_code: + result['citycode'] = city_code + cols.append(city_code) + + return (result, cols) + + +def adresse_submit(df): + """Does the actual request to the geocoding server""" + global i + verbosechunksize = 2000 + string_io = StringIO.StringIO() + i += lines_per_request + if (i % verbosechunksize) == 0: + logging.info("geocoding chunk %r to %r", i-verbosechunksize, i) + + data, cols = datas() + df[cols].to_csv(string_io, encoding="utf-8", index=False) + + kwargs = { + 'data': data, + 'files': {'data': string_io.getvalue()}, + 'timeout': timeout, + 'url': "{}/search/csv".format(server_address) + } + + if http_proxy: + kwargs['proxies'] = {'http': http_proxy} + + response = requests.post(**kwargs) + + if error: + error_col = 'result_{}'.format(error) + else: + error_col = None + + if response.status_code == 200: + content = StringIO.StringIO(response.content.decode('utf-8-sig')) + result = pd.read_csv(content, dtype=object) + if error_col: + result[error_col] = None + result = result.rename(columns={'longitude': 'result_longitude', + 'latitude': 'result_latitude'}) + + # We only keep the new columns to avoid modifying the schema + diff = result.axes[1].difference(df.axes[1]) + + for new_column in diff: + if new_column[0:7] == "result_": + df[new_column.replace("result_", prefix)] = result[new_column] + + else: + logging.warning("Chunk %r to %r: no valid response", + i-lines_per_request, i) + df['result_score'] = -1 + if error_col: + df["{}{}".format(prefix, error)] = "HTTP Status: {}".format(response.status_code) + + return df + + +def grouper(iterable, n, fillvalue=None): + "Collect data into fixed-length chunks or blocks" + args = [iter(iterable)] * n + return itertools.izip_longest(*args, fillvalue=fillvalue) + + +# We make a first run with a sample to have a valid schema to build a writer +small = input_dataset.get_dataframe(sampling='head', + limit=1, + infer_with_pandas=False) + +initial_index = small.axes[1] +geocoded = adresse_submit(small) +output_index = geocoded.axes[1] + +if '{}longitude'.format(prefix) not in output_index: + raise Exception('Geocoding failed: unable to make a sample request') + +schema = input_dataset.read_schema() + +floats = [prefix + column for column in ['longitude', 'latitude', 'score']] +for column in output_index.difference(initial_index): + if column in floats: + schema.append({'name': column, 'type': 'float'}) + else: + schema.append({'name': column, 'type': 'string'}) + +output_dataset.write_schema(schema) +writer = output_dataset.get_writer() + +dataset_iter = input_dataset.iter_dataframes(chunksize=lines_per_request, + infer_with_pandas=False) + +with futures.ThreadPoolExecutor(max_workers=concurent_requests) as executor: + for chunks in grouper(dataset_iter, 10 * concurent_requests): + j = 0 + for s in executor.map(adresse_submit, chunks): + j += lines_per_request + try: + writer.write_dataframe(s) + except Exception as exc: + logging.warning("chunk %r to %r generated an exception: %r\n%r", + j-lines_per_request, j, exc, s) + +writer.close() diff --git a/geocoding_ban/plugin.json b/geocoding_ban/plugin.json new file mode 100644 index 00000000..cd05b389 --- /dev/null +++ b/geocoding_ban/plugin.json @@ -0,0 +1,29 @@ +// This file is the descriptor for the DSS Plugin geocoding_BAN +{ + // The identifier of the plugin. + // This must be globally unique, and only contain A-Za-z0-9_- + "id" : "geocoding-ban", + + // Version. It is highly recommended to use Semantic Versioning + "version" : "0.0.2", + + // Meta data for display purposes + "meta" : { + // Name of this plugin that appears in the interface. + "label": "French geocoding (BAN)", + + "description": "Uses the French BAN database to geocode addresses", + "author": "Ministère de l’Intérieur — EIG 2017", + + // The icon of a plugin must be one of the FontAwesome 3.1 icons + "icon": "icon-map-marker", + + "licenseInfo" : "Apache Software License", + + // URL where the user can learn more about the plugin + "url": "https://adresse.data.gouv.fr", + + // List of tags for filtering the list of plugins + "tags": ["geocoding", "ban"] + } +} diff --git a/geocoding_ban/requirements.json b/geocoding_ban/requirements.json new file mode 100644 index 00000000..0553bd6b --- /dev/null +++ b/geocoding_ban/requirements.json @@ -0,0 +1,5 @@ +{ + "python" : [ + {"name":"futures"} + ] +}