-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathapp.py
More file actions
166 lines (141 loc) · 4.34 KB
/
app.py
File metadata and controls
166 lines (141 loc) · 4.34 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
from flask import Flask, session, request, render_template, make_response
from werkzeug.utils import secure_filename
from flask_pymongo import PyMongo
import pylibmc
#from elasticsearch import Elasticsearch
#from elasticsearch_dsl import Search
import requests
import logging, time, json, uuid, os
from config import config
#from pprint import pprint
app = Flask(__name__)
# MongoDB
app.config['MONGO_URI'] = "mongodb://{}:{}@{}/{}".format(
config['mongo_usr'],
config['mongo_pwd'],
config['mongo_ip'],
config['mongo_db']
)
mongo = PyMongo(app)
# memcached
mc = pylibmc.Client(["127.0.0.1"], binary=True)
mc.behaviors = {"tcp_nodelay": True, "ketama": True}
# Other Configs
search_route = config["elasticsearch_route"]
profiles_route = config["profiles_route"]
# Setup logging
if __name__ != '__main__':
gunicorn_logger = logging.getLogger('gunicorn.error')
app.logger.handlers = gunicorn_logger.handlers
app.logger.setLevel(gunicorn_logger.level)
@app.route("/reset_search", methods=["POST"])
def reset():
app.logger.warning("/reset_search called")
query = {
"query": {
"match_all" : {}
}
}
requests.post(url=('http://' + search_route + '/posts/_delete_by_query'), json=query)
return { "status": "OK" }, 200
@app.route("/search", methods=["POST"])
def search():
data = request.json
results = []
item_collection = mongo.db.items
app.logger.debug("/search data: {}".format(data))
# Check memcache
cached_results = mc.get(json.dumps(data))
if cached_results:
app.logger.debug("/search using cached results: {}".format(cached_results))
for item_id in cached_results:
mongo_ret = item_collection.find_one({"id": item_id})
#app.logger.debug(mongo_ret)
if mongo_ret:
del mongo_ret['_id']
results.append(mongo_ret)
#print(ret[i])
#results.append(ret[i])
app.logger.info("/search cached OK")
return { "status" : "OK", "items": results }, 200
## Else
app.logger.debug("/search cache miss")
# Limit defaults
limit = 25
if "limit" in data:
limit = data["limit"]
if limit > 100:
limit = 100
# Setup search query
search = []
filter = []
#fields = ['username', 'timestamp', 'interest']
# Time defaults
timestamp = time.time()
if "timestamp" in data:
timestamp = data["timestamp"]
timestamp = int(round(timestamp*1000))
filter.append({ "range": {"timestamp": {"lte": timestamp}} })
# String query
if "q" in data and data['q']:
search.append({ "match": {"content": data['q']} })
#fields.append(content)
# By username or followed users
if 'username' in data:
filter.append({ "term": {"username": data['username']} })
elif 'user' in data:
r = requests.post(url=('http://' + profiles_route + '/user/following'),
json={'username': data['user']})
r_json = r.json()
filter.append({ "terms": {"username": r_json['users']} })
# Exclude replies
if 'replies' in data and not data['replies']:
filter.append({ "term": {"isReply": False} })
# Children of parent only
elif 'parent' in data:
filter.append({ "term": {"parent": data['parent']} })
# has media
if 'hasMedia' in data and data['hasMedia']:
filter.append({"exists": {'field': "media"}})
#fields.append('media')
query = {
"_source": ["username", "timestamp", "interest"],
"query": {
"bool": {
"filter": filter
}
},
"sort": [{"interest": "desc"}],
"size": limit
}
if search:
query['query']['bool']['must'] = search
# Rank
if 'rank' in data and data['rank'] == "time":
app.logger.debug("Sorting by time")
query['sort'] = [{"timestamp": "desc"}]
app.logger.debug("/search query: {}".format(query))
r = requests.get(url=('http://' + search_route + '/posts/_search'), json=query)
r_json = r.json()
app.logger.debug(r_json)
#print(r_json)
#print(r_json['hits'])
app.logger.debug(r_json['hits']['hits'])
app.logger.debug(r_json['hits']['total'])
cache_ids = []
for search_result in r_json['hits']['hits']:
mongo_ret = item_collection.find_one({"id": search_result['_id']})
#app.logger.debug(mongo_ret)
if mongo_ret:
del mongo_ret['_id']
results.append(mongo_ret)
cache_ids.append(search_result['_id'])
#print(ret[i])
#results.append(ret[i])
mc.add(json.dumps(data), cache_ids)
#print(80*'=')
#pprint(results)
app.logger.info("/search OK")
return { "status" : "OK", "items": results }, 200
# if r_json['hits']['total']['value'] == 0:
# return { "status" : "error", "error": "No items found" }, 200 #400