-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathschema_creator.py
More file actions
47 lines (39 loc) · 1.03 KB
/
schema_creator.py
File metadata and controls
47 lines (39 loc) · 1.03 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
import psycopg2
from app.core.config import settings
ALLOWED_TYPES = {
"text": "TEXT",
"string": "TEXT",
"int": "INTEGER",
"integer": "INTEGER",
"number": "REAL",
"float": "REAL",
"timestamp": "TIMESTAMP",
"date": "TIMESTAMP",
}
def create_table(table_name: str, columns: list[dict]):
cols_sql = []
for col in columns:
if col["type"] not in ALLOWED_TYPES:
raise ValueError(f"Unsupported column type: {col['type']}")
cols_sql.append(
f"{col['name']} {ALLOWED_TYPES[col['type']]}"
)
sql = f"""
CREATE TABLE IF NOT EXISTS {table_name} (
id SERIAL PRIMARY KEY,
{", ".join(cols_sql)}
);
"""
conn = psycopg2.connect(
host=settings.DB_HOST,
port=settings.DB_PORT,
dbname=settings.DB_NAME,
user=settings.DB_USER,
password=settings.DB_PASSWORD,
)
try:
with conn.cursor() as cursor:
cursor.execute(sql)
conn.commit()
finally:
conn.close()