Skip to content

Commit ec99582

Browse files
authored
Merge pull request #146 from SpineEventEngine/catch-up
Implement catch-up storage and use transactions for `Delivery` routines
2 parents 47c9682 + 17a94a8 commit ec99582

29 files changed

+1494
-457
lines changed

build.gradle

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,12 @@ subprojects {
9999

100100
defaultRepositories(project)
101101

102+
// Required to fetch `androidx.annotation:annotation:1.1.0`,
103+
// which is a transitive dependency of `com.google.cloud:google-cloud-datastore`.
104+
repositories {
105+
google()
106+
}
107+
102108
dependencies {
103109
errorprone deps.build.errorProneCore
104110
errorproneJavac deps.build.errorProneJavac

config

datastore/build.gradle

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ dependencies {
3434
}
3535

3636
testImplementation project(path: ":testutil-gcloud")
37-
testImplementation "io.spine:spine-server:"
37+
testImplementation "io.spine:spine-server:$spineCoreVersion"
3838
}
3939

4040
task startDatastore(type: com.github.psxpaul.task.ExecFork) {

datastore/config/README.md

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,11 @@
1-
This folder contains a sample Datastore index configuration in `index.yaml` file.
1+
This folder contains sample configurations of Datastore indexes
22

3-
The Datastore index must be configured as follows:
4-
- Contain the Spine internal types config.
5-
- Contain your custom `Aggregate` types config.
6-
7-
For more details, please see `index.yaml`.
3+
Depending on the mode in which your Datastore runs one should choose either
4+
5+
- `index-datastore-native.yaml` for the Datastore in native mode;
6+
- `index-firestore-in-datastore-mode.yaml` for the Firestore in Datastore mode.
7+
8+
Each of the configurations lists the indexes for the system records (e.g. `Event`) and the samples
9+
for the user-defined entities.
10+
11+
Feel free to modify the samples according to your needs.

datastore/config/index.yaml renamed to datastore/config/index-datastore-native.yaml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
# The GAE Datastore sample indexes.
1+
# The indexes for Datastore in native mode.
22

33
indexes:
44

@@ -19,7 +19,7 @@ indexes:
1919
- name: when_received
2020
- name: version
2121

22-
# Index required for `DsInboxStorage.oldestMessageToDeliver` query.
22+
# Index required for `DsInboxStorage.newestMessageToDeliver` query.
2323

2424
- kind: spine.server.delivery.InboxMessage
2525
properties:
Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
# The indexes for Firestore in Datastore mode.
2+
3+
indexes:
4+
5+
# Common index for all the applications.
6+
7+
- kind: spine.core.Event
8+
ancestor: no
9+
properties:
10+
- name: type
11+
- name: created
12+
13+
# Index required for `DsInboxStorage.readAll` query.
14+
15+
- kind: spine.server.delivery.InboxMessage
16+
ancestor: yes
17+
properties:
18+
- name: inbox_shard
19+
- name: of_total_inbox_shards
20+
- name: received_at
21+
- name: version
22+
23+
# Index required for `DsInboxStorage.newestMessageToDeliver` query.
24+
25+
- kind: spine.server.delivery.InboxMessage
26+
ancestor: yes
27+
properties:
28+
- name: inbox_shard
29+
- name: of_total_inbox_shards
30+
- name: status
31+
32+
# Indexes for the Aggregates.
33+
#
34+
# Each custom Aggregate type must have such indexes in order to be retrieved properly.
35+
#
36+
# The Datastore kind represents the Protobuf type name of the Aggregate state. Both the property
37+
# names and the index directions should be kept unchanged.
38+
39+
- kind: myapp.example.MyFirstAggregate # Replace with your Aggregate Protobuf type name.
40+
# Copy as is:
41+
# ------------------
42+
properties:
43+
- name: aggregate_id
44+
- name: created
45+
direction: desc
46+
- name: version
47+
direction: desc
48+
- name: snapshot
49+
# ------------------
50+
51+
- kind: myapp.example.MySecondAggregate # Replace with your Aggregate Protobuf type name.
52+
# Copy as is:
53+
# ------------------
54+
properties:
55+
- name: aggregate_id
56+
- name: created
57+
direction: desc
58+
- name: version
59+
direction: desc
60+
- name: snapshot
61+
# ------------------
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
/*
2+
* Copyright 2020, TeamDev. All rights reserved.
3+
*
4+
* Redistribution and use in source and/or binary forms, with or without
5+
* modification, must retain the above copyright notice and the following
6+
* disclaimer.
7+
*
8+
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
9+
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
10+
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
11+
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
12+
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
13+
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
14+
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
15+
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
16+
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
17+
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
18+
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
19+
*/
20+
21+
package io.spine.server.storage.datastore;
22+
23+
/**
24+
* A mode in which Google Datastore operates.
25+
*/
26+
public enum DatastoreMode {
27+
28+
/**
29+
* Native Datastore mode.
30+
*
31+
* <p>In this mode, Datastore is mostly eventually consistent. Also it has significant
32+
* <a href="https://cloud.google.com/datastore/docs/concepts/limits#Cloud_Datastore_limits">limitations</a>
33+
* on accessing the records.
34+
*
35+
* <p>Since recently, the projects in Google Cloud cannot be created in this mode.
36+
* So this one works primarily for the legacy applications.
37+
*/
38+
NATIVE,
39+
40+
/**
41+
* Firestore in Datastore mode.
42+
*
43+
* <p>In this mode, Datastore becomes strongly consistent in most cases. However, some limits
44+
* still <a href="https://cloud.google.com/datastore/docs/concepts/limits#limits">apply.</a>
45+
*
46+
* <p>All new Cloud projects provide this mode as a default one.
47+
*/
48+
FIRESTORE_AS_DATASTORE
49+
}

datastore/src/main/java/io/spine/server/storage/datastore/DatastoreStorageFactory.java

Lines changed: 40 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -26,10 +26,12 @@
2626
import com.google.common.collect.Iterables;
2727
import com.google.errorprone.annotations.CanIgnoreReturnValue;
2828
import io.spine.annotation.Internal;
29+
import io.spine.annotation.SPI;
2930
import io.spine.server.BoundedContextBuilder;
3031
import io.spine.server.ContextSpec;
3132
import io.spine.server.aggregate.Aggregate;
3233
import io.spine.server.aggregate.AggregateStorage;
34+
import io.spine.server.delivery.CatchUpStorage;
3335
import io.spine.server.delivery.InboxStorage;
3436
import io.spine.server.entity.Entity;
3537
import io.spine.server.entity.storage.ColumnMapping;
@@ -140,21 +142,54 @@ public BoundedContextBuilder configureTenantIndex(BoundedContextBuilder builder)
140142

141143
DsProjectionStorageDelegate<I> recordStorage =
142144
configure(DsProjectionStorageDelegate.newDelegateBuilder(), cls, context);
143-
DsPropertyStorage propertyStorage = createPropertyStorage(context);
144145
DsProjectionStorage<I> result =
145146
new DsProjectionStorage<>(cls,
146147
recordStorage,
147-
propertyStorage,
148148
context.isMultitenant());
149149
return result;
150150
}
151151

152+
/**
153+
* {@inheritDoc}
154+
*
155+
* Creates a Datastore-specific {@link InboxStorage} taking into account the support
156+
* of multi-tenant storage required.
157+
*
158+
* <p>By default, creates an instance of storage for Datastore in native mode.
159+
*
160+
* @apiNote In order to change this behavior and supply a custom implementation, an SPI
161+
* user should override {@link #inboxStorageWith(DatastoreWrapper, boolean)
162+
* inboxStorageWith(multitenant, DatastoreWrapper)} method.
163+
* @see DsInboxStorage for more details on supported modes
164+
*/
152165
@Override
153-
public InboxStorage createInboxStorage(boolean multitenant) {
166+
public final InboxStorage createInboxStorage(boolean multitenant) {
154167
DatastoreWrapper wrapper = systemWrapperFor(InboxStorage.class, multitenant);
168+
return inboxStorageWith(wrapper, multitenant);
169+
}
170+
171+
/**
172+
* Creates a Datastore-specific {@link InboxStorage}.
173+
*
174+
* <p>SPI users should override this method in order to supply a custom implementation.
175+
*
176+
* @param wrapper
177+
* a wrapper over Datastore
178+
* @param multitenant
179+
* whether the created storage should support multi-tenancy
180+
* @return a new instance of {@code InboxStorage}
181+
*/
182+
@SPI
183+
protected InboxStorage inboxStorageWith(DatastoreWrapper wrapper, boolean multitenant) {
155184
return new DsInboxStorage(wrapper, multitenant);
156185
}
157186

187+
@Override
188+
public CatchUpStorage createCatchUpStorage(boolean multitenant) {
189+
DatastoreWrapper wrapper = systemWrapperFor(CatchUpStorage.class, multitenant);
190+
return new DsCatchUpStorage(wrapper, multitenant);
191+
}
192+
158193
public ColumnMapping<Value<?>> columnMapping() {
159194
return columnMapping;
160195
}
@@ -196,7 +231,8 @@ private NsConverterFactory converterFactory() {
196231
}
197232

198233
private String namespaceFromOptions() {
199-
return nullToEmpty(datastore.getOptions().getNamespace());
234+
return nullToEmpty(datastore.getOptions()
235+
.getNamespace());
200236
}
201237

202238
/**

datastore/src/main/java/io/spine/server/storage/datastore/DatastoreWrapper.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -337,7 +337,7 @@ private static <R> StructuredQuery<R> limit(StructuredQuery<R> query,
337337
* Deletes all existing {@link Entity Entities} with the given keys.
338338
*
339339
* @param keys
340-
* {@link Key Keys} of the {@link Entity Entities} to delete. May be nonexistent
340+
* {@code Keys} of the {@code Entities} to delete. May be nonexistent
341341
*/
342342
public void delete(Key... keys) {
343343
datastore.delete(keys);
@@ -418,7 +418,7 @@ public final TransactionWrapper newTransaction() {
418418
public KeyFactory keyFactory(Kind kind) {
419419
checkNotNull(kind);
420420
KeyFactory keyFactory = datastore.newKeyFactory()
421-
.setKind(kind.value());
421+
.setKind(kind.value());
422422
Namespace namespace = namespaceSupplier.get();
423423
_trace().log("Retrieving KeyFactory for kind `%s` in `%s` namespace.",
424424
kind, namespace.value());
Lines changed: 123 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,123 @@
1+
/*
2+
* Copyright 2020, TeamDev. All rights reserved.
3+
*
4+
* Redistribution and use in source and/or binary forms, with or without
5+
* modification, must retain the above copyright notice and the following
6+
* disclaimer.
7+
*
8+
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
9+
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
10+
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
11+
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
12+
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
13+
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
14+
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
15+
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
16+
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
17+
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
18+
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
19+
*/
20+
21+
package io.spine.server.storage.datastore;
22+
23+
import com.google.cloud.datastore.EntityQuery;
24+
import com.google.cloud.datastore.LongValue;
25+
import com.google.cloud.datastore.Query;
26+
import com.google.cloud.datastore.StringValue;
27+
import com.google.common.collect.ImmutableList;
28+
import com.google.protobuf.Timestamp;
29+
import io.spine.server.delivery.CatchUp;
30+
import io.spine.server.delivery.CatchUpId;
31+
import io.spine.server.delivery.CatchUpReadRequest;
32+
import io.spine.server.delivery.CatchUpStorage;
33+
import io.spine.type.TypeUrl;
34+
35+
import java.util.Iterator;
36+
37+
import static com.google.cloud.datastore.StructuredQuery.PropertyFilter.eq;
38+
import static com.google.protobuf.util.Timestamps.toNanos;
39+
40+
/**
41+
* A Google Datastore-backed storage implementation of {@link CatchUpStorage}.
42+
*/
43+
public class DsCatchUpStorage extends DsMessageStorage<CatchUpId, CatchUp, CatchUpReadRequest>
44+
implements CatchUpStorage {
45+
46+
protected DsCatchUpStorage(DatastoreWrapper datastore, boolean multitenant) {
47+
super(datastore, multitenant);
48+
}
49+
50+
@Override
51+
public Iterable<CatchUp> readAll() {
52+
EntityQuery.Builder builder = Query.newEntityQueryBuilder();
53+
return readAsIterable(builder);
54+
}
55+
56+
private Iterable<CatchUp> readAsIterable(EntityQuery.Builder builder) {
57+
Iterator<CatchUp> iterator = read(builder);
58+
return ImmutableList.copyOf(iterator);
59+
}
60+
61+
@Override
62+
public Iterable<CatchUp> readByType(TypeUrl url) {
63+
EntityQuery.Builder builder = Query.newEntityQueryBuilder();
64+
builder.setFilter(eq(Column.projectionType.columnName(), url.value()));
65+
return readAsIterable(builder);
66+
}
67+
68+
@Override
69+
protected CatchUpId idOf(CatchUp message) {
70+
return message.getId();
71+
}
72+
73+
@Override
74+
protected MessageColumn<CatchUp>[] columns() {
75+
return Column.values();
76+
}
77+
78+
/**
79+
* The columns of the {@code InboxMessage} kind in Datastore.
80+
*/
81+
private enum Column implements MessageColumn<CatchUp> {
82+
83+
status("catchup_status", (m) -> {
84+
return StringValue.of(m.getStatus()
85+
.toString());
86+
}),
87+
88+
whenLastRead("when_last_read", (m) -> {
89+
Timestamp timestamp = m.getWhenLastRead();
90+
return LongValue.of(toNanos(timestamp));
91+
}),
92+
93+
projectionType("projection_type", (m) -> {
94+
return StringValue.of(m.getId()
95+
.getProjectionType());
96+
});
97+
98+
/**
99+
* The column name.
100+
*/
101+
private final String name;
102+
103+
/**
104+
* Obtains the value of the column from the given message.
105+
*/
106+
private final Getter<CatchUp> getter;
107+
108+
Column(String name, Getter<CatchUp> getter) {
109+
this.name = name;
110+
this.getter = getter;
111+
}
112+
113+
@Override
114+
public String columnName() {
115+
return name;
116+
}
117+
118+
@Override
119+
public Getter<CatchUp> getter() {
120+
return getter;
121+
}
122+
}
123+
}

0 commit comments

Comments
 (0)