Skip to content

Commit 0469ccd

Browse files
authored
add veAllocate (#910)
* add veAllocate
1 parent 6c8d91e commit 0469ccd

File tree

10 files changed

+157
-20
lines changed

10 files changed

+157
-20
lines changed

.bumpversion.cfg

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
[bumpversion]
2-
current_version = 4.4.2
2+
current_version = 4.5.1
33
commit = True
44
tag = True
55

README.md

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@ The events monitor runs continuously to retrieve and index the chain Metadata. I
6363
- a Decryptor class that handles decompression and decryption on the chain data, through communication with Provider
6464
- a set of `ALLOWED_PUBLISHERS`, if such a restriction exists. You can set a limited number of allowed publisher addresses using this env variable.
6565
- a Purgatory, based on the `ASSET_PURGATORY_URL` and `ACCOUNT_PURGATORY_URL` env variables. These mark some assets as being in purgatory (`"isInPurgatory": True`), enabling restrictions for some assets or accounts.
66+
- a VeAllocate, based on the `VEALLOCATE_URL` and `VEALLOCATE_UPDATE_INTERVAL` env variables. This updates the veAllocation for datasets.
6667
- start blocks, if such defined using `BFACTORY_BLOCK` and `METADATA_CONTRACT_BLOCK`. These start blocks are coroborated with the last stored blocks per Elasticsearch, to avoid indexing multiple times
6768

6869
The EventsMonitor processes block chunks as defined using `BLOCKS_CHUNK_SIZE`. For each block, it retrieves all `MetadataCreated` and `MetadataUpdated` events, and these events are processed inside the `MetadataCreatedProcessor` and `MetadataUpdatedProcessor` classes. These processors run the following flow:
@@ -142,6 +143,12 @@ ACCOUNT_PURGATORY_URL
142143
# Customise purgatory update (refresh) time (in number of minutes)
143144
PURGATORY_UPDATE_INTERVAL
144145

146+
# URL for getting the veAllocation list. If not exists, the veAllocate will not be processed. Possible values are: https://df-sql.oceandao.org/nftinfo for mainnet and https://test-df-sql.oceandao.org/nftinfo for goerli, because veOCEAN is deployed only on this networks. All other networks SHOULD NOT HAVE this defined. The list should be formatted as a list of dictionaries containing chainID,nft_addr and ve_allocated
147+
VEALLOCATE_URL
148+
149+
# Customise veAllocate update (refresh) time (in number of minutes)
150+
VEALLOCATE_UPDATE_INTERVAL
151+
145152
# The URL of the RBAC Permissions Server. If set, Aquarius will check permissions with RBAC. Leave empty/unset to skip RBAC permission checks.
146153
RBAC_SERVER_URL
147154

aquarius/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,5 +9,5 @@
99
__author__ = """OceanProtocol"""
1010
# fmt: off
1111
# bumpversion needs single quotes
12-
__version__ = '4.4.2'
12+
__version__ = '4.5.1'
1313
# fmt: on

aquarius/events/events_monitor.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
TokenURIUpdatedProcessor,
2828
)
2929
from aquarius.events.purgatory import Purgatory
30+
from aquarius.events.ve_allocate import VeAllocate
3031
from aquarius.events.util import (
3132
get_metadata_start_block,
3233
get_defined_block,
@@ -101,6 +102,10 @@ def __init__(self, web3, config_file):
101102
else None
102103
)
103104

105+
self.ve_allocate = (
106+
VeAllocate(self._es_instance) if (os.getenv("VEALLOCATE_URL")) else None
107+
)
108+
104109
self.retry_mechanism = RetryMechanism(
105110
config_file, self._es_instance, self._retries_db_index, self.purgatory
106111
)
@@ -146,6 +151,12 @@ def do_run_monitor(self):
146151
except (KeyError, Exception) as e:
147152
logger.error(f"Error updating purgatory list: {str(e)}.")
148153

154+
if self.ve_allocate:
155+
try:
156+
self.ve_allocate.update_lists()
157+
except (KeyError, Exception) as e:
158+
logger.error(f"Error updating ve_allocate list: {str(e)}.")
159+
149160
def process_current_blocks(self):
150161
"""Process all blocks from the last processed block to the current block."""
151162
process_queue = strtobool(os.getenv("PROCESS_RETRY_QUEUE", "0"))

aquarius/events/processors.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,11 @@ def add_aqua_data(self, record):
9494
order_count, price = get_number_orders_price(
9595
self.dt_contract.address, self.block, self._chain_id
9696
)
97-
record[AquariusCustomDDOFields.STATS] = {"orders": order_count, "price": price}
97+
record[AquariusCustomDDOFields.STATS] = {
98+
"allocated": 0,
99+
"orders": order_count,
100+
"price": price,
101+
}
98102

99103
return record, block_time
100104

