Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions async_simple/Signal.h
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ class Signal : public std::enable_shared_from_this<Signal> {
// binding slots, then execute the slot callback functions. It will return
// the signal which success triggered. If no signal success triggger, return
// SignalType::none.
SignalType emit(SignalType state) noexcept;
SignalType emits(SignalType state) noexcept;

// Return now signal type.
SignalType state() const noexcept {
Expand Down Expand Up @@ -320,7 +320,7 @@ class Slot {
[chainedSignal =
chainedSignal->weak_from_this()](SignalType type) {
if (auto signal = chainedSignal.lock(); signal != nullptr) {
signal->emit(type);
signal->emits(type);
}
}),
std::memory_order_release);
Expand Down Expand Up @@ -451,7 +451,7 @@ inline detail::SignalSlotSharedState::~SignalSlotSharedState() {
}
}

inline SignalType Signal::emit(SignalType state) noexcept {
inline SignalType Signal::emits(SignalType state) noexcept {
if (state != SignalType::None) {
SignalType vaildSignal = UpdateState(_state, state);
if (vaildSignal) {
Expand Down
8 changes: 4 additions & 4 deletions async_simple/coro/Collect.h
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ struct CollectAnyAwaiter {
_result->_idx = i;
_result->_value = std::move(result);
if (auto ptr = local->getSlot(); ptr) {
ptr->signal()->emit(_SignalType);
ptr->signal()->emits(_SignalType);
}
c.resume();
}
Expand Down Expand Up @@ -294,7 +294,7 @@ struct CollectAnyVariadicAwaiter {
_result = std::make_unique<ResultType>(
std::in_place_index_t<index>(), std::move(res));
if (auto ptr = local->getSlot(); ptr) {
ptr->signal()->emit(_SignalType);
ptr->signal()->emits(_SignalType);
}
c.resume();
}
Expand Down Expand Up @@ -409,7 +409,7 @@ struct CollectAllAwaiter {
auto signalType = _SignalType;
auto awaitingCoro = _event.down(oldCount, 1);
if (oldCount == size) {
signal->emit(signalType);
signal->emits(signalType);
}
if (awaitingCoro) {
awaitingCoro.resume();
Expand Down Expand Up @@ -582,7 +582,7 @@ struct CollectAllVariadicAwaiter {
auto signalType = _SignalType;
auto awaitingCoro = _event.down(oldCount, 1);
if (oldCount == sizeof...(Ts)) {
signal->emit(signalType);
signal->emits(signalType);
}
if (awaitingCoro) {
awaitingCoro.resume();
Expand Down
6 changes: 3 additions & 3 deletions async_simple/coro/test/LazyTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -360,7 +360,7 @@ TEST_F(LazyTest, testYieldCancel) {
EXPECT_EQ(result.hasError(), true);
p.set_value();
});
signal->emit(SignalType::Terminate);
signal->emits(SignalType::Terminate);
p.get_future().wait();
}

Expand Down Expand Up @@ -2020,7 +2020,7 @@ TEST_F(LazyTest, testForbiddenCancel) {
EXPECT_EQ(slot->signal()->state(), SignalType::Terminate);
};
lazy(p.getFuture()).setLazyLocal(signal.get()).via(&e).detach();
signal->emit(SignalType::Terminate);
signal->emits(SignalType::Terminate);
p.setValue();
}
{
Expand All @@ -2037,7 +2037,7 @@ TEST_F(LazyTest, testForbiddenCancel) {
EXPECT_EQ(slot, nullptr);
};
lazy(p.getFuture()).setLazyLocal(signal.get()).via(&e).detach();
signal->emit(SignalType::Terminate);
signal->emits(SignalType::Terminate);
p.setValue();
}
}
Expand Down
2 changes: 1 addition & 1 deletion async_simple/coro/test/SleepTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ Lazy<void> cancelSleep() {
EXPECT_TRUE(err.value() == async_simple::Terminate);
}
auto slot = co_await async_simple::coro::CurrentSlot{};
auto ok = slot->signal()->emit(SignalType::Terminate);
auto ok = slot->signal()->emits(SignalType::Terminate);
if (ok) {
std::cout << "Coro " << i << " emit cancel work" << std::endl;
} else {
Expand Down
4 changes: 2 additions & 2 deletions async_simple/coro/test/SpinLockTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ TEST_F(SpinLockTest, testSpinLockCancel) {
auto signal = async_simple::Signal::create();
std::promise<void> p;
if (cancelFirst) {
signal->emit(SignalType::Terminate);
signal->emits(SignalType::Terminate);
}
std::move(lazy)
.setLazyLocal(signal.get())
Expand All @@ -132,7 +132,7 @@ TEST_F(SpinLockTest, testSpinLockCancel) {
});
if (!cancelFirst) {
std::this_thread::sleep_for(10ms);
signal->emit(SignalType::Terminate);
signal->emits(SignalType::Terminate);
}
p.get_future().wait();
};
Expand Down
38 changes: 19 additions & 19 deletions async_simple/test/CancellationTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ TEST_F(CancellationTest, testSimpleCancellation) {
return slot->signal()->state();
});
EXPECT_EQ(signal->state() != 0, false);
EXPECT_EQ(signal->emit(SignalType::Terminate), true);
EXPECT_EQ(signal->emits(SignalType::Terminate), true);
EXPECT_EQ(signal->state() != 0, true);
EXPECT_EQ(result.get(), SignalType::Terminate);
EXPECT_EQ(slot->canceled(), true);
Expand All @@ -65,7 +65,7 @@ TEST_F(CancellationTest, testCancellationWithNoneType) {
}
return 1;
});
signal->emit(SignalType::None);
signal->emits(SignalType::None);
std::this_thread::sleep_for(10ms);
flag = 1;
EXPECT_EQ(result.get(), 0);
Expand All @@ -88,7 +88,7 @@ TEST_F(CancellationTest, testCancellationWithSlotFilter) {
}
return 1;
});
signal->emit(SignalType::Terminate);
signal->emits(SignalType::Terminate);
std::this_thread::sleep_for(10ms);
flag = 1;
EXPECT_EQ(result.get(), 0);
Expand All @@ -109,7 +109,7 @@ TEST_F(CancellationTest, testMultiSlotsCancellationCallback) {
j++;
});
}
signal->emit(SignalType::Terminate);
signal->emits(SignalType::Terminate);
EXPECT_EQ(j, 100);
}

Expand All @@ -122,7 +122,7 @@ TEST_F(CancellationTest, testMultiSlotsCancellation) {
[[maybe_unused]] auto _ = slots.back()->emplace(
SignalType::Terminate, [&j](SignalType, Signal*) mutable { j++; });
}
signal->emit(SignalType::Terminate);
signal->emits(SignalType::Terminate);
EXPECT_EQ(j, 100);
}

Expand All @@ -143,9 +143,9 @@ TEST_F(CancellationTest, testMultiSlotsCancellationWithFilter) {
EXPECT_EQ(type, expected_type);
});
}
signal->emit(expected_type);
signal->emits(expected_type);
EXPECT_EQ(j, 75);
signal->emit(static_cast<SignalType>(0b100));
signal->emits(static_cast<SignalType>(0b100));
EXPECT_EQ(j, 75);
}
{
Expand All @@ -164,7 +164,7 @@ TEST_F(CancellationTest, testMultiSlotsCancellationWithFilter) {
EXPECT_EQ(type, expected_type & filter);
});
}
signal->emit(expected_type);
signal->emits(expected_type);
EXPECT_EQ(j, 100);
}
}
Expand All @@ -176,7 +176,7 @@ TEST_F(CancellationTest, testScopeFilterGuard) {
[[maybe_unused]] auto _ =
slot->emplace(SignalType::Terminate,
[](SignalType type, Signal*) { EXPECT_TRUE(true); });
signal->emit(SignalType::Terminate);
signal->emits(SignalType::Terminate);
{
auto guard = slot->setScopedFilter(static_cast<SignalType>(1));
EXPECT_TRUE(slot->canceled());
Expand All @@ -191,7 +191,7 @@ TEST_F(CancellationTest, testScopeFilterGuard) {
[](SignalType type, Signal*) { EXPECT_TRUE(false); });
{
auto guard = slot->setScopedFilter(static_cast<SignalType>(0b10));
signal->emit(SignalType::Terminate);
signal->emits(SignalType::Terminate);
EXPECT_TRUE(slot->signal()->state() == SignalType::Terminate);
EXPECT_TRUE(slot->canceled() == false);
}
Expand All @@ -205,7 +205,7 @@ TEST_F(CancellationTest, testScopeFilterGuard) {
[](SignalType type, Signal*) { EXPECT_TRUE(true); });
{
auto guard = slot->setScopedFilter(SignalType::All);
signal->emit(SignalType::Terminate);
signal->emits(SignalType::Terminate);
EXPECT_TRUE(slot->canceled());
}
EXPECT_TRUE(slot->canceled());
Expand All @@ -224,7 +224,7 @@ TEST_F(CancellationTest, testScopeFilterGuardNested) {
{
auto guard =
slot->setScopedFilter(static_cast<SignalType>(0b11100));
signal->emit(static_cast<SignalType>(0b100));
signal->emits(static_cast<SignalType>(0b100));
EXPECT_EQ(slot->getFilter(), static_cast<SignalType>(0b100));
EXPECT_EQ(slot->hasTriggered(static_cast<SignalType>(0b100)),
true);
Expand All @@ -246,7 +246,7 @@ TEST_F(CancellationTest, testScopeFilterGuardNested) {
{
auto guard =
slot->setScopedFilter(static_cast<SignalType>(0b11100));
signal->emit(static_cast<SignalType>(0b011));
signal->emits(static_cast<SignalType>(0b011));
EXPECT_EQ(slot->getFilter(), static_cast<SignalType>(0b100));
EXPECT_EQ(slot->canceled(), false);
}
Expand Down Expand Up @@ -274,7 +274,7 @@ TEST_F(CancellationTest, testMultiThreadEmit) {
res.emplace_back(std::async([&]() {
while (!start_flag)
std::this_thread::yield();
return signal->emit(SignalType::Terminate);
return signal->emits(SignalType::Terminate);
}));
start_flag = true;
int cnt = 0;
Expand Down Expand Up @@ -316,7 +316,7 @@ TEST_F(CancellationTest, testMultiThreadEmitDifferentSignal) {
res.emplace_back(std::async([i, &start_flag, &signal]() {
while (!start_flag)
std::this_thread::yield();
return signal->emit(
return signal->emits(
i % 2
? static_cast<SignalType>(0b101 | (uint64_t{1} << 63))
: static_cast<SignalType>(0b110 | (uint64_t{1} << 63)));
Expand Down Expand Up @@ -378,7 +378,7 @@ TEST_F(CancellationTest, testMultiThreadEmitWhenEmplace) {
res.emplace_back(std::async([&]() {
while (!start_flag)
std::this_thread::yield();
return static_cast<bool>(signal->emit(SignalType::Terminate));
return static_cast<bool>(signal->emits(SignalType::Terminate));
}));
start_flag = true;
int cnt = 0;
Expand Down Expand Up @@ -423,7 +423,7 @@ TEST_F(CancellationTest, testMultiThreadEmitWhenSetScopedFilter) {
while (!start_flag) {
std::this_thread::yield();
}
signal->emit(SignalType::Terminate);
signal->emits(SignalType::Terminate);
});
}
while (start_flag2 < 200) {
Expand All @@ -441,7 +441,7 @@ TEST_F(CancellationTest, testMultiThreadEmitWhenSetScopedFilter) {
TEST_F(CancellationTest, testRegistSignalAfterCancellation) {
{
auto signal = Signal::create();
signal->emit(SignalType::Terminate);
signal->emits(SignalType::Terminate);
Slot s{signal.get()};
EXPECT_FALSE(
s.emplace(SignalType::Terminate, [](SignalType, Signal*) {}));
Expand Down Expand Up @@ -469,6 +469,6 @@ TEST_F(CancellationTest, testDerivedSignal) {
EXPECT_EQ(mySignal->myState, 1);
});
mySignal->myState = 1;
mySignal->emit(SignalType::Terminate);
mySignal->emits(SignalType::Terminate);
}
} // namespace async_simple
16 changes: 8 additions & 8 deletions docs/docs.cn/信号与任务的取消.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,16 +41,16 @@ for (int i=0;i<10;++i) {
}
// ...
// 提交取消信号
signal->emit(SignalType::terminate);
signal->emits(SignalType::terminate);
for (auto &e:works)
e.get();
```

除了直接查询取消状态,我们可以在`Slot`中注册信号处理函数来接受信号。信号处理函数的签名应为`void(SignalType, Signal*)`。第一个参数SignalType代表经过滤后,本次成功触发的信号类型,第二个参数是指向信号的指针。

需要注意的是:
1. 信号处理函数不应该阻塞。调用`emit()`函数并触发信号时,程序会遍历绑定在`Signal`上的信号处理函数并立即执行。
2. 注意线程安全问题:信号处理函数会由调用`emit()`的线程执行,32号以上的信号处理函数还可能由多个线程并发执行。
1. 信号处理函数不应该阻塞。调用`emits()`函数并触发信号时,程序会遍历绑定在`Signal`上的信号处理函数并立即执行。
2. 注意线程安全问题:信号处理函数会由调用`emits()`的线程执行,32号以上的信号处理函数还可能由多个线程并发执行。
3. 信号处理函数禁止持有槽绑定的信号,这会导致信号的内存泄漏。用户应通过`Signal*`参数访问信号。

例如,下面这段代码通过信号回调函数取消睡眠。
Expand All @@ -76,7 +76,7 @@ for (int i=0;i<10;++i) {
}
slot->clear(); // 清除回调函数
if (slot->signal()) { //如果槽被绑定在信号上
slot->signal()->emit(SignalType::terminate); // 触发取消信号。
slot->signal()->emits(SignalType::terminate); // 触发取消信号。
}
return;
});
Expand All @@ -97,7 +97,7 @@ class Signal
: public std::enable_shared_from_this<Signal> {
public:
// 提交信号(允许一次提交多种信号),并返回本次请求成功触发的信号,线程安全。
SignalType emit(SignalType state) noexcept;
SignalType emits(SignalType state) noexcept;
// 获取当前的信号,线程安全。
SignalType state() const noexcept;
// 创建信号的工厂方法,返回信号的shared_ptr,线程安全。
Expand Down Expand Up @@ -140,11 +140,11 @@ std::shared_ptr<Signal> signal = Signal::create();
auto slot = std::make_unique<Slot>(signal.get());
std::shared_ptr<Signal> chainedSignal = Signal::create();
slot->addChainedSignal(chainedSignal);
signal->emit(SignalType::terminate);
signal->emits(SignalType::terminate);
assert(chainedSignal->state()==SignalType::terminate);
// 信号会被转发给chainedSignal
// 然而,chainedSignal触发的信号不会触发给signal
chainedSignal->emit(static_cast<SignalType>(0b10));
chainedSignal->emits(static_cast<SignalType>(0b10));
assert(signal->state()!=static_cast<SignalType>(0b10));
```

Expand All @@ -164,7 +164,7 @@ slot->emplace([](SignalType type, Signal* signal) {
std::cout << "myState:" << mySignal->myState << std::endl;
});
mySignal->myState=1;
mySignal->emit(SignalType::terminate);
mySignal->emits(SignalType::terminate);
```


Expand Down
Loading
Loading