Skip to content

Commit 29dd70a

Browse files
Update client.py
1 parent 7a9bfd3 commit 29dd70a

File tree

1 file changed

+94
-85
lines changed

1 file changed

+94
-85
lines changed

supabase/_async/client.py

Lines changed: 94 additions & 85 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,12 @@
11
import asyncio
2+
import logging
23
import re
34
from typing import Any, Dict, List, Optional, Union
45

56
from gotrue import AsyncMemoryStorage
67
from gotrue.types import AuthChangeEvent, Session
78
from httpx import Timeout
8-
from postgrest import (
9-
AsyncPostgrestClient,
10-
AsyncRequestBuilder,
11-
AsyncRPCFilterRequestBuilder,
12-
)
9+
from postgrest import AsyncPostgrestClient, AsyncRequestBuilder, AsyncRPCFilterRequestBuilder
1310
from postgrest.constants import DEFAULT_POSTGREST_CLIENT_TIMEOUT
1411
from realtime import AsyncRealtimeChannel, AsyncRealtimeClient, RealtimeChannelOptions
1512
from storage3 import AsyncStorageClient
@@ -20,7 +17,6 @@
2017
from .auth_client import AsyncSupabaseAuthClient
2118

2219

23-
# Create an exception class when user does not provide a valid url or key.
2420
class SupabaseException(Exception):
2521
def __init__(self, message: str):
2622
self.message = message
@@ -54,11 +50,9 @@ def __init__(
5450
if not supabase_key:
5551
raise SupabaseException("supabase_key is required")
5652

57-
# Check if the url and key are valid
53+
# Validate the URL and key
5854
if not re.match(r"^(https?)://.+", supabase_url):
5955
raise SupabaseException("Invalid URL")
60-
61-
# Check if the key is a valid JWT
6256
if not re.match(
6357
r"^[A-Za-z0-9-_=]+\.[A-Za-z0-9-_=]+\.?[A-Za-z0-9-_.+/=]*$", supabase_key
6458
):
@@ -77,7 +71,7 @@ def __init__(
7771
self.storage_url = f"{supabase_url}/storage/v1"
7872
self.functions_url = f"{supabase_url}/functions/v1"
7973

80-
# Instantiate clients.
74+
# Instantiate clients
8175
self.auth = self._init_supabase_auth_client(
8276
auth_url=self.auth_url,
8377
client_options=options,
@@ -99,6 +93,22 @@ async def create(
9993
supabase_key: str,
10094
options: Optional[ClientOptions] = None,
10195
):
96+
"""Create a Supabase client instance.
97+
98+
Parameters
99+
----------
100+
supabase_url: str
101+
The URL to the Supabase instance that should be connected to.
102+
supabase_key: str
103+
The API key to the Supabase instance that should be connected to.
104+
**options
105+
Any extra settings to be optionally specified - also see the
106+
`DEFAULT_OPTIONS` dict.
107+
108+
Returns
109+
-------
110+
AsyncClient
111+
"""
102112
auth_header = options.headers.get("Authorization") if options else None
103113
client = cls(supabase_url, supabase_key, options)
104114

@@ -116,68 +126,44 @@ async def create(
116126
return client
117127

118128
def table(self, table_name: str) -> AsyncRequestBuilder:
119-
"""Perform a table operation.
120-
121-
Note that the supabase client uses the `from` method, but in Python,
122-
this is a reserved keyword, so we have elected to use the name `table`.
123-
Alternatively you can use the `.from_()` method.
124-
"""
129+
"""Perform a table operation."""
125130
return self.from_(table_name)
126131

127132
def schema(self, schema: str) -> AsyncPostgrestClient:
128-
"""Select a schema to query or perform an function (rpc) call.
129-
130-
The schema needs to be on the list of exposed schemas inside Supabase.
131-
"""
133+
"""Select a schema to query or perform a function (RPC) call."""
132134
if self.options.schema != schema:
133135
self.options.schema = schema
134136
if self._postgrest:
135137
self._postgrest.schema(schema)
136138
return self.postgrest
137139

138140
def from_(self, table_name: str) -> AsyncRequestBuilder:
139-
"""Perform a table operation.
140-
141-
See the `table` method.
142-
"""
141+
"""Perform a table operation."""
143142
return self.postgrest.from_(table_name)
144143

145144
def rpc(
146145
self, fn: str, params: Optional[Dict[Any, Any]] = None
147146
) -> AsyncRPCFilterRequestBuilder:
148-
"""Performs a stored procedure call.
149-
150-
Parameters
151-
----------
152-
fn : callable
153-
The stored procedure call to be executed.
154-
params : dict of any
155-
Parameters passed into the stored procedure call.
156-
157-
Returns
158-
-------
159-
SyncFilterRequestBuilder
160-
Returns a filter builder. This lets you apply filters on the response
161-
of an RPC.
162-
"""
147+
"""Performs a stored procedure call."""
163148
if params is None:
164149
params = {}
165150
return self.postgrest.rpc(fn, params)
166151

167152
@property
168153
def postgrest(self):
154+
"""PostgREST client property."""
169155
if self._postgrest is None:
170156
self._postgrest = self._init_postgrest_client(
171157
rest_url=self.rest_url,
172158
headers=self.options.headers,
173159
schema=self.options.schema,
174160
timeout=self.options.postgrest_client_timeout,
175161
)
176-
177162
return self._postgrest
178163

179164
@property
180165
def storage(self):
166+
"""Storage client property."""
181167
if self._storage is None:
182168
self._storage = self._init_storage_client(
183169
storage_url=self.storage_url,
@@ -188,6 +174,7 @@ def storage(self):
188174

189175
@property
190176
def functions(self):
177+
"""Functions client property."""
191178
if self._functions is None:
192179
self._functions = AsyncFunctionsClient(
193180
self.functions_url,
@@ -207,20 +194,80 @@ def get_channels(self) -> List[AsyncRealtimeChannel]:
207194
return self.realtime.get_channels()
208195

209196
async def remove_channel(self, channel: AsyncRealtimeChannel) -> None:
210-
"""Unsubscribes and removes Realtime channel from Realtime client."""
197+
"""Unsubscribes and removes a Realtime channel from the Realtime client."""
211198
await self.realtime.remove_channel(channel)
212199

213200
async def remove_all_channels(self) -> None:
214-
"""Unsubscribes and removes all Realtime channels from Realtime client."""
201+
"""Unsubscribes and removes all Realtime channels from the Realtime client."""
215202
await self.realtime.remove_all_channels()
216203

204+
def _create_auth_header(self, token: str):
205+
"""Creates an authorization header."""
206+
return f"Bearer {token}"
207+
208+
def _get_auth_headers(self, authorization: Optional[str] = None) -> Dict[str, str]:
209+
"""Helper method to get auth headers."""
210+
if authorization is None:
211+
authorization = self.options.headers.get(
212+
"Authorization", self._create_auth_header(self.supabase_key)
213+
)
214+
return {
215+
"apiKey": self.supabase_key,
216+
"Authorization": authorization,
217+
}
218+
219+
def _listen_to_auth_events(
220+
self, event: AuthChangeEvent, session: Optional[Session]
221+
):
222+
"""Listens to authentication state changes."""
223+
access_token = self.supabase_key
224+
if event in ["SIGNED_IN", "TOKEN_REFRESHED", "SIGNED_OUT"]:
225+
self._postgrest = None
226+
self._storage = None
227+
self._functions = None
228+
access_token = session.access_token if session else self.supabase_key
229+
230+
self.options.headers["Authorization"] = self._create_auth_header(access_token)
231+
asyncio.create_task(self.realtime.set_auth(access_token))
232+
233+
async def connect_to_realtime(self):
234+
"""Connect to Supabase Realtime service and handle disconnections with retries."""
235+
try:
236+
await self.realtime.connect()
237+
logging.info("Connected to Supabase realtime successfully.")
238+
except Exception as e:
239+
logging.error(f"Connection to Supabase realtime failed: {e}")
240+
await self._retry_realtime_connection()
241+
242+
async def _retry_realtime_connection(self):
243+
"""Retries the connection to the Realtime service with exponential backoff."""
244+
retries = 0
245+
max_retries = 5
246+
base_delay = 2
247+
248+
while retries < max_retries:
249+
try:
250+
await asyncio.sleep(base_delay * (2 ** retries)) # Exponential backoff
251+
await self.realtime.connect()
252+
logging.info("Reconnected to Supabase realtime.")
253+
return
254+
except Exception as e:
255+
retries += 1
256+
logging.error(f"Retry {retries} failed: {e}")
257+
258+
logging.error("Max retries reached, could not reconnect to Supabase realtime.")
259+
217260
@staticmethod
218261
def _init_realtime_client(
219262
realtime_url: str, supabase_key: str, options: Optional[Dict[str, Any]] = None
220263
) -> AsyncRealtimeClient:
264+
"""Private method for creating an instance of the realtime client."""
221265
if options is None:
222266
options = {}
223-
"""Private method for creating an instance of the realtime-py client."""
267+
268+
# Configure connection options if needed
269+
options['timeout'] = 30 # Example timeout setting, adjust as needed
270+
224271
return AsyncRealtimeClient(realtime_url, token=supabase_key, **options)
225272

226273
@staticmethod
@@ -231,6 +278,7 @@ def _init_storage_client(
231278
verify: bool = True,
232279
proxy: Optional[str] = None,
233280
) -> AsyncStorageClient:
281+
"""Initializes the storage client."""
234282
return AsyncStorageClient(
235283
storage_url, headers, storage_client_timeout, verify, proxy
236284
)
@@ -263,7 +311,7 @@ def _init_postgrest_client(
263311
verify: bool = True,
264312
proxy: Optional[str] = None,
265313
) -> AsyncPostgrestClient:
266-
"""Private helper for creating an instance of the Postgrest client."""
314+
"""Initializes the PostgREST client."""
267315
return AsyncPostgrestClient(
268316
rest_url,
269317
headers=headers,
@@ -273,42 +321,13 @@ def _init_postgrest_client(
273321
proxy=proxy,
274322
)
275323

276-
def _create_auth_header(self, token: str):
277-
return f"Bearer {token}"
278-
279-
def _get_auth_headers(self, authorization: Optional[str] = None) -> Dict[str, str]:
280-
if authorization is None:
281-
authorization = self.options.headers.get(
282-
"Authorization", self._create_auth_header(self.supabase_key)
283-
)
284-
285-
"""Helper method to get auth headers."""
286-
return {
287-
"apiKey": self.supabase_key,
288-
"Authorization": authorization,
289-
}
290-
291-
def _listen_to_auth_events(
292-
self, event: AuthChangeEvent, session: Optional[Session]
293-
):
294-
access_token = self.supabase_key
295-
if event in ["SIGNED_IN", "TOKEN_REFRESHED", "SIGNED_OUT"]:
296-
# reset postgrest and storage instance on event change
297-
self._postgrest = None
298-
self._storage = None
299-
self._functions = None
300-
access_token = session.access_token if session else self.supabase_key
301-
302-
self.options.headers["Authorization"] = self._create_auth_header(access_token)
303-
asyncio.create_task(self.realtime.set_auth(access_token))
304-
305324

306325
async def create_client(
307326
supabase_url: str,
308327
supabase_key: str,
309328
options: Optional[ClientOptions] = None,
310329
) -> AsyncClient:
311-
"""Create client function to instantiate supabase client like JS runtime.
330+
"""Create client function to instantiate supabase client.
312331
313332
Parameters
314333
----------
@@ -320,19 +339,9 @@ async def create_client(
320339
Any extra settings to be optionally specified - also see the
321340
`DEFAULT_OPTIONS` dict.
322341
323-
Examples
324-
--------
325-
Instantiating the client.
326-
>>> import os
327-
>>> from supabase import create_client, Client
328-
>>>
329-
>>> url: str = os.environ.get("SUPABASE_TEST_URL")
330-
>>> key: str = os.environ.get("SUPABASE_TEST_KEY")
331-
>>> supabase: Client = create_client(url, key)
332-
333342
Returns
334343
-------
335-
Client
344+
AsyncClient
336345
"""
337346
return await AsyncClient.create(
338347
supabase_url=supabase_url, supabase_key=supabase_key, options=options

0 commit comments

Comments
 (0)