Skip to content

Commit 95b1dd3

Browse files
committed
Enhance serialization functions in utils.py for better data handling:
- Introduced new helper functions `_stringify_value` and `_prepare_for_serialization` to handle complex data types, including sumtypes and Pubkeys, ensuring compatibility with Arrow. - Updated `serialize_perp_market` and `serialize_spot_market` functions to utilize these new helpers, improving the serialization process for market data. - Added error handling for missing keys during data preparation and ensured final conversion of object columns to strings for better compatibility. - Enhanced overall data integrity and clarity in the serialized output, facilitating easier data manipulation and analysis.
1 parent 54bffe5 commit 95b1dd3

File tree

1 file changed

+84
-23
lines changed

1 file changed

+84
-23
lines changed

src/utils.py

Lines changed: 84 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
11
import datetime
22
import os
3-
from typing import Optional
3+
from typing import Optional, Any, Union
44
import pandas as pd
5+
import copy # Added for deepcopy
56

67
import requests
78
from driftpy.decode.utils import decode_name
@@ -19,10 +20,37 @@
1920
WebsocketConfig as UserMapWebsocketConfig,
2021
)
2122
from driftpy.user_map.userstats_map import UserStatsMap
23+
from solders.pubkey import Pubkey # Import Pubkey
2224

2325
from dotenv import load_dotenv
2426
load_dotenv()
2527

28+
# Helper function to stringify sumtypes, Pubkeys, and other problematic types
29+
def _stringify_value(value: Any) -> Any:
30+
if hasattr(value, 'kind') and isinstance(value.kind, str):
31+
# Common pattern for driftpy sumtypes (e.g., MarketType, OracleSource)
32+
return value.kind
33+
elif (hasattr(value, '__class__') and
34+
hasattr(value.__class__, '__module__') and
35+
'sumtypes' in value.__class__.__module__):
36+
# Another pattern for sumtypes (e.g., MarketStatus)
37+
return value.__class__.__name__
38+
elif isinstance(value, Pubkey):
39+
return str(value)
40+
elif isinstance(value, list):
41+
return [_stringify_value(item) for item in value]
42+
elif isinstance(value, dict):
43+
return {k: _stringify_value(v) for k, v in value.items()}
44+
# Add other specific type checks if needed, e.g., for specific complex objects
45+
# that are not sumtypes but still cause issues with Arrow.
46+
return value
47+
48+
def _prepare_for_serialization(obj_dict: dict) -> dict:
49+
prepared_dict = {}
50+
for key, value in obj_dict.items():
51+
prepared_dict[key] = _stringify_value(value)
52+
return prepared_dict
53+
2654
def to_financial(num):
2755
num_str = str(num)
2856
decimal_pos = num_str.find(".")
@@ -260,61 +288,94 @@ def human_amm_df(df):
260288

261289

262290
def serialize_perp_market(market: PerpMarketAccount):
263-
264-
market_df = pd.json_normalize(market.__dict__).drop(['amm', 'insurance_claim', 'pnl_pool'],axis=1).pipe(human_market_df)
265-
market_df['pubkey'] = str(market.pubkey)
266-
market_df['name'] = decode_name(market.name)
291+
# Prepare market data by stringifying sumtypes and Pubkeys
292+
market_dict_prepared = _prepare_for_serialization(copy.deepcopy(market.__dict__))
293+
amm_dict_prepared = _prepare_for_serialization(copy.deepcopy(market.amm.__dict__))
294+
hist_oracle_data_prepared = _prepare_for_serialization(copy.deepcopy(market.amm.historical_oracle_data.__dict__))
295+
fee_pool_prepared = _prepare_for_serialization(copy.deepcopy(market.amm.fee_pool.__dict__))
296+
insurance_claim_prepared = _prepare_for_serialization(copy.deepcopy(market.insurance_claim.__dict__))
297+
pnl_pool_prepared = _prepare_for_serialization(copy.deepcopy(market.pnl_pool.__dict__))
298+
299+
market_df = pd.json_normalize(market_dict_prepared).drop(['amm', 'insurance_claim', 'pnl_pool'],axis=1, errors='ignore').pipe(human_market_df)
300+
# 'name' is bytes, decode_name handles it; 'pubkey' is already stringified by _prepare_for_serialization if it was a Pubkey object
301+
if 'name' in market_df.columns and market_dict_prepared.get('name'): # Check if name exists before decoding
302+
market_df['name'] = decode_name(market_dict_prepared['name']) # Use original bytes for decode_name
267303
market_df.columns = ['market.'+col for col in market_df.columns]
268304

269-
amm_df= pd.json_normalize(market.amm.__dict__).drop(['historical_oracle_data', 'fee_pool'],axis=1).pipe(human_amm_df)
305+
amm_df= pd.json_normalize(amm_dict_prepared).drop(['historical_oracle_data', 'fee_pool'],axis=1, errors='ignore').pipe(human_amm_df)
270306
amm_df.columns = ['market.amm.'+col for col in amm_df.columns]
271307

272-
amm_hist_oracle_df= pd.json_normalize(market.amm.historical_oracle_data.__dict__).pipe(human_amm_df)
308+
amm_hist_oracle_df= pd.json_normalize(hist_oracle_data_prepared).pipe(human_amm_df)
273309
amm_hist_oracle_df.columns = ['market.amm.historical_oracle_data.'+col for col in amm_hist_oracle_df.columns]
274310

275-
market_amm_pool_df = pd.json_normalize(market.amm.fee_pool.__dict__).pipe(human_amm_df)
311+
market_amm_pool_df = pd.json_normalize(fee_pool_prepared).pipe(human_amm_df)
276312
market_amm_pool_df.columns = ['market.amm.fee_pool.'+col for col in market_amm_pool_df.columns]
277313

278-
market_if_df = pd.json_normalize(market.insurance_claim.__dict__).pipe(human_market_df)
314+
market_if_df = pd.json_normalize(insurance_claim_prepared).pipe(human_market_df)
279315
market_if_df.columns = ['market.insurance_claim.'+col for col in market_if_df.columns]
280316

281-
market_pool_df = pd.json_normalize(market.pnl_pool.__dict__).pipe(human_amm_df)
317+
market_pool_df = pd.json_normalize(pnl_pool_prepared).pipe(human_amm_df)
282318
market_pool_df.columns = ['market.pnl_pool.'+col for col in market_pool_df.columns]
283319

284320
result_df = pd.concat([market_df, amm_df, amm_hist_oracle_df, market_amm_pool_df, market_if_df, market_pool_df],axis=1)
321+
322+
# Final conversion of object columns to string for Arrow compatibility
323+
for col in result_df.columns:
324+
if result_df[col].dtype == 'object':
325+
try:
326+
result_df[col] = result_df[col].astype(str)
327+
except Exception:
328+
# Fallback if astype(str) fails for any reason on a column
329+
result_df[col] = result_df[col].apply(lambda x: str(x) if pd.notnull(x) else x)
285330
return result_df
286331

287332

288333
def serialize_spot_market(spot_market: SpotMarketAccount):
289-
spot_market_df = pd.json_normalize(spot_market.__dict__).drop([
334+
# Prepare spot_market data
335+
spot_market_dict_prepared = _prepare_for_serialization(copy.deepcopy(spot_market.__dict__))
336+
insurance_fund_prepared = _prepare_for_serialization(copy.deepcopy(spot_market.insurance_fund.__dict__))
337+
hist_oracle_data_prepared = _prepare_for_serialization(copy.deepcopy(spot_market.historical_oracle_data.__dict__))
338+
hist_index_data_prepared = _prepare_for_serialization(copy.deepcopy(spot_market.historical_index_data.__dict__))
339+
revenue_pool_prepared = _prepare_for_serialization(copy.deepcopy(spot_market.revenue_pool.__dict__))
340+
spot_fee_pool_prepared = _prepare_for_serialization(copy.deepcopy(spot_market.spot_fee_pool.__dict__))
341+
342+
spot_market_df = pd.json_normalize(spot_market_dict_prepared).drop([
290343
'historical_oracle_data', 'historical_index_data',
291-
'insurance_fund', # todo
344+
'insurance_fund',
292345
'spot_fee_pool', 'revenue_pool'
293-
], axis=1).pipe(human_amm_df)
294-
spot_market_df['name'] = decode_name(spot_market.name)
295-
spot_market_df['pubkey'] = str(spot_market.pubkey)
296-
spot_market_df['oracle'] = str(spot_market.oracle)
297-
spot_market_df['mint'] = str(spot_market.mint)
298-
spot_market_df['vault'] = str(spot_market.vault)
346+
], axis=1, errors='ignore').pipe(human_amm_df) # Note: using human_amm_df as per original
347+
348+
# 'name' is bytes, decode_name handles it. Other Pubkey fields are stringified.
349+
if 'name' in spot_market_df.columns and spot_market_dict_prepared.get('name'):
350+
spot_market_df['name'] = decode_name(spot_market_dict_prepared['name']) # Use original bytes
299351

300352
spot_market_df.columns = ['spot_market.'+col for col in spot_market_df.columns]
301353

302-
if_df= pd.json_normalize(spot_market.insurance_fund.__dict__).pipe(human_amm_df)
354+
if_df= pd.json_normalize(insurance_fund_prepared).pipe(human_amm_df)
303355
if_df.columns = ['spot_market.insurance_fund.'+col for col in if_df.columns]
304356

305-
hist_oracle_df= pd.json_normalize(spot_market.historical_oracle_data.__dict__).pipe(human_amm_df)
357+
hist_oracle_df= pd.json_normalize(hist_oracle_data_prepared).pipe(human_amm_df)
306358
hist_oracle_df.columns = ['spot_market.historical_oracle_data.'+col for col in hist_oracle_df.columns]
307359

308-
hist_index_df= pd.json_normalize(spot_market.historical_index_data.__dict__).pipe(human_amm_df)
360+
hist_index_df= pd.json_normalize(hist_index_data_prepared).pipe(human_amm_df)
309361
hist_index_df.columns = ['spot_market.historical_index_data.'+col for col in hist_index_df.columns]
310362

311363

312-
market_pool_df = pd.json_normalize(spot_market.revenue_pool.__dict__).pipe(human_amm_df)
364+
market_pool_df = pd.json_normalize(revenue_pool_prepared).pipe(human_amm_df)
313365
market_pool_df.columns = ['spot_market.revenue_pool.'+col for col in market_pool_df.columns]
314366

315367

316-
market_fee_df = pd.json_normalize(spot_market.spot_fee_pool.__dict__).pipe(human_amm_df)
368+
market_fee_df = pd.json_normalize(spot_fee_pool_prepared).pipe(human_amm_df)
317369
market_fee_df.columns = ['spot_market.spot_fee_pool.'+col for col in market_fee_df.columns]
318370

319371
result_df = pd.concat([spot_market_df, if_df, hist_oracle_df, hist_index_df, market_pool_df, market_fee_df],axis=1)
372+
373+
# Final conversion of object columns to string for Arrow compatibility
374+
for col in result_df.columns:
375+
if result_df[col].dtype == 'object':
376+
try:
377+
result_df[col] = result_df[col].astype(str)
378+
except Exception:
379+
# Fallback if astype(str) fails for any reason on a column
380+
result_df[col] = result_df[col].apply(lambda x: str(x) if pd.notnull(x) else x)
320381
return result_df

0 commit comments

Comments
 (0)