@@ -236,6 +236,10 @@ internal int Flush(int millisecondsTimeout)
236236 internal int AddBrokers ( string brokers )
237237 {
238238 ThrowIfHandleClosed ( ) ;
239+ if ( brokers == null )
240+ {
241+ throw new ArgumentNullException ( "brokers" , "Argument 'brokers' must not be null." ) ;
242+ }
239243 return ( int ) Librdkafka . brokers_add ( handle , brokers ) ;
240244 }
241245
@@ -645,6 +649,12 @@ internal void Assign(IEnumerable<TopicPartitionOffset> partitions)
645649 }
646650 foreach ( var partition in partitions )
647651 {
652+ if ( partition . Topic == null )
653+ {
654+ Librdkafka . topic_partition_list_destroy ( list ) ;
655+ throw new ArgumentException ( "Cannot assign partitions because one or more have a null topic." ) ;
656+ }
657+
648658 IntPtr ptr = Librdkafka . topic_partition_list_add ( list , partition . Topic , partition . Partition ) ;
649659 Marshal . WriteInt64 (
650660 ptr ,
@@ -1011,6 +1021,11 @@ private List<GroupInfo> ListGroupsImpl(string group, int millisecondsTimeout)
10111021 {
10121022 ThrowIfHandleClosed ( ) ;
10131023
1024+ if ( group == null )
1025+ {
1026+ throw new ArgumentNullException ( "group" , "Argument 'group' must not be null." ) ;
1027+ }
1028+
10141029 ErrorCode err = Librdkafka . list_groups ( handle , group , out IntPtr grplistPtr , ( IntPtr ) millisecondsTimeout ) ;
10151030 if ( err == ErrorCode . NoError )
10161031 {
@@ -1222,48 +1237,61 @@ internal void CreatePartitions(
12221237 setOption_completionSource ( optionsPtr , completionSourcePtr ) ;
12231238
12241239 IntPtr [ ] newPartitionsPtrs = new IntPtr [ newPartitions . Count ( ) ] ;
1225- int newPartitionsIdx = 0 ;
1226- foreach ( var newPartitionsForTopic in newPartitions )
1240+ try
12271241 {
1228- var topic = newPartitionsForTopic . Topic ;
1229- var increaseTo = newPartitionsForTopic . IncreaseTo ;
1230- var assignments = newPartitionsForTopic . ReplicaAssignments ;
1231-
1232- IntPtr ptr = Librdkafka . NewPartitions_new ( topic , ( UIntPtr ) increaseTo , errorStringBuilder , ( UIntPtr ) errorStringBuilder . Capacity ) ;
1233- if ( ptr == IntPtr . Zero )
1242+ int newPartitionsIdx = 0 ;
1243+ foreach ( var newPartitionsForTopic in newPartitions )
12341244 {
1235- throw new KafkaException ( new Error ( ErrorCode . Unknown , errorStringBuilder . ToString ( ) ) ) ;
1236- }
1245+ var topic = newPartitionsForTopic . Topic ;
1246+ var increaseTo = newPartitionsForTopic . IncreaseTo ;
1247+ var assignments = newPartitionsForTopic . ReplicaAssignments ;
12371248
1238- if ( assignments != null )
1239- {
1240- int assignmentsCount = 0 ;
1241- foreach ( var assignment in assignments )
1249+ if ( newPartitionsForTopic . Topic == null )
12421250 {
1243- errorStringBuilder = new StringBuilder ( Librdkafka . MaxErrorStringLength ) ;
1244- var brokerIds = assignments [ assignmentsCount ] . ToArray ( ) ;
1245- var errorCode = Librdkafka . NewPartitions_set_replica_assignment (
1246- ptr ,
1247- assignmentsCount ,
1248- brokerIds , ( UIntPtr ) brokerIds . Length ,
1249- errorStringBuilder , ( UIntPtr ) errorStringBuilder . Capacity ) ;
1250- if ( errorCode != ErrorCode . NoError )
1251+ throw new ArgumentException ( "Cannot add partitions to a null topic." ) ;
1252+ }
1253+
1254+ IntPtr ptr = Librdkafka . NewPartitions_new ( topic , ( UIntPtr ) increaseTo , errorStringBuilder , ( UIntPtr ) errorStringBuilder . Capacity ) ;
1255+ if ( ptr == IntPtr . Zero )
1256+ {
1257+ throw new KafkaException ( new Error ( ErrorCode . Unknown , errorStringBuilder . ToString ( ) ) ) ;
1258+ }
1259+
1260+ if ( assignments != null )
1261+ {
1262+ int assignmentsCount = 0 ;
1263+ foreach ( var assignment in assignments )
12511264 {
1252- throw new KafkaException ( CreatePossiblyFatalError ( errorCode , errorStringBuilder . ToString ( ) ) ) ;
1265+ errorStringBuilder = new StringBuilder ( Librdkafka . MaxErrorStringLength ) ;
1266+ var brokerIds = assignments [ assignmentsCount ] . ToArray ( ) ;
1267+ var errorCode = Librdkafka . NewPartitions_set_replica_assignment (
1268+ ptr ,
1269+ assignmentsCount ,
1270+ brokerIds , ( UIntPtr ) brokerIds . Length ,
1271+ errorStringBuilder , ( UIntPtr ) errorStringBuilder . Capacity ) ;
1272+ if ( errorCode != ErrorCode . NoError )
1273+ {
1274+ throw new KafkaException ( CreatePossiblyFatalError ( errorCode , errorStringBuilder . ToString ( ) ) ) ;
1275+ }
1276+ assignmentsCount += 1 ;
12531277 }
1254- assignmentsCount += 1 ;
12551278 }
1279+
1280+ newPartitionsPtrs [ newPartitionsIdx ] = ptr ;
1281+ newPartitionsIdx += 1 ;
12561282 }
12571283
1258- newPartitionsPtrs [ newPartitionsIdx ] = ptr ;
1259- newPartitionsIdx += 1 ;
1284+ Librdkafka . CreatePartitions ( handle , newPartitionsPtrs , ( UIntPtr ) newPartitionsPtrs . Length , optionsPtr , resultQueuePtr ) ;
12601285 }
1261-
1262- Librdkafka . CreatePartitions ( handle , newPartitionsPtrs , ( UIntPtr ) newPartitionsPtrs . Length , optionsPtr , resultQueuePtr ) ;
1263-
1264- foreach ( var newPartitionPtr in newPartitionsPtrs )
1286+ finally
12651287 {
1266- Librdkafka . NewPartitions_destroy ( newPartitionPtr ) ;
1288+ foreach ( var newPartitionPtr in newPartitionsPtrs )
1289+ {
1290+ if ( newPartitionPtr != IntPtr . Zero )
1291+ {
1292+ Librdkafka . NewPartitions_destroy ( newPartitionPtr ) ;
1293+ }
1294+ }
12671295 }
12681296
12691297 Librdkafka . AdminOptions_destroy ( optionsPtr ) ;
@@ -1284,19 +1312,32 @@ internal void DeleteTopics(
12841312 setOption_completionSource ( optionsPtr , completionSourcePtr ) ;
12851313
12861314 IntPtr [ ] deleteTopicsPtrs = new IntPtr [ deleteTopics . Count ( ) ] ;
1287- int idx = 0 ;
1288- foreach ( var deleteTopic in deleteTopics )
1315+ try
12891316 {
1290- var deleteTopicPtr = Librdkafka . DeleteTopic_new ( deleteTopic ) ;
1291- deleteTopicsPtrs [ idx ] = deleteTopicPtr ;
1292- idx += 1 ;
1293- }
1317+ int idx = 0 ;
1318+ foreach ( var deleteTopic in deleteTopics )
1319+ {
1320+ if ( deleteTopic == null )
1321+ {
1322+ throw new ArgumentException ( "Cannot delete topics because one or more topics were specified as null." ) ;
1323+ }
12941324
1295- Librdkafka . DeleteTopics ( handle , deleteTopicsPtrs , ( UIntPtr ) deleteTopicsPtrs . Length , optionsPtr , resultQueuePtr ) ;
1325+ var deleteTopicPtr = Librdkafka . DeleteTopic_new ( deleteTopic ) ;
1326+ deleteTopicsPtrs [ idx ] = deleteTopicPtr ;
1327+ idx += 1 ;
1328+ }
12961329
1297- foreach ( var deleteTopicPtr in deleteTopicsPtrs )
1330+ Librdkafka . DeleteTopics ( handle , deleteTopicsPtrs , ( UIntPtr ) deleteTopicsPtrs . Length , optionsPtr , resultQueuePtr ) ;
1331+ }
1332+ finally
12981333 {
1299- Librdkafka . DeleteTopic_destroy ( deleteTopicPtr ) ;
1334+ foreach ( var deleteTopicPtr in deleteTopicsPtrs )
1335+ {
1336+ if ( deleteTopicPtr != IntPtr . Zero )
1337+ {
1338+ Librdkafka . DeleteTopic_destroy ( deleteTopicPtr ) ;
1339+ }
1340+ }
13001341 }
13011342
13021343 Librdkafka . AdminOptions_destroy ( optionsPtr ) ;
@@ -1320,59 +1361,72 @@ internal void CreateTopics(
13201361 setOption_completionSource ( optionsPtr , completionSourcePtr ) ;
13211362
13221363 IntPtr [ ] newTopicPtrs = new IntPtr [ newTopics . Count ( ) ] ;
1323- int idx = 0 ;
1324- foreach ( var newTopic in newTopics )
1364+ try
13251365 {
1326- if ( newTopic . ReplicationFactor != - 1 && newTopic . ReplicasAssignments != null )
1366+ int idx = 0 ;
1367+ foreach ( var newTopic in newTopics )
13271368 {
1328- throw new ArgumentException ( "ReplicationFactor must be -1 when ReplicasAssignments are specified." ) ;
1329- }
1369+ if ( newTopic . ReplicationFactor != - 1 && newTopic . ReplicasAssignments != null )
1370+ {
1371+ throw new ArgumentException ( "ReplicationFactor must be -1 when ReplicasAssignments are specified." ) ;
1372+ }
13301373
1331- IntPtr newTopicPtr = Librdkafka . NewTopic_new (
1332- newTopic . Name ,
1333- ( IntPtr ) newTopic . NumPartitions ,
1334- ( IntPtr ) newTopic . ReplicationFactor ,
1335- errorStringBuilder ,
1336- ( UIntPtr ) errorStringBuilder . Capacity ) ;
1337- if ( newTopicPtr == IntPtr . Zero )
1338- {
1339- throw new KafkaException ( new Error ( ErrorCode . Unknown , errorStringBuilder . ToString ( ) ) ) ;
1340- }
1374+ if ( newTopic . Name == null )
1375+ {
1376+ throw new ArgumentException ( "Cannot create a topic with a name of null." ) ;
1377+ }
13411378
1342- if ( newTopic . ReplicasAssignments != null )
1343- {
1344- foreach ( var replicAssignment in newTopic . ReplicasAssignments )
1379+ IntPtr newTopicPtr = Librdkafka . NewTopic_new (
1380+ newTopic . Name ,
1381+ ( IntPtr ) newTopic . NumPartitions ,
1382+ ( IntPtr ) newTopic . ReplicationFactor ,
1383+ errorStringBuilder ,
1384+ ( UIntPtr ) errorStringBuilder . Capacity ) ;
1385+ if ( newTopicPtr == IntPtr . Zero )
1386+ {
1387+ throw new KafkaException ( new Error ( ErrorCode . Unknown , errorStringBuilder . ToString ( ) ) ) ;
1388+ }
1389+
1390+ if ( newTopic . ReplicasAssignments != null )
13451391 {
1346- var partition = replicAssignment . Key ;
1347- var brokerIds = replicAssignment . Value . ToArray ( ) ;
1348- var errorCode = Librdkafka . NewTopic_set_replica_assignment (
1349- newTopicPtr ,
1350- partition , brokerIds , ( UIntPtr ) brokerIds . Length ,
1351- errorStringBuilder , ( UIntPtr ) errorStringBuilder . Capacity ) ;
1352- if ( errorCode != ErrorCode . NoError )
1392+ foreach ( var replicAssignment in newTopic . ReplicasAssignments )
13531393 {
1354- throw new KafkaException ( CreatePossiblyFatalError ( errorCode , errorStringBuilder . ToString ( ) ) ) ;
1394+ var partition = replicAssignment . Key ;
1395+ var brokerIds = replicAssignment . Value . ToArray ( ) ;
1396+ var errorCode = Librdkafka . NewTopic_set_replica_assignment (
1397+ newTopicPtr ,
1398+ partition , brokerIds , ( UIntPtr ) brokerIds . Length ,
1399+ errorStringBuilder , ( UIntPtr ) errorStringBuilder . Capacity ) ;
1400+ if ( errorCode != ErrorCode . NoError )
1401+ {
1402+ throw new KafkaException ( CreatePossiblyFatalError ( errorCode , errorStringBuilder . ToString ( ) ) ) ;
1403+ }
13551404 }
13561405 }
1357- }
13581406
1359- if ( newTopic . Configs != null )
1360- {
1361- foreach ( var config in newTopic . Configs )
1407+ if ( newTopic . Configs != null )
13621408 {
1363- Librdkafka . NewTopic_set_config ( newTopicPtr , config . Key , config . Value ) ;
1409+ foreach ( var config in newTopic . Configs )
1410+ {
1411+ Librdkafka . NewTopic_set_config ( newTopicPtr , config . Key , config . Value ) ;
1412+ }
13641413 }
1414+
1415+ newTopicPtrs [ idx ] = newTopicPtr ;
1416+ idx += 1 ;
13651417 }
13661418
1367- newTopicPtrs [ idx ] = newTopicPtr ;
1368- idx += 1 ;
1419+ Librdkafka . CreateTopics ( handle , newTopicPtrs , ( UIntPtr ) newTopicPtrs . Length , optionsPtr , resultQueuePtr ) ;
13691420 }
1370-
1371- Librdkafka . CreateTopics ( handle , newTopicPtrs , ( UIntPtr ) newTopicPtrs . Length , optionsPtr , resultQueuePtr ) ;
1372-
1373- foreach ( var newTopicPtr in newTopicPtrs )
1421+ finally
13741422 {
1375- Librdkafka . NewTopic_destroy ( newTopicPtr ) ;
1423+ foreach ( var newTopicPtr in newTopicPtrs )
1424+ {
1425+ if ( newTopicPtr != IntPtr . Zero )
1426+ {
1427+ Librdkafka . NewTopic_destroy ( newTopicPtr ) ;
1428+ }
1429+ }
13761430 }
13771431
13781432 Librdkafka . AdminOptions_destroy ( optionsPtr ) ;
0 commit comments