forked from TON-Punks/ton-nft-verify-bot
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathfunctions.py
More file actions
76 lines (65 loc) · 2.69 KB
/
functions.py
File metadata and controls
76 lines (65 loc) · 2.69 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
import asyncio
import json
import logging
import psycopg2
import requests
from address import detect_address
from config import russian, english, bot, CHAT_ID, COLLECTION, DATABASE
async def open_connection():
connect = psycopg2.connect(DATABASE)
cursor = connect.cursor()
return connect, cursor
async def close_connection(connect):
connect.commit()
connect.close()
async def kick_user(tgid):
"""
Лишение пользователя верификации и права присутствовать в чате.
"""
try:
connect, cursor = await open_connection()
cursor.execute(
f"delete from verify where tgid = '{tgid}'")
await close_connection(connect)
except Exception as e:
logging.error(f"Failed to Delete {tgid} from Database\n"
f"{e}")
try:
await bot.ban_chat_member(CHAT_ID, tgid)
await bot.unban_chat_member(CHAT_ID, tgid, only_if_banned=True)
except Exception as e:
logging.warning(f"Failed to Kick {tgid} from Chat {CHAT_ID}\n"
f"{e}")
try:
await bot.send_chat_action(tgid, 'typing')
await bot.send_message(tgid,
f'{russian["no_longer_owner"]}\n\n{english["no_longer_owner"]}')
except Exception as e:
logging.warning(f"Failed to Send Message to {tgid}\n"
f"{e}")
async def get_ton_addresses(address):
addresses = detect_address(address)
return {'b64': addresses['bounceable']['b64'],
'b64url': addresses['bounceable']['b64url'],
'n_b64': addresses['non_bounceable']['b64'],
'n_b64url': addresses['non_bounceable']['b64url'],
'raw': addresses['raw_form']}
async def get_user_nfts(address):
address = (await get_ton_addresses(address))['b64url']
await asyncio.sleep(1)
all_nfts = json.loads(requests.get(f"https://tonapi.io/v1/nft/searchItems",
params={"owner": address,
"collection": COLLECTION,
"include_on_sale": "true",
"limit": "50",
"offset": 0}).text)['nft_items']
nfts = []
for nft in all_nfts:
try:
name = nft['metadata']['name']
address = (await get_ton_addresses(nft['address']))['b64url']
image = nft['metadata']['image']
nfts += [{'address': address, 'name': name, 'image': image}]
except:
pass
return nfts