Skip to content

Commit 21875fc

Browse files
committed
proto: support queue
This patch adds the `push` field to `ExecuteRequest` and introduces `aeon_queue.proto` module. Needed for tarantool/aeon#488
1 parent 2d0e279 commit 21875fc

File tree

2 files changed

+120
-0
lines changed

2 files changed

+120
-0
lines changed

aeon_crud.proto

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,24 @@ service CRUDService {
4343
// Transactionally executes a set of read and write operations.
4444

4545
message ExecuteRequest {
46+
// Message queue push section in a transaction request.
47+
message Push {
48+
// Definition of messages pushed to a shard-local queue.
49+
message Message {
50+
// Topic of the message.
51+
string topic = 1;
52+
// Data of the message.
53+
Value data = 2;
54+
// Time to live of the message.
55+
double ttl = 3;
56+
}
57+
// Push function.
58+
string func = 1;
59+
// Additional argument to the push function.
60+
Value func_arg = 2;
61+
// Messages to send if the push function is not provided.
62+
repeated Message messages = 3;
63+
}
4664
// Array of read operations.
4765
repeated Operation read_set = 1;
4866
// Array of write operations.
@@ -89,6 +107,8 @@ message ExecuteRequest {
89107
// Map : space name -> tuple format.
90108
// Contains formats of all provided tuples. Optional.
91109
map<string, TupleFormat> tuple_formats = 6;
110+
// Description of messages to push when executing a transaction.
111+
Push push = 7;
92112
}
93113

94114
message ExecuteResponse {
@@ -103,6 +123,9 @@ message ExecuteResponse {
103123
// Map : space name -> tuple format.
104124
// Contains formats of all returned tuples.
105125
map<string, TupleFormat> tuple_formats = 5;
126+
// PushErr is the error returned by the push function, or nil
127+
// if no error occurred.
128+
Error push_err = 6;
106129
}
107130

108131
// Transactionally inserts tuples into a space.

aeon_queue.proto

Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,97 @@
1+
syntax = "proto3";
2+
3+
import "aeon_error.proto";
4+
import "aeon_value.proto";
5+
6+
package aeon;
7+
8+
// Queue API to Aeon - a distributed database based on Tarantool.
9+
service QueueService {
10+
// Takes messages from a shard-local queue.
11+
rpc TakeMessages(TakeMessagesRequest) returns (TakeMessagesResponse) {}
12+
13+
// Releases messages.
14+
rpc ReleaseMessages(ReleaseMessagesRequest)
15+
returns (ReleaseMessagesResponse) {}
16+
17+
// Returns the oldest message for all storages, or the oldest message
18+
// for each storage.
19+
rpc GetOldestMessages(GetOldestMessagesRequest)
20+
returns (GetOldestMessagesResponse) {}
21+
}
22+
23+
// Description of returned messages.
24+
message Message {
25+
// Shard name.
26+
string shard = 1;
27+
// The serial number of the message on the shard.
28+
uint64 lsn = 2;
29+
// Data of the message.
30+
Value data = 3;
31+
}
32+
33+
// Consumer description.
34+
message ConsumerRef {
35+
// Consumer shard.
36+
string shard = 1;
37+
// Consumer topic.
38+
string topic = 2;
39+
// Consumer name.
40+
string consumer = 3;
41+
}
42+
43+
message TakeMessagesRequest {
44+
// Topic name.
45+
string topic = 1;
46+
// Unique consumer name.
47+
string consumer = 2;
48+
// Max number of returned messages.
49+
uint64 limit = 3;
50+
// Time for the consumer to process the messages.
51+
double ttl = 4;
52+
// Exclusive mode flag.
53+
bool exclusive = 5;
54+
// Time to wait for messages.
55+
double timeout = 6;
56+
}
57+
58+
message TakeMessagesResponse {
59+
// Error information. Set only on failure.
60+
Error error = 1;
61+
// Returned messages.
62+
repeated Message messages = 2;
63+
// Consumer reference used to release messages.
64+
ConsumerRef ref = 3;
65+
// True if these messages have already been taken by the same consumer,
66+
// false otherwise.
67+
bool taken_earlier = 4;
68+
}
69+
70+
message ReleaseMessagesRequest {
71+
// Consumer reference.
72+
ConsumerRef ref = 1;
73+
// If true, released messages will no longer be returned to consumers.
74+
bool done = 2;
75+
}
76+
77+
message ReleaseMessagesResponse {
78+
// Error information. Set only on failure.
79+
Error error = 1;
80+
// True if messages were already released, false otherwise.
81+
bool released_earlier = 2;
82+
}
83+
84+
message GetOldestMessagesRequest {
85+
// Topic name.
86+
string topic = 1;
87+
// True if the oldest messages for each shard should be returned,
88+
// false if the oldest messages for all shards should be returned.
89+
bool for_each_shard = 2;
90+
}
91+
92+
message GetOldestMessagesResponse {
93+
// Error information. Set only on failure.
94+
Error error = 1;
95+
// Returned messages.
96+
repeated Message messages = 2;
97+
}

0 commit comments

Comments
 (0)