Skip to content

Commit 9a86358

Browse files
committed
Support newly introduced WorkerId instead of plain NodeId in ShardSessionRecords.
1 parent 18a242e commit 9a86358

File tree

3 files changed

+45
-25
lines changed

3 files changed

+45
-25
lines changed

datastore/src/main/java/io/spine/server/storage/datastore/DsSessionStorage.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
import com.google.cloud.datastore.TimestampValue;
3535
import io.spine.server.delivery.ShardIndex;
3636
import io.spine.server.delivery.ShardSessionRecord;
37+
import io.spine.server.delivery.WorkerId;
3738
import org.checkerframework.checker.nullness.qual.Nullable;
3839

3940
import java.util.Iterator;
@@ -165,9 +166,11 @@ private enum Column implements MessageColumn<ShardSessionRecord> {
165166
.getOfTotal());
166167
}),
167168

168-
node((m) -> {
169-
return StringValue.of(m.getPickedBy()
170-
.getValue());
169+
worker((m) -> {
170+
WorkerId worker = m.getWorker();
171+
String value = worker.getNodeId().getValue() + '-' + worker.getValue();
172+
return StringValue.of(value);
173+
171174
}),
172175

173176
when_last_picked((m) -> {

datastore/src/main/java/io/spine/server/storage/datastore/DsShardedWorkRegistry.java

Lines changed: 24 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import io.spine.server.delivery.ShardIndex;
3434
import io.spine.server.delivery.ShardProcessingSession;
3535
import io.spine.server.delivery.ShardSessionRecord;
36+
import io.spine.server.delivery.WorkerId;
3637
import org.checkerframework.checker.nullness.qual.Nullable;
3738

3839
import java.util.Iterator;
@@ -79,11 +80,26 @@ public DsShardedWorkRegistry(DatastoreStorageFactory factory) {
7980
public synchronized Optional<ShardProcessingSession> pickUp(ShardIndex index, NodeId nodeId) {
8081
checkNotNull(index);
8182
checkNotNull(nodeId);
83+
84+
WorkerId worker = currentWorkerFor(nodeId);
8285
Optional<ShardSessionRecord> result =
83-
storage.updateTransactionally(index, new UpdateNodeIfAbsent(index, nodeId));
86+
storage.updateTransactionally(index, new UpdateWorkerIfAbsent(index, worker));
8487
return result.map(this::asSession);
8588
}
8689

90+
/**
91+
* Creates a worker ID by combining the given node ID with the ID of the current Java thread,
92+
* in which the execution in performed.
93+
*/
94+
@Override
95+
protected WorkerId currentWorkerFor(NodeId id) {
96+
long threadId = Thread.currentThread().getId();
97+
return WorkerId.newBuilder()
98+
.setNodeId(id)
99+
.setValue(Long.toString(threadId))
100+
.vBuild();
101+
}
102+
87103
@Override
88104
public synchronized Iterable<ShardIndex> releaseExpiredSessions(Duration inactivityPeriod) {
89105
return super.releaseExpiredSessions(inactivityPeriod);
@@ -123,24 +139,24 @@ protected DsSessionStorage storage() {
123139
}
124140

125141
/**
126-
* Updates the {@code nodeId} for the {@link ShardSessionRecord} with the specified
142+
* Updates the {@code workerId} for the {@link ShardSessionRecord} with the specified
127143
* {@link ShardIndex} if the record has not been picked by anyone.
128144
*
129145
* <p>If there is no such a record, creates a new record.
130146
*/
131-
private static class UpdateNodeIfAbsent implements DsSessionStorage.RecordUpdate {
147+
private static class UpdateWorkerIfAbsent implements DsSessionStorage.RecordUpdate {
132148

133149
private final ShardIndex index;
134-
private final NodeId nodeToSet;
150+
private final WorkerId workerToSet;
135151

136-
private UpdateNodeIfAbsent(ShardIndex index, NodeId set) {
152+
private UpdateWorkerIfAbsent(ShardIndex index, WorkerId worker) {
137153
this.index = index;
138-
nodeToSet = set;
154+
workerToSet = worker;
139155
}
140156

141157
@Override
142158
public Optional<ShardSessionRecord> createOrUpdate(@Nullable ShardSessionRecord previous) {
143-
if (previous != null && previous.hasPickedBy()) {
159+
if (previous != null && previous.hasWorker()) {
144160
return Optional.empty();
145161
}
146162
ShardSessionRecord.Builder builder =
@@ -150,7 +166,7 @@ public Optional<ShardSessionRecord> createOrUpdate(@Nullable ShardSessionRecord
150166
: previous.toBuilder();
151167

152168
ShardSessionRecord updated =
153-
builder.setPickedBy(nodeToSet)
169+
builder.setWorker(workerToSet)
154170
.setWhenLastPicked(currentTime())
155171
.vBuild();
156172
return Optional.of(updated);

datastore/src/test/java/io/spine/server/storage/datastore/DsShardedWorkRegistryTest.java

Lines changed: 15 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
import io.spine.server.delivery.ShardSessionRecord;
3737
import io.spine.server.delivery.ShardedWorkRegistry;
3838
import io.spine.server.delivery.ShardedWorkRegistryTest;
39+
import io.spine.server.delivery.WorkerId;
3940
import io.spine.testing.server.storage.datastore.TestDatastoreStorageFactory;
4041
import org.junit.jupiter.api.AfterEach;
4142
import org.junit.jupiter.api.BeforeEach;
@@ -47,8 +48,6 @@
4748
import static com.google.common.truth.Truth.assertThat;
4849
import static com.google.common.truth.Truth8.assertThat;
4950
import static io.spine.server.storage.datastore.given.TestShardIndex.newIndex;
50-
import static org.junit.Assert.assertFalse;
51-
import static org.junit.Assert.assertTrue;
5251

5352
@DisplayName("`DsShardedWorkRegistry` should")
5453
class DsShardedWorkRegistryTest extends ShardedWorkRegistryTest {
@@ -82,60 +81,62 @@ protected ShardedWorkRegistry registry() {
8281
@DisplayName("pick up the shard and write a corresponding record to the storage")
8382
void pickUp() {
8483
Optional<ShardProcessingSession> session = registry.pickUp(index, nodeId);
85-
assertTrue(session.isPresent());
84+
WorkerId expectedWorker = registry.currentWorkerFor(nodeId);
85+
assertThat(session).isPresent();
8686
assertThat(session.get()
8787
.shardIndex()).isEqualTo(index);
8888

8989
ShardSessionRecord record = readSingleRecord(index);
9090
assertThat(record.getIndex()).isEqualTo(index);
91-
assertThat(record.getPickedBy()).isEqualTo(nodeId);
91+
assertThat(record.getWorker()).isEqualTo(expectedWorker);
9292
}
9393

9494
@Test
9595
@DisplayName("not be able to pick up the shard if it's already picked up")
9696
void cannotPickUpIfTaken() {
9797

9898
Optional<ShardProcessingSession> session = registry.pickUp(index, nodeId);
99-
assertTrue(session.isPresent());
99+
assertThat(session).isPresent();
100100

101101
Optional<ShardProcessingSession> sameIdxSameNode = registry.pickUp(index, nodeId);
102-
assertFalse(sameIdxSameNode.isPresent());
102+
assertThat(sameIdxSameNode).isEmpty();
103103

104104
Optional<ShardProcessingSession> sameIdxAnotherNode = registry.pickUp(index, newNode());
105-
assertFalse(sameIdxAnotherNode.isPresent());
105+
assertThat(sameIdxAnotherNode).isEmpty();
106106

107107
ShardIndex anotherIdx = newIndex(24, 100);
108108
Optional<ShardProcessingSession> anotherIdxSameNode = registry.pickUp(anotherIdx, nodeId);
109-
assertTrue(anotherIdxSameNode.isPresent());
109+
assertThat(anotherIdxSameNode).isPresent();
110110

111111
Optional<ShardProcessingSession> anotherIdxAnotherNode =
112112
registry.pickUp(anotherIdx, newNode());
113-
assertFalse(anotherIdxAnotherNode.isPresent());
113+
assertThat(anotherIdxAnotherNode).isEmpty();
114114
}
115115

116116
@Test
117117
@DisplayName("complete the shard session (once picked up) and make it available for picking up")
118118
void completeSessionAndMakeItAvailable() {
119119
Optional<ShardProcessingSession> optional = registry.pickUp(index, nodeId);
120-
assertTrue(optional.isPresent());
120+
assertThat(optional).isPresent();
121121

122122
Timestamp whenPickedFirst = readSingleRecord(index).getWhenLastPicked();
123123

124124
DsShardProcessingSession session = (DsShardProcessingSession) optional.get();
125125
session.complete();
126126

127127
ShardSessionRecord completedRecord = readSingleRecord(index);
128-
assertFalse(completedRecord.hasPickedBy());
128+
assertThat(completedRecord.hasWorker()).isFalse();
129129

130130
NodeId anotherNode = newNode();
131+
WorkerId anotherWorker = registry.currentWorkerFor(anotherNode);
131132
Optional<ShardProcessingSession> anotherOptional = registry.pickUp(index, anotherNode);
132-
assertTrue(anotherOptional.isPresent());
133+
assertThat(anotherOptional).isPresent();
133134

134135
ShardSessionRecord secondSessionRecord = readSingleRecord(index);
135-
assertThat(secondSessionRecord.getPickedBy()).isEqualTo(anotherNode);
136+
assertThat(secondSessionRecord.getWorker()).isEqualTo(anotherWorker);
136137

137138
Timestamp whenPickedSecond = secondSessionRecord.getWhenLastPicked();
138-
assertTrue(Timestamps.compare(whenPickedFirst, whenPickedSecond) < 0);
139+
assertThat(Timestamps.compare(whenPickedFirst, whenPickedSecond) < 0).isTrue();
139140
}
140141

141142
@Test

0 commit comments

Comments
 (0)