Skip to content

Commit f0f6a26

Browse files
committed
proto: support queue
This patch adds the `push` field to `ExecuteRequest` and introduces `aeon_queue.proto` module. Needed for tarantool/aeon#488
1 parent 51463ff commit f0f6a26

File tree

2 files changed

+123
-0
lines changed

2 files changed

+123
-0
lines changed

aeon_crud.proto

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,24 @@ service CRUDService {
4343
// Transactionally executes a set of read and write operations.
4444

4545
message ExecuteRequest {
46+
// Message queue push section in a transaction request.
47+
message Push {
48+
// Definition of messages pushed to a shard-local queue.
49+
message Message {
50+
// Topic of the message.
51+
string topic = 1;
52+
// Data of the message.
53+
Value data = 2;
54+
// Time to live of the message.
55+
double ttl = 3;
56+
}
57+
// Push function.
58+
string func = 1;
59+
// Additional argument to the push function.
60+
Value func_arg = 2;
61+
// Messages to send if the push function is not provided.
62+
repeated Message messages = 3;
63+
}
4664
// Array of read operations.
4765
repeated Operation read_set = 1;
4866
// Array of write operations.
@@ -89,6 +107,8 @@ message ExecuteRequest {
89107
// Map : space name -> tuple format.
90108
// Contains formats of all provided tuples. Optional.
91109
map<string, TupleFormat> tuple_formats = 6;
110+
// Description of messages to push when executing a transaction.
111+
Push push = 7;
92112
}
93113

94114
message ExecuteResponse {

aeon_queue.proto

Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,103 @@
1+
syntax = "proto3";
2+
3+
import "aeon_error.proto";
4+
import "aeon_value.proto";
5+
6+
package aeon;
7+
8+
// Queue API to Aeon - a distributed database based on Tarantool.
9+
service QueueService {
10+
// Takes messages from a shard-local queue.
11+
rpc TakeMessages(TakeMessagesRequest) returns (TakeMessagesResponse) {}
12+
13+
// Releases messages.
14+
rpc ReleaseMessages(ReleaseMessagesRequest) returns (ReleaseMessagesResponse) {}
15+
16+
// Returns the oldest message for all storages, or the oldest message
17+
// for each storage.
18+
rpc GetOldestMessages(GetOldestMessagesRequest) returns (GetOldestMessagesResponse) {}
19+
}
20+
21+
// The difinition of messages returned by the `take_messages()` and
22+
// `get_oldest_messages()`.
23+
message Message {
24+
// Shard name.
25+
string shard = 1;
26+
// LSN of the message.
27+
uint64 lsn = 2;
28+
// Data of the message.
29+
Value data = 3;
30+
}
31+
32+
// Message queue consumer reference.
33+
message ConsumerRef {
34+
// Shard of the message queue consumer.
35+
string shard = 1;
36+
// Topic of the message queue consumer.
37+
string topic = 2;
38+
// Name of the consumer.
39+
string consumer = 3;
40+
}
41+
42+
// Request for the `take_messages()`.
43+
message TakeMessagesRequest {
44+
// Topic of the messages.
45+
string topic = 1;
46+
// Consumer of the messages.
47+
string consumer = 2;
48+
// Max number of returned messages.
49+
uint64 limit = 3;
50+
// Time to live of the request.
51+
double ttl = 4;
52+
// Exclusivity of `take_messages()`.
53+
bool exclusive = 5;
54+
// The timeout for the request.
55+
double timeout = 6;
56+
}
57+
58+
// Response of the `take_messages()`.
59+
message TakeMessagesResponse {
60+
// Error information. Set only on failure.
61+
Error error = 1;
62+
// Returned messages.
63+
repeated Message messages = 2;
64+
// Reference to queue consumer.
65+
ConsumerRef ref = 3;
66+
// True if these messages have already been requested by this consumer,
67+
// false otherwise.
68+
bool taken_earlier = 4;
69+
}
70+
71+
// Request for the `release_messages()`.
72+
message ReleaseMessagesRequest {
73+
// Reference to queue consumer.
74+
ConsumerRef ref = 1;
75+
// If the flag is set, queue.take_messages() won't return released
76+
// messages to consumers anymore.
77+
bool done = 2;
78+
}
79+
80+
// Response of the `release_messages()`.
81+
message ReleaseMessagesResponse {
82+
// Error information. Set only on failure.
83+
Error error = 1;
84+
// True if messages were already released, false otherwise.
85+
bool released_earlier = 2;
86+
}
87+
88+
// Request for the `get_oldest_messages()`.
89+
message GetOldestMessagesRequest {
90+
// Topic of messages to get.
91+
string topic = 1;
92+
// True if the oldest messages for each shard should be returned,
93+
// false if the oldest messages for all shards should be returned.
94+
bool for_each_shard = 2;
95+
}
96+
97+
// Response of the `get_oldest_messages()`.
98+
message GetOldestMessagesResponse {
99+
// Error information. Set only on failure.
100+
Error error = 1;
101+
// Returned messages.
102+
repeated Message messages = 2;
103+
}

0 commit comments

Comments
 (0)