-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathapi.py
More file actions
236 lines (178 loc) · 6.44 KB
/
api.py
File metadata and controls
236 lines (178 loc) · 6.44 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
"""
backend API for mediacloud dashboard
"""
import os
import time
import urllib.parse
from typing import Any, TypeAlias, TypedDict
# PyPI
import aiohttp
import fastapi
from fastapi.middleware.cors import CORSMiddleware
from fastapi_cache import FastAPICache
from fastapi_cache.backends.inmemory import InMemoryBackend
from fastapi_cache.decorator import cache
################ Types
JSON: TypeAlias = dict[str, Any]
V1_Response: TypeAlias = JSON | list[Any]
class V2_Response(TypedDict):
data: JSON | list[JSON | list]
created_ts: int
################ constants and config
# Get hostname of statsd/graphite/grafana container
# must run on same server, be linked via `dokku graphite:link OUR_APP`:
GRAPHITE_HOST = urllib.parse.urlparse(os.environ["STATSD_URL"]).netloc.split(":")[0]
GRAPHITE_PORT = 81
RENDER_URL = f"http://{GRAPHITE_HOST}:{GRAPHITE_PORT}/render"
REALM = "prod" # stats realm to get stats for
MCWEB_TOKEN = os.environ["MCWEB_TOKEN"]
DEFAULT_TTL = 60 # cache data one minute
################ FastAPI app & initialization
app = fastapi.FastAPI()
origins = ["*"] # Accept All Origins seems fine for our usecase
app.add_middleware(
CORSMiddleware,
allow_origins=origins,
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
@app.on_event("startup")
async def startup() -> None:
# all async tasks run in single thread, so use in-memory cache
FastAPICache.init(InMemoryBackend(), prefix="fastapi-cache")
################ data formatting (named by version first appears in)
def maybe_round(x: float | None, digits: int) -> float | None:
if x is None:
return x
return round(x, digits)
def v1_zip_columns(j: list[JSON]) -> list[list[Any]]:
"""
take data from graphite:
return list where first row is column names,
followed by rows with values for each column
(more compact than list of dicts)
"""
headers = ["ts"]
for metric in j:
headers.append(metric["target"])
rows = [headers]
# list of lists of [value, ts]
datapoints = [col["datapoints"] for col in j]
# for now, assume that metrics
# come with an indentical series of timestamps
# if not, need to track index for each column separately!
i = 0
for metric0 in datapoints[0]:
value0 = metric0[0]
ts0 = time.strftime("%Y-%m-%d %H:%M", time.gmtime(metric0[1]))
out = [ts0, value0]
# loop for remaining columns
for dp_n in datapoints[1:]:
out.append(maybe_round(dp_n[i][0], 1))
rows.append(out)
i += 1
return rows
def v2_wrap(data: JSON | list[Any]) -> V2_Response:
"""
standard wrapper for v2 of API
"""
return {"data": data, "created_ts": int(time.time())}
################ stats endpoint
# create graphite stats paths
def g(path: str) -> str:
return f"stats.gauges.mc.{REALM}.{path}"
def c(path: str) -> str:
return f"stats.counters.mc.{REALM}.{path}"
def t(path: str) -> str:
return f"stats.timers.mc.{REALM}.{path}"
# functions to add graphite functions to metric paths
def ss(path: str) -> str:
"""sum (wildcard) series path into one series"""
return f"sumSeries({path})"
def asum(path: str, name: str, func: str = "sum") -> str:
"""
summarize samples for each minute (using `func`: default sum)
and apply an alias
"""
return f'alias(summarize({path},"1min","{func}"),"{name}")'
def amax(path: str, name: str) -> str:
"""
get maximum sample for each minute, and apply an alias
"""
return asum(path, name, "max")
def amedian(path: str, name: str) -> str:
"""
get median sample for each minute, and apply an alias
"""
return asum(path, name, "median")
####
# list of metrics for "stats" endpoint to return
# MUST include an alias name!
GRAPHITE_METRICS: list[str] = [
# sum across all ES sub-indices, return the maximum value for each minute
amax(
ss(
g(
"story-indexer.elastic-stats.indices.indices.primaries.docs.count.name_mc_search*"
)
),
"es_documents",
),
# sum total requests served each minute by gunicorn, regardless of status
asum(c("web-search.gunicorn.requests.count"), "requests"),
# Max working feeds (fetched and successfully parsed document in
# last 24 hrs regardless of whether there were new stories found)
amax(
g("rss-fetcher.rss-fetcher-stats.feeds.recent.hours_24.status_working"),
"feeds_working",
),
# Max sources with working (see above) feeds in last 24 hrs
amax(
g("rss-fetcher.rss-fetcher-stats.sources.recent.hours_24.status_working"),
"sources_feeds_working",
),
# median "college" count-over-time
amedian(
t("web-search.monitor-api.college.31d.count-over-time.median"),
"median_search_ms",
),
# sum of successful document fetches per minute
asum(t("rss-fetcher.fetcher.total.status_SUCC.count"), "feed_docs_fetched"),
]
# default is from -24h until now
GRAPHITE_PARAMS = "&".join(f"target={m}" for m in GRAPHITE_METRICS)
GRAPHITE_URL = f"{RENDER_URL}?format=json&{GRAPHITE_PARAMS}"
@app.get("/v2/stats")
@cache(expire=DEFAULT_TTL)
async def v2_stats_get() -> V2_Response:
"""
NOTE! Any non-backwards-compatible change should copy this routine
and increment the version number!
v2: uses v2_wrap()
"""
async with aiohttp.ClientSession() as session:
async with session.get(GRAPHITE_URL) as response:
j = await response.json()
return v2_wrap(v1_zip_columns(j))
################ stories endpoint: return recent stories
#### version 2; use random sample
SAMPLE_URL = "https://search.mediacloud.org/api/search/sample"
@app.get("/v2/stories")
@cache(expire=DEFAULT_TTL)
async def v2_stories_get() -> V2_Response:
"""
returns random sample from last 24 hours, to try to show
representative stories (last 20 may all be from same source)
"""
async with aiohttp.ClientSession() as session:
session.headers["Authorization"] = f"Token {MCWEB_TOKEN}"
now = time.time()
start = time.strftime("%Y-%m-%d", time.gmtime(now - 24 * 60 * 60))
end = time.strftime("%Y-%m-%d", time.gmtime(now))
# doesn't (currently) take count parameter!
url = f"{SAMPLE_URL}?start={start}&end={end}"
async with session.get(url) as response:
j = await response.json()
# perhaps trim any unwanted columns?
return v2_wrap(j["sample"])