forked from luci/luci-py
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathstorage.py
More file actions
271 lines (204 loc) · 7.92 KB
/
storage.py
File metadata and controls
271 lines (204 loc) · 7.92 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
# Copyright 2015 The LUCI Authors. All rights reserved.
# Use of this source code is governed by the Apache v2.0 license that can be
# found in the LICENSE file.
"""Storage of config files."""
import hashlib
import logging
from google.appengine.api import app_identity
from google.appengine.ext import ndb
from google.appengine.ext.ndb import msgprop
from google.protobuf import text_format
from components import config
from components import utils
class Blob(ndb.Model):
"""Content-addressed blob. Immutable.
Entity key:
Id is content hash that has format "v1:<sha>"
where sha is hex-encoded Git-compliant SHA-1 of
'blob {content len}\0{content}'. Computed by compute_hash function.
Blob has no parent.
"""
created_ts = ndb.DateTimeProperty(auto_now_add=True)
content = ndb.BlobProperty(required=True)
class ConfigSet(ndb.Model):
"""Versioned collection of config files.
Entity key:
Id is a config set name. Examples: services/luci-config, projects/chromium.
gitiles_import.py relies on the fact that this class has only one attribute.
"""
# last imported revision of the config set. See also Revision and File.
latest_revision = ndb.StringProperty(required=True)
latest_revision_url = ndb.StringProperty(indexed=False)
latest_revision_time = ndb.DateTimeProperty(indexed=False)
latest_revision_committer_email = ndb.StringProperty(indexed=False)
location = ndb.StringProperty(required=True)
class RevisionInfo(ndb.Model):
"""Contains revision metadata.
Used with StructuredProperty.
"""
id = ndb.StringProperty(required=True, indexed=False)
url = ndb.StringProperty(indexed=False)
time = ndb.DateTimeProperty(indexed=False)
committer_email = ndb.StringProperty(indexed=False)
class ImportAttempt(ndb.Model):
"""Describes what happened last time we tried to import a config set.
Entity key:
Parent is ConfigSet (does not have to exist).
ID is "last".
"""
time = ndb.DateTimeProperty(auto_now_add=True, required=True, indexed=False)
revision = ndb.StructuredProperty(RevisionInfo, indexed=False)
success = ndb.BooleanProperty(required=True, indexed=False)
message = ndb.StringProperty(required=True, indexed=False)
class ValidationMessage(ndb.Model):
severity = msgprop.EnumProperty(config.Severity, indexed=False)
text = ndb.StringProperty(indexed=False)
validation_messages = ndb.StructuredProperty(ValidationMessage, repeated=True)
class Revision(ndb.Model):
"""A single revision of a config set. Immutable.
Parent of File entities. Revision entity does not have to exist.
Entity key:
Id is a revision name. If imported from Git, it is a commit hash.
Parent is ConfigSet.
"""
class File(ndb.Model):
"""A single file in a revision. Immutable.
Entity key:
Id is a filename without a leading slash. Parent is Revision.
"""
created_ts = ndb.DateTimeProperty(auto_now_add=True)
# hash of the file content, computed by compute_hash().
# A Blob entity with this key must exist.
content_hash = ndb.StringProperty(indexed=False, required=True)
def _pre_put_hook(self):
assert isinstance(self.key.id(), str)
assert not self.key.id().startswith('/')
def last_import_attempt_key(config_set):
return ndb.Key(ConfigSet, config_set, ImportAttempt, 'last')
@ndb.tasklet
def get_config_sets_async(config_set=None):
if config_set:
existing = yield ConfigSet.get_by_id_async(config_set)
config_sets = [existing or ConfigSet(id=config_set)]
else:
config_sets = yield ConfigSet.query().fetch_async()
raise ndb.Return(config_sets)
@ndb.tasklet
def get_latest_revision_async(config_set):
"""Returns latest known revision of the |config_set|. May return None."""
config_set_entity = yield ConfigSet.get_by_id_async(config_set)
raise ndb.Return(
config_set_entity.latest_revision if config_set_entity else None)
@ndb.tasklet
def get_config_hash_async(config_set, path, revision=None):
"""Returns tuple (revision, content_hash).
|revision| detaults to the latest revision.
"""
assert isinstance(config_set, basestring)
assert config_set
assert isinstance(path, basestring)
assert path
assert not path.startswith('/')
if not revision:
revision = yield get_latest_revision_async(config_set)
if revision is None:
logging.warning('Config set not found: %s' % config_set)
raise ndb.Return(None, None)
assert revision
file_key = ndb.Key(
ConfigSet, config_set,
Revision, revision,
File, path)
file_entity = yield file_key.get_async()
content_hash = file_entity.content_hash if file_entity else None
if not content_hash:
revision = None
raise ndb.Return(revision, content_hash)
@ndb.tasklet
def get_config_by_hash_async(content_hash):
"""Returns config content by its hash."""
blob = yield Blob.get_by_id_async(content_hash)
raise ndb.Return(blob.content if blob else None)
@ndb.tasklet
def get_latest_async(config_set, path):
"""Returns latest content of a config file."""
_, content_hash = yield get_config_hash_async(config_set, path)
if not content_hash: # pragma: no cover
raise ndb.Return(None)
content = yield get_config_by_hash_async(content_hash)
raise ndb.Return(content)
@ndb.tasklet
def get_latest_multi_async(config_sets, path, hashes_only=False):
"""Returns latest contents of all <config_set>:<path> config files.
Returns:
A a list of dicts with keys 'config_set', 'revision', 'content_hash' and
'content'. Content is not available if |hashes_only| is True.
"""
assert path
assert not path.startswith('/')
config_set_keys = [ndb.Key(ConfigSet, cs) for cs in config_sets]
config_set_entities = yield ndb.get_multi_async(config_set_keys)
config_set_entities = filter(None, config_set_entities)
file_keys = [
ndb.Key(ConfigSet, cs.key.id(), Revision, cs.latest_revision, File, path)
for cs in config_set_entities
]
file_entities = yield ndb.get_multi_async(file_keys)
file_entities = filter(None, file_entities)
results = [
{
'config_set': f.key.parent().parent().id(),
'revision': f.key.parent().id(),
'content_hash': f.content_hash,
'content': (
None if hashes_only else ndb.Key(Blob, f.content_hash).get_async()),
}
for f in file_entities
]
if not hashes_only:
for r in results:
blob = yield r['content']
r['content'] = blob.content if blob else None
raise ndb.Return(results)
@utils.memcache_async('latest_message', ['config_set', 'path'], time=60)
@ndb.tasklet
def get_latest_as_message_async(config_set, path, message_factory):
"""Reads latest config file as a text-formatted protobuf message.
|message_factory| is a function that creates a message. Typically the message
type itself. Values found in the retrieved config file are merged into the
return value of the factory.
Memcaches results.
"""
msg = message_factory()
text = yield get_latest_async(config_set, path)
if text:
text_format.Merge(text, msg)
raise ndb.Return(msg)
@utils.cache
def get_self_config_set():
return 'services/%s' % app_identity.get_application_id()
def get_self_config_async(path, message_factory):
"""Parses a config file in the app's config set into a protobuf message."""
return get_latest_as_message_async(
get_self_config_set(), path, message_factory)
def compute_hash(content):
"""Computes Blob id by its content.
See Blob docstring for Blob id format.
"""
sha = hashlib.sha1()
sha.update('blob %d\0' % len(content))
sha.update(content)
return 'v1:%s' % sha.hexdigest()
@ndb.tasklet
def import_blob_async(content, content_hash=None):
"""Saves |content| to a Blob entity.
Returns:
Content hash.
"""
content_hash = content_hash or compute_hash(content)
# pylint: disable=E1120
if not Blob.get_by_id(content_hash):
yield Blob(id=content_hash, content=content).put_async()
raise ndb.Return(content_hash)
def import_blob(content, content_hash=None):
return import_blob_async(content, content_hash=content_hash).get_result()