@@ -140,28 +140,29 @@ struct MainTestCase {
140140 {
141141 }
142142
143- void CreateTable (const TString& tableDDL) {
144- auto ddl = Sprintf (tableDDL.data (), TableName.data ());
143+ void ExecuteDDL (const TString& ddl) {
145144 auto res = Session.ExecuteQuery (ddl, TTxControl::NoTx ()).GetValueSync ();
146145 UNIT_ASSERT_C (res.IsSuccess (), res.GetIssues ().ToString ());
147146 }
148147
148+ void CreateTable (const TString& tableDDL) {
149+ ExecuteDDL (Sprintf (tableDDL.data (), TableName.data ()));
150+ }
151+
149152 void CreateTopic (size_t partitionCount = 10 ) {
150- auto res = Session. ExecuteQuery (Sprintf (R"(
153+ ExecuteDDL (Sprintf (R"(
151154 CREATE TOPIC `%s`
152155 WITH (
153156 min_active_partitions = %d
154157 );
155- )" , TopicName.data (), partitionCount), TTxControl::NoTx ()).GetValueSync ();
156- UNIT_ASSERT_C (res.IsSuccess (), res.GetIssues ().ToString ());
158+ )" , TopicName.data (), partitionCount));
157159 }
158160
159161 void CreateConsumer (const TString& consumerName) {
160- auto res = Session. ExecuteQuery (Sprintf (R"(
162+ ExecuteDDL (Sprintf (R"(
161163 ALTER TOPIC `%s`
162164 ADD CONSUMER `%s`;
163- )" , TopicName.data (), consumerName.data ()), TTxControl::NoTx ()).GetValueSync ();
164- UNIT_ASSERT_C (res.IsSuccess (), res.GetIssues ().ToString ());
165+ )" , TopicName.data (), consumerName.data ()));
165166 }
166167
167168 struct CreateTransferSettings {
@@ -200,7 +201,7 @@ struct MainTestCase {
200201 sb << " , BATCH_SIZE_BYTES = " << *settings.BatchSizeBytes << Endl;
201202 }
202203
203- auto res = Session. ExecuteQuery ( Sprintf (R"(
204+ auto ddl = Sprintf (R"(
204205 %s;
205206
206207 CREATE TRANSFER `%s`
@@ -209,9 +210,9 @@ struct MainTestCase {
209210 CONNECTION_STRING = 'grpc://%s'
210211 %s
211212 );
212- )" , lambda.data (), TransferName.data (), TopicName.data (), TableName.data (), ConnectionString.data (), sb.data ()),
213- TTxControl::NoTx ()). GetValueSync ();
214- UNIT_ASSERT_C (res. IsSuccess (), res. GetIssues (). ToString () );
213+ )" , lambda.data (), TransferName.data (), TopicName.data (), TableName.data (), ConnectionString.data (), sb.data ());
214+
215+ ExecuteDDL (ddl );
215216 }
216217
217218 struct AlterTransferSettings {
@@ -360,7 +361,8 @@ struct MainTestCase {
360361 for (size_t i = 20 ; i--;) {
361362 auto result = DescribeTransfer ().GetReplicationDescription ();
362363 if (TReplicationDescription::EState::Error == result.GetState ()) {
363- Cerr << " >>>>> " << result.GetErrorState ().GetIssues ().ToOneLineString () << Endl << Flush;
364+ Cerr << " >>>>> ACTUAL: " << result.GetErrorState ().GetIssues ().ToOneLineString () << Endl << Flush;
365+ Cerr << " >>>>> EXPECTED: " << expectedMessage << Endl << Flush;
364366 UNIT_ASSERT (result.GetErrorState ().GetIssues ().ToOneLineString ().contains (expectedMessage));
365367 break ;
366368 }
@@ -1242,7 +1244,34 @@ Y_UNIT_TEST_SUITE(Transfer)
12421244 }});
12431245 }
12441246
1245- Y_UNIT_TEST (CreateTransferTopicNotExists)
1247+ Y_UNIT_TEST (CreateTransferSourceNotExists)
1248+ {
1249+ MainTestCase testCase;
1250+ testCase.CreateTable (R"(
1251+ CREATE TABLE `%s` (
1252+ Key Uint64 NOT NULL,
1253+ Message Utf8 NOT NULL,
1254+ PRIMARY KEY (Key)
1255+ ) WITH (
1256+ STORE = COLUMN
1257+ );
1258+ )" );
1259+
1260+ testCase.CreateTransfer (R"(
1261+ $l = ($x) -> {
1262+ return [
1263+ <|
1264+ Key:CAST($x._offset AS Uint64),
1265+ Message:CAST($x._data AS Utf8)
1266+ |>
1267+ ];
1268+ };
1269+ )" );
1270+
1271+ testCase.CheckTransferStateError (" Discovery error: local/Topic_" );
1272+ }
1273+
1274+ Y_UNIT_TEST (CreateTransferSourceIsNotTopic)
12461275 {
12471276 MainTestCase testCase;
12481277 testCase.CreateTable (R"(
@@ -1255,6 +1284,13 @@ Y_UNIT_TEST_SUITE(Transfer)
12551284 );
12561285 )" );
12571286
1287+ testCase.ExecuteDDL (Sprintf (R"(
1288+ CREATE TABLE `%s` (
1289+ Key Uint64 NOT NULL,
1290+ PRIMARY KEY (Key)
1291+ );
1292+ )" , testCase.TopicName .data ()));
1293+
12581294 testCase.CreateTransfer (R"(
12591295 $l = ($x) -> {
12601296 return [
@@ -1292,7 +1328,48 @@ Y_UNIT_TEST_SUITE(Transfer)
12921328 };
12931329 )" );
12941330
1295- testCase.CheckTransferStateError (" Unexpected entry kind at 'writer'" );
1331+ testCase.CheckTransferStateError (" Only column tables are supported as transfer targets" );
1332+ }
1333+
1334+ Y_UNIT_TEST (CreateTransferTargetIsNotTable)
1335+ {
1336+ MainTestCase testCase;
1337+ testCase.CreateTable (R"(
1338+ CREATE TOPIC `%s`;
1339+ )" );
1340+ testCase.CreateTopic ();
1341+
1342+ testCase.CreateTransfer (R"(
1343+ $l = ($x) -> {
1344+ return [
1345+ <|
1346+ Key:CAST($x._offset AS Uint64),
1347+ Message:CAST($x._data AS Utf8)
1348+ |>
1349+ ];
1350+ };
1351+ )" );
1352+
1353+ testCase.CheckTransferStateError (" Only column tables are supported as transfer targets" );
1354+ }
1355+
1356+ Y_UNIT_TEST (CreateTransferTargetNotExists)
1357+ {
1358+ MainTestCase testCase;
1359+ testCase.CreateTopic ();
1360+
1361+ testCase.CreateTransfer (R"(
1362+ $l = ($x) -> {
1363+ return [
1364+ <|
1365+ Key:CAST($x._offset AS Uint64),
1366+ Message:CAST($x._data AS Utf8)
1367+ |>
1368+ ];
1369+ };
1370+ )" );
1371+
1372+ testCase.CheckTransferStateError (TStringBuilder () << " The target table `/local/" << testCase.TableName << " ` does not exist" );
12961373 }
12971374}
12981375
0 commit comments