diff --git a/supporting-blog-content/ingesting-data-with-big-query/ingesting_data_with_big_query_notebook.ipynb b/supporting-blog-content/ingesting-data-with-big-query/ingesting_data_with_big_query_notebook.ipynb new file mode 100644 index 000000000..49fb2d407 --- /dev/null +++ b/supporting-blog-content/ingesting-data-with-big-query/ingesting_data_with_big_query_notebook.ipynb @@ -0,0 +1,391 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "metadata": { + "id": "LlrEjmtJNpuX" + }, + "source": [ + "# Ingesting data with BigQuery\n", + "\n", + "This notebook demonstrates how to consume data contained in BigQuery and index it into Elasticsearch. This notebook is based on the article [Ingesting data with BigQuery](https://www.elastic.co/search-labs/blog/ingesting-data-with-big-query)." + ] + }, + { + "cell_type": "markdown", + "metadata": { + "id": "GNaAN-GNO5qp" + }, + "source": [ + "## Installing dependencies and importing packages" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "colab": { + "base_uri": "https://localhost:8080/" + }, + "id": "qgclZayCk1Ct", + "outputId": "7da1e962-ead6-4016-b2e5-12a019885d86" + }, + "outputs": [], + "source": [ + "!pip install google-cloud-bigquery elasticsearch==8.16 google-auth" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "id": "rAesontNXpLu" + }, + "outputs": [], + "source": [ + "from elasticsearch import Elasticsearch, exceptions\n", + "from google.cloud import bigquery\n", + "from google.colab import auth\n", + "from getpass import getpass\n", + "\n", + "import json" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "id": "NwOmnk99Pfh3" + }, + "source": [ + "## Declaring variables" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "id": "-4sV9fiXdBwj" + }, + "source": [ + "This code will create inputs where you can enter your credentials.\n", + "Here you can learn how to retrieve your Elasticsearch credentials: [Finding Your Cloud ID](https://www.elastic.co/search-labs/tutorials/install-elasticsearch/elastic-cloud#finding-your-cloud-id)." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "colab": { + "base_uri": "https://localhost:8080/" + }, + "id": "GVKJKfFpPWuj", + "outputId": "21c6f4e3-8cb5-4a2c-8efe-0e45c3c6b1c4" + }, + "outputs": [], + "source": [ + "ELASTICSEARCH_ENDPOINT = getpass(\"Elasticsearch endpoint: \")\n", + "ELASTIC_API_KEY = getpass(\"Elastic Api Key: \")\n", + "\n", + "\n", + "# Google Cloud project name and BigQuery dataset name\n", + "PROJECT_ID = \"elasticsearch-bigquery\"\n", + "# dataset_id in format .\n", + "DATASET_ID = f\"{PROJECT_ID}.server_logs\"" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "id": "3O2HclcYHEsS" + }, + "source": [ + "## Instance a Elasticsearch client" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "id": "1LWiop8NYiQF" + }, + "outputs": [], + "source": [ + "auth.authenticate_user()\n", + "\n", + "# Elasticsearch client\n", + "es_client = Elasticsearch(\n", + " ELASTICSEARCH_ENDPOINT,\n", + " api_key=ELASTIC_API_KEY,\n", + ")" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "id": "9lvPHaXjPlfu" + }, + "source": [ + "## Creating mappings" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "id": "tc88YzAYw31e" + }, + "outputs": [], + "source": [ + "try:\n", + " es_client.indices.create(\n", + " index=\"bigquery-logs\",\n", + " body={\n", + " \"mappings\": {\n", + " \"properties\": {\n", + " \"status_code_description\": {\"type\": \"match_only_text\"},\n", + " \"status_code\": {\"type\": \"keyword\"},\n", + " \"@timestamp\": {\"type\": \"date\"},\n", + " \"ip_address\": {\"type\": \"ip\"},\n", + " \"http_method\": {\"type\": \"keyword\"},\n", + " \"endpoint\": {\"type\": \"keyword\"},\n", + " \"response_time\": {\"type\": \"integer\"},\n", + " }\n", + " }\n", + " },\n", + " )\n", + "except exceptions.RequestError as e:\n", + " if e.error == \"resource_already_exists_exception\":\n", + " print(\"Index already exists.\")\n", + " else:\n", + " raise e" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "id": "AOtwAfPXP38Z" + }, + "source": [ + "## Getting data from BigQuery" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "colab": { + "base_uri": "https://localhost:8080/" + }, + "id": "DyKtsjXRXB2S", + "outputId": "7734ad7e-cb11-42cc-b97d-e9241cab6b17" + }, + "outputs": [], + "source": [ + "client = bigquery.Client(project=PROJECT_ID)\n", + "# Getting tables from dataset\n", + "tables = client.list_tables(DATASET_ID)\n", + "\n", + "data = {}\n", + "\n", + "for table in tables:\n", + " # Table id must be in format .\n", + " table_id = f\"{DATASET_ID}.{table.table_id}\"\n", + "\n", + " print(f\"Processing table: {table.table_id}\")\n", + "\n", + " # Query to retrieve BigQuery tables data\n", + " query = f\"\"\"\n", + " SELECT *\n", + " FROM `{table_id}`\n", + " \"\"\"\n", + "\n", + " query_job = client.query(query)\n", + "\n", + " results = query_job.result()\n", + "\n", + " print(f\"Results for table: {table.table_id}:\")\n", + "\n", + " data[table.table_id] = []\n", + "\n", + " for row in results:\n", + " # Saving data with key=table_id\n", + " data[table.table_id].append(dict(row))\n", + " print(row)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "colab": { + "base_uri": "https://localhost:8080/" + }, + "id": "UAznwqXStJ39", + "outputId": "508a3255-43f7-4828-87d5-997cd04ca427" + }, + "outputs": [], + "source": [ + "# variable with data\n", + "logs_data = data[\"logs\"]\n", + "\n", + "\n", + "print(logs_data)" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "id": "2s4Tr6wBP773" + }, + "source": [ + "## Indexing to Elasticsearch" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "colab": { + "base_uri": "https://localhost:8080/" + }, + "id": "C4goyH6ZbDJK", + "outputId": "cfb4af41-1ef1-40da-a2da-6c0aa83633d3" + }, + "outputs": [], + "source": [ + "bulk_data = []\n", + "\n", + "for log_entry in logs_data:\n", + " # Convert timestamp to ISO 8601 string\n", + " timestamp_iso8601 = log_entry[\"_timestamp\"].isoformat()\n", + "\n", + " # Prepare action metadata\n", + " action_metadata = {\n", + " \"index\": {\n", + " \"_index\": \"bigquery-logs\",\n", + " \"_id\": f\"{log_entry['ip_address']}-{timestamp_iso8601}\",\n", + " }\n", + " }\n", + "\n", + " # Prepare document\n", + " document = {\n", + " \"ip_address\": log_entry[\"ip_address\"],\n", + " \"status_code\": log_entry[\"status_code\"],\n", + " \"@timestamp\": timestamp_iso8601,\n", + " \"http_method\": log_entry[\"http_method\"],\n", + " \"endpoint\": log_entry[\"endpoint\"],\n", + " \"response_time\": log_entry[\"response_time\"],\n", + " \"status_code_description\": log_entry[\"status_code_description\"],\n", + " }\n", + "\n", + " # Append to bulk data\n", + " bulk_data.append(action_metadata)\n", + " bulk_data.append(document)\n", + "\n", + "print(bulk_data)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "colab": { + "base_uri": "https://localhost:8080/" + }, + "id": "WPEwsJrFbDHQ", + "outputId": "ab5904f7-21c1-4596-fb06-55569cd9eb17" + }, + "outputs": [], + "source": [ + "try:\n", + " # Indexing data\n", + " response = es_client.bulk(body=bulk_data)\n", + "\n", + " if response[\"errors\"]:\n", + " print(\"Errors while indexing:\")\n", + " for item in response[\"items\"]:\n", + " if \"error\" in item[\"index\"]:\n", + " print(item[\"index\"][\"error\"])\n", + " else:\n", + " print(\"Documents indexed successfully.\")\n", + "except Exception as e:\n", + " print(f\"Bulk indexing failed: {e}\")" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "id": "uxix_o8LQDup" + }, + "source": [ + "# Searching data" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "colab": { + "base_uri": "https://localhost:8080/" + }, + "id": "285_MwI8yknk", + "outputId": "67d43aca-b05d-43f7-c730-1fa3bc0c4662" + }, + "outputs": [], + "source": [ + "response = es_client.search(\n", + " index=\"bigquery-logs\",\n", + " body={\n", + " \"query\": {\"match\": {\"status_code_description\": \"error\"}},\n", + " \"sort\": [{\"@timestamp\": {\"order\": \"desc\"}}],\n", + " \"aggs\": {\"by_ip\": {\"terms\": {\"field\": \"ip_address\", \"size\": 10}}},\n", + " },\n", + ")\n", + "\n", + "# Print results\n", + "formatted_json = json.dumps(response.body, indent=4)\n", + "print(formatted_json)" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "id": "S6WZMJayyzxh" + }, + "source": [ + "## Deleting\n", + "\n", + "Finally, we can delete the resources used to prevent them from consuming resources." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "colab": { + "base_uri": "https://localhost:8080/" + }, + "id": "9UcQwa41yy_x", + "outputId": "02a33d89-b57c-4273-ba93-5b6441a4f91e" + }, + "outputs": [], + "source": [ + "# Cleanup - Delete Index\n", + "es_client.indices.delete(index=\"bigquery-logs\", ignore=[400, 404])" + ] + } + ], + "metadata": { + "colab": { + "provenance": [] + }, + "kernelspec": { + "display_name": "Python 3", + "name": "python3" + }, + "language_info": { + "name": "python" + } + }, + "nbformat": 4, + "nbformat_minor": 0 +}