-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathpnl_explain.py
More file actions
190 lines (158 loc) · 10.7 KB
/
pnl_explain.py
File metadata and controls
190 lines (158 loc) · 10.7 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
import asyncio
import copy
import sys
from datetime import timedelta, datetime
import pandas as pd
import streamlit as st
import plotly.express as px
from plex.plex import PnlExplainer
from utils.async_utils import safe_gather
from utils.db import SQLiteDB, RawDataDB
from plex.debank_api import DebankAPI
assert (sys.version_info >= (3, 10)), "Please use Python 3.10 or higher"
if 'set_config' not in st.session_state: # hack to have it run only once, and before any st is called (crash otherwise..)
st.set_page_config(layout="wide")
st.session_state.set_config =True
from utils.streamlit_utils import load_parameters, prompt_plex_interval, display_pivot, download_button, \
download_db_button, prompt_snapshot_timestamp, display_multi_stacked_bars
pd.options.mode.chained_assignment = None
st.session_state.parameters = load_parameters()
# TODO: no pb reloading each time ? bc of sql
if 'plex_db' not in st.session_state:
# tamper with the db file name to add debank key
plex_db_params = copy.deepcopy(st.session_state.parameters['input_data']['plex_db'])
plex_db_params['remote_file'] = plex_db_params['remote_file'].replace('.db',
f"_{st.session_state.parameters['profile']['debank_key']}.db")
st.session_state.plex_db: SQLiteDB = SQLiteDB(plex_db_params, st.secrets)
raw_data_db: RawDataDB = RawDataDB.build_RawDataDB(st.session_state.parameters['input_data']['raw_data_db'], st.secrets)
st.session_state.api = DebankAPI(json_db=raw_data_db,
plex_db=st.session_state.plex_db,
parameters=st.session_state.parameters)
st.session_state.pnl_explainer = PnlExplainer(st.session_state.plex_db.query_categories(),st.secrets['alchemy_key'])
addresses = st.session_state.parameters['profile']['addresses']
risk_tab, risk_history_tab, pnl_tab, pnl_history_tab = st.tabs(
["risk", "risk_history", "pnl", "pnl_history"])
with risk_tab:
with st.form("snapshot_form"):
historical_tab, refresh_tab = st.columns(2)
with historical_tab:
historical = st.form_submit_button("fetch historical date", help="fetch from db")
timestamp = prompt_snapshot_timestamp(st.session_state.plex_db, addresses)
with refresh_tab:
if refresh := st.form_submit_button("fetch live from debank", help="fetch from debank costs credits !"):
timestamp = int(datetime.now().timestamp())
debank_credits = st.session_state.api.get_credits()
if refresh or historical:
all_fetch = asyncio.run(
safe_gather([st.session_state.api.fetch_snapshot(address, refresh=refresh, timestamp=timestamp)
for address in addresses] +
[st.session_state.api.fetch_transactions(address)
for address in addresses if refresh],
n=st.session_state.parameters['run_parameters']['async']['gather_limit']))
if refresh:
st.write(f"Debank credits used: {(debank_credits - st.session_state.api.get_credits()) * 200 / 1e6} $")
st.session_state.plex_db.upload_to_s3()
snapshots = all_fetch[:len(addresses)]
st.session_state.snapshot = pd.concat(snapshots, axis=0, ignore_index=True)
download_db_button(st.session_state.plex_db, file_name='snapshot.db', label='Download database')
if 'snapshot' in st.session_state:
# dynamic categorization
if missing_category := set(st.session_state.snapshot['asset']) - set(st.session_state.pnl_explainer.categories.keys()):
st.warning(f"New underlyings {missing_category} -> Edit 'underlying' below to group exposures by underlying")
with st.form("categorization_form"):
# categorization
st.write("Edit 'underlying' below to group exposures by underlying")
categorization = pd.DataFrame({'underlying': {coin: coin for coin in missing_category}
| st.session_state.pnl_explainer.categories})
categorization['exposure'] = st.session_state.snapshot.groupby('asset').sum()['value']
edited_categorization = st.data_editor(categorization, use_container_width=True)['underlying'].to_dict()
if st.form_submit_button("Override categorization"):
st.session_state.pnl_explainer.categories = edited_categorization
st.session_state.plex_db.overwrite_categories(edited_categorization)
st.session_state.plex_db.upload_to_s3()
st.success("Categories updated (not exposure!)")
# display risk
st.write("Risk pivot table: group exposures by underlying")
st.session_state.snapshot['underlying'] = st.session_state.snapshot['asset'].map(
st.session_state.pnl_explainer.categories)
# risk = copy.deepcopy(st.session_state.snapshot)
# risk['value'] = risk['value'] / 1000
display_pivot(st.session_state.snapshot.loc[st.session_state.snapshot['value'].apply(lambda x: abs(x) > st.session_state.snapshot['value'].sum() * 1e-4)],
rows=['underlying', 'asset', 'chain', 'protocol'],
columns=['address'],
values=['value'],
hidden=['hold_mode', 'type', 'price', 'amount'])
download_button(st.session_state.snapshot, file_name='snapshot.csv', label='Download snapshot')
with risk_history_tab:
risk_start_timestamp, risk_end_timestamp = prompt_plex_interval(st.session_state.plex_db, addresses, nonce='risk', default_dt=timedelta(days=7))
# snapshots
risk_snapshots_within = st.session_state.plex_db.query_table_between(st.session_state.parameters['profile']['addresses'],
risk_start_timestamp, risk_end_timestamp, "snapshots")
risk_snapshots_within['timestamp'] = pd.to_datetime(risk_snapshots_within['timestamp'], unit='s', utc=True)
risk_snapshots_within['underlying'] = risk_snapshots_within['asset'].map(
st.session_state.pnl_explainer.categories)
display_multi_stacked_bars(risk_snapshots_within,
categoricals=['underlying', 'asset', 'protocol', 'chain', 'hold_mode', 'type'],
values=['value'],
rows=['timestamp'],
default_stacking_field='protocol',
default_row_field='all')
download_button(risk_snapshots_within, file_name='risk_history.csv', label='Download risk history')
with pnl_tab:
pnl_start_timestamp, pnl_end_timestamp = prompt_plex_interval(st.session_state.plex_db, addresses, nonce='pnl', default_dt=timedelta(days=1))
## display_pivot plex
st.subheader("Pnl Explain")
st.latex(r'PnL_{\text{delta}} = \sum (\frac{P_{\text{underlying}}^1}{P_{\text{underlying}}^0}-1) \times N^{\text{start}} \frac{P_{\text{underlying}}^0}{P_{\text{asset}}^0}')
st.latex(r'PnL_{\text{basis}} = \sum (P_{\text{asset}}^1-P_{\text{asset}}^0) \times N^{\text{start}} - PnL_{\text{delta}}')
st.latex(r'PnL_{\text{amt\_chng}} = \sum \Delta N \times P^{1}')
start_snapshot = st.session_state.plex_db.query_table_at(addresses, pnl_start_timestamp, "snapshots")
end_snapshot = st.session_state.plex_db.query_table_at(addresses, pnl_end_timestamp, "snapshots")
st.session_state.plex = st.session_state.pnl_explainer.explain(start_snapshot=start_snapshot, end_snapshot=end_snapshot)
display_pivot(st.session_state.plex.loc[st.session_state.plex['pnl'].apply(lambda x: abs(x) > start_snapshot['value'].sum() * 1e-4)],
rows=['underlying', 'asset'],
columns=['pnl_bucket'],
values=['pnl'],
hidden=['protocol', 'chain', 'hold_mode', 'type'])
download_button(st.session_state.plex, file_name='plex.csv', label='Download pnl explain')
## display_pivot transactions
st.subheader("Transactions")
transactions = st.session_state.plex_db.query_table_between(addresses, pnl_start_timestamp, pnl_end_timestamp, "transactions")
st.session_state.transactions = st.session_state.pnl_explainer.format_transactions(pnl_start_timestamp, pnl_end_timestamp, transactions)
st.session_state.transactions.rename(columns={'pnl': 'value'}, inplace=True)
display_pivot(st.session_state.transactions,
rows=['underlying', 'asset'],
columns=['type'],
values=['gas', 'value'],
hidden=['id', 'protocol', 'chain'])
download_button(st.session_state.transactions, file_name='tx.csv', label="Download tx data")
with pnl_history_tab:
pnl_history_start_timestamp, pnl_history_end_timestamp = prompt_plex_interval(st.session_state.plex_db, addresses, nonce='pnl_history',
default_dt=timedelta(days=7))
# snapshots
pnl_snapshots_within = st.session_state.plex_db.query_table_between(st.session_state.parameters['profile']['addresses'], pnl_history_start_timestamp, pnl_history_end_timestamp, "snapshots")
# explains and transactions btw snapshots
explain_list = []
transactions_list = []
for start, end in zip(
pnl_snapshots_within['timestamp'].unique()[:-1],
pnl_snapshots_within['timestamp'].unique()[1:],
):
start_snapshots = pnl_snapshots_within[pnl_snapshots_within['timestamp'] == start]
end_snapshots = pnl_snapshots_within[pnl_snapshots_within['timestamp'] == end]
explain = st.session_state.pnl_explainer.explain(start_snapshots, end_snapshots)
explain_list.append(explain)
# transactions
transactions = st.session_state.plex_db.query_table_between(addresses, start, end, "transactions")
transactions = st.session_state.pnl_explainer.format_transactions(start, end, transactions)
transactions_list.append(transactions)
explain_history = pd.concat(explain_list, axis=0, ignore_index=True)
tx_pnl = pd.concat(transactions_list, axis=0, ignore_index=True)
display_multi_stacked_bars(explain_history,
categoricals=['underlying', 'asset', 'protocol', 'pnl_bucket', 'chain',
'hold_mode', 'type'],
values=['pnl'],
rows=['timestamp_end'],
default_stacking_field='protocol',
default_row_field='pnl_bucket',
cum_sum=True)
download_button(pnl_snapshots_within, file_name='snapshot.csv', label='Download pnl history')