|
| 1 | +import requests |
| 2 | +import pandas as pd |
| 3 | +from sempy.fabric.exceptions import FabricHTTPException |
| 4 | +from sempy._utils._log import log |
| 5 | +import sempy_labs._icons as icons |
| 6 | +from typing import Optional |
| 7 | +from uuid import UUID |
| 8 | +from sempy_labs._kql_databases import _resolve_cluster_uri |
| 9 | +from sempy_labs._helper_functions import resolve_item_id |
| 10 | + |
| 11 | + |
| 12 | +@log |
| 13 | +def query_kusto( |
| 14 | + query: str, |
| 15 | + kql_database: str | UUID, |
| 16 | + workspace: Optional[str | UUID] = None, |
| 17 | + language: str = "kql", |
| 18 | +) -> pd.DataFrame: |
| 19 | + """ |
| 20 | + Runs a KQL query against a KQL database. |
| 21 | +
|
| 22 | + Parameters |
| 23 | + ---------- |
| 24 | + query : str |
| 25 | + The query (supports KQL or SQL - make sure to specify the language parameter accordingly). |
| 26 | + kql_database : str | uuid.UUID |
| 27 | + The KQL database name or ID. |
| 28 | + workspace : str | uuid.UUID, default=None |
| 29 | + The Fabric workspace name or ID. |
| 30 | + Defaults to None which resolves to the workspace of the attached lakehouse |
| 31 | + or if no lakehouse attached, resolves to the workspace of the notebook. |
| 32 | + language : str, default="kql" |
| 33 | + The language of the query. Currently "kql' and "sql" are supported. |
| 34 | +
|
| 35 | + Returns |
| 36 | + ------- |
| 37 | + pandas.DataFrame |
| 38 | + A pandas dataframe showing the result of the KQL query. |
| 39 | + """ |
| 40 | + |
| 41 | + import notebookutils |
| 42 | + |
| 43 | + language = language.lower() |
| 44 | + if language not in ["kql", "sql"]: |
| 45 | + raise ValueError( |
| 46 | + f"{icons._red_dot} Invalid language '{language}'. Only 'kql' and 'sql' are supported." |
| 47 | + ) |
| 48 | + |
| 49 | + cluster_uri = _resolve_cluster_uri(kql_database=kql_database, workspace=workspace) |
| 50 | + token = notebookutils.credentials.getToken(cluster_uri) |
| 51 | + |
| 52 | + headers = { |
| 53 | + "Authorization": f"Bearer {token}", |
| 54 | + "Content-Type": "application/json", |
| 55 | + "Accept": "application/json", |
| 56 | + } |
| 57 | + |
| 58 | + kql_database_id = resolve_item_id( |
| 59 | + item=kql_database, type="KQLDatabase", workspace=workspace |
| 60 | + ) |
| 61 | + payload = {"db": kql_database_id, "csl": query} |
| 62 | + if language == "sql": |
| 63 | + payload["properties"] = {"Options": {"query_language": "sql"}} |
| 64 | + |
| 65 | + response = requests.post( |
| 66 | + f"{cluster_uri}/v1/rest/query", |
| 67 | + headers=headers, |
| 68 | + json=payload, |
| 69 | + ) |
| 70 | + |
| 71 | + if response.status_code != 200: |
| 72 | + raise FabricHTTPException(response) |
| 73 | + |
| 74 | + results = response.json() |
| 75 | + columns_info = results["Tables"][0]["Columns"] |
| 76 | + rows = results["Tables"][0]["Rows"] |
| 77 | + |
| 78 | + df = pd.DataFrame(rows, columns=[col["ColumnName"] for col in columns_info]) |
| 79 | + |
| 80 | + for col_info in columns_info: |
| 81 | + col_name = col_info["ColumnName"] |
| 82 | + data_type = col_info["DataType"] |
| 83 | + |
| 84 | + try: |
| 85 | + if data_type == "DateTime": |
| 86 | + df[col_name] = pd.to_datetime(df[col_name]) |
| 87 | + elif data_type in ["Int64", "Int32", "Long"]: |
| 88 | + df[col_name] = ( |
| 89 | + pd.to_numeric(df[col_name], errors="coerce") |
| 90 | + .fillna(0) |
| 91 | + .astype("int64") |
| 92 | + ) |
| 93 | + elif data_type == "Real" or data_type == "Double": |
| 94 | + df[col_name] = pd.to_numeric(df[col_name], errors="coerce") |
| 95 | + else: |
| 96 | + # Convert any other type to string, change as needed |
| 97 | + df[col_name] = df[col_name].astype(str) |
| 98 | + except Exception as e: |
| 99 | + print( |
| 100 | + f"{icons.yellow_dot} Could not convert column {col_name} to {data_type}, defaulting to string: {str(e)}" |
| 101 | + ) |
| 102 | + df[col_name] = df[col_name].astype(str) |
| 103 | + |
| 104 | + return df |
| 105 | + |
| 106 | + |
| 107 | +def query_workspace_monitoring( |
| 108 | + query: str, workspace: Optional[str | UUID] = None, language: str = "kql" |
| 109 | +) -> pd.DataFrame: |
| 110 | + """ |
| 111 | + Runs a query against the Fabric workspace monitoring database. Workspace monitoring must be enabled on the workspace to use this function. |
| 112 | +
|
| 113 | + Parameters |
| 114 | + ---------- |
| 115 | + query : str |
| 116 | + The query (supports KQL or SQL - make sure to specify the language parameter accordingly). |
| 117 | + workspace : str | uuid.UUID, default=None |
| 118 | + The Fabric workspace name or ID. |
| 119 | + Defaults to None which resolves to the workspace of the attached lakehouse |
| 120 | + or if no lakehouse attached, resolves to the workspace of the notebook. |
| 121 | + language : str, default="kql" |
| 122 | + The language of the query. Currently "kql' and "sql" are supported. |
| 123 | +
|
| 124 | + Returns |
| 125 | + ------- |
| 126 | + pandas.DataFrame |
| 127 | + A pandas dataframe showing the result of the query. |
| 128 | + """ |
| 129 | + |
| 130 | + return query_kusto( |
| 131 | + query=query, |
| 132 | + kql_database="Monitoring KQL database", |
| 133 | + workspace=workspace, |
| 134 | + language=language, |
| 135 | + ) |
0 commit comments