12
12
import threading
13
13
import time
14
14
15
- PYTH_TEST_SYMBOL_COUNT = int (os .environ .get ("PYTH_TEST_SYMBOL_COUNT" , "9" ))
16
-
17
15
class PythAccEndpoint (BaseHTTPRequestHandler ):
18
16
"""
19
17
A dumb endpoint to respond with a JSON containing Pyth symbol and mapping addresses
@@ -60,6 +58,9 @@ def accounts_endpoint():
60
58
61
59
62
60
def add_symbol (num : int ):
61
+ """
62
+ NOTE: Updates HTTP_ENDPOINT_DATA
63
+ """
63
64
symbol_name = f"Test symbol { num } "
64
65
# Add a product
65
66
prod_pubkey = pyth_admin_run_or_die (
@@ -96,6 +97,8 @@ def add_symbol(num: int):
96
97
97
98
sys .stdout .flush ()
98
99
100
+ print (f"New symbol: { num } " )
101
+
99
102
return num
100
103
101
104
# Fund the publisher
@@ -122,14 +125,14 @@ def add_symbol(num: int):
122
125
"--keypair" , PYTH_PUBLISHER_KEYPAIR
123
126
], capture_output = True ).stdout .strip ()
124
127
125
- with ThreadPoolExecutor (max_workers = 10 ) as executor :
128
+ with ThreadPoolExecutor (max_workers = PYTH_TEST_SYMBOL_COUNT ) as executor :
126
129
add_symbol_futures = {executor .submit (add_symbol , sym_id ) for sym_id in range (PYTH_TEST_SYMBOL_COUNT )}
127
130
128
131
for future in as_completed (add_symbol_futures ):
129
132
print (f"Completed { future .result ()} " )
130
133
131
134
print (
132
- f"Mock updates ready to roll. Updating every { str (PYTH_PUBLISHER_INTERVAL )} seconds" )
135
+ f"Mock updates ready to roll. Updating every { str (PYTH_PUBLISHER_INTERVAL_SECS )} seconds" )
133
136
134
137
# Spin off the readiness probe endpoint into a separate thread
135
138
readiness_thread = threading .Thread (target = readiness , daemon = True )
@@ -140,12 +143,26 @@ def add_symbol(num: int):
140
143
readiness_thread .start ()
141
144
http_service .start ()
142
145
143
- while True :
144
- for sym in HTTP_ENDPOINT_DATA ["symbols" ]:
145
- publisher_random_update (sym ["price" ])
146
+ next_new_symbol_id = PYTH_TEST_SYMBOL_COUNT
147
+ last_new_sym_added_at = time .monotonic ()
148
+
149
+ with ThreadPoolExecutor () as executor : # Used for async adding of products and prices
150
+ while True :
151
+ for sym in HTTP_ENDPOINT_DATA ["symbols" ]:
152
+ publisher_random_update (sym ["price" ])
153
+
154
+ # Add a symbol if new symbol interval configured
155
+ if PYTH_NEW_SYMBOL_INTERVAL_SECS > 0 :
156
+ # Do it if enough time passed
157
+ now = time .monotonic ()
158
+ if (now - last_new_sym_added_at ) >= PYTH_NEW_SYMBOL_INTERVAL_SECS :
159
+ executor .submit (add_symbol , next_new_symbol_id ) # Returns immediately, runs in background
160
+ last_sym_added_at = now
161
+ next_new_symbol_id += 1
162
+
163
+ time .sleep (PYTH_PUBLISHER_INTERVAL_SECS )
164
+ sys .stdout .flush ()
146
165
147
- time .sleep (PYTH_PUBLISHER_INTERVAL )
148
- sys .stdout .flush ()
149
166
150
167
readiness_thread .join ()
151
168
http_service .join ()
0 commit comments