|
1 | 1 | import colander |
| 2 | +from pyramid.httpexceptions import HTTPForbidden |
2 | 3 |
|
3 | | -from kinto.core import resource |
| 4 | +from kinto.core import Service, resource, utils |
| 5 | +from kinto.core.resource.schema import ErrorResponseSchema |
4 | 6 | from kinto.core.resource.viewset import ViewSet |
5 | | -from kinto.core.storage import Filter |
| 7 | +from kinto.core.storage import Filter, Sort |
6 | 8 | from kinto.core.utils import instance_uri |
7 | 9 |
|
8 | 10 |
|
@@ -64,3 +66,120 @@ def _extract_filters(self): |
64 | 66 | filters_str_id.append(filt) |
65 | 67 |
|
66 | 68 | return filters_str_id |
| 69 | + |
| 70 | + |
| 71 | +snapshot = Service( |
| 72 | + name="history_snapshot", |
| 73 | + path="/buckets/{bucket_id}/snapshot/collections/{collection_id}@{timestamp}", |
| 74 | + description="Reconstruct collection at given timestamp", |
| 75 | +) |
| 76 | + |
| 77 | + |
| 78 | +def timestamp_validator(request, **kwargs): |
| 79 | + """ |
| 80 | + Validates that the timestamp is an integer. |
| 81 | + """ |
| 82 | + timestamp = request.matchdict["timestamp"] |
| 83 | + try: |
| 84 | + if int(timestamp) < 0: |
| 85 | + raise ValueError |
| 86 | + except ValueError: |
| 87 | + request.errors.add("path", "timestamp", "Invalid timestamp %r" % timestamp) |
| 88 | + |
| 89 | + |
| 90 | +class SnapshotPathSchema(colander.MappingSchema): |
| 91 | + bucket_id = colander.SchemaNode(colander.String()) |
| 92 | + collection_id = colander.SchemaNode(colander.String()) |
| 93 | + timestamp = colander.SchemaNode(colander.Integer()) |
| 94 | + |
| 95 | + |
| 96 | +class SnapshotSchema(colander.MappingSchema): |
| 97 | + path = SnapshotPathSchema() |
| 98 | + |
| 99 | + |
| 100 | +snapshot_response_schemas = { |
| 101 | + "401": ErrorResponseSchema(description="The collection is not publicly readable."), |
| 102 | + "403": ErrorResponseSchema(description="No permission to read this collection."), |
| 103 | + "200": colander.SchemaNode( |
| 104 | + colander.Mapping(), |
| 105 | + description="Returns the records at the given timestamp.", |
| 106 | + ), |
| 107 | +} |
| 108 | + |
| 109 | + |
| 110 | +@snapshot.get( |
| 111 | + schema=SnapshotSchema(), |
| 112 | + validators=(timestamp_validator,), |
| 113 | + response_schemas=snapshot_response_schemas, |
| 114 | +) |
| 115 | +def get_snapshot(request): |
| 116 | + """Reconstructs the collection as it was at the given timestamp.""" |
| 117 | + bucket_id = request.matchdict["bucket_id"] |
| 118 | + collection_id = request.matchdict["collection_id"] |
| 119 | + timestamp = int(request.matchdict["timestamp"]) |
| 120 | + |
| 121 | + bucket_uri = instance_uri(request, "bucket", id=bucket_id) |
| 122 | + collection_uri = instance_uri(request, "collection", bucket_id=bucket_id, id=collection_id) |
| 123 | + |
| 124 | + # Check that user has read permission on the collection. |
| 125 | + # This is manual code, because we are outside the normal resource system. |
| 126 | + if not request.registry.permission.check_permission( |
| 127 | + request.prefixed_principals, |
| 128 | + [ |
| 129 | + (bucket_uri, "read"), |
| 130 | + (bucket_uri, "write"), |
| 131 | + (collection_uri, "read"), |
| 132 | + (collection_uri, "write"), |
| 133 | + ], |
| 134 | + ): |
| 135 | + raise HTTPForbidden() |
| 136 | + |
| 137 | + # List all the records that have changed since the given timestamp. |
| 138 | + all_records = request.registry.storage.list_all( |
| 139 | + parent_id=collection_uri, |
| 140 | + resource_name="record", |
| 141 | + include_deleted=True, # Include tombstones |
| 142 | + ) |
| 143 | + |
| 144 | + unchanged_records = [ |
| 145 | + r for r in all_records if r["last_modified"] <= timestamp and not r.get("deleted") |
| 146 | + ] |
| 147 | + changed_rids = [r["id"] for r in all_records if r["last_modified"] > timestamp] |
| 148 | + if not changed_rids: |
| 149 | + # No change after timestamp, return all records as-is. |
| 150 | + return {"data": sorted(unchanged_records, key=lambda r: r["last_modified"], reverse=True)} |
| 151 | + |
| 152 | + # History entries store the current version. We need to pick the most recent |
| 153 | + # entry before the timestamp for each record_id to obtain the records' state |
| 154 | + # before it was changed or deleted. |
| 155 | + history_entries = request.registry.storage.list_all( |
| 156 | + parent_id=bucket_uri, |
| 157 | + resource_name="history", |
| 158 | + filters=[ |
| 159 | + Filter("resource_name", "record", utils.COMPARISON.EQ), |
| 160 | + Filter("collection_id", collection_id, utils.COMPARISON.EQ), |
| 161 | + Filter("record_id", changed_rids, utils.COMPARISON.IN), |
| 162 | + Filter("target.data.last_modified", timestamp, utils.COMPARISON.MAX), |
| 163 | + ], |
| 164 | + sorting=[Sort("last_modified", -1)], # Most recent first |
| 165 | + # TODO: add storage option to keep only the latest entry per record_id |
| 166 | + ) |
| 167 | + |
| 168 | + most_recent_entry = {} |
| 169 | + for entry in history_entries: |
| 170 | + rid = entry["record_id"] |
| 171 | + if rid not in most_recent_entry: |
| 172 | + most_recent_entry[rid] = entry |
| 173 | + |
| 174 | + # Records created after the timestamp (not existing in history) should not appear. |
| 175 | + # Records deleted or updated after the timestamp should be reverted to their most recent |
| 176 | + # version before the timestamp. |
| 177 | + result_records = unchanged_records |
| 178 | + for rid in changed_rids: |
| 179 | + if rid not in most_recent_entry: |
| 180 | + # Record was created after the timestamp, skip it. |
| 181 | + continue |
| 182 | + history_entry = most_recent_entry[rid] |
| 183 | + result_records.append(history_entry["target"]["data"]) |
| 184 | + |
| 185 | + return {"data": sorted(result_records, key=lambda r: r["last_modified"], reverse=True)} |
0 commit comments