Skip to content

Commit f24c6d5

Browse files
committed
crafted
1 parent dfa8bde commit f24c6d5

File tree

1 file changed

+101
-13
lines changed

1 file changed

+101
-13
lines changed

ydb/core/tx/datashard/datashard_ut_incremental_restore_scan.cpp

Lines changed: 101 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -10,22 +10,99 @@ class TDriverMock
1010
: public NTable::IDriver
1111
{
1212
public:
13-
void Touch(NTable::EScan) noexcept {
13+
std::optional<NTable::EScan> LastScan;
1414

15+
void Touch(NTable::EScan scan) noexcept {
16+
LastScan = scan;
1517
}
1618
};
1719

1820
class TCbExecutorActor : public TActorBootstrapped<TCbExecutorActor> {
1921
public:
20-
std::function<void()> Cb;
22+
enum EEv {
23+
EvExec = EventSpaceBegin(TKikimrEvents::ES_PRIVATE),
24+
EvBoot,
25+
EvExecuted,
26+
27+
EvEnd
28+
};
29+
30+
static_assert(EvEnd < EventSpaceEnd(TKikimrEvents::ES_PRIVATE),
31+
"expect EvEnd < EventSpaceEnd(TKikimrEvents::ES_PRIVATE)");
32+
33+
struct TEvExec : public TEventLocal<TEvExec, EvExec> {
34+
std::function<void()> OnHandle;
35+
bool Async;
36+
37+
TEvExec(std::function<void()> onHandle, bool async = true)
38+
: OnHandle(onHandle)
39+
, Async(async)
40+
{}
41+
};
42+
43+
struct TEvBoot : public TEventLocal<TEvBoot, EvBoot> {};
44+
struct TEvExecuted : public TEventLocal<TEvExecuted, EvExecuted> {};
45+
46+
std::function<void()> OnBootstrap;
47+
TActorId ReplyTo;
48+
TActorId ForwardTo;
2149

2250
void Bootstrap() {
23-
Cb();
51+
if (OnBootstrap) {
52+
OnBootstrap();
53+
}
54+
55+
Become(&TThis::Serve);
56+
Send(ReplyTo, new TCbExecutorActor::TEvBoot());
57+
}
58+
59+
void Handle(TEvExec::TPtr& ev) {
60+
ev->Get()->OnHandle();
61+
if (!ev->Get()->Async) {
62+
Send(ReplyTo, new TCbExecutorActor::TEvExecuted());
63+
}
64+
}
65+
66+
STATEFN(Serve) {
67+
switch (ev->GetTypeRewrite()) {
68+
hFunc(TEvExec, Handle);
69+
default: Y_ABORT("unexpected");
70+
}
2471
}
2572
};
2673

74+
class TRuntimeCbExecutor {
75+
public:
76+
TRuntimeCbExecutor(TTestActorRuntime& runtime, std::function<void()> onBootstrap = {}, TActorId forwardTo = {})
77+
: Runtime(runtime)
78+
, Sender(runtime.AllocateEdgeActor())
79+
{
80+
auto* executor = new TCbExecutorActor;
81+
executor->OnBootstrap = onBootstrap;
82+
executor->ForwardTo = forwardTo;
83+
executor->ReplyTo = Sender;
84+
Impl = runtime.Register(executor);
85+
Runtime.EnableScheduleForActor(Impl);
86+
Runtime.GrabEdgeEventRethrow<TCbExecutorActor::TEvBoot>(Sender);
87+
}
88+
89+
void AsyncExecute(std::function<void()> cb) {
90+
Runtime.Send(new IEventHandle(Impl, Sender, new TCbExecutorActor::TEvExec(cb), 0, 0), 0);
91+
}
92+
93+
void Execute(std::function<void()> cb) {
94+
Runtime.Send(new IEventHandle(Impl, Sender, new TCbExecutorActor::TEvExec(cb, false), 0, 0), 0);
95+
Runtime.GrabEdgeEventRethrow<TCbExecutorActor::TEvExecuted>(Sender);
96+
}
97+
98+
private:
99+
TTestActorRuntime& Runtime;
100+
TActorId Sender;
101+
TActorId Impl;
102+
};
103+
27104
Y_UNIT_TEST_SUITE(IncrementalRestoreScan) {
28-
Y_UNIT_TEST(Simple) {
105+
Y_UNIT_TEST(Empty) {
29106
TPortManager pm;
30107
Tests::TServerSettings serverSettings(pm.GetPort(2134));
31108
serverSettings.SetDomainName("Root")
@@ -51,26 +128,37 @@ Y_UNIT_TEST_SUITE(IncrementalRestoreScan) {
51128
TPathId targetPathId{};
52129
ui64 txId = 0;
53130

54-
auto scan = CreateIncrementalRestoreScan(
131+
auto* scan = CreateIncrementalRestoreScan(
55132
sender,
56133
[&](const TActorContext&) {
57134
return sender2;
58135
},
59136
TPathId{} /*sourcePathId*/,
60137
table,
61138
targetPathId,
62-
txId);
139+
txId).Release();
63140

64141
TDriverMock driver;
65-
auto* executor = new TCbExecutorActor;
66-
executor->Cb = [&]() {
142+
143+
// later we can use driver, scan and scheme ONLY with additional sync, e.g. from actorExec to avoid races
144+
TRuntimeCbExecutor actorExec(runtime, [&]() {
67145
scan->Prepare(&driver, scheme);
68-
};
69-
auto executorActor = runtime.Register(executor);
70-
runtime.EnableScheduleForActor(executorActor);
146+
});
147+
148+
actorExec.Execute([&]() {
149+
UNIT_ASSERT_EQUAL(scan->Exhausted(), NTable::EScan::Sleep);
150+
});
151+
152+
auto resp = runtime.GrabEdgeEventRethrow<TEvIncrementalRestoreScan::TEvNoMoreData>(sender2);
153+
154+
runtime.Send(new IEventHandle(resp->Sender, sender2, new TEvIncrementalRestoreScan::TEvFinished(), 0, 0), 0);
155+
156+
actorExec.Execute([&]() {
157+
UNIT_ASSERT(driver.LastScan && *driver.LastScan == NTable::EScan::Final);
158+
scan->Finish(NTable::EAbort::None);
159+
});
71160

72-
auto resp = runtime.GrabEdgeEventRethrow<TEvIncrementalRestoreScan::TEvFinished>(sender);
73-
Y_UNUSED(resp);
161+
runtime.GrabEdgeEventRethrow<TEvIncrementalRestoreScan::TEvFinished>(sender);
74162
}
75163
}
76164

0 commit comments

Comments
 (0)