Skip to content

Commit b4137d8

Browse files
committed
proto: support queue
This patch adds the `push` field to `ExecuteRequest` and introduces `aeon_queue.proto` module. Needed for tarantool/aeon#488
1 parent 51463ff commit b4137d8

File tree

2 files changed

+125
-0
lines changed

2 files changed

+125
-0
lines changed

aeon_crud.proto

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,24 @@ service CRUDService {
4343
// Transactionally executes a set of read and write operations.
4444

4545
message ExecuteRequest {
46+
// Message queue push section in a transaction request.
47+
message Push {
48+
// Definition of messages pushed to a shard-local queue.
49+
message Message {
50+
// Topic of the message.
51+
string topic = 1;
52+
// Data of the message.
53+
Value data = 2;
54+
// Time to live of the message.
55+
double ttl = 3;
56+
}
57+
// Push function.
58+
string func = 1;
59+
// Additional argument to the push function.
60+
Value func_arg = 2;
61+
// Messages to send if the push function is not provided.
62+
repeated Message messages = 3;
63+
}
4664
// Array of read operations.
4765
repeated Operation read_set = 1;
4866
// Array of write operations.
@@ -89,6 +107,8 @@ message ExecuteRequest {
89107
// Map : space name -> tuple format.
90108
// Contains formats of all provided tuples. Optional.
91109
map<string, TupleFormat> tuple_formats = 6;
110+
// Description of messages to push when executing a transaction.
111+
Push push = 7;
92112
}
93113

94114
message ExecuteResponse {

aeon_queue.proto

Lines changed: 105 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,105 @@
1+
syntax = "proto3";
2+
3+
import "aeon_error.proto";
4+
import "aeon_value.proto";
5+
6+
package aeon;
7+
8+
// Queue API to Aeon - a distributed database based on Tarantool.
9+
service QueueService {
10+
// Takes messages from a shard-local queue.
11+
rpc TakeMessages(TakeMessagesRequest) returns (TakeMessagesResponse) {}
12+
13+
// Releases messages.
14+
rpc ReleaseMessages(ReleaseMessagesRequest)
15+
returns (ReleaseMessagesResponse) {}
16+
17+
// Returns the oldest message for all storages, or the oldest message
18+
// for each storage.
19+
rpc GetOldestMessages(GetOldestMessagesRequest)
20+
returns (GetOldestMessagesResponse) {}
21+
}
22+
23+
// The difinition of messages returned by the `take_messages()` and
24+
// `get_oldest_messages()`.
25+
message Message {
26+
// Shard name.
27+
string shard = 1;
28+
// LSN of the message.
29+
uint64 lsn = 2;
30+
// Data of the message.
31+
Value data = 3;
32+
}
33+
34+
// Message queue consumer reference.
35+
message ConsumerRef {
36+
// Shard of the message queue consumer.
37+
string shard = 1;
38+
// Topic of the message queue consumer.
39+
string topic = 2;
40+
// Name of the consumer.
41+
string consumer = 3;
42+
}
43+
44+
// Request for the `take_messages()`.
45+
message TakeMessagesRequest {
46+
// Topic of the messages.
47+
string topic = 1;
48+
// Consumer of the messages.
49+
string consumer = 2;
50+
// Max number of returned messages.
51+
uint64 limit = 3;
52+
// Time to live of the request.
53+
double ttl = 4;
54+
// Exclusivity of `take_messages()`.
55+
bool exclusive = 5;
56+
// The timeout for the request.
57+
double timeout = 6;
58+
}
59+
60+
// Response of the `take_messages()`.
61+
message TakeMessagesResponse {
62+
// Error information. Set only on failure.
63+
Error error = 1;
64+
// Returned messages.
65+
repeated Message messages = 2;
66+
// Reference to queue consumer.
67+
ConsumerRef ref = 3;
68+
// True if these messages have already been requested by this consumer,
69+
// false otherwise.
70+
bool taken_earlier = 4;
71+
}
72+
73+
// Request for the `release_messages()`.
74+
message ReleaseMessagesRequest {
75+
// Reference to queue consumer.
76+
ConsumerRef ref = 1;
77+
// If the flag is set, queue.take_messages() won't return released
78+
// messages to consumers anymore.
79+
bool done = 2;
80+
}
81+
82+
// Response of the `release_messages()`.
83+
message ReleaseMessagesResponse {
84+
// Error information. Set only on failure.
85+
Error error = 1;
86+
// True if messages were already released, false otherwise.
87+
bool released_earlier = 2;
88+
}
89+
90+
// Request for the `get_oldest_messages()`.
91+
message GetOldestMessagesRequest {
92+
// Topic of messages to get.
93+
string topic = 1;
94+
// True if the oldest messages for each shard should be returned,
95+
// false if the oldest messages for all shards should be returned.
96+
bool for_each_shard = 2;
97+
}
98+
99+
// Response of the `get_oldest_messages()`.
100+
message GetOldestMessagesResponse {
101+
// Error information. Set only on failure.
102+
Error error = 1;
103+
// Returned messages.
104+
repeated Message messages = 2;
105+
}

0 commit comments

Comments
 (0)