Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/brpc/acceptor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ int Acceptor::StartAccept(int listened_fd, int idle_timeout_sec,
if (idle_timeout_sec > 0) {
bthread_attr_t tmp = BTHREAD_ATTR_NORMAL;
tmp.tag = _bthread_tag;
bthread_attr_set_name(&tmp, "CloseIdleConnections");
if (bthread_start_background(&_close_idle_tid, &tmp, CloseIdleConnections, this) != 0) {
LOG(FATAL) << "Fail to start bthread";
return -1;
Expand Down
1 change: 1 addition & 0 deletions src/brpc/controller.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -743,6 +743,7 @@ void Controller::OnVersionedRPCReturned(const CompletionInfo& info,
bthread_t bt;
bthread_attr_t attr = (FLAGS_usercode_in_pthread ?
BTHREAD_ATTR_PTHREAD : BTHREAD_ATTR_NORMAL);
bthread_attr_set_name(&attr, "RunEndRPC");
_tmp_completion_info = info;
if (bthread_start_background(&bt, &attr, RunEndRPC, this) != 0) {
LOG(FATAL) << "Fail to start bthread";
Expand Down
1 change: 1 addition & 0 deletions src/brpc/event_dispatcher_epoll.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ int EventDispatcher::Start(const bthread_attr_t* thread_attr) {
// Only event dispatcher thread has flag BTHREAD_GLOBAL_PRIORITY.
bthread_attr_t epoll_thread_attr =
_thread_attr | BTHREAD_NEVER_QUIT | BTHREAD_GLOBAL_PRIORITY;
bthread_attr_set_name(&epoll_thread_attr, "EventDispatcher::RunThis");

// Polling thread uses the same attr for consumer threads (NORMAL right
// now). Previously, we used small stack (32KB) which may be overflowed
Expand Down
1 change: 1 addition & 0 deletions src/brpc/event_dispatcher_kqueue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ int EventDispatcher::Start(const bthread_attr_t* thread_attr) {
// Only event dispatcher thread has flag BTHREAD_GLOBAL_PRIORITY.
bthread_attr_t kqueue_thread_attr =
_thread_attr | BTHREAD_NEVER_QUIT | BTHREAD_GLOBAL_PRIORITY;
bthread_attr_set_name(&kqueue_thread_attr, "EventDispatcher::RunThis");

// Polling thread uses the same attr for consumer threads (NORMAL right
// now). Previously, we used small stack (32KB) which may be overflowed
Expand Down
4 changes: 3 additions & 1 deletion src/brpc/global.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -635,7 +635,9 @@ static void GlobalInitializeOrDieImpl() {

// We never join GlobalUpdate, let it quit with the process.
bthread_t th;
CHECK(bthread_start_background(&th, NULL, GlobalUpdate, NULL) == 0)
bthread_attr_t attr = BTHREAD_ATTR_NORMAL;
bthread_attr_set_name(&attr, "GlobalUpdate");
CHECK(bthread_start_background(&th, &attr, GlobalUpdate, NULL) == 0)
<< "Fail to start GlobalUpdate";
}

Expand Down
17 changes: 9 additions & 8 deletions src/brpc/input_messenger.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,13 @@ static void QueueMessage(InputMessageBase* to_run_msg,
if (!to_run_msg) {
return;
}

#if BRPC_WITH_RDMA
if (rdma::FLAGS_rdma_disable_bthread) {
ProcessInputMessage(to_run_msg);
return;
}
#endif
// Create bthread for last_msg. The bthread is not scheduled
// until bthread_flush() is called (in the worse case).

Expand All @@ -207,14 +214,8 @@ static void QueueMessage(InputMessageBase* to_run_msg,
BTHREAD_ATTR_NORMAL) | BTHREAD_NOSIGNAL;
tmp.keytable_pool = keytable_pool;
tmp.tag = bthread_self_tag();

#if BRPC_WITH_RDMA
if (rdma::FLAGS_rdma_disable_bthread) {
ProcessInputMessage(to_run_msg);
return;
}
#endif

bthread_attr_set_name(&tmp, "ProcessInputMessage");

if (!FLAGS_usercode_in_coroutine && bthread_start_background(
&th, &tmp, ProcessInputMessage, to_run_msg) == 0) {
++*num_bthread_created;
Expand Down
4 changes: 3 additions & 1 deletion src/brpc/periodic_task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,10 @@ static void* PeriodicTaskThread(void* arg) {

static void RunPeriodicTaskThread(void* arg) {
bthread_t th = 0;
bthread_attr_t attr = BTHREAD_ATTR_NORMAL;
bthread_attr_set_name(&attr, "PeriodicTaskThread");
int rc = bthread_start_background(
&th, &BTHREAD_ATTR_NORMAL, PeriodicTaskThread, arg);
&th, &attr, PeriodicTaskThread, arg);
if (rc != 0) {
LOG(ERROR) << "Fail to start PeriodicTaskThread";
static_cast<PeriodicTask*>(arg)->OnDestroyingTask();
Expand Down
9 changes: 7 additions & 2 deletions src/brpc/rdma/rdma_endpoint.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,9 @@ void RdmaConnect::StartConnect(const Socket* socket,
_done = done;
_data = data;
bthread_t tid;
if (bthread_start_background(&tid, &BTHREAD_ATTR_NORMAL,
bthread_attr_t attr = BTHREAD_ATTR_NORMAL;
bthread_attr_set_name(&attr, "RdmaProcessHandshakeAtClient");
if (bthread_start_background(&tid, &attr,
RdmaEndpoint::ProcessHandshakeAtClient, socket->_rdma_ep) < 0) {
LOG(FATAL) << "Fail to start handshake bthread";
} else {
Expand Down Expand Up @@ -309,7 +311,9 @@ void RdmaEndpoint::OnNewDataFromTcp(Socket* m) {
ep->_state = S_HELLO_WAIT;
SocketUniquePtr s;
m->ReAddress(&s);
if (bthread_start_background(&tid, &BTHREAD_ATTR_NORMAL,
bthread_attr_t attr = BTHREAD_ATTR_NORMAL;
bthread_attr_set_name(&attr, "RdmaProcessHandshakeAtServer");
if (bthread_start_background(&tid, &attr,
ProcessHandshakeAtServer, ep) < 0) {
ep->_state = UNINIT;
LOG(FATAL) << "Fail to start handshake bthread";
Expand Down Expand Up @@ -1616,6 +1620,7 @@ int RdmaEndpoint::PollingModeInitialize(bthread_tag_t tag,
auto attr = FLAGS_rdma_disable_bthread ? BTHREAD_ATTR_PTHREAD
: BTHREAD_ATTR_NORMAL;
attr.tag = tag;
bthread_attr_set_name(&attr, "RdmaPolling");
pollers[i].callback = callback;
pollers[i].init_fn = init_fn;
pollers[i].release_fn = release_fn;
Expand Down
1 change: 1 addition & 0 deletions src/brpc/server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1236,6 +1236,7 @@ int Server::StartInternal(const butil::EndPoint& endpoint,
CHECK_EQ(INVALID_BTHREAD, _derivative_thread);
bthread_attr_t tmp = BTHREAD_ATTR_NORMAL;
tmp.tag = _options.bthread_tag;
bthread_attr_set_name(&tmp, "UpdateDerivedVars");
if (bthread_start_background(&_derivative_thread, &tmp,
UpdateDerivedVars, this) != 0) {
LOG(ERROR) << "Fail to create _derivative_thread";
Expand Down
13 changes: 10 additions & 3 deletions src/brpc/socket.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1491,8 +1491,10 @@ void Socket::AfterAppConnected(int err, void* data) {
// requests are not setup yet. check the comment on Setup() in Write()
req->Setup(s);
bthread_t th;
bthread_attr_t attr = BTHREAD_ATTR_NORMAL;
bthread_attr_set_name(&attr, "KeepWrite");
if (bthread_start_background(
&th, &BTHREAD_ATTR_NORMAL, KeepWrite, req) != 0) {
&th, &attr, KeepWrite, req) != 0) {
PLOG(WARNING) << "Fail to start KeepWrite";
KeepWrite(req);
}
Expand Down Expand Up @@ -1532,7 +1534,9 @@ int Socket::KeepWriteIfConnected(int fd, int err, void* data) {
bthread_t th;
std::unique_ptr<google::protobuf::Closure> thrd_func(brpc::NewCallback(
Socket::CheckConnectedAndKeepWrite, fd, err, data));
if ((err = bthread_start_background(&th, &BTHREAD_ATTR_NORMAL,
bthread_attr_t attr = BTHREAD_ATTR_NORMAL;
bthread_attr_set_name(&attr, "CheckConnectedAndKeepWrite");
if ((err = bthread_start_background(&th, &attr,
RunClosure, thrd_func.get())) == 0) {
thrd_func.release();
return 0;
Expand Down Expand Up @@ -1705,6 +1709,8 @@ int Socket::StartWrite(WriteRequest* req, const WriteOptions& opt) {

int saved_errno = 0;
bthread_t th;
bthread_attr_t attr = BTHREAD_ATTR_NORMAL;
bthread_attr_set_name(&attr, "KeepWrite");
SocketUniquePtr ptr_for_keep_write;
ssize_t nw = 0;
int ret = 0;
Expand Down Expand Up @@ -1779,7 +1785,7 @@ int Socket::StartWrite(WriteRequest* req, const WriteOptions& opt) {
KEEPWRITE_IN_BACKGROUND:
ReAddress(&ptr_for_keep_write);
req->set_socket(ptr_for_keep_write.release());
if (bthread_start_background(&th, &BTHREAD_ATTR_NORMAL,
if (bthread_start_background(&th, &attr,
KeepWrite, req) != 0) {
LOG(FATAL) << "Fail to start KeepWrite";
KeepWrite(req);
Expand Down Expand Up @@ -2266,6 +2272,7 @@ int Socket::OnInputEvent(void* user_data, uint32_t events,
bthread_attr_t attr = thread_attr;
attr.keytable_pool = p->_keytable_pool;
attr.tag = bthread_self_tag();
bthread_attr_set_name(&attr, "ProcessEvent");
if (FLAGS_usercode_in_coroutine) {
ProcessEvent(p);
#if BRPC_WITH_RDMA
Expand Down
4 changes: 3 additions & 1 deletion src/brpc/socket_map.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,9 @@ int SocketMap::Init(const SocketMapOptions& options) {
}
if (_options.idle_timeout_second_dynamic != NULL ||
_options.idle_timeout_second > 0) {
if (bthread_start_background(&_close_idle_thread, NULL,
bthread_attr_t attr = BTHREAD_ATTR_NORMAL;
bthread_attr_set_name(&attr, "RunWatchConnections");
if (bthread_start_background(&_close_idle_thread, &attr,
RunWatchConnections, this) != 0) {
LOG(FATAL) << "Fail to start bthread";
return -1;
Expand Down
7 changes: 7 additions & 0 deletions src/bthread/bthread.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -669,3 +669,10 @@ uint64_t bthread_cpu_clock_ns(void) {
}

} // extern "C"

void bthread_attr_set_name(bthread_attr_t* attr, const char* name) {
if (attr) {
strncpy(attr->name, name, BTHREAD_NAME_MAX_LENGTH);
attr->name[BTHREAD_NAME_MAX_LENGTH] = '\0';
}
}
4 changes: 3 additions & 1 deletion src/bthread/fd.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -141,8 +141,10 @@ class EpollThread {
PLOG(FATAL) << "Fail to epoll_create/kqueue";
return -1;
}
bthread_attr_t attr = BTHREAD_ATTR_NORMAL;
bthread_attr_set_name(&attr, "EpollThread::run_this");
if (bthread_start_background(
&_tid, NULL, EpollThread::run_this, this) != 0) {
&_tid, &attr, EpollThread::run_this, this) != 0) {
close(_epfd);
_epfd = -1;
LOG(FATAL) << "Fail to create epoll bthread";
Expand Down
3 changes: 2 additions & 1 deletion src/bthread/task_group.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@
namespace bthread {

static const bthread_attr_t BTHREAD_ATTR_TASKGROUP = {
BTHREAD_STACKTYPE_UNKNOWN, 0, NULL, BTHREAD_TAG_INVALID };
BTHREAD_STACKTYPE_UNKNOWN, 0, NULL, BTHREAD_TAG_INVALID, {0} };

DEFINE_bool(show_bthread_creation_in_vars, false, "When this flags is on, The time "
"from bthread creation to first run will be recorded and shown in /vars");
Expand Down Expand Up @@ -1141,6 +1141,7 @@ void print_task(std::ostream& os, bthread_t tid, bool enable_trace,
<< "\nattr={stack_type=" << attr.stack_type
<< " flags=" << attr.flags
<< " specified_tag=" << attr.tag
<< " name=" << attr.name
<< " keytable_pool=" << attr.keytable_pool
<< "}\nhas_tls=" << has_tls
<< "\nuptime_ns=" << butil::cpuwide_time_ns() - cpuwide_start_ns
Expand Down
14 changes: 9 additions & 5 deletions src/bthread/types.h
Original file line number Diff line number Diff line change
Expand Up @@ -97,12 +97,14 @@ typedef struct {
size_t nfree;
} bthread_keytable_pool_stat_t;

static const size_t BTHREAD_NAME_MAX_LENGTH = 31;
// Attributes for thread creation.
typedef struct bthread_attr_t {
bthread_stacktype_t stack_type;
bthread_attrflags_t flags;
bthread_keytable_pool_t* keytable_pool;
bthread_tag_t tag;
char name[BTHREAD_NAME_MAX_LENGTH + 1]; // do not use std::string to keep POD

#if defined(__cplusplus)
void operator=(unsigned stacktype_and_flags) {
Expand All @@ -120,29 +122,31 @@ typedef struct bthread_attr_t {
#endif // __cplusplus
} bthread_attr_t;

void bthread_attr_set_name(bthread_attr_t* attr, const char* name);

// bthreads started with this attribute will run on stack of worker pthread and
// all bthread functions that would block the bthread will block the pthread.
// The bthread will not allocate its own stack, simply occupying a little meta
// memory. This is required to run JNI code which checks layout of stack. The
// obvious drawback is that you need more worker pthreads when you have a lot
// of such bthreads.
static const bthread_attr_t BTHREAD_ATTR_PTHREAD =
{ BTHREAD_STACKTYPE_PTHREAD, 0, NULL, BTHREAD_TAG_INVALID };
{ BTHREAD_STACKTYPE_PTHREAD, 0, NULL, BTHREAD_TAG_INVALID, {0} };

// bthreads created with following attributes will have different size of
// stacks. Default is BTHREAD_ATTR_NORMAL.
static const bthread_attr_t BTHREAD_ATTR_SMALL = {BTHREAD_STACKTYPE_SMALL, 0, NULL,
BTHREAD_TAG_INVALID};
BTHREAD_TAG_INVALID, {0}};
static const bthread_attr_t BTHREAD_ATTR_NORMAL = {BTHREAD_STACKTYPE_NORMAL, 0, NULL,
BTHREAD_TAG_INVALID};
BTHREAD_TAG_INVALID, {0}};
static const bthread_attr_t BTHREAD_ATTR_LARGE = {BTHREAD_STACKTYPE_LARGE, 0, NULL,
BTHREAD_TAG_INVALID};
BTHREAD_TAG_INVALID, {0}};

// bthreads created with this attribute will print log when it's started,
// context-switched, finished.
static const bthread_attr_t BTHREAD_ATTR_DEBUG = {
BTHREAD_STACKTYPE_NORMAL, BTHREAD_LOG_START_AND_FINISH | BTHREAD_LOG_CONTEXT_SWITCH, NULL,
BTHREAD_TAG_INVALID};
BTHREAD_TAG_INVALID, {0}};

static const size_t BTHREAD_EPOLL_THREAD_NUM = 1;
static const bthread_t BTHREAD_ATOMIC_INIT = 0;
Expand Down
Loading