|
| 1 | +import pandas as pd |
| 2 | +import psycopg |
| 3 | +from databricks.sdk import WorkspaceClient |
| 4 | + |
| 5 | +import streamlit as st |
| 6 | + |
| 7 | +st.header("Lakebase Postgres database", divider=True) |
| 8 | +st.subheader("Read a table") |
| 9 | +st.write( |
| 10 | + "This app connects to a [Databricks Lakebase](https://docs.databricks.com/aws/en/oltp/) OLTP database instance and reads the first 100 rows from any table. " |
| 11 | + "Provide the instance name, database, schema, and table name." |
| 12 | +) |
| 13 | + |
| 14 | + |
| 15 | +w = WorkspaceClient() |
| 16 | + |
| 17 | + |
| 18 | +def get_connection(host: str, database: str, user: str) -> psycopg.Connection: |
| 19 | + """Get a connection to the Lakebase database using OAuth token.""" |
| 20 | + token = w.config.oauth_token().access_token |
| 21 | + |
| 22 | + return psycopg.connect( |
| 23 | + host=host, |
| 24 | + port=5432, |
| 25 | + dbname=database, |
| 26 | + user=user, |
| 27 | + password=token, |
| 28 | + sslmode="require", |
| 29 | + ) |
| 30 | + |
| 31 | + |
| 32 | +def query_df(host: str, database: str, user: str, sql: str) -> pd.DataFrame: |
| 33 | + """Execute a SQL query and return results as a DataFrame.""" |
| 34 | + conn = get_connection(host, database, user) |
| 35 | + try: |
| 36 | + with conn.cursor() as cur: |
| 37 | + cur.execute(sql) |
| 38 | + if not cur.description: |
| 39 | + return pd.DataFrame() |
| 40 | + |
| 41 | + cols = [d.name for d in cur.description] |
| 42 | + rows = cur.fetchall() |
| 43 | + return pd.DataFrame(rows, columns=cols) |
| 44 | + finally: |
| 45 | + conn.close() |
| 46 | + |
| 47 | + |
| 48 | +tab_try, tab_code, tab_reqs = st.tabs( |
| 49 | + ["**Try it**", "**Code snippet**", "**Requirements**"] |
| 50 | +) |
| 51 | + |
| 52 | +with tab_try: |
| 53 | + instance_names = [i.name for i in w.database.list_database_instances()] |
| 54 | + instance_name = st.selectbox("Database instance:", instance_names) |
| 55 | + database = st.text_input("Database:", value="databricks_postgres") |
| 56 | + schema = st.text_input("Schema:", value="public") |
| 57 | + table = st.text_input("Table:", value="your_table_name") |
| 58 | + |
| 59 | + # Get user and host |
| 60 | + user = w.config.client_id or w.current_user.me().user_name |
| 61 | + host = "" |
| 62 | + if instance_name: |
| 63 | + host = w.database.get_database_instance(name=instance_name).read_write_dns |
| 64 | + |
| 65 | + if st.button("Read table"): |
| 66 | + if not all([instance_name, host, database, schema, table]): |
| 67 | + st.error("Please provide all required fields.") |
| 68 | + else: |
| 69 | + df = query_df( |
| 70 | + host, database, user, f"SELECT * FROM {schema}.{table} LIMIT 100" |
| 71 | + ) |
| 72 | + st.dataframe(df, use_container_width=True) |
| 73 | + st.caption(f"Showing first 100 rows from {schema}.{table}") |
| 74 | + |
| 75 | +with tab_code: |
| 76 | + st.code( |
| 77 | + '''import os |
| 78 | +import pandas as pd |
| 79 | +import psycopg |
| 80 | +from databricks.sdk import WorkspaceClient |
| 81 | +import streamlit as st |
| 82 | +
|
| 83 | +
|
| 84 | +w = WorkspaceClient() |
| 85 | +
|
| 86 | +
|
| 87 | +def get_connection(host: str, database: str, user: str) -> psycopg.Connection: |
| 88 | + """Get a connection to the Lakebase database using OAuth token.""" |
| 89 | + token = w.config.oauth_token().access_token |
| 90 | + |
| 91 | + return psycopg.connect( |
| 92 | + host=host, |
| 93 | + port=5432, |
| 94 | + dbname=database, |
| 95 | + user=user, |
| 96 | + password=token, |
| 97 | + sslmode="require", |
| 98 | + ) |
| 99 | +
|
| 100 | +
|
| 101 | +def query_df(host: str, database: str, user: str, sql: str) -> pd.DataFrame: |
| 102 | + """Execute a SQL query and return results as a DataFrame.""" |
| 103 | + conn = get_connection(host, database, user) |
| 104 | + try: |
| 105 | + with conn.cursor() as cur: |
| 106 | + cur.execute(sql) |
| 107 | + if not cur.description: |
| 108 | + return pd.DataFrame() |
| 109 | + |
| 110 | + cols = [d.name for d in cur.description] |
| 111 | + rows = cur.fetchall() |
| 112 | + return pd.DataFrame(rows, columns=cols) |
| 113 | + finally: |
| 114 | + conn.close() |
| 115 | +
|
| 116 | +
|
| 117 | +# Get connection parameters from environment variables (set by Databricks Apps) |
| 118 | +# or fall back to manual configuration |
| 119 | +host = os.getenv("PGHOST") |
| 120 | +database = os.getenv("PGDATABASE") |
| 121 | +user = os.getenv("PGUSER") |
| 122 | +
|
| 123 | +if not all([host, database, user]): |
| 124 | + # Manual configuration if environment variables are not set |
| 125 | + instance_name = "your_instance_name" |
| 126 | + database = "databricks_postgres" |
| 127 | + user = w.config.client_id or w.current_user.me().user_name |
| 128 | + host = w.database.get_database_instance(name=instance_name).read_write_dns |
| 129 | +
|
| 130 | +# Query table |
| 131 | +schema = "public" |
| 132 | +table = "your_table_name" |
| 133 | +df = query_df(host, database, user, f"SELECT * FROM {schema}.{table} LIMIT 100") |
| 134 | +st.dataframe(df) |
| 135 | +''', |
| 136 | + language="python", |
| 137 | + ) |
| 138 | + |
| 139 | +with tab_reqs: |
| 140 | + st.info( |
| 141 | + "💡 **Tip:** Add your Lakebase instance as an App resource to automatically configure connection parameters via environment variables. " |
| 142 | + "See the [Lakebase resource documentation](https://docs.databricks.com/aws/en/dev-tools/databricks-apps/lakebase) for details." |
| 143 | + ) |
| 144 | + |
| 145 | + col1, col2, col3 = st.columns(3) |
| 146 | + |
| 147 | + with col1: |
| 148 | + st.markdown( |
| 149 | + """ |
| 150 | + **Permissions (app service principal)** |
| 151 | + * Add the Lakebase instance as an [**App resource**](https://docs.databricks.com/aws/en/dev-tools/databricks-apps/lakebase) to automatically configure permissions and environment variables (`PGHOST`, `PGDATABASE`, `PGUSER`, etc.). |
| 152 | + * Alternatively, manually create a Postgres role for the service principal. See [this guide](https://docs.databricks.com/aws/en/oltp/pg-roles?language=PostgreSQL#create-postgres-roles-and-grant-privileges-for-databricks-identities). |
| 153 | + * Example grants for read access: |
| 154 | + """ |
| 155 | + ) |
| 156 | + st.code( |
| 157 | + """ |
| 158 | +GRANT CONNECT ON DATABASE databricks_postgres TO "<service-principal-id>"; |
| 159 | +GRANT USAGE ON SCHEMA public TO "<service-principal-id>"; |
| 160 | +GRANT SELECT ON TABLE your_table_name TO "<service-principal-id>"; |
| 161 | + """, |
| 162 | + language="sql", |
| 163 | + ) |
| 164 | + |
| 165 | + with col2: |
| 166 | + st.markdown( |
| 167 | + """ |
| 168 | + **Databricks resources** |
| 169 | + * [Lakebase](https://docs.databricks.com/aws/en/oltp/) database instance (Postgres). |
| 170 | + * An existing Postgres database, schema, and table with data. |
| 171 | + """ |
| 172 | + ) |
| 173 | + |
| 174 | + with col3: |
| 175 | + st.markdown( |
| 176 | + """ |
| 177 | + **Dependencies** |
| 178 | + * [Databricks SDK](https://pypi.org/project/databricks-sdk/) - `databricks-sdk>=0.60.0` |
| 179 | + * [Psycopg](https://pypi.org/project/psycopg/) - `psycopg[binary]` |
| 180 | + * [Pandas](https://pypi.org/project/pandas/) - `pandas` |
| 181 | + * [Streamlit](https://pypi.org/project/streamlit/) - `streamlit` |
| 182 | + """ |
| 183 | + ) |
| 184 | + |
| 185 | + st.info( |
| 186 | + "[This guide](https://docs.databricks.com/aws/en/oltp/query/sql-editor#create-a-new-query) " |
| 187 | + "shows you how to query your Lakebase." |
| 188 | + ) |
| 189 | + |
| 190 | + st.warning( |
| 191 | + "⚠️ Tokens expire periodically; this app refreshes on each new connection and enforces TLS (sslmode=require)." |
| 192 | + ) |
0 commit comments