aquarius/events/ve_allocate.py

Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
#
2+
# Copyright 2021 Ocean Protocol Foundation
3+
# SPDX-License-Identifier: Apache-2.0
4+
#
5+
import json
6+
import logging
7+
import os
8+
from datetime import datetime
9+
10+
import elasticsearch
11+
import requests
12+
13+
from aquarius.events.util import make_did
14+
15+
logger = logging.getLogger(__name__)
16+
17+
18+
class VeAllocate:
19+
def __init__(self, es_instance):
20+
self.update_time = None
21+
self._es_instance = es_instance
22+
23+
def retrieve_new_list(self, env_var):
24+
"""
25+
:param env_var: Url of the file containing purgatory list.
26+
:return: Object as follows: {...('<did>', '<reason>'),...}
27+
"""
28+
response = requests.post(os.getenv(env_var))
29+
30+
if response.status_code == requests.codes.ok:
31+
logger.info(
32+
f"veAllocate: Successfully retrieved list from {env_var} env var."
33+
)
34+
return {
35+
(a["nft_addr"], a["ve_allocated"], a["chainID"])
36+
for a in response.json()
37+
if a and "nft_addr" in a and "ve_allocated" in a and "chainID" in a
38+
}
39+
40+
logger.info(f"veAllocate: Failed to retrieve list from {env_var} env var.")
41+
return set()
42+
43+
def update_asset(self, asset, veAllocated):
44+
"""
45+
Updates the field `state.allocated` in `asset` object.
46+
"""
47+
did = asset["id"]
48+
if "stats" not in asset:
49+
asset["stats"] = {}
50+
51+
asset["stats"]["allocated"] = veAllocated
52+
logger.info(
53+
f"veAllocate: updating asset {did} with state.allocated={veAllocated}."
54+
)
55+
try:
56+
self._es_instance.update(json.dumps(asset), did)
57+
except Exception as e:
58+
logger.warning(f"updating ddo {did} stats.allocated attribute failed: {e}")
59+
60+
def update_lists(self):
61+
"""
62+
:return: None
63+
"""
64+
now = int(datetime.now().timestamp())
65+
req_diff = int(os.getenv("VEALLOCATE_UPDATE_INTERVAL", "60")) * 60
66+
if self.update_time and (now - self.update_time) < req_diff:
67+
return
68+
69+
logger.info(
70+
f"veAllocate: updating veAllocate list and setting update time to {now}."
71+
)
72+
self.update_time = now
73+
74+
ve_list = self.retrieve_new_list("VEALLOCATE_URL")
75+
76+
for nft, ve_allocated, chain_id in ve_list:
77+
try:
78+
did = make_did(nft, chain_id)
79+
asset = self._es_instance.read(did)
80+
self.update_asset(asset, ve_allocated)
81+
except elasticsearch.exceptions.NotFoundError:
82+
continue

setup.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,7 @@
9595
url="https://github.com/oceanprotocol/aquarius",
9696
# fmt: off
9797
# bumpversion needs single quotes
98-
version='4.4.2',
98+
version='4.5.1',
9999
# fmt: on
100100
zip_safe=False,
101101
)

tests/helpers.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,15 @@ def get_ddo(client, base_ddo_url, did):
6363
return None
6464

6565

66+
def publish_ddo(client, base_ddo_url, events_object):
67+
ddo = new_ddo(test_account1, get_web3(), "dt.0")
68+
did = ddo.id
69+
send_create_update_tx("create", ddo, bytes([0]), test_account1)
70+
events_object.process_current_blocks()
71+
72+
return did
73+
74+
6675
def send_create_update_tx(name, ddo, flags, account):
6776
provider_url = "http://172.15.0.4:8030"
6877
provider_address = "0xe2DD09d719Da89e5a3D0F2549c7E24566e947260"

tests/test_purgatory.py

Lines changed: 1 addition & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -9,13 +9,7 @@
99
from requests.models import Response
1010

1111
from aquarius.events.purgatory import Purgatory
12-
from tests.helpers import (
13-
get_ddo,
14-
get_web3,
15-
new_ddo,
16-
send_create_update_tx,
17-
test_account1,
18-
)
12+
from tests.helpers import get_ddo, publish_ddo
1913

2014

2115
class PurgatoryForTesting(Purgatory):
@@ -32,15 +26,6 @@ def retrieve_new_list(self, env_var):
3226
)
3327

3428

35-
def publish_ddo(client, base_ddo_url, events_object):
36-
ddo = new_ddo(test_account1, get_web3(), "dt.0")
37-
did = ddo.id
38-
send_create_update_tx("create", ddo, bytes([0]), test_account1)
39-
events_object.process_current_blocks()
40-
41-
return did
42-
43-
4429
def test_purgatory_before_init(client, base_ddo_url, events_object, monkeypatch):
4530
monkeypatch.setenv(
4631
"ASSET_PURGATORY_URL",

tests/test_veallocate.py

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
#
2+
# Copyright 2021 Ocean Protocol Foundation
3+
# SPDX-License-Identifier: Apache-2.0
4+
#
5+
from datetime import datetime, timedelta
6+
from unittest.mock import Mock, patch
7+
8+
from freezegun import freeze_time
9+
from requests.models import Response
10+
11+
from aquarius.events.ve_allocate import VeAllocate
12+
from tests.helpers import get_ddo, publish_ddo
13+
14+
15+
class VeAllocateForTesting(VeAllocate):
16+
def __init__(self, es_instance):
17+
self.current_test_asset_list = set()
18+
super(VeAllocateForTesting, self).__init__(es_instance)
19+
20+
def retrieve_new_list(self, env_var):
21+
return self.current_test_asset_list if env_var == "VEALLOCATE_URL" else None
22+
23+
24+
def test_ve_allocate_with_assets(client, base_ddo_url, events_object, monkeypatch):
25+
monkeypatch.setenv(
26+
"VEALLOCATE_URL",
27+
"https://test-df-sql.oceandao.org/nftinfo",
28+
)
29+
did = publish_ddo(client, base_ddo_url, events_object)
30+
31+
veAllocate = VeAllocateForTesting(events_object._es_instance)
32+
published_ddo = get_ddo(client, base_ddo_url, did)
33+
34+
veAllocate.current_test_asset_list = {
35+
(published_ddo["nftAddress"], 100, published_ddo["chainId"])
36+
}
37+
veAllocate.update_lists()
38+
published_ddo = get_ddo(client, base_ddo_url, did)
39+
assert published_ddo["stats"]["allocated"] == 100

0 commit comments

Comments
 (0)