Skip to content

Commit 979e0d8

Browse files
authored
AdminClient.DeleteRecords implementation (#1513)
* AdminClient.DeleteRecords implementation * Changes following review
1 parent 391a3c3 commit 979e0d8

20 files changed

+536
-44
lines changed

src/Confluent.Kafka/Admin/AlterConfigsReport.cs

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -14,12 +14,6 @@
1414
//
1515
// Refer to LICENSE for more information.
1616

17-
using System;
18-
using System.Linq;
19-
using System.Threading;
20-
using System.Threading.Tasks;
21-
using System.Collections.Generic;
22-
2317

2418
namespace Confluent.Kafka.Admin
2519
{

src/Confluent.Kafka/Admin/ConfigEntry.cs

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,6 @@
1414
//
1515
// Refer to LICENSE for more information.
1616

17-
using System.Collections.Generic;
18-
1917

2018
namespace Confluent.Kafka.Admin
2119
{

src/Confluent.Kafka/Admin/ConfigResource.cs

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,6 @@
1414
//
1515
// Refer to LICENSE for more information.
1616

17-
using System;
18-
1917

2018
namespace Confluent.Kafka.Admin
2119
{

src/Confluent.Kafka/Admin/ConfigSynonym.cs

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,6 @@
1414
//
1515
// Refer to LICENSE for more information.
1616

17-
using System.Collections.Generic;
18-
1917

2018
namespace Confluent.Kafka.Admin
2119
{

src/Confluent.Kafka/Admin/CreatePartitionsReport.cs

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -14,12 +14,6 @@
1414
//
1515
// Refer to LICENSE for more information.
1616

17-
using System;
18-
using System.Linq;
19-
using System.Threading;
20-
using System.Threading.Tasks;
21-
using System.Collections.Generic;
22-
2317

2418
namespace Confluent.Kafka.Admin
2519
{

src/Confluent.Kafka/Admin/CreateTopicReport.cs

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -14,12 +14,6 @@
1414
//
1515
// Refer to LICENSE for more information.
1616

17-
using System;
18-
using System.Linq;
19-
using System.Threading;
20-
using System.Threading.Tasks;
21-
using System.Collections.Generic;
22-
2317

2418
namespace Confluent.Kafka.Admin
2519
{
Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
// Copyright 2021 Confluent Inc.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
//
15+
// Refer to LICENSE for more information.
16+
17+
using System;
18+
using System.Linq;
19+
using System.Collections.Generic;
20+
21+
22+
namespace Confluent.Kafka.Admin
23+
{
24+
/// <summary>
25+
/// Represents an error that occured during a delete records request.
26+
/// </summary>
27+
public class DeleteRecordsException : KafkaException
28+
{
29+
/// <summary>
30+
/// Initializes a new DeleteRecordsException.
31+
/// </summary>
32+
/// <param name="results">
33+
/// The result corresponding to all topic partitions in the request
34+
/// (whether or not they were in error). At least one of these
35+
/// results will be in error.
36+
/// </param>
37+
public DeleteRecordsException(List<DeleteRecordsReport> results)
38+
: base(new Error(ErrorCode.Local_Partial,
39+
"An error occurred deleting records: [" +
40+
String.Join(", ", results.Where(r => r.Error.IsError).Select(r => r.Topic)) +
41+
"]: [" + String.Join(", ", results.Where(r => r.Error.IsError).Select(r => r.Error)) +
42+
"]."))
43+
{
44+
Results = results;
45+
}
46+
47+
/// <summary>
48+
/// The result corresponding to all topics partitions in the request
49+
/// (whether or not they were in error). At least one of these
50+
/// results will be in error.
51+
/// </summary>
52+
public List<DeleteRecordsReport> Results { get; }
53+
}
54+
}
Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
// Copyright 2021 Confluent Inc.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
//
15+
// Refer to LICENSE for more information.
16+
17+
using System;
18+
19+
20+
namespace Confluent.Kafka.Admin
21+
{
22+
/// <summary>
23+
/// Options for the DeleteRecordsAsync method.
24+
/// </summary>
25+
public class DeleteRecordsOptions
26+
{
27+
/// <summary>
28+
/// The overall request timeout, including broker lookup, request
29+
/// transmission, operation time on broker, and response. If set
30+
/// to null, the default request timeout for the AdminClient will
31+
/// be used.
32+
///
33+
/// Default: null
34+
/// </summary>
35+
public TimeSpan? RequestTimeout { get; set; }
36+
37+
/// <summary>
38+
/// The broker's operation timeout - the maximum time to wait for
39+
/// DeleteRecordsAsync before returning a result to the application.
40+
/// If set to null, will return immediately upon triggering record
41+
/// deletion.
42+
///
43+
/// Default: null
44+
/// </summary>
45+
public TimeSpan? OperationTimeout { get; set; }
46+
}
47+
}
Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
// Copyright 2021 Confluent Inc.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
//
15+
// Refer to LICENSE for more information.
16+
17+
18+
namespace Confluent.Kafka.Admin
19+
{
20+
/// <summary>
21+
/// The per-partition result of delete records request
22+
/// (including error status).
23+
/// </summary>
24+
public class DeleteRecordsReport
25+
{
26+
/// <summary>
27+
/// The topic name.
28+
/// </summary>
29+
public string Topic { get; set; }
30+
31+
/// <summary>
32+
/// The partition.
33+
/// </summary>
34+
public Partition Partition { get; set; }
35+
36+
/// <summary>
37+
/// Post-deletion low-watermark (smallest available offset of all
38+
/// live replicas).
39+
/// </summary>
40+
public Offset Offset { get; set; }
41+
42+
/// <summary>
43+
/// Per-partition error status.
44+
/// </summary>
45+
public Error Error { get; set; }
46+
}
47+
}
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
// Copyright 2021 Confluent Inc.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
//
15+
// Refer to LICENSE for more information.
16+
17+
18+
namespace Confluent.Kafka.Admin
19+
{
20+
/// <summary>
21+
/// The per-partition result of delete records request.
22+
/// </summary>
23+
public class DeleteRecordsResult
24+
{
25+
/// <summary>
26+
/// The topic name.
27+
/// </summary>
28+
public string Topic { get; set; }
29+
30+
/// <summary>
31+
/// The partition.
32+
/// </summary>
33+
public Partition Partition { get; set; }
34+
35+
/// <summary>
36+
/// Post-deletion low-watermark offset (smallest available offset of all
37+
/// live replicas).
38+
/// </summary>
39+
public Offset Offset { get; set; }
40+
41+
internal Error Error { get; set; }
42+
}
43+
}

0 commit comments

Comments
 (0)