@@ -65,18 +65,25 @@ def log_current_identity():
65
65
REGION = os .environ .get ("AWS_REGION" , "eu-west-1" )
66
66
S3_OUTPUT = os .environ .get ("ATHENA_S3_OUTPUT" , "s3://mainnet-beta-data-ingestion-bucket/athena/" )
67
67
68
+ class UserVolumeDetails (BaseModel ):
69
+ user_address : str
70
+ initial_selected_market_volume : float = 0.0
71
+ initial_other_market_volume : float = 0.0
72
+ selected_market_volume_14d : float = 0.0
73
+ other_market_volume_14d : float = 0.0
74
+ selected_market_volume_28d : float = 0.0
75
+ other_market_volume_28d : float = 0.0
76
+
68
77
class RetentionExplorerItem (BaseModel ):
69
78
market : str
70
79
category : List [str ]
71
80
start_date : str
72
- new_traders : int
73
- new_traders_list : List [ str ]
74
- retained_users_14d : int
81
+ new_traders_count : int
82
+ retained_14d_count : int
83
+ retained_28d_count : int
75
84
retention_ratio_14d : float
76
- retained_users_14d_list : List [str ]
77
- retained_users_28d : int
78
85
retention_ratio_28d : float
79
- retained_users_28d_list : List [str ]
86
+ user_data : List [UserVolumeDetails ]
80
87
81
88
UTC = tz .tzutc ()
82
89
@@ -98,42 +105,93 @@ def partition_pred(parts: Set[Tuple[str, str, str]]) -> str:
98
105
def sql_new_traders (mkt_idx : int , start_dt : datetime ) -> str :
99
106
parts = partition_pred (partition_tuples (start_dt , NEW_TRADER_WINDOW_DAYS ))
100
107
return f"""
101
- SELECT "user",
102
- MIN(slot) AS first_slot,
103
- MIN(ts) AS first_ts
104
- FROM eventtype_orderrecord
105
- WHERE ({ parts } )
106
- AND "order".marketindex = { mkt_idx }
107
- AND ("order".orderid = 0 OR "order".orderid = 1)
108
- GROUP BY "user"
108
+ WITH potential_new_traders AS (
109
+ -- First, find users who made their first trade in the target market within the window
110
+ SELECT "user", MIN(ts) as first_ts
111
+ FROM eventtype_orderrecord
112
+ WHERE ({ parts } )
113
+ AND "order".marketindex = { mkt_idx }
114
+ GROUP BY "user"
115
+ ),
116
+ all_trades AS (
117
+ -- Get all trades for these potential new traders across all time
118
+ -- This is needed to ensure the trade in the window was truly their first one
119
+ SELECT "user", MIN(ts) as first_ever_ts
120
+ FROM eventtype_orderrecord
121
+ WHERE "user" IN (SELECT "user" FROM potential_new_traders)
122
+ GROUP BY "user"
123
+ ),
124
+ true_new_traders AS (
125
+ -- Filter to users whose first-ever trade matches their first trade in the window
126
+ SELECT pnt."user"
127
+ FROM potential_new_traders pnt
128
+ JOIN all_trades at ON pnt."user" = at."user" AND pnt.first_ts = at.first_ever_ts
129
+ )
130
+ -- Finally, join with newuserrecord to ensure it's their first subaccount (subaccountid=0)
131
+ SELECT tnt."user"
132
+ FROM true_new_traders tnt
133
+ JOIN eventtype_newuserrecord nur ON tnt."user" = nur."user"
134
+ WHERE nur.subaccountid = 0
135
+ """
136
+
137
+ # --- New Helper Function for SQL ---
138
+ # Define a scaling factor for quote asset amounts, assuming 6 decimal places (e.g., USDC)
139
+ QUOTE_ASSET_SCALE_FACTOR = 1e6
140
+
141
+ def sql_users_volume (
142
+ users : List [str ],
143
+ start_dt : datetime ,
144
+ end_dt : datetime ,
145
+ partition_func : callable ,
146
+ market_index : Optional [int ] = None ,
147
+ exclude_market_index : Optional [int ] = None
148
+ ) -> str :
109
149
"""
150
+ Generates SQL to calculate the trading volume for a list of users in a given time period.
151
+ Volume is calculated from `quoteassetamountfilled` in `eventtype_traderecord`.
152
+ """
153
+ if not users :
154
+ return ""
155
+
156
+ user_list = "', '" .join (users )
157
+
158
+ market_filter = ""
159
+ if market_index is not None :
160
+ market_filter = f"AND marketindex = { market_index } "
161
+ if exclude_market_index is not None :
162
+ market_filter = f"AND marketindex <> { exclude_market_index } "
163
+
164
+ start_ts = int (start_dt .timestamp ())
165
+ # end_dt is exclusive, so we don't subtract one day
166
+ end_ts = int (end_dt .timestamp ())
110
167
111
- def sql_retention_users_chunk (traders : List [str ],
112
- mkt_idx : int ,
113
- chunk_start : datetime ,
114
- chunk_days : int ) -> str :
115
- chunk_end = chunk_start + timedelta (days = chunk_days )
116
- start_ts = int (chunk_start .timestamp ())
117
- end_ts = int (chunk_end .timestamp ())
118
- from_date = chunk_start .strftime ('%Y%m%d' )
119
- to_date = chunk_end .strftime ('%Y%m%d' )
120
- trader_list = "', '" .join (traders )
121
-
122
- return f'''
123
- WITH time_range AS (
124
- SELECT
125
- { start_ts } AS from_ts,
126
- { end_ts } AS to_ts,
127
- '{ from_date } ' AS from_date,
128
- '{ to_date } ' AS to_date
168
+ # Use partitions for traderecord based on the provided function
169
+ num_days = (end_dt - start_dt ).days
170
+ parts = partition_func (partition_tuples (start_dt , num_days ))
171
+
172
+ # This query sums up volume for users as both takers and makers.
173
+ return f"""
174
+ WITH user_trades AS (
175
+ SELECT
176
+ taker AS user,
177
+ CAST(quoteassetamountfilled AS DOUBLE) / { QUOTE_ASSET_SCALE_FACTOR } AS quote_volume
178
+ FROM eventtype_traderecord
179
+ WHERE ({ parts } ) AND CAST(ts AS BIGINT) >= { start_ts } AND CAST(ts AS BIGINT) < { end_ts }
180
+ AND taker IN ('{ user_list } ') { market_filter }
181
+ UNION ALL
182
+ SELECT
183
+ maker AS user,
184
+ CAST(quoteassetamountfilled AS DOUBLE) / { QUOTE_ASSET_SCALE_FACTOR } AS quote_volume
185
+ FROM eventtype_traderecord
186
+ WHERE ({ parts } ) AND CAST(ts AS BIGINT) >= { start_ts } AND CAST(ts AS BIGINT) < { end_ts }
187
+ AND maker IN ('{ user_list } ') { market_filter }
129
188
)
130
- SELECT DISTINCT "user"
131
- FROM eventtype_orderrecord, time_range
132
- WHERE CAST(ts AS INT) BETWEEN time_range.from_ts AND time_range.to_ts
133
- AND CONCAT(year, month, day) BETWEEN time_range.from_date AND time_range.to_date
134
- AND "order".marketindex <> { mkt_idx }
135
- AND "user" IN ('{ trader_list } ')
136
- '''
189
+ SELECT
190
+ user,
191
+ SUM(quote_volume) AS total_volume
192
+ FROM user_trades
193
+ GROUP BY user
194
+ """
137
195
138
196
async def calculate_retention_for_market (market_name : str , start_date_str : str ) -> Dict [str , Any ]:
139
197
conn = None
@@ -143,61 +201,77 @@ async def calculate_retention_for_market(market_name: str, start_date_str: str)
143
201
if not market_config :
144
202
raise HTTPException (status_code = 404 , detail = f"Market '{ market_name } ' not found." )
145
203
204
+ mkt_idx = market_config ["index" ]
205
+
146
206
logger .info (f"Connecting to Athena. S3 staging: { S3_OUTPUT } , Region: { REGION } , DB: { DATABASE } " )
147
207
conn = connect (s3_staging_dir = S3_OUTPUT , region_name = REGION , schema_name = DATABASE )
148
208
logger .info ("Successfully connected to Athena." )
149
209
log_current_identity ()
150
210
151
- # 1. Find new traders for the given market and date
211
+ # 1. Find new traders
152
212
logger .info (f"Scanning for new traders for { market_name } from { start_date_str } ..." )
153
- q_new_traders = sql_new_traders (market_config [ "index" ] , start_date )
213
+ q_new_traders = sql_new_traders (mkt_idx , start_date )
154
214
new_traders_df = pd .read_sql (q_new_traders , conn )
155
- logger .info (f"Found { len (new_traders_df )} new traders for { market_name } ." )
156
-
157
215
mkt_traders = new_traders_df ["user" ].tolist ()
158
216
new_traders_count = len (mkt_traders )
217
+
218
+ if not mkt_traders :
219
+ return {
220
+ "market" : market_name , "category" : market_config .get ("category" , []), "start_date" : start_date_str ,
221
+ "new_traders_count" : 0 , "retained_14d_count" : 0 , "retained_28d_count" : 0 ,
222
+ "retention_ratio_14d" : 0.0 , "retention_ratio_28d" : 0.0 , "user_data" : []
223
+ }
224
+
225
+ # --- Volume Calculation ---
226
+ traders_df = pd .DataFrame (mkt_traders , columns = ['user' ])
227
+
228
+ # Define time windows
229
+ initial_window_end = start_date + timedelta (days = NEW_TRADER_WINDOW_DAYS )
230
+ retention_14d_end = initial_window_end + timedelta (days = 14 )
231
+ retention_28d_end = initial_window_end + timedelta (days = 28 )
232
+
233
+ def get_and_merge_volume (
234
+ base_df : pd .DataFrame , column_name : str , start_dt : datetime , end_dt : datetime ,
235
+ m_idx : Optional [int ] = None , exclude_m_idx : Optional [int ] = None
236
+ ) -> pd .DataFrame :
237
+ logger .info (f"Calculating volume for '{ column_name } '..." )
238
+ vol_sql = sql_users_volume (mkt_traders , start_dt , end_dt , partition_pred , m_idx , exclude_m_idx )
239
+ if not vol_sql : return base_df .assign (** {column_name : 0.0 })
240
+
241
+ vol_df = pd .read_sql (vol_sql , conn )
242
+ merged_df = base_df .merge (vol_df , on = 'user' , how = 'left' )
243
+ merged_df ['total_volume' ] = merged_df ['total_volume' ].fillna (0 )
244
+ return merged_df .rename (columns = {'total_volume' : column_name })
245
+
246
+ # Calculate all volume metrics
247
+ traders_df = get_and_merge_volume (traders_df , 'initial_selected_market_volume' , start_date , initial_window_end , m_idx = mkt_idx )
248
+ traders_df = get_and_merge_volume (traders_df , 'initial_other_market_volume' , start_date , initial_window_end , exclude_m_idx = mkt_idx )
249
+ traders_df = get_and_merge_volume (traders_df , 'selected_market_volume_14d' , initial_window_end , retention_14d_end , m_idx = mkt_idx )
250
+ traders_df = get_and_merge_volume (traders_df , 'other_market_volume_14d' , initial_window_end , retention_14d_end , exclude_m_idx = mkt_idx )
251
+ traders_df = get_and_merge_volume (traders_df , 'selected_market_volume_28d' , initial_window_end , retention_28d_end , m_idx = mkt_idx )
252
+ traders_df = get_and_merge_volume (traders_df , 'other_market_volume_28d' , initial_window_end , retention_28d_end , exclude_m_idx = mkt_idx )
159
253
254
+ traders_df = traders_df .rename (columns = {'user' : 'user_address' })
255
+
256
+ # --- Summary Statistics ---
257
+ retained_14d_count = int ((traders_df ['other_market_volume_14d' ] > 0 ).sum ())
258
+ retained_28d_count = int ((traders_df ['other_market_volume_28d' ] > 0 ).sum ())
259
+ retention_ratio_14d = (retained_14d_count / new_traders_count ) if new_traders_count > 0 else 0.0
260
+ retention_ratio_28d = (retained_28d_count / new_traders_count ) if new_traders_count > 0 else 0.0
261
+
160
262
result = {
161
263
"market" : market_name ,
162
264
"category" : market_config .get ("category" , []),
163
265
"start_date" : start_date_str ,
164
- "new_traders" : new_traders_count ,
165
- "new_traders_list" : mkt_traders ,
266
+ "new_traders_count" : new_traders_count ,
267
+ "retained_14d_count" : retained_14d_count ,
268
+ "retained_28d_count" : retained_28d_count ,
269
+ "retention_ratio_14d" : round (retention_ratio_14d , 4 ),
270
+ "retention_ratio_28d" : round (retention_ratio_28d , 4 ),
271
+ "user_data" : traders_df .to_dict ('records' )
166
272
}
167
-
168
- # 2. Calculate retention for each window
169
- if not mkt_traders :
170
- for win in RETENTION_WINDOWS_DAYS :
171
- result [f"retained_users_{ win } d" ] = 0
172
- result [f"retention_ratio_{ win } d" ] = 0.0
173
- result [f"retained_users_{ win } d_list" ] = []
174
- return result
175
-
176
- retention_period_start_dt = start_date
177
- for win in RETENTION_WINDOWS_DAYS :
178
- offset = 0
179
- retained_set : Set [str ] = set ()
180
-
181
- while offset < win :
182
- chunk_start_dt = retention_period_start_dt + timedelta (days = offset )
183
- span = min (CHUNK_DAYS , win - offset )
184
- if span <= 0 : break
185
-
186
- logger .info (f"Fetching retention for { market_name } , window { win } d, chunk: { chunk_start_dt .strftime ('%Y-%m-%d' )} for { span } days" )
187
- q_retention_chunk = sql_retention_users_chunk (mkt_traders , market_config ["index" ], chunk_start_dt , span )
188
- retained_users_df = pd .read_sql (q_retention_chunk , conn )
189
- retained_set .update (retained_users_df ["user" ].tolist ())
190
- offset += CHUNK_DAYS
191
-
192
- retained_list = sorted (list (retained_set ))
193
- retained_count = len (retained_list )
194
- retention_ratio = (retained_count / new_traders_count ) if new_traders_count > 0 else 0.0
195
-
196
- result [f"retained_users_{ win } d" ] = retained_count
197
- result [f"retention_ratio_{ win } d" ] = round (retention_ratio , 4 )
198
- result [f"retained_users_{ win } d_list" ] = retained_list
199
273
200
- logger .info (f"Successfully calculated retention for { market_name } ." )
274
+ logger .info (f"Successfully calculated consolidated retention for { market_name } ." )
201
275
return result
202
276
203
277
except Exception as e :
@@ -229,7 +303,6 @@ async def get_retention_for_market(
229
303
"""
230
304
try :
231
305
logger .info (f"Received request for /calculate: market='{ market_name } ', date='{ start_date } '" )
232
- # Input validation for date format can be added here if needed
233
306
result_data = await calculate_retention_for_market (market_name , start_date )
234
307
return RetentionExplorerItem (** result_data )
235
308
except HTTPException as http_exc :
0 commit comments