forked from bitvavo/python-bitvavo-api
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathbitvavo.py
More file actions
4368 lines (3949 loc) · 160 KB
/
bitvavo.py
File metadata and controls
4368 lines (3949 loc) · 160 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
from __future__ import annotations
import contextlib
import datetime as dt
import json
import statistics
import time
from concurrent.futures import ThreadPoolExecutor, as_completed
from pathlib import Path
from threading import Thread
from typing import TYPE_CHECKING, Any
import websocket as ws_lib
from httpx import delete, get, post, put
from structlog.stdlib import get_logger
from websocket import WebSocketApp # missing stubs for WebSocketApp
from bitvavo_api_upgraded.dataframe_utils import convert_candles_to_dataframe, convert_to_dataframe
from bitvavo_api_upgraded.helper_funcs import configure_loggers, time_ms, time_to_wait
from bitvavo_api_upgraded.settings import bitvavo_settings, bitvavo_upgraded_settings
from bitvavo_api_upgraded.type_aliases import OutputFormat, anydict, errordict, intdict, ms, s_f, strdict, strintdict
from bitvavo_client.auth.signing import create_signature
from bitvavo_client.endpoints.common import (
asks_compare,
bids_compare,
create_postfix,
default,
epoch_millis,
sort_and_insert,
)
if TYPE_CHECKING:
from collections.abc import Callable
configure_loggers()
logger = get_logger(__name__)
def process_local_book(ws: Bitvavo.WebSocketAppFacade, message: anydict) -> None:
market: str = ""
if "action" in message:
if message["action"] == "getBook":
market = message["response"]["market"]
ws.localBook[market]["bids"] = message["response"]["bids"]
ws.localBook[market]["asks"] = message["response"]["asks"]
ws.localBook[market]["nonce"] = message["response"]["nonce"]
ws.localBook[market]["market"] = market
elif "event" in message and message["event"] == "book":
market = message["market"]
if message["nonce"] != ws.localBook[market]["nonce"] + 1:
# I think I've fixed this, by looking at the other Bitvavo repos (search for 'nonce' or '!=' 😆)
ws.subscription_book(market, ws.callbacks[market])
return
ws.localBook[market]["bids"] = sort_and_insert(ws.localBook[market]["bids"], message["bids"], bids_compare)
ws.localBook[market]["asks"] = sort_and_insert(ws.localBook[market]["asks"], message["asks"], asks_compare)
ws.localBook[market]["nonce"] = message["nonce"]
if market != "":
ws.callbacks["subscriptionBookUser"][market](ws.localBook[market])
class ReceiveThread(Thread):
"""This used to be `class rateLimitThread`."""
def __init__(self, ws: WebSocketApp, ws_facade: Bitvavo.WebSocketAppFacade) -> None:
self.ws = ws
self.ws_facade = ws_facade
Thread.__init__(self)
def run(self) -> None:
"""This used to be `self.waitForReset`."""
try:
while self.ws_facade.keepAlive:
self.ws.run_forever()
self.ws_facade.reconnect = True
self.ws_facade.authenticated = False
time.sleep(self.ws_facade.reconnectTimer)
if self.ws_facade.bitvavo.debugging:
msg = f"we have just set reconnect to true and have waited for {self.ws_facade.reconnectTimer}"
logger.debug(msg)
self.ws_facade.reconnectTimer = self.ws_facade.reconnectTimer * 2
except KeyboardInterrupt:
if self.ws_facade.bitvavo.debugging:
logger.debug("keyboard-interrupt")
def stop(self) -> None:
self.ws_facade.keepAlive = False
def callback_example(response: Any) -> None:
"""
You can use this example as a starting point, for the websocket code, IF you want to
I made this so you can see what kind of function you'll need to stick into the websocket functions.
"""
if isinstance(response, dict):
# instead of printing, you could save the object to a file:
HERE = Path.cwd() # root of your project folder
filepath = HERE / "your_output.json"
# a = append; figure out yourself to create multiple callback functions, probably one for each type of call that
# you want to make
with filepath.open("a") as file:
file.write(json.dumps(response))
elif isinstance(response, list):
# Whether `item` is a list or a dict doesn't matter to print
for item in response:
print(item)
# You can also copy-paste stuff to write it to a file or something
# of maybe mess around with sqlite. ¯\_(ツ)_/¯
else:
# Normally, I would raise an exception here, but the websocket Thread would just eat it up anyway :/
# I don't even know if this log will be shown to you.
# Yes, I haven't tested this function; it's just some off-the-cuff example to get you started.
logger.critical("what in the blazes did I just receive!?")
def error_callback_example(msg: errordict) -> None:
"""
When using the websocket, I really REALLY recommend using `ws.setErrorCallback(error_callback_example)`, instead of
using the default (yes, there is a default on_error function, but that just prints the error, which in practice
means it won't show for the user, as the websocket has a tendency to silently fail printing).
I would recommand adding some alerting mechanism, where the error isn't written to a log,
but to some external system instead, like Discord, Slack, Email, Signal, Telegram, etc
As I said, this is due to the websocket silently dropping python Exceptions and Bitvavo Errors.
I can't speak for all options (yet), but the Discord one was VERY easy (mostly due to me already having a Discord channel :p)
```shell
pip install discord-webhook
```
Create a webhook for some channel (look for the cog icon) and copy it into a `DISCORD_WEBHOOK` variable
```python
from discord_webhook import DiscordWebhook
# send the message directly to your discord channel! :D
DiscordWebhook(
url=DISCORD_WEBHOOK,
rate_limit_retry=True,
content=f"{msg}",
).execute()
```
""" # noqa: E501
# easiest thing is to use the logger, but there's a good chance this message gets silently eaten.
logger.error("error", msg=msg)
class Bitvavo:
"""
Example code to get your started:
```python
# Single API key (backward compatible)
bitvavo = Bitvavo(
{
"APIKEY": "$YOUR_API_KEY",
"APISECRET": "$YOUR_API_SECRET",
"RESTURL": "https://api.bitvavo.com/v2",
"WSURL": "wss://ws.bitvavo.com/v2/",
"ACCESSWINDOW": 10000,
"DEBUGGING": True,
},
)
time_dict = bitvavo.time()
# Multiple API keys
bitvavo = Bitvavo(
{
"APIKEYS": [
{"key": "$YOUR_API_KEY_1", "secret": "$YOUR_API_SECRET_1"},
{"key": "$YOUR_API_KEY_2", "secret": "$YOUR_API_SECRET_2"},
{"key": "$YOUR_API_KEY_3", "secret": "$YOUR_API_SECRET_3"},
],
"RESTURL": "https://api.bitvavo.com/v2",
"WSURL": "wss://ws.bitvavo.com/v2/",
"ACCESSWINDOW": 10000,
"DEBUGGING": True,
},
)
time_dict = bitvavo.time()
# Keyless only (no API keys)
bitvavo = Bitvavo(
{
"RESTURL": "https://api.bitvavo.com/v2",
"WSURL": "wss://ws.bitvavo.com/v2/",
"ACCESSWINDOW": 10000,
"DEBUGGING": True,
},
)
markets = bitvavo.markets() # Only public endpoints will work
```
"""
def __init__(self, options: dict[str, str | int | list[dict[str, str]]] | None = None) -> None:
if options is None:
options = {}
_options = {k.upper(): v for k, v in options.items()}
# Options take precedence over settings
self.base: str = str(_options.get("RESTURL", bitvavo_settings.RESTURL))
self.wsUrl: str = str(_options.get("WSURL", bitvavo_settings.WSURL))
self.ACCESSWINDOW: int = int(_options.get("ACCESSWINDOW", bitvavo_settings.ACCESSWINDOW))
# Support for multiple API keys - options take absolute precedence
if "APIKEY" in _options and "APISECRET" in _options:
# Single API key explicitly provided in options - takes precedence
single_key = str(_options["APIKEY"])
single_secret = str(_options["APISECRET"])
self.api_keys: list[dict[str, str]] = [{"key": single_key, "secret": single_secret}]
elif "APIKEYS" in _options:
# Multiple API keys provided in options - takes precedence
api_keys = _options["APIKEYS"]
if isinstance(api_keys, list) and api_keys:
self.api_keys = api_keys
else:
self.api_keys = []
else:
# Fall back to settings only if no API key options provided
api_keys = bitvavo_settings.APIKEYS
if isinstance(api_keys, list) and api_keys:
self.api_keys = api_keys
else:
# Single API key from settings (backward compatibility)
single_key = str(bitvavo_settings.APIKEY)
single_secret = str(bitvavo_settings.APISECRET)
if single_key and single_secret:
self.api_keys = [{"key": single_key, "secret": single_secret}]
else:
self.api_keys = []
if not self.api_keys:
msg = "API keys are required"
raise ValueError(msg)
# Current API key index - options take precedence
self.current_api_key_index: int = 0
# Rate limiting per API key
self.rate_limits: dict[int, dict[str, int | ms]] = {}
# Get default rate limit from options or settings
default_rate_limit_option = _options.get("DEFAULT_RATE_LIMIT", bitvavo_upgraded_settings.DEFAULT_RATE_LIMIT)
default_rate_limit = (
int(default_rate_limit_option)
if isinstance(default_rate_limit_option, (int, str))
else bitvavo_upgraded_settings.DEFAULT_RATE_LIMIT
)
for i in range(len(self.api_keys)):
self.rate_limits[i] = {"remaining": default_rate_limit, "resetAt": ms(0)}
# Legacy properties for backward compatibility
self.APIKEY: str = self.api_keys[0]["key"] if self.api_keys else ""
self.APISECRET: str = self.api_keys[0]["secret"] if self.api_keys else ""
self._current_api_key: str = self.APIKEY
self._current_api_secret: str = self.APISECRET
self.rateLimitRemaining: int = default_rate_limit
self.rateLimitResetAt: ms = 0
# Options take precedence over settings for debugging
self.debugging: bool = bool(_options.get("DEBUGGING", bitvavo_settings.DEBUGGING))
def get_best_api_key_config(self, rateLimitingWeight: int = 1) -> tuple[str, str, int]:
"""Get the best API key configuration to use for a request."""
for i in range(len(self.api_keys)):
if self._has_rate_limit_available(i, rateLimitingWeight):
return self.api_keys[i]["key"], self.api_keys[i]["secret"], i
# No keys have available budget, use current key and let rate limiting handle the wait
return (
self.api_keys[self.current_api_key_index]["key"],
self.api_keys[self.current_api_key_index]["secret"],
self.current_api_key_index,
)
def _has_rate_limit_available(self, key_index: int, weight: int) -> bool:
"""Check if a specific API key has enough rate limit."""
if key_index not in self.rate_limits:
return False
remaining = self.rate_limits[key_index]["remaining"]
return (remaining - weight) > bitvavo_upgraded_settings.RATE_LIMITING_BUFFER
def _update_rate_limit_for_key(self, key_index: int, response: anydict | errordict) -> None:
"""Update rate limit for a specific API key index."""
if key_index not in self.rate_limits:
self.rate_limits[key_index] = {"remaining": 1000, "resetAt": ms(0)}
if "errorCode" in response and response["errorCode"] == 105: # noqa: PLR2004
self.rate_limits[key_index]["remaining"] = 0
# rateLimitResetAt is a value that's stripped from a string.
reset_time_str = str(response.get("error", "")).split(" at ")
if len(reset_time_str) > 1:
try:
reset_time = ms(int(reset_time_str[1].split(".")[0]))
self.rate_limits[key_index]["resetAt"] = reset_time
except (ValueError, IndexError):
# Fallback to current time + 60 seconds if parsing fails
self.rate_limits[key_index]["resetAt"] = ms(time_ms() + 60000)
else:
self.rate_limits[key_index]["resetAt"] = ms(time_ms() + 60000)
timeToWait = time_to_wait(ms(self.rate_limits[key_index]["resetAt"]))
key_name = f"API_KEY_{key_index}"
logger.warning(
"api-key-banned",
info={
"key_name": key_name,
"wait_time_seconds": timeToWait + 1,
"until": (dt.datetime.now(tz=dt.timezone.utc) + dt.timedelta(seconds=timeToWait + 1)).isoformat(),
},
)
if "bitvavo-ratelimit-remaining" in response:
with contextlib.suppress(ValueError, TypeError):
self.rate_limits[key_index]["remaining"] = int(response["bitvavo-ratelimit-remaining"])
if "bitvavo-ratelimit-resetat" in response:
with contextlib.suppress(ValueError, TypeError):
self.rate_limits[key_index]["resetAt"] = ms(int(response["bitvavo-ratelimit-resetat"]))
def _sleep_for_key(self, key_index: int) -> None:
"""Sleep until the specified API key's rate limit resets."""
if key_index not in self.rate_limits:
return
reset_at = ms(self.rate_limits[key_index]["resetAt"])
napTime = time_to_wait(reset_at)
key_name = f"API_KEY_{key_index}" if key_index >= 0 else "KEYLESS"
logger.warning(
"rate-limit-reached",
key_name=key_name,
rateLimitRemaining=self.rate_limits[key_index]["remaining"],
)
logger.info(
"napping-until-reset",
key_name=key_name,
napTime=napTime,
currentTime=dt.datetime.now(tz=dt.timezone.utc).isoformat(),
targetDatetime=dt.datetime.fromtimestamp(reset_at / 1000.0, tz=dt.timezone.utc).isoformat(),
)
time.sleep(napTime + 1) # +1 to add a tiny bit of buffer time
def calc_lag(self, samples: int = 5, timeout_seconds: float = 5.0) -> ms: # noqa: C901
"""
Calculate the time difference between the client and server using statistical analysis.
Uses multiple samples with outlier detection to get a more accurate lag measurement.
Args:
samples: Number of time samples to collect (default: 5)
timeout_seconds: Maximum time to spend collecting samples (default: 5.0)
Returns:
Average lag in milliseconds
Raises:
ValueError: If unable to collect sufficient valid samples
RuntimeError: If all API calls fail
"""
ARBITRARY = 3
if samples < ARBITRARY:
msg = f"Need at least {ARBITRARY} samples for statistical analysis"
raise ValueError(msg)
def measure_single_lag() -> ms | None:
"""Measure lag for a single request with error handling."""
try:
client_time_before = time_ms()
server_response = self.time()
client_time_after = time_ms()
if isinstance(server_response, dict) and "time" in server_response:
# Use midpoint of request duration for better accuracy
client_time_avg = (client_time_before + client_time_after) // 2
server_time = server_response["time"]
if isinstance(server_time, int):
return ms(server_time - client_time_avg)
return None
except (ValueError, TypeError, KeyError):
return None
else:
# If error or unexpected response
return None
lag_measurements: list[ms] = []
# Collect samples concurrently for better performance
with ThreadPoolExecutor(max_workers=min(samples, 5)) as executor:
try:
# Submit all measurement tasks
future_to_sample = {executor.submit(measure_single_lag): i for i in range(samples)}
# Collect results with timeout
for future in as_completed(future_to_sample, timeout=timeout_seconds):
lag = future.result()
if lag is not None:
lag_measurements.append(lag)
except TimeoutError:
if self.debugging:
logger.warning(
"lag-calculation-timeout",
collected_samples=len(lag_measurements),
requested_samples=samples,
)
if len(lag_measurements) < max(2, samples // 2):
msg = f"Insufficient valid samples: got {len(lag_measurements)}, need at least {max(2, samples // 2)}"
raise RuntimeError(msg)
# Remove outliers using interquartile range method
QUARTILES = 4
if len(lag_measurements) >= QUARTILES:
try:
q1 = statistics.quantiles(lag_measurements, n=QUARTILES)[0]
q3 = statistics.quantiles(lag_measurements, n=QUARTILES)[2]
iqr = q3 - q1
lower_bound = q1 - 1.5 * iqr
upper_bound = q3 + 1.5 * iqr
filtered_measurements = [lag for lag in lag_measurements if lower_bound <= lag <= upper_bound]
# Use filtered data if we still have enough samples
if len(filtered_measurements) >= 2: # noqa: PLR2004
lag_measurements = filtered_measurements
except statistics.StatisticsError:
# Fall back to original measurements if filtering fails
pass
# Calculate final lag using median for robustness
final_lag = ms(statistics.median(lag_measurements))
if self.debugging:
logger.debug(
"lag-calculated",
samples_collected=len(lag_measurements),
lag_ms=final_lag,
min_lag=min(lag_measurements),
max_lag=max(lag_measurements),
std_dev=statistics.stdev(lag_measurements) if len(lag_measurements) > 1 else 0,
)
return final_lag
def get_remaining_limit(self) -> int:
"""Get the remaining rate limit
---
Returns:
```python
1000 # or lower
```
"""
return self.rateLimitRemaining
def update_rate_limit(self, response: anydict | errordict) -> None:
"""
Update the rate limited
If you're banned, use the errordict to sleep until you're not banned
If you're not banned, then use the received headers to update the variables.
This method maintains backward compatibility by updating the legacy properties.
"""
# Update rate limit for the current API key being used
current_key = self.current_api_key_index if self.APIKEY else -1
self._update_rate_limit_for_key(current_key, response)
# Update legacy properties for backward compatibility
if current_key in self.rate_limits:
self.rateLimitRemaining = int(self.rate_limits[current_key]["remaining"])
self.rateLimitResetAt = ms(self.rate_limits[current_key]["resetAt"])
# Handle ban with sleep (legacy behavior)
if "errorCode" in response and response["errorCode"] == 105: # noqa: PLR2004
timeToWait = time_to_wait(self.rateLimitResetAt)
logger.warning(
"banned",
info={
"wait_time_seconds": timeToWait + 1,
"until": (dt.datetime.now(tz=dt.timezone.utc) + dt.timedelta(seconds=timeToWait + 1)).isoformat(),
},
)
logger.info("napping-until-ban-lifted")
time.sleep(timeToWait + 1) # plus one second to ENSURE we're able to run again.
def public_request(
self,
url: str,
rateLimitingWeight: int = 1,
) -> list[anydict] | list[list[str]] | intdict | strdict | anydict | errordict:
"""Execute a request to the public part of the API; no API key and/or SECRET necessary.
Will return the reponse as one of three types.
---
Args:
```python
url: str = "https://api.bitvavo.com/v2/time" # example of how the url looks like
```
---
Returns:
```python
# either of one:
dict[str, Any]
list[dict[str, Any]]
list[list[str]]
```
"""
# Get the best API key configuration
api_key, api_secret, key_index = self.get_best_api_key_config(rateLimitingWeight)
# Check if we need to wait for rate limit
if not self._has_rate_limit_available(key_index, rateLimitingWeight):
self._sleep_for_key(key_index)
# Update current API key for legacy compatibility
self._current_api_key = api_key
self._current_api_secret = api_secret
self.current_api_key_index = key_index
if self.debugging:
logger.debug(
"api-request",
info={"url": url, "key_index": key_index},
)
now = time_ms() + bitvavo_upgraded_settings.LAG
sig = create_signature(now, "GET", url.replace(self.base, ""), None, api_secret)
headers = {
"bitvavo-access-key": api_key,
"bitvavo-access-signature": sig,
"bitvavo-access-timestamp": str(now),
"bitvavo-access-window": str(self.ACCESSWINDOW),
}
r = get(url, headers=headers, timeout=(self.ACCESSWINDOW / 1000))
# Update rate limit for the specific key used
if "error" in r.json():
self._update_rate_limit_for_key(key_index, r.json())
else:
self._update_rate_limit_for_key(key_index, dict(r.headers))
# Also update legacy rate limit tracking
self.update_rate_limit(r.json() if "error" in r.json() else dict(r.headers))
return r.json() # type:ignore[no-any-return]
def private_request(
self,
endpoint: str,
postfix: str,
body: anydict | None = None,
method: str = "GET",
rateLimitingWeight: int = 1,
) -> list[anydict] | list[list[str]] | intdict | strdict | anydict | errordict:
"""Execute a request to the private part of the API. API key and SECRET are required.
Will return the reponse as one of three types.
---
Args:
```python
endpoint: str = "/order"
postfix: str = "" # ?key=value&key2=another_value&...
body: anydict = {"market" = "BTC-EUR", "side": "buy", "orderType": "limit"} # for example
method: Optional[str] = "POST" # Defaults to "GET"
```
---
Returns:
```python
# either of one:
dict[str, Any]
list[dict[str, Any]]
list[list[str]]
```
"""
# Private requests require an API key, so get the best available one
api_key, api_secret, key_index = self.get_best_api_key_config(rateLimitingWeight)
# If no API keys available, use the configured one (may fail)
if not api_key and self.api_keys:
api_key = self.api_keys[self.current_api_key_index]["key"]
api_secret = self.api_keys[self.current_api_key_index]["secret"]
key_index = self.current_api_key_index
elif not api_key:
# No API keys configured at all
api_key = self.APIKEY
api_secret = self.APISECRET
key_index = 0 if api_key else -1
# Check if we need to wait for rate limit
if not self._has_rate_limit_available(key_index, rateLimitingWeight):
self._sleep_for_key(key_index)
# Update current API key for legacy compatibility
self._current_api_key = api_key
self._current_api_secret = api_secret
now = time_ms() + bitvavo_upgraded_settings.LAG
sig = create_signature(now, method, (endpoint + postfix), body, api_secret)
url = self.base + endpoint + postfix
headers = {
"bitvavo-access-key": api_key,
"bitvavo-access-signature": sig,
"bitvavo-access-timestamp": str(now),
"bitvavo-access-window": str(self.ACCESSWINDOW),
}
if self.debugging:
logger.debug(
"api-request",
info={
"url": url,
"with_api_key": bool(api_key != ""),
"public_or_private": "private",
"method": method,
"key_index": key_index,
},
)
if method == "DELETE":
r = delete(url, headers=headers, timeout=(self.ACCESSWINDOW / 1000))
elif method == "POST":
r = post(url, headers=headers, json=body, timeout=(self.ACCESSWINDOW / 1000))
elif method == "PUT":
r = put(url, headers=headers, json=body, timeout=(self.ACCESSWINDOW / 1000))
else: # method == "GET"
r = get(url, headers=headers, timeout=(self.ACCESSWINDOW / 1000))
# Update rate limit for the specific key used
if "error" in r.json():
self._update_rate_limit_for_key(key_index, r.json())
else:
self._update_rate_limit_for_key(key_index, dict(r.headers))
# Also update legacy rate limit tracking
self.update_rate_limit(r.json() if "error" in r.json() else dict(r.headers))
return r.json()
def sleep_until_can_continue(self) -> None:
napTime = time_to_wait(self.rateLimitResetAt)
logger.warning("rate-limit-reached", rateLimitRemaining=self.rateLimitRemaining)
logger.info(
"napping-until-reset",
napTime=napTime,
currentTime=dt.datetime.now(tz=dt.timezone.utc).isoformat(),
targetDatetime=dt.datetime.fromtimestamp(self.rateLimitResetAt / 1000.0, tz=dt.timezone.utc).isoformat(),
)
time.sleep(napTime + 1) # +1 to add a tiny bit of buffer time
def time(self) -> intdict:
"""Get server-time, in milliseconds, since 1970-01-01
---
Examples:
* https://api.bitvavo.com/v2/time
---
Rate Limit Weight:
```python
1
```
---
Returns:
```python
{"time": 1539180275424 }
```
"""
return self.public_request(f"{self.base}/time") # type: ignore[return-value]
def markets(
self,
options: strdict | None = None,
output_format: OutputFormat = OutputFormat.DICT,
) -> list[anydict] | anydict | errordict | Any:
"""Get all available markets with some meta-information, unless options is given a `market` key.
Then you will get a single market, instead of a list of markets.
---
Examples:
* https://api.bitvavo.com/v2/markets
* https://api.bitvavo.com/v2/markets?market=BTC-EUR
* https://api.bitvavo.com/v2/markets?market=SHIB-EUR
---
Args:
```python
# Choose one:
options={} # returns all markets
options={"market": "BTC-EUR"} # returns only the BTC-EUR market
# If you want multiple markets, but not all, make multiple calls
# Output format selection:
output_format=OutputFormat.DICT # Default: returns standard Python dict/list
output_format=OutputFormat.PANDAS # Returns pandas DataFrame
output_format=OutputFormat.POLARS # Returns polars DataFrame
output_format=OutputFormat.CUDF # Returns NVIDIA cuDF (GPU-accelerated)
output_format=OutputFormat.MODIN # Returns modin (distributed pandas)
output_format=OutputFormat.PYARROW # Returns Apache Arrow Table
output_format=OutputFormat.DASK # Returns Dask DataFrame (distributed)
output_format=OutputFormat.DUCKDB # Returns DuckDB relation
output_format=OutputFormat.IBIS # Returns Ibis expression
output_format=OutputFormat.PYSPARK # Returns PySpark DataFrame
output_format=OutputFormat.PYSPARK_CONNECT # Returns PySpark Connect DataFrame
output_format=OutputFormat.SQLFRAME # Returns SQLFrame DataFrame
# Note: DataFrame formats require narwhals and the respective library to be installed.
# Install with: pip install 'bitvavo-api-upgraded[pandas]' or similar for other formats.
```
---
Rate Limit Weight:
```python
1
```
---
Returns:
```python
# When output_format=OutputFormat.DICT (default):
[
{
"market": "BTC-EUR",
"status": "trading",
"base": "BTC",
"quote": "EUR",
"pricePrecision": "5",
"minOrderInQuoteAsset": "10",
"minOrderInBaseAsset": "0.001",
"orderTypes": [
"market",
"limit",
"stopLoss",
"stopLossLimit",
"takeProfit",
"takeProfitLimit"
]
}
]
# When output_format is any DataFrame format (pandas, polars, cudf, etc.):
# Returns a DataFrame with columns: market, status, base, quote, pricePrecision,
# minOrderInQuoteAsset, minOrderInBaseAsset, orderTypes
# The specific DataFrame type depends on the selected format.
```
"""
postfix = create_postfix(options)
result = self.public_request(f"{self.base}/markets{postfix}") # type: ignore[return-value]
return convert_to_dataframe(result, output_format)
def assets(
self,
options: strdict | None = None,
output_format: OutputFormat = OutputFormat.DICT,
) -> list[anydict] | anydict | Any:
"""Get all available assets, unless `options` is given a `symbol` key.
Then you will get a single asset, instead of a list of assets.
---
Examples:
* https://api.bitvavo.com/v2/assets
* https://api.bitvavo.com/v2/assets?symbol=BTC
* https://api.bitvavo.com/v2/assets?symbol=SHIB
* https://api.bitvavo.com/v2/assets?symbol=ADA
* https://api.bitvavo.com/v2/assets?symbol=EUR
---
Args:
```python
# pick one
options={} # returns all assets
options={"symbol": "BTC"} # returns a single asset (the one of Bitcoin)
# Output format selection:
output_format=OutputFormat.DICT # Default: returns standard Python dict/list
output_format=OutputFormat.PANDAS # Returns pandas DataFrame
output_format=OutputFormat.POLARS # Returns polars DataFrame
output_format=OutputFormat.CUDF # Returns NVIDIA cuDF (GPU-accelerated)
output_format=OutputFormat.MODIN # Returns modin (distributed pandas)
output_format=OutputFormat.PYARROW # Returns Apache Arrow Table
output_format=OutputFormat.DASK # Returns Dask DataFrame (distributed)
output_format=OutputFormat.DUCKDB # Returns DuckDB relation
output_format=OutputFormat.IBIS # Returns Ibis expression
output_format=OutputFormat.PYSPARK # Returns PySpark DataFrame
output_format=OutputFormat.PYSPARK_CONNECT # Returns PySpark Connect DataFrame
output_format=OutputFormat.SQLFRAME # Returns SQLFrame DataFrame
# Note: DataFrame formats require narwhals and the respective library to be installed.
# Install with: pip install 'bitvavo-api-upgraded[pandas]' or similar for other formats.
```
---
Rate Limit Weight:
```python
1
```
---
Returns:
```python
# When output_format=OutputFormat.DICT (default):
[
{
"symbol": "BTC",
"name": "Bitcoin",
"decimals": 8,
"depositFee": "0",
"depositConfirmations": 10,
"depositStatus": "OK",
"withdrawalFee": "0.2",
"withdrawalMinAmount": "0.2",
"withdrawalStatus": "OK",
"networks": ["Mainnet"],
"message": ""
}
]
# When output_format is any DataFrame format (pandas, polars, cudf, etc.):
# Returns a DataFrame with columns: symbol, name, decimals, depositFee,
# depositConfirmations, depositStatus, withdrawalFee, withdrawalMinAmount,
# withdrawalStatus, networks, message
# The specific DataFrame type depends on the selected format.
```
"""
postfix = create_postfix(options)
result = self.public_request(f"{self.base}/assets{postfix}") # type: ignore[return-value]
return convert_to_dataframe(result, output_format)
def book(self, market: str, options: intdict | None = None) -> dict[str, str | int | list[str]] | errordict:
"""Get a book (with two lists: asks and bids, as they're called)
---
Examples:
* https://api.bitvavo.com/v2/BTC-EUR/book
* https://api.bitvavo.com/v2/SHIB-EUR/book?depth=10
* https://api.bitvavo.com/v2/ADA-EUR/book?depth=0
---
Args:
```python
market="ADA-EUR"
options={"depth": 3} # returns the best 3 asks and 3 bids
options={} # same as `{"depth": 0}`; returns all bids and asks for that book
```
---
Rate Limit Weight:
```python
1
```
---
Returns:
```python
{
"market": "ADA-EUR",
"nonce": 10378032,
"bids": [["1.1908", "600"], ["1.1902", "4091.359809"], ["1.1898", "7563"]],
"asks": [["1.1917", "2382.166997"], ["1.1919", "440.7"], ["1.192", "600"]],
"timestamp": 1700000000000,
}
# Notice how each bid and ask is also a list
bid = ["1.1908", "600"] # the first bid from the bids list
price = bid[0] # the price for one coin/token
size = bid[1] # how many tokens are asked (or bidded, in this case)
result = price * size
assert result == 714.48 # EUR can be gained from this bid if it's sold (minus the fee)
```
"""
postfix = create_postfix(options)
return self.public_request(f"{self.base}/{market}/book{postfix}") # type: ignore[return-value]
def public_trades(
self,
market: str,
options: strintdict | None = None,
output_format: OutputFormat = OutputFormat.DICT,
) -> list[anydict] | errordict | Any:
"""Publically available trades
---
Examples:
* https://api.bitvavo.com/v2/BTC-EUR/trades
* https://api.bitvavo.com/v2/SHIB-EUR/trades?limit=10
* https://api.bitvavo.com/v2/ADA-EUR/trades?tradeIdFrom=532f4d4d-f545-4a2d-a175-3d37919cb73c
* https://api.bitvavo.com/v2/NANO-EUR/trades
---
Args:
```python
market="NANO-EUR"
# note that any of these `options` are optional
# use `int(time.time() * 1000)` to get current timestamp in milliseconds
# or `int(datetime.datetime.now().timestamp()*1000)`
options={
"limit": [ 1 .. 1000 ], default 500
"start": int timestamp in ms >= 0
# (that's somewhere in the year 2243, or near the number 2^52)
"end": int timestamp in ms <= 8_640_000_000_000_000
"tradeIdFrom": "" # if you get a list and want everything AFTER a certain id, put that id here
"tradeIdTo": "" # if you get a list and want everything BEFORE a certain id, put that id here
}
# Output format selection:
output_format=OutputFormat.DICT # Default: returns standard Python dict/list
output_format=OutputFormat.PANDAS # Returns pandas DataFrame
output_format=OutputFormat.POLARS # Returns polars DataFrame
output_format=OutputFormat.CUDF # Returns NVIDIA cuDF (GPU-accelerated)
output_format=OutputFormat.MODIN # Returns modin (distributed pandas)
output_format=OutputFormat.PYARROW # Returns Apache Arrow Table
output_format=OutputFormat.DASK # Returns Dask DataFrame (distributed)
output_format=OutputFormat.DUCKDB # Returns DuckDB relation
output_format=OutputFormat.IBIS # Returns Ibis expression
output_format=OutputFormat.PYSPARK # Returns PySpark DataFrame
output_format=OutputFormat.PYSPARK_CONNECT # Returns PySpark Connect DataFrame
output_format=OutputFormat.SQLFRAME # Returns SQLFrame DataFrame
# Note: DataFrame formats require narwhals and the respective library to be installed.
# Install with: pip install 'bitvavo-api-upgraded[pandas]' or similar for other formats.
```
---
Rate Limit Weight:
```python
5
```
---
Returns:
```python
# When output_format='dict' (default):
[
{
"timestamp": 1542967486256,
"id": "57b1159b-6bf5-4cde-9e2c-6bd6a5678baf",
"amount": "0.1",
"price": "5012",
"side": "sell"
}
]
# When output_format is any DataFrame format:
# Returns the above data as a DataFrame in the requested format (pandas, polars, etc.)
```
"""
postfix = create_postfix(options)
result = self.public_request(f"{self.base}/{market}/trades{postfix}", 5) # type: ignore[return-value]
return convert_to_dataframe(result, output_format)
def candles(
self,
market: str,
interval: str,
options: strintdict | None = None,
limit: int | None = None,
start: dt.datetime | None = None,
end: dt.datetime | None = None,
output_format: OutputFormat = OutputFormat.DICT,
) -> list[list[str]] | errordict | Any:
"""Get up to 1440 candles for a market, with a specific interval (candle size)
Extra reading material: https://en.wikipedia.org/wiki/Candlestick_chart
## WARNING: RETURN TYPE IS WEIRD - CHECK BOTTOM OF THIS TEXT FOR EXPLANATION
---
Examples:
* https://api.bitvavo.com/v2/BTC-EUR/candles?interval=1h&limit=100
---
Args:
```python
market="BTC-EUR"
interval="1h" # Choose: 1m, 5m, 15m, 30m, 1h, 2h, 4h, 6h, 8h, 12h, 1d
# use `int(time.time() * 1000)` to get current timestamp in milliseconds
# or `int(datetime.datetime.now().timestamp()*1000)`
options={
"limit": [ 1 .. 1440 ], default 1440
"start": int timestamp in ms >= 0
"end": int timestamp in ms <= 8640000000000000
}
# Output format selection:
output_format=OutputFormat.DICT # Default: returns standard Python list/dict
output_format=OutputFormat.PANDAS # Returns pandas DataFrame
output_format=OutputFormat.POLARS # Returns polars DataFrame
output_format=OutputFormat.CUDF # Returns NVIDIA cuDF (GPU-accelerated)
output_format=OutputFormat.MODIN # Returns modin (distributed pandas)
output_format=OutputFormat.PYARROW # Returns Apache Arrow Table
output_format=OutputFormat.DASK # Returns Dask DataFrame (distributed)