Skip to content

Commit eecb3a6

Browse files
[ISSUE #891] Implement message recalling API in C# SDK
1 parent 214761b commit eecb3a6

File tree

11 files changed

+176
-3
lines changed

11 files changed

+176
-3
lines changed

csharp/rocketmq-client-csharp/ClientManager.cs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -120,6 +120,15 @@ public async Task Shutdown()
120120
request, response, metadata);
121121
}
122122

123+
public async Task<RpcInvocation<Proto.RecallMessageRequest, Proto.RecallMessageResponse>>
124+
RecallMessage(Endpoints endpoints, Proto.RecallMessageRequest request, TimeSpan timeout)
125+
{
126+
var metadata = _client.Sign();
127+
var response = await GetRpcClient(endpoints).RecallMessage(metadata, request, timeout);
128+
return new RpcInvocation<Proto.RecallMessageRequest, Proto.RecallMessageResponse>(
129+
request, response, metadata);
130+
}
131+
123132
public async Task<RpcInvocation<Proto.SendMessageRequest, Proto.SendMessageResponse>> SendMessage(
124133
Endpoints endpoints, Proto::SendMessageRequest request, TimeSpan timeout)
125134
{

csharp/rocketmq-client-csharp/IClientManager.cs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,16 @@ Task<RpcInvocation<HeartbeatRequest, HeartbeatResponse>> Heartbeat(Endpoints end
6262
Task<RpcInvocation<NotifyClientTerminationRequest, NotifyClientTerminationResponse>> NotifyClientTermination(
6363
Endpoints endpoints, NotifyClientTerminationRequest request, TimeSpan timeout);
6464

65+
/// <summary>
66+
/// Recall messages.
67+
/// </summary>
68+
/// <param name="endpoints">The target endpoints.</param>
69+
/// <param name="request">gRPC request of recalling messages.</param>
70+
/// <param name="timeout">Request max duration.</param>
71+
/// <returns>Task of response.</returns>
72+
Task<RpcInvocation<RecallMessageRequest, RecallMessageResponse>> RecallMessage(
73+
Endpoints endpoints, RecallMessageRequest request, TimeSpan timeout);
74+
6575
/// <summary>
6676
/// Send message to remote endpoints.
6777
/// </summary>
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
namespace Org.Apache.Rocketmq
19+
{
20+
public interface IRecallReceipt
21+
{
22+
string MessageId { get; }
23+
}
24+
}

csharp/rocketmq-client-csharp/IRpcClient.cs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,8 @@ Task<ForwardMessageToDeadLetterQueueResponse> ForwardMessageToDeadLetterQueue(Me
5252
Task<NotifyClientTerminationResponse> NotifyClientTermination(Metadata metadata,
5353
NotifyClientTerminationRequest request, TimeSpan timeout);
5454

55+
Task<RecallMessageResponse> RecallMessage(Metadata metadata, RecallMessageRequest request, TimeSpan timeout);
56+
5557
Task Shutdown();
5658
}
5759
}

csharp/rocketmq-client-csharp/ISendReceipt.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,5 +20,6 @@ namespace Org.Apache.Rocketmq
2020
public interface ISendReceipt
2121
{
2222
string MessageId { get; }
23+
string RecallHandle { get; }
2324
}
2425
}

csharp/rocketmq-client-csharp/Producer.cs

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -342,6 +342,33 @@ internal async Task EndTransaction(Endpoints endpoints, string topic, string mes
342342
StatusChecker.Check(invocation.Response.Status, request, invocation.RequestId);
343343
}
344344

345+
public async Task<IRecallReceipt> RecallMessage(string topic, string recallhandle)
346+
{
347+
var recallReceipt = await RecallMessage0(topic, recallhandle);
348+
return recallReceipt;
349+
}
350+
351+
private async Task<RecallReceipt> RecallMessage0(string topic, string recallhandle)
352+
{
353+
if (State.Running != State)
354+
{
355+
throw new InvalidOperationException("Producer is not running");
356+
}
357+
if (recallhandle == null)
358+
{
359+
throw new InvalidOperationException("Recall handle is invalid");
360+
}
361+
var request = new Proto.RecallMessageRequest
362+
{
363+
Topic = new Proto.Resource { ResourceNamespace = ClientConfig.Namespace, Name = topic },
364+
RecallHandle = recallhandle
365+
};
366+
var invocation =
367+
await ClientManager.RecallMessage(new Endpoints(ClientConfig.Endpoints), request, ClientConfig.RequestTimeout);
368+
StatusChecker.Check(invocation.Response.Status, request, invocation.RequestId);
369+
return new RecallReceipt(invocation.Response.MessageId);
370+
}
371+
345372
public class Builder
346373
{
347374
private ClientConfig _clientConfig;
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
namespace Org.Apache.Rocketmq
19+
{
20+
public sealed class RecallReceipt : IRecallReceipt
21+
{
22+
public RecallReceipt(string messageId)
23+
{
24+
MessageId = messageId;
25+
}
26+
27+
public string MessageId { get; }
28+
29+
public override string ToString()
30+
{
31+
return $"{nameof(MessageId)}: {MessageId}";
32+
}
33+
}
34+
}

csharp/rocketmq-client-csharp/RpcClient.cs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -189,5 +189,14 @@ internal static HttpMessageHandler CreateHttpHandler()
189189
var call = _stub.NotifyClientTerminationAsync(request, callOptions);
190190
return await call.ResponseAsync;
191191
}
192+
193+
public async Task<Proto::RecallMessageResponse> RecallMessage(Metadata metadata, Proto.RecallMessageRequest request, TimeSpan timeout)
194+
{
195+
var deadline = DateTime.UtcNow.Add(timeout);
196+
var callOptions = new CallOptions(metadata, deadline);
197+
198+
var call = _stub.RecallMessageAsync(request, callOptions);
199+
return await call.ResponseAsync;
200+
}
192201
}
193202
}

csharp/rocketmq-client-csharp/SendReceipt.cs

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,15 +23,21 @@ namespace Org.Apache.Rocketmq
2323
{
2424
public sealed class SendReceipt : ISendReceipt
2525
{
26-
private SendReceipt(string messageId, string transactionId, MessageQueue messageQueue)
26+
private SendReceipt(string messageId, string transactionId, MessageQueue messageQueue, long offset, string recallHandle)
2727
{
2828
MessageId = messageId;
2929
TransactionId = transactionId;
3030
MessageQueue = messageQueue;
31+
Offset = offset;
32+
RecallHandle = recallHandle;
3133
}
3234

3335
public string MessageId { get; }
3436

37+
public string RecallHandle { get; }
38+
39+
public long Offset { get; }
40+
3541
public string TransactionId { get; }
3642

3743
private MessageQueue MessageQueue { get; }
@@ -40,7 +46,7 @@ private SendReceipt(string messageId, string transactionId, MessageQueue message
4046

4147
public override string ToString()
4248
{
43-
return $"{nameof(MessageId)}: {MessageId}";
49+
return $"{nameof(MessageId)}: {MessageId}, {nameof(RecallHandle)}: {RecallHandle}";
4450
}
4551

4652
public static IEnumerable<SendReceipt> ProcessSendMessageResponse(MessageQueue mq,
@@ -58,7 +64,7 @@ public static IEnumerable<SendReceipt> ProcessSendMessageResponse(MessageQueue m
5864

5965
// May throw exception.
6066
StatusChecker.Check(status, invocation.Request, invocation.RequestId);
61-
return invocation.Response.Entries.Select(entry => new SendReceipt(entry.MessageId, entry.TransactionId, mq)).ToList();
67+
return invocation.Response.Entries.Select(entry => new SendReceipt(entry.MessageId, entry.TransactionId, mq, entry.Offset, entry.RecallHandle)).ToList();
6268
}
6369
}
6470
}

csharp/tests/ClientManagerTest.cs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -121,6 +121,15 @@ public void TestNotifyClientTermination()
121121
// Expect no exception thrown.
122122
}
123123

124+
[TestMethod]
125+
public void TestRecallMessage()
126+
{
127+
var request = new RecallMessageRequest();
128+
_clientManager.RecallMessage(FakeEndpoints, request, TimeSpan.FromSeconds(1));
129+
_clientManager.RecallMessage(null, request, TimeSpan.FromSeconds(1));
130+
// Expect no exception thrown.
131+
}
132+
124133
private Client CreateTestClient()
125134
{
126135
return new Producer(_clientConfig, new ConcurrentDictionary<string, bool>(), 1, null);

0 commit comments

Comments
 (0)