-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathfilter.py
More file actions
137 lines (117 loc) · 4.67 KB
/
filter.py
File metadata and controls
137 lines (117 loc) · 4.67 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
"""
The basis of the code below was taken from https://github.com/jeffthibault/python-nostr with some modifications.
"""
from collections import UserList
from typing import List
from .event import Event, EventKind
class Filter:
"""
NIP-01 filtering.
Explicitly supports "#e" and "#p" tag filters via `event_refs` and `pubkey_refs`.
Arbitrary NIP-12 single-letter tag filters are also supported via `add_arbitrary_tag`.
If a particular single-letter tag gains prominence, explicit support should be
added. For example:
# arbitrary tag
filter.add_arbitrary_tag('t', [hashtags])
# promoted to explicit support
Filter(hashtag_refs=[hashtags])
"""
def __init__(
self,
event_ids: List[str] = None,
kinds: List[EventKind] = None,
authors: List[str] = None,
since: int = None,
until: int = None,
event_refs: List[
str
] = None, # the "#e" attr; list of event ids referenced in an "e" tag
pubkey_refs: List[
str
] = None, # The "#p" attr; list of pubkeys referenced in a "p" tag
limit: int = None,
) -> None:
self.event_ids: list[str] = event_ids
self.kinds: list[EventKind] = kinds
self.authors: list[str] = authors
self.since: int = since
self.until: int = until
self.event_refs = event_refs
self.pubkey_refs = pubkey_refs
self.limit: int = limit
self.tags = {}
if self.event_refs:
self.add_arbitrary_tag('e', self.event_refs)
if self.pubkey_refs:
self.add_arbitrary_tag('p', self.pubkey_refs)
def add_arbitrary_tag(self, tag: str, values: list):
"""
Filter on any arbitrary tag with explicit handling for NIP-01 and NIP-12
single-letter tags.
"""
# NIP-01 'e' and 'p' tags and any NIP-12 single-letter tags must be prefixed with "#"
tag_key = tag if len(tag) > 1 else f'#{tag}'
self.tags[tag_key] = values
def matches(self, event: Event) -> bool:
if self.event_ids is not None and event.id not in self.event_ids:
return False
if self.kinds is not None and event.kind not in self.kinds:
return False
if self.authors is not None and event.public_key not in self.authors:
return False
if self.since is not None and event.created_at < self.since:
return False
if self.until is not None and event.created_at > self.until:
return False
if (self.event_refs is not None or self.pubkey_refs is not None) and len(
event.tags
) == 0:
return False
if self.tags:
e_tag_identifiers = set([e_tag[0] for e_tag in event.tags])
for f_tag, f_tag_values in self.tags.items():
# Omit any NIP-01 or NIP-12 "#" chars on single-letter tags
f_tag = f_tag.replace('#', '')
if f_tag not in e_tag_identifiers:
# Event is missing a tag type that we're looking for
return False
# Multiple values within f_tag_values are treated as OR search; an Event
# needs to match only one.
# Note: an Event could have multiple entries of the same tag type
# (e.g. a reply to multiple people) so we have to check all of them.
match_found = False
for e_tag in event.tags:
if e_tag[0] == f_tag and e_tag[1] in f_tag_values:
match_found = True
break
if not match_found:
return False
return True
def to_json_object(self) -> dict:
res = {}
if self.event_ids is not None:
res['ids'] = self.event_ids
if self.kinds is not None:
res['kinds'] = self.kinds
if self.authors is not None:
res['authors'] = self.authors
if self.since is not None:
res['since'] = self.since
if self.until is not None:
res['until'] = self.until
if self.limit is not None:
res['limit'] = self.limit
if self.tags:
res.update(self.tags)
return res
class Filters(UserList):
def __init__(self, initlist: 'list[Filter]' = []) -> None:
super().__init__(initlist)
self.data: 'list[Filter]'
def match(self, event: Event):
for filter in self.data:
if filter.matches(event):
return True
return False
def to_json_array(self) -> list:
return [filter.to_json_object() for filter in self.data]