@@ -81,16 +81,18 @@ def __init__(
8181 user_data_json = data .get ("user_data" , "{}" )
8282 bot_data_json = data .get ("bot_data" , "{}" )
8383 conversations_json = data .get ("conversations" , "{}" )
84- callback_data_json = data .get ("callback_data_json " , "" )
84+ callback_data_json = data .get ("callback_data " , "" )
8585
8686 self .logger .info ("Database loaded successfully!" )
8787
8888 # if it is a fresh setup we'll add some placeholder data so we
8989 # can perform `UPDATE` operations on it, cause SQL only allows
9090 # `UPDATE` operations if column have some data already present inside it.
9191 if not data :
92- insert_qry = "INSERT INTO persistence (data) VALUES (:jsondata)"
93- self ._session .execute (text (insert_qry ), {"jsondata" : "{}" })
92+ upsert_qry = """
93+ INSERT INTO persistence (data) VALUES (:jsondata)
94+ ON CONFLICT (id) DO UPDATE SET data = :jsondata"""
95+ self ._session .execute (text (upsert_qry ), {"jsondata" : "{}" })
9496 self ._session .commit ()
9597
9698 super ().__init__ (
@@ -108,16 +110,69 @@ def __init_database(self) -> None:
108110 """
109111 creates table for storing the data if table
110112 doesn't exist already inside database.
113+ runs schema migration if necessary.
111114 """
112- create_table_qry = """
113- CREATE TABLE IF NOT EXISTS persistence(
114- data json NOT NULL);"""
115- self ._session .execute (text (create_table_qry ))
116- self ._session .commit ()
115+ try :
116+ create_table_qry = """
117+ CREATE TABLE IF NOT EXISTS persistence(
118+ id INT PRIMARY KEY DEFAULT 1,
119+ data json NOT NULL,
120+ CONSTRAINT single_row CHECK (id = 1));"""
121+ self ._session .execute (text (create_table_qry ))
122+
123+ # Check if id column exists, is an integer type, and is a primary key
124+ check_column_qry = """
125+ SELECT 1 FROM information_schema.columns c
126+ JOIN information_schema.key_column_usage kcu
127+ ON c.table_name = kcu.table_name
128+ AND c.column_name = kcu.column_name
129+ AND c.table_schema = kcu.table_schema
130+ JOIN information_schema.table_constraints tc
131+ ON kcu.constraint_name = tc.constraint_name
132+ AND kcu.table_schema = tc.table_schema
133+ WHERE c.table_schema = current_schema()
134+ AND c.table_name = 'persistence'
135+ AND c.column_name = 'id'
136+ AND c.data_type IN ('integer', 'smallint', 'bigint')
137+ AND tc.constraint_type = 'PRIMARY KEY';"""
138+ column_valid = self ._session .execute (text (check_column_qry )).first () is not None
139+
140+ # If column exists, check if there's a valid row with id = 1
141+ data_valid = False
142+ if column_valid :
143+ check_data_qry = """
144+ SELECT 1 FROM persistence WHERE id = 1;"""
145+ data_valid = self ._session .execute (text (check_data_qry )).first () is not None
146+
147+ needs_migration = not (column_valid and data_valid )
148+
149+ if needs_migration :
150+ self .logger .info ("Old database schema detected. Running migration..." )
151+ migration_commands = [
152+ "ALTER TABLE persistence ADD COLUMN id INT;" ,
153+ """
154+ UPDATE persistence SET id = 1 WHERE ctid = (
155+ SELECT ctid FROM persistence LIMIT 1
156+ );""" ,
157+ "DELETE FROM persistence WHERE id IS NULL;" ,
158+ "ALTER TABLE persistence ALTER COLUMN id SET NOT NULL;" ,
159+ "ALTER TABLE persistence ADD PRIMARY KEY (id);" ,
160+ "ALTER TABLE persistence ADD CONSTRAINT single_row CHECK (id = 1);" ,
161+ ]
162+ for command in migration_commands :
163+ self ._session .execute (text (command ))
164+ self .logger .info ("Database migration successful!" )
165+
166+ self ._session .commit ()
167+ except Exception as excp : # pylint: disable=W0703
168+ self .logger .error (
169+ "Database initialization or migration failed!" ,
170+ exc_info = excp ,
171+ )
172+ self ._session .rollback ()
117173
118174 def _dump_into_json (self ) -> Any :
119175 """Dumps data into json format for inserting in db."""
120-
121176 to_dump = {
122177 "chat_data" : self .chat_data_json ,
123178 "user_data" : self .user_data_json ,
@@ -131,16 +186,18 @@ def _dump_into_json(self) -> Any:
131186 def _update_database (self ) -> None :
132187 self .logger .debug ("Updating database..." )
133188 try :
134- insert_qry = "UPDATE persistence SET data = :jsondata"
189+ upsert_qry = """
190+ INSERT INTO persistence (data) VALUES (:jsondata)
191+ ON CONFLICT (id) DO UPDATE SET data = :jsondata"""
135192 params = {"jsondata" : self ._dump_into_json ()}
136- self ._session .execute (text (insert_qry ), params )
193+ self ._session .execute (text (upsert_qry ), params )
137194 self ._session .commit ()
138195 except Exception as excp : # pylint: disable=W0703
139- self ._session .close ()
140196 self .logger .error (
141197 "Failed to save data in the database.\n Logging exception: " ,
142198 exc_info = excp ,
143199 )
200+ self ._session .rollback ()
144201
145202 async def update_conversation (
146203 self , name : str , key : Tuple [int , ...], new_state : Optional [object ]
0 commit comments