Skip to content
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,7 @@ with requests.post(url, headers=headers, json={"chat": ["Count orders last week"
continue
obj = json.loads(part)
print('STREAM:', obj)
```

Notes & tips
- Graph IDs are namespaced per-user. When calling the API directly use the plain graph id (the server will namespace by the authenticated user). For uploaded files the `database` field determines the saved graph id.
Expand Down
101 changes: 82 additions & 19 deletions api/loaders/postgres_loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import decimal
import logging
from typing import AsyncGenerator, Dict, Any, List, Tuple
from urllib.parse import urlparse, parse_qs, unquote

import psycopg2
from psycopg2 import sql
Expand Down Expand Up @@ -96,19 +97,73 @@ def _serialize_value(value):
return None
return value

@staticmethod
def _parse_schema_from_url(connection_url: str) -> str:
"""
Parse the search_path from the connection URL's options parameter.

The options parameter follows PostgreSQL's libpq format:
postgresql://user:pass@host:port/db?options=-csearch_path%3Dschema_name

Args:
connection_url: PostgreSQL connection URL

Returns:
The first schema from search_path, or 'public' if not specified
"""
try:
parsed = urlparse(connection_url)
query_params = parse_qs(parsed.query)

# Get the options parameter
options = query_params.get('options', [])
if not options:
return 'public'

# Options can be URL-encoded, decode it
options_str = unquote(options[0])

# Parse -c search_path=value from options
# Format can be: -csearch_path=schema or -c search_path=schema
match = re.search(r'-c\s*search_path[=\s]+([^\s]+)', options_str, re.IGNORECASE)
if match:
search_path = match.group(1)
# search_path can be comma-separated, take the first schema
# Also handle quoted values and $user
first_schema = search_path.split(',')[0].strip().strip('"\'')
# Skip $user as it's a special variable
if first_schema == '$user':
schemas = search_path.split(',')
if len(schemas) > 1:
first_schema = schemas[1].strip().strip('"\'')
else:
return 'public'
return first_schema if first_schema else 'public'

return 'public'

except Exception: # pylint: disable=broad-exception-caught
# If parsing fails, default to public schema
return 'public'

@staticmethod
async def load(prefix: str, connection_url: str) -> AsyncGenerator[tuple[bool, str], None]:
"""
Load the graph data from a PostgreSQL database into the graph database.

Args:
connection_url: PostgreSQL connection URL in format:
postgresql://username:password@host:port/database
postgresql://username:password@host:port/database
Optionally with schema via options parameter:
postgresql://...?options=-csearch_path%3Dschema_name

Returns:
Tuple[bool, str]: Success status and message
"""
try:
# Parse schema from connection URL (defaults to 'public')
schema = PostgresLoader._parse_schema_from_url(connection_url)

# Connect to PostgreSQL database
conn = psycopg2.connect(connection_url)
cursor = conn.cursor()
Expand All @@ -120,11 +175,11 @@ async def load(prefix: str, connection_url: str) -> AsyncGenerator[tuple[bool, s

# Get all table information
yield True, "Extracting table information..."
entities = PostgresLoader.extract_tables_info(cursor)
entities = PostgresLoader.extract_tables_info(cursor, schema)

yield True, "Extracting relationship information..."
# Get all relationship information
relationships = PostgresLoader.extract_relationships(cursor)
relationships = PostgresLoader.extract_relationships(cursor, schema)

# Close database connection
cursor.close()
Expand All @@ -146,44 +201,46 @@ async def load(prefix: str, connection_url: str) -> AsyncGenerator[tuple[bool, s
yield False, "Failed to load PostgreSQL database schema"

@staticmethod
def extract_tables_info(cursor) -> Dict[str, Any]:
def extract_tables_info(cursor, schema: str = 'public') -> Dict[str, Any]:
"""
Extract table and column information from PostgreSQL database.

Args:
cursor: Database cursor
schema: Database schema to extract tables from (default: 'public')

Returns:
Dict containing table information
"""
entities = {}

# Get all tables in public schema
# Get all tables in the specified schema
cursor.execute("""
SELECT table_name, table_comment
FROM information_schema.tables t
LEFT JOIN (
SELECT schemaname, tablename, description as table_comment
FROM pg_tables pt
JOIN pg_class pc ON pc.relname = pt.tablename
JOIN pg_namespace pn ON pn.oid = pc.relnamespace AND pn.nspname = pt.schemaname
JOIN pg_description pd ON pd.objoid = pc.oid AND pd.objsubid = 0
WHERE pt.schemaname = 'public'
WHERE pt.schemaname = %s
) tc ON tc.tablename = t.table_name
WHERE t.table_schema = 'public'
WHERE t.table_schema = %s
AND t.table_type = 'BASE TABLE'
ORDER BY t.table_name;
""")
""", (schema, schema))

tables = cursor.fetchall()

for table_name, table_comment in tqdm.tqdm(tables, desc="Extracting table information"):
table_name = table_name.strip()

# Get column information for this table
columns_info = PostgresLoader.extract_columns_info(cursor, table_name)
columns_info = PostgresLoader.extract_columns_info(cursor, table_name, schema)

# Get foreign keys for this table
foreign_keys = PostgresLoader.extract_foreign_keys(cursor, table_name)
foreign_keys = PostgresLoader.extract_foreign_keys(cursor, table_name, schema)

# Generate table description
table_description = table_comment if table_comment else f"Table: {table_name}"
Expand All @@ -201,13 +258,14 @@ def extract_tables_info(cursor) -> Dict[str, Any]:
return entities

@staticmethod
def extract_columns_info(cursor, table_name: str) -> Dict[str, Any]:
def extract_columns_info(cursor, table_name: str, schema: str = 'public') -> Dict[str, Any]:
"""
Extract column information for a specific table.

Args:
cursor: Database cursor
table_name: Name of the table
schema: Database schema (default: 'public')

Returns:
Dict containing column information
Expand All @@ -231,6 +289,7 @@ def extract_columns_info(cursor, table_name: str) -> Dict[str, Any]:
JOIN information_schema.key_column_usage ku
ON tc.constraint_name = ku.constraint_name
WHERE tc.table_name = %s
AND tc.table_schema = %s
AND tc.constraint_type = 'PRIMARY KEY'
) pk ON pk.column_name = c.column_name
LEFT JOIN (
Expand All @@ -239,15 +298,17 @@ def extract_columns_info(cursor, table_name: str) -> Dict[str, Any]:
JOIN information_schema.key_column_usage ku
ON tc.constraint_name = ku.constraint_name
WHERE tc.table_name = %s
AND tc.table_schema = %s
AND tc.constraint_type = 'FOREIGN KEY'
) fk ON fk.column_name = c.column_name
LEFT JOIN pg_class pc ON pc.relname = c.table_name
LEFT JOIN pg_namespace pn ON pn.oid = pc.relnamespace AND pn.nspname = c.table_schema
LEFT JOIN pg_attribute pa ON pa.attrelid = pc.oid AND pa.attname = c.column_name
LEFT JOIN pg_description pgd ON pgd.objoid = pc.oid AND pgd.objsubid = pa.attnum
WHERE c.table_name = %s
AND c.table_schema = 'public'
AND c.table_schema = %s
ORDER BY c.ordinal_position;
""", (table_name, table_name, table_name))
""", (table_name, schema, table_name, schema, table_name, schema))

columns = cursor.fetchall()
columns_info = {}
Expand Down Expand Up @@ -289,13 +350,14 @@ def extract_columns_info(cursor, table_name: str) -> Dict[str, Any]:
return columns_info

@staticmethod
def extract_foreign_keys(cursor, table_name: str) -> List[Dict[str, str]]:
def extract_foreign_keys(cursor, table_name: str, schema: str = 'public') -> List[Dict[str, str]]:
"""
Extract foreign key information for a specific table.

Args:
cursor: Database cursor
table_name: Name of the table
schema: Database schema (default: 'public')

Returns:
List of foreign key dictionaries
Expand All @@ -315,8 +377,8 @@ def extract_foreign_keys(cursor, table_name: str) -> List[Dict[str, str]]:
AND ccu.table_schema = tc.table_schema
WHERE tc.constraint_type = 'FOREIGN KEY'
AND tc.table_name = %s
AND tc.table_schema = 'public';
""", (table_name,))
AND tc.table_schema = %s;
""", (table_name, schema))

foreign_keys = []
for constraint_name, column_name, foreign_table, foreign_column in cursor.fetchall():
Expand All @@ -330,12 +392,13 @@ def extract_foreign_keys(cursor, table_name: str) -> List[Dict[str, str]]:
return foreign_keys

@staticmethod
def extract_relationships(cursor) -> Dict[str, List[Dict[str, str]]]:
def extract_relationships(cursor, schema: str = 'public') -> Dict[str, List[Dict[str, str]]]:
"""
Extract all relationship information from the database.

Args:
cursor: Database cursor
schema: Database schema (default: 'public')

Returns:
Dict containing relationship information
Expand All @@ -355,9 +418,9 @@ def extract_relationships(cursor) -> Dict[str, List[Dict[str, str]]]:
ON ccu.constraint_name = tc.constraint_name
AND ccu.table_schema = tc.table_schema
WHERE tc.constraint_type = 'FOREIGN KEY'
AND tc.table_schema = 'public'
AND tc.table_schema = %s
ORDER BY tc.table_name, tc.constraint_name;
""")
""", (schema,))

relationships = {}
for (table_name, constraint_name, column_name,
Expand Down
28 changes: 28 additions & 0 deletions app/src/components/modals/DatabaseModal.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ const DatabaseModal = ({ open, onOpenChange }: DatabaseModalProps) => {
const [database, setDatabase] = useState("");
const [username, setUsername] = useState("");
const [password, setPassword] = useState("");
const [schema, setSchema] = useState("");
const [isConnecting, setIsConnecting] = useState(false);
const [connectionSteps, setConnectionSteps] = useState<ConnectionStep[]>([]);
const { refreshGraphs } = useDatabase();
Expand Down Expand Up @@ -94,6 +95,12 @@ const DatabaseModal = ({ open, onOpenChange }: DatabaseModalProps) => {
builtUrl.username = username;
builtUrl.password = password;
dbUrl = builtUrl.toString();

// Append schema option for PostgreSQL if provided
if (selectedDatabase === 'postgresql' && schema.trim()) {
const schemaOption = `options=-csearch_path%3D${encodeURIComponent(schema.trim())}`;
dbUrl += (dbUrl.includes('?') ? '&' : '?') + schemaOption;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

better to use:
const url = new URL(dbUrl);
url.searchParams.set('options', -csearch_path=${schema.trim()});
dbUrl = url.toString();

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indeed, fixed

}
}

// Make streaming request
Expand Down Expand Up @@ -175,6 +182,7 @@ const DatabaseModal = ({ open, onOpenChange }: DatabaseModalProps) => {
setDatabase("");
setUsername("");
setPassword("");
setSchema("");
setConnectionSteps([]);
}, 1000);
} else {
Expand Down Expand Up @@ -382,6 +390,26 @@ const DatabaseModal = ({ open, onOpenChange }: DatabaseModalProps) => {
className="bg-muted border-border focus-visible:ring-purple-500"
/>
</div>

{/* Schema field - PostgreSQL only */}
{selectedDatabase === 'postgresql' && (
<div className="space-y-2">
<Label htmlFor="schema" className="text-sm font-medium">
Schema <span className="text-muted-foreground font-normal">(optional)</span>
</Label>
<Input
id="schema"
data-testid="schema-input"
placeholder="public"
value={schema}
onChange={(e) => setSchema(e.target.value)}
className="bg-muted border-border"
/>
<p className="text-xs text-muted-foreground">
Leave empty to use the default 'public' schema
</p>
</div>
)}
</>
)}

Expand Down
56 changes: 54 additions & 2 deletions docs/postgres_loader.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ This loader connects to a PostgreSQL database and extracts the complete schema i
## Features

- **Complete Schema Extraction**: Retrieves all tables, columns, data types, constraints, and relationships
- **Custom Schema Support**: Configure which PostgreSQL schema to extract using the `search_path` option
- **Foreign Key Relationships**: Automatically discovers and maps foreign key relationships between tables
- **Column Metadata**: Extracts column comments, default values, nullability, and key types
- **Batch Processing**: Efficiently processes large schemas with progress tracking
Expand Down Expand Up @@ -90,6 +91,36 @@ postgresql://[username[:password]@][host[:port]][/database][?options]
- `postgresql://user:pass@example.com:5432/production_db`
- `postgresql://postgres@127.0.0.1/testdb`

### Custom Schema Configuration

By default, the loader extracts tables from the `public` schema. To use a different schema, add the `search_path` option to your connection URL using PostgreSQL's standard `options` parameter. More info here: https://www.postgresql.org/docs/18/runtime-config-client.html#GUC-SEARCH-PATH

**Format:**
postgresql://user:pass@host:port/database?options=-csearch_path%3Dschema_name

**Examples:**
Extract from 'sales' schema

postgresql://postgres:password@localhost:5432/mydb?options=-csearch_path%3Dsales

Extract from 'dbo' schema

postgresql://user:pass@host:5432/enterprise_db?options=-csearch_path%3Ddbo

Extract from 'inventory' schema

postgresql://admin:secret@192.168.1.100:5432/warehouse?options=-csearch_path%3Dinventory

**Notes:**
- The `%3D` is the URL-encoded form of `=`
- If `search_path` is not specified, the loader defaults to `public`
- The schema must exist and the user must have `USAGE` permission on it
- This follows PostgreSQL's native `search_path` configuration option

**Using the UI:**

When connecting via the QueryWeaver UI with "Manual Entry" mode for PostgreSQL, you can specify the schema in the optional "Schema" field. Leave it empty to use the default `public` schema.

### Integration with Graph Database

{% capture python_1 %}
Expand Down Expand Up @@ -222,7 +253,7 @@ PostgreSQL schema loaded successfully. Found 15 tables.
## Limitations

- Currently only supports PostgreSQL databases
- Extracts schema from the 'public' schema only
- Extracts schema from one schema at a time (defaults to 'public', configurable via `search_path`)
- Requires read permissions on information_schema and pg_* system tables
- Large schemas may take time to process due to embedding generation

Expand All @@ -232,8 +263,29 @@ PostgreSQL schema loaded successfully. Found 15 tables.

1. **Connection Failed**: Verify the connection URL format and database credentials
2. **Permission Denied**: Ensure the database user has read access to system tables
3. **Schema Not Found**: Check that tables exist in the 'public' schema
3. **No Tables Found**:
- Check that tables exist in the target schema
- If using a custom schema, verify the `search_path` option is correctly formatted
- Ensure the schema name is spelled correctly (schema names are case-sensitive)
4. **Graph Database Error**: Verify that the graph database is running and accessible
5. **Schema Permission Error**: Ensure the database user has `USAGE` permission on the target schema:
```sql
GRANT USAGE ON SCHEMA my_schema TO my_user;
GRANT SELECT ON ALL TABLES IN SCHEMA my_schema TO my_user;
```

### Verifying Schema Configuration

To verify which schema will be used, you can test the search_path parsing:

{% capture python_1 %}
from api.loaders.postgres_loader import PostgresLoader

# Test URL parsing
url = "postgresql://user:pass@localhost:5432/mydb?options=-csearch_path%3Dmy_schema"
schema = PostgresLoader._parse_schema_from_url(url)
print(f"Schema to be used: {schema}") # Output: my_schema
{% endcapture %}

### Debug Mode

Expand Down
Loading
Loading