Skip to content

Commit e88bd57

Browse files
improvement over filter_list
1 parent 5256896 commit e88bd57

File tree

1 file changed

+105
-26
lines changed

1 file changed

+105
-26
lines changed

shuffle-tools/1.2.0/src/app.py

Lines changed: 105 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
import tempfile
1010
import zipfile
1111
import base64
12+
import gzip
1213
import ipaddress
1314
import hashlib
1415
from io import StringIO
@@ -53,6 +54,8 @@ def __init__(self, redis, logger, console_logger=None):
5354
:param logger:
5455
:param console_logger:
5556
"""
57+
self.cache_update_buffer = []
58+
self.shared_cache = {}
5659
super().__init__(redis, logger, console_logger)
5760

5861
def router(self):
@@ -661,6 +664,62 @@ def check_wildcard(self, wildcardstring, matching_string):
661664

662665
return False
663666

667+
def preload_cache(self, key):
668+
org_id = self.full_execution["workflow"]["execution_org"]["id"]
669+
url = f"{self.url}/api/v1/orgs/{org_id}/get_cache"
670+
data = {
671+
"workflow_id": self.full_execution["workflow"]["id"],
672+
"execution_id": self.current_execution_id,
673+
"authorization": self.authorization,
674+
"org_id": org_id,
675+
"key": key,
676+
}
677+
get_response = requests.post(url, json=data, verify=False)
678+
response_data = get_response.json()
679+
if "value" in response_data:
680+
raw_value = response_data["value"]
681+
if isinstance(raw_value, str):
682+
try:
683+
parsed = json.loads(raw_value)
684+
except json.JSONDecodeError:
685+
parsed = [raw_value]
686+
else:
687+
parsed = raw_value
688+
689+
if not isinstance(parsed, list):
690+
parsed = [parsed]
691+
692+
response_data["value"] = parsed
693+
return get_response.json()
694+
695+
def check_compression(self, obj, threshold=1_000_000):
696+
data_btyes = json.dumps(obj).encode("utf-8")
697+
if len(data_btyes) > threshold:
698+
return True
699+
return False
700+
701+
def compress_data(self, obj):
702+
data_btyes = json.dumps(obj).encode("utf-8")
703+
compressed_data = gzip.compress(data_btyes)
704+
return base64.b64encode(compressed_data).decode("utf-8")
705+
706+
def update_cache(self, key):
707+
org_id = self.full_execution["workflow"]["execution_org"]["id"]
708+
url = f"{self.url}/api/v1/orgs/{org_id}/set_cache"
709+
data = {
710+
"workflow_id": self.full_execution["workflow"]["id"],
711+
"execution_id": self.current_execution_id,
712+
"authorization": self.authorization,
713+
"org_id": org_id,
714+
"key": key,
715+
"value": json.dumps(self.shared_cache["value"]),
716+
}
717+
718+
get_response = requests.post(url, json=data, verify=False)
719+
self.cache_update_buffer = []
720+
return get_response.json()
721+
722+
664723
def filter_list(self, input_list, field, check, value, opposite):
665724

666725
# Remove hashtags on the fly
@@ -876,12 +935,20 @@ def filter_list(self, input_list, field, check, value, opposite):
876935
failed_list.append(item)
877936

878937
elif check == "in cache key":
938+
if item == input_list[0]:
939+
self.shared_cache = self.preload_cache(key=value)
940+
879941
ret = self.check_cache_contains(value, tmp, "true")
942+
880943
if ret["success"] == True and ret["found"] == True:
881944
new_list.append(item)
882945
else:
883946
failed_list.append(item)
884947

948+
if len(self.cache_update_buffer) > 400 or (item == input_list[-1] and len(self.cache_update_buffer) > 0):
949+
self.update_cache(value)
950+
951+
885952
#return {
886953
# "success": True,
887954
# "found": False,
@@ -931,13 +998,16 @@ def filter_list(self, input_list, field, check, value, opposite):
931998
failed_list = tmplist
932999

9331000
try:
934-
return json.dumps(
935-
{
1001+
data ={
9361002
"success": True,
9371003
"valid": new_list,
9381004
"invalid": failed_list,
9391005
}
940-
)
1006+
if self.check_compression(data):
1007+
data = self.compress_data(data)
1008+
return data
1009+
1010+
return json.dumps(data)
9411011
# new_list = json.dumps(new_list)
9421012
except json.decoder.JSONDecodeError as e:
9431013
return json.dumps(
@@ -1737,7 +1807,6 @@ def escape_html(self, input_data):
17371807

17381808
def check_cache_contains(self, key, value, append):
17391809
org_id = self.full_execution["workflow"]["execution_org"]["id"]
1740-
url = "%s/api/v1/orgs/%s/get_cache" % (self.url, org_id)
17411810
data = {
17421811
"workflow_id": self.full_execution["workflow"]["id"],
17431812
"execution_id": self.current_execution_id,
@@ -1766,7 +1835,7 @@ def check_cache_contains(self, key, value, append):
17661835
value = json.dumps(value)
17671836
except Exception as e:
17681837
pass
1769-
1838+
17701839
if not isinstance(value, str):
17711840
value = str(value)
17721841

@@ -1778,11 +1847,13 @@ def check_cache_contains(self, key, value, append):
17781847
append = False
17791848

17801849
if "success" not in allvalues:
1781-
get_response = requests.post(url, json=data, verify=False)
1850+
#get_response = requests.post(url, json=data, verify=False)
1851+
pass
17821852

17831853
try:
17841854
if "success" not in allvalues:
1785-
allvalues = get_response.json()
1855+
#allvalues = get_response.json()
1856+
allvalues = self.shared_cache
17861857

17871858
try:
17881859
if allvalues["value"] == None or allvalues["value"] == "null":
@@ -1799,6 +1870,7 @@ def check_cache_contains(self, key, value, append):
17991870
set_response = requests.post(set_url, json=data, verify=False)
18001871
try:
18011872
allvalues = set_response.json()
1873+
self.shared_cache = self.preload_cache(key=key)
18021874
#allvalues["key"] = key
18031875
#return allvalues
18041876

@@ -1830,19 +1902,26 @@ def check_cache_contains(self, key, value, append):
18301902
if allvalues["value"] == None or allvalues["value"] == "null":
18311903
allvalues["value"] = "[]"
18321904

1833-
allvalues["value"] = str(allvalues["value"])
1905+
if isinstance(allvalues["value"], str):
1906+
try:
1907+
allvalues["value"] = json.loads(allvalues["value"])
1908+
except json.JSONDecodeError:
1909+
self.logger.info("[WARNING] Failed inner value cache parsing")
1910+
allvalues["value"] = [allvalues["value"]]
1911+
1912+
if not isinstance(allvalues["value"], list):
1913+
allvalues["value"] = [allvalues["value"]]
18341914

18351915
try:
1836-
parsedvalue = json.loads(allvalues["value"])
1916+
parsedvalue = json.loads(str(allvalues["value"]))
18371917
except json.decoder.JSONDecodeError as e:
1838-
parsedvalue = [str(allvalues["value"])]
1839-
except Exception as e:
1840-
parsedvalue = [str(allvalues["value"])]
1918+
parsedvalue = allvalues["value"]
18411919

18421920
try:
18431921
for item in parsedvalue:
18441922
#return "%s %s" % (item, value)
1845-
if item == value:
1923+
self.logger.info(f"{item} == {value}")
1924+
if str(item) == str(value):
18461925
if not append:
18471926
try:
18481927
newdata = json.loads(json.dumps(data))
@@ -1858,7 +1937,7 @@ def check_cache_contains(self, key, value, append):
18581937
"reason": "Found and not appending!",
18591938
"key": key,
18601939
"search": value,
1861-
"value": json.loads(allvalues["value"]),
1940+
"value": allvalues["value"],
18621941
}
18631942
else:
18641943
return {
@@ -1867,10 +1946,10 @@ def check_cache_contains(self, key, value, append):
18671946
"reason": "Found, was appending, but item already exists",
18681947
"key": key,
18691948
"search": value,
1870-
"value": json.loads(allvalues["value"]),
1949+
"value": allvalues["value"],
18711950
}
1872-
1873-
# Lol
1951+
1952+
# Lol
18741953
break
18751954
except Exception as e:
18761955
parsedvalue = [str(parsedvalue)]
@@ -1886,18 +1965,18 @@ def check_cache_contains(self, key, value, append):
18861965
"value": json.loads(allvalues["value"]),
18871966
}
18881967

1889-
new_value = parsedvalue
1890-
if new_value == None:
1891-
new_value = [value]
1968+
#parsedvalue.append(value)
18921969

1893-
new_value.append(value)
1894-
data["value"] = json.dumps(new_value)
1970+
#data["value"] = json.dumps(parsedvalue)
18951971

1896-
set_url = "%s/api/v1/orgs/%s/set_cache" % (self.url, org_id)
1897-
response = requests.post(set_url, json=data, verify=False)
1972+
if value not in allvalues["value"] and isinstance(allvalues["value"], list):
1973+
self.cache_update_buffer.append(value)
1974+
allvalues["value"].append(value)
1975+
#set_url = "%s/api/v1/orgs/%s/set_cache" % (self.url, org_id)
1976+
#response = requests.post(set_url, json=data, verify=False)
18981977
exception = ""
18991978
try:
1900-
allvalues = response.json()
1979+
#allvalues = response.json()
19011980
#return allvalues
19021981

19031982
return {
@@ -1906,7 +1985,7 @@ def check_cache_contains(self, key, value, append):
19061985
"reason": "Appended as it didn't exist",
19071986
"key": key,
19081987
"search": value,
1909-
"value": new_value,
1988+
"value": parsedvalue,
19101989
}
19111990
except Exception as e:
19121991
exception = e

0 commit comments

Comments
 (0)