Skip to content

how could i use eventhub correctly? #6857

@dongyinq94

Description

@dongyinq94

Query/Question
When I use multi thread to create PartitionClient, I found that there is a chance of an error.
Could not open Claims Based Security object.
I'm certain my settings are correct because the minimum use case with a single thread works correctly.
This code works when I use the simulator.

My question is, is there something wrong with how I'm using it? Some answers suggest using a processor, but after looking at the code, it seems to have the same problem. Furthermore, I haven't found a working sample.

The example code is as follows, and the log is as follows.

#include <azure/messaging/eventhubs.hpp>
#include <azure/core/context.hpp>
#include <iostream>
#include <string>
#include <vector>
#include <thread>
#include <chrono>

// 每个线程执行的完整任务
void full_partition_creation_task(
    std::string conn_str,
    std::string eh_name,
    std::string consumer_group,
    std::string partition_id)
{
    try {
        // 步骤 1: 使用确定性延迟,错峰启动
        int partition_num = std::stoi(partition_id);
        auto delay = std::chrono::milliseconds(partition_num * 800); // 错开 250ms
        std::cout << "[线程 " << partition_id << "] 等待 " << delay.count() << "ms..." << std::endl;
        std::this_thread::sleep_for(delay);

        // 步骤 2: 创建此线程专属的 ConsumerClient
        std::cout << "[线程 " << partition_id << "] 尝试创建 ConsumerClient..." << std::endl;
        Azure::Messaging::EventHubs::ConsumerClient consumer_client(conn_str, eh_name, consumer_group);
        std::cout << "[线程 " << partition_id << "] -> 成功创建 ConsumerClient。" << std::endl;

        // 步骤 3: 从新建的 ConsumerClient 创建此线程专属的 PartitionClient
        std::cout << "[线程 " << partition_id << "] 尝试创建 PartitionClient..." << std::endl;
        Azure::Messaging::EventHubs::PartitionClientOptions partition_options;
        Azure::Messaging::EventHubs::PartitionClient partition_client = consumer_client.CreatePartitionClient(partition_id, partition_options);
        std::cout << "[线程 " << partition_id << "] -> 成功创建 PartitionClient。" << std::endl;

        // 步骤 4: 清理资源
        partition_client.Close({});
        consumer_client.Close({});
        std::cout << "[线程 " << partition_id << "] -> 资源已清理。" << std::endl;

    }
    catch (const std::exception& e) {
        std::cerr << "[线程 " << partition_id << "] !!! 失败: " << e.what() << std::endl;
    }
}

int main() {
    // !!! 请替换为你的连接字符串 !!!

    std::vector<std::string> partition_ids;
    try {
        // 预先获取一次分区列表
        Azure::Messaging::EventHubs::ConsumerClient client(conn_str, eventhub_name, consumer_group);
        partition_ids = client.GetEventHubProperties().PartitionIds;
        client.Close({});
        std::cout << "获取到 " << partition_ids.size() << " 个分区ID。" << std::endl;
    } catch(const std::exception& e) {
        std::cerr << "获取分区ID失败: " << e.what() << std::endl;
        return 1;
    }

    std::vector<std::thread> threads;
    std::cout << "\n======= 启动最终测试:并发创建 ConsumerClient 和 PartitionClient =======" << std::endl;
    for (const auto& pid : partition_ids) {
        threads.emplace_back(full_partition_creation_task, conn_str, eventhub_name, consumer_group, pid);
    }

    for (auto& t : threads) {
        if (t.joinable()) t.join();
    }

    std::cout << "\n======= 所有线程执行完毕。=======" << std::endl;
    return 0;
}
获取到 32 个分区ID。

======= 启动最终测试:并发创建 ConsumerClient 和 PartitionClient =======
[线程 0] 等待 0ms...
[线程 0] 尝试创建 ConsumerClient...
[线程 1] 等待 800ms...
[线程 2] 等待 1600ms...
[线程 3] 等待 2400ms...
[线程 0] -> 成功创建 ConsumerClient。
[线程 [线程 0] 尝试创建 PartitionClient...
4] 等待 3200ms...
[线程 5] 等待 4000ms...
[线程 [线程 8] 等待 6400ms...
[线程 6] 等待 4800ms...
7] 等待 5600[线程 ms...9[线程 ] 等待 720010ms...] 等待 8000ms...
[线程 11] 等待 8800ms...
[线程 12] 等待 9600ms...

[线程 13] 等待 10400ms...

[线程 14] 等待 11200ms...
[线程 15] 等待 12000ms...
[线程 16] 等待 12800ms...
[线程 17] 等待 13600ms...
[线程 18] 等待 14400ms...
[线程 19] 等待 15200ms...
[线程 20] 等待 16000ms...
[线程 21] 等待 16800ms...
[线程 22] 等待 17600ms...
[线程 23] 等待 18400ms...
[线程 24] 等待 19200ms...
[线程 25] 等待 20000ms...
[线程 26] 等待 20800ms...
[线程 27] 等待 21600ms...
[线程 28] 等待 22400ms...
[线程 30] 等待 24000ms...
[线程 29] 等待 23200ms...
[线程 31] 等待 24800ms...
[线程 1] 尝试创建 ConsumerClient...
[线程 1] -> 成功创建 ConsumerClient。
[线程 1] 尝试创建 PartitionClient...
[线程 2] 尝试创建 ConsumerClient...
[线程 2] -> 成功创建 ConsumerClient。
[线程 2] 尝试创建 PartitionClient...
[线程 0] -> 成功创建 PartitionClient。
[线程 0] -> 资源已清理。
[线程 3] 尝试创建 ConsumerClient...
[线程 3] -> 成功创建 ConsumerClient。
[线程 3] 尝试创建 PartitionClient...
[线程 1] -> 成功创建 PartitionClient。
[线程 1] -> 资源已清理。
[线程 4] 尝试创建 ConsumerClient...
[线程 4] -> 成功创建 ConsumerClient。
[线程 4] 尝试创建 PartitionClient...
[线程 5] 尝试创建 ConsumerClient...
[线程 5] -> 成功创建 ConsumerClient。
[线程 5] 尝试创建 PartitionClient...
[线程 4] -> 成功创建 PartitionClient。
[线程 4] -> 资源已清理。
[线程 6] 尝试创建 ConsumerClient...
[线程 6] -> 成功创建 ConsumerClient。
[线程 6] 尝试创建 PartitionClient...
[线程 7] 尝试创建 ConsumerClient...
[线程 7] -> 成功创建 ConsumerClient。
[线程 7] 尝试创建 PartitionClient...
[线程 2] -> 成功创建 PartitionClient。
[线程 2] -> 资源已清理。
[线程 8] 尝试创建 ConsumerClient...
[线程 8] -> 成功创建 ConsumerClient。
[线程 8] 尝试创建 PartitionClient...
[线程 6] -> 成功创建 PartitionClient。
[线程 6] -> 资源已清理。
[线程 9] 尝试创建 ConsumerClient...
[线程 9] -> 成功创建 ConsumerClient。
[线程 9] 尝试创建 PartitionClient...
[线程 8] -> 成功创建 PartitionClient。
[线程 8] -> 资源已清理。
[线程 10] 尝试创建 ConsumerClient...
[线程 10] -> 成功创建 ConsumerClient。
[线程 10] 尝试创建 PartitionClient...
[线程 5] -> 成功创建 PartitionClient。
[线程 5] -> 资源已清理。
[线程 9] -> 成功创建 PartitionClient。
[线程 9] -> 资源已清理。
[线程 11] 尝试创建 ConsumerClient...
[线程 11] -> 成功创建 ConsumerClient。
[线程 11] 尝试创建 PartitionClient...
[线程 10] -> 成功创建 PartitionClient。
[线程 10] -> 资源已清理。
[线程 12] 尝试创建 ConsumerClient...
[线程 12] -> 成功创建 ConsumerClient。
[线程 12] 尝试创建 PartitionClient...
[线程 13] 尝试创建 ConsumerClient...
[线程 13] -> 成功创建 ConsumerClient。
[线程 13] 尝试创建 PartitionClient...
[线程 14] 尝试创建 ConsumerClient...
[线程 14] -> 成功创建 ConsumerClient。
[线程 14] 尝试创建 PartitionClient...
[线程 13] -> 成功创建 PartitionClient。
[线程 13] -> 资源已清理。
[线程 15] 尝试创建 ConsumerClient...
[线程 15] -> 成功创建 ConsumerClient。
[线程 15] 尝试创建 PartitionClient...
[线程 12] -> 成功创建 PartitionClient。
[线程 12] -> 资源已清理。
[线程 3] !!! 失败: Could not open Claims Based Security object.
[线程 16] 尝试创建 ConsumerClient...
[线程 16] -> 成功创建 ConsumerClient。
[线程 16] 尝试创建 PartitionClient...
[线程 17] 尝试创建 ConsumerClient...
[线程 17] -> 成功创建 ConsumerClient。
[线程 17] 尝试创建 PartitionClient...
[线程 7] -> 成功创建 PartitionClient。
[线程 7] -> 资源已清理。
[线程 18] 尝试创建 ConsumerClient...
[线程 18] -> 成功创建 ConsumerClient。
[线程 18] 尝试创建 PartitionClient...
[线程 19] 尝试创建 ConsumerClient...
[线程 19] -> 成功创建 ConsumerClient。
[线程 19] 尝试创建 PartitionClient...
[线程 20] 尝试创建 ConsumerClient...
[线程 20] -> 成功创建 ConsumerClient。
[线程 20] 尝试创建 PartitionClient...
[线程 15] -> 成功创建 PartitionClient。
[线程 15] -> 资源已清理。
[线程 19] -> 成功创建 PartitionClient。
[线程 19] -> 资源已清理。
[线程 21] 尝试创建 ConsumerClient...
[线程 21] -> 成功创建 ConsumerClient。
[线程 21] 尝试创建 PartitionClient...
[线程 11] -> 成功创建 PartitionClient。
[线程 11] -> 资源已清理。
[线程 22] 尝试创建 ConsumerClient...
[线程 22] -> 成功创建 ConsumerClient。
[线程 22] 尝试创建 PartitionClient...
[线程 23] 尝试创建 ConsumerClient...
[线程 23] -> 成功创建 ConsumerClient。
[线程 23] 尝试创建 PartitionClient...
[线程 22] -> 成功创建 PartitionClient。
[线程 18] -> 成功创建 PartitionClient。
[线程 22] -> 资源已清理。
[线程 18] -> 资源已清理。
[线程 24] 尝试创建 ConsumerClient...
[线程 24] -> 成功创建 ConsumerClient。
[线程 24] 尝试创建 PartitionClient...
[线程 25] 尝试创建 ConsumerClient...
[线程 25] -> 成功创建 ConsumerClient。
[线程 25] 尝试创建 PartitionClient...
[线程 24] -> 成功创建 PartitionClient。
[线程 24] -> 资源已清理。
[线程 26] 尝试创建 ConsumerClient...
[线程 26] -> 成功创建 ConsumerClient。
[线程 26] 尝试创建 PartitionClient...
[线程 23] -> 成功创建 PartitionClient。
[线程 23] -> 资源已清理。
[线程 14] !!! 失败: Could not open Claims Based Security object.
[线程 21] -> 成功创建 PartitionClient。
[线程 21] -> 资源已清理。
[线程 27] 尝试创建 ConsumerClient...
[线程 27] -> 成功创建 ConsumerClient。
[线程 27] 尝试创建 PartitionClient...
[线程 17] -> 成功创建 PartitionClient。
[线程 17] -> 资源已清理。
[线程 28] 尝试创建 ConsumerClient...
[线程 28] -> 成功创建 ConsumerClient。
[线程 28] 尝试创建 PartitionClient...
[线程 16] !!! 失败: Could not open Claims Based Security object.
[线程 29] 尝试创建 ConsumerClient...
[线程 29] -> 成功创建 ConsumerClient。
[线程 29] 尝试创建 PartitionClient...
[线程 30] 尝试创建 ConsumerClient...
[线程 30] -> 成功创建 ConsumerClient。
[线程 30] 尝试创建 PartitionClient...
[线程 25] -> 成功创建 PartitionClient。
[线程 25] -> 资源已清理。
[线程 28] -> 成功创建 PartitionClient。
[线程 28] -> 资源已清理。
[线程 31] 尝试创建 ConsumerClient...
[线程 31] -> 成功创建 ConsumerClient。
[线程 31] 尝试创建 PartitionClient...
[线程 30] -> 成功创建 PartitionClient。
[线程 30] -> 资源已清理。
[线程 26] -> 成功创建 PartitionClient。
[线程 26] -> 资源已清理。
[线程 29] -> 成功创建 PartitionClient。
[线程 29] -> 资源已清理。
[线程 20] !!! 失败: Could not open Claims Based Security object.
[线程 27] -> 成功创建 PartitionClient。
[线程 27] -> 资源已清理。
[线程 31] -> 成功创建 PartitionClient。
[线程 31] -> 资源已清理。

======= 所有线程执行完毕。=======

Why is this not a Bug or a feature Request?
A multithreading error; it's unclear whether it's due to incorrect usage.

Setup (please complete the following information if applicable):

  • OS: [debian11]
  • IDE : [vscode]
  • Version of the Library used

Metadata

Metadata

Assignees

No one assigned

    Labels

    ClientThis issue points to a problem in the data-plane of the library.Event HubsService AttentionWorkflow: This issue is responsible by Azure service team.customer-reportedIssues that are reported by GitHub users external to the Azure organization.needs-team-attentionWorkflow: This issue needs attention from Azure service team or SDK teamquestionThe issue doesn't require a change to the product in order to be resolved. Most issues start as that

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions