Skip to content

Commit 646a69f

Browse files
committed
do not capture exception in importer sink
1 parent 61e43cf commit 646a69f

File tree

2 files changed

+14
-30
lines changed

2 files changed

+14
-30
lines changed

src/Storages/ObjectStorage/MergeTree/StorageObjectStorageMergeTreePartImporterSink.cpp

Lines changed: 10 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -38,42 +38,29 @@ String StorageObjectStorageMergeTreePartImporterSink::getName() const
3838

3939
void StorageObjectStorageMergeTreePartImporterSink::consume(Chunk & chunk)
4040
{
41-
if (stats.status.code != 0)
42-
return;
43-
44-
try
45-
{
46-
sink->consume(chunk);
47-
stats.read_bytes += chunk.bytes();
48-
stats.read_rows += chunk.getNumRows();
49-
50-
stats.status = ExecutionStatus(0, "Success");
51-
} catch (...) {
52-
stats.status = ExecutionStatus(-1, "Error importing part");
53-
part_log(stats);
54-
}
41+
sink->consume(chunk);
42+
stats.read_bytes += chunk.bytes();
43+
stats.read_rows += chunk.getNumRows();
5544
}
5645

5746
void StorageObjectStorageMergeTreePartImporterSink::onFinish()
5847
{
59-
if (stats.status.code != 0)
60-
{
61-
sink->cancel();
62-
return;
63-
}
64-
6548
sink->onFinish();
49+
6650
if (const auto object_metadata = object_storage->tryGetObjectMetadata(stats.file_path))
6751
{
6852
stats.bytes_on_disk = object_metadata->size_bytes;
6953
}
54+
7055
part_log(stats);
7156
}
7257

73-
void StorageObjectStorageMergeTreePartImporterSink::onException(std::exception_ptr)
58+
void StorageObjectStorageMergeTreePartImporterSink::onException(std::exception_ptr exception)
7459
{
75-
/// we should not reach here
76-
std::terminate();
60+
sink->onException(exception);
61+
62+
stats.status = ExecutionStatus(-1, "Error importing part");
63+
part_log(stats);
7764
}
7865

7966
}

src/Storages/ObjectStorage/MergeTree/StorageObjectStorageMergeTreePartImporterSink.h

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,10 @@
22

33
#include <Interpreters/Context.h>
44
#include <Storages/ObjectStorage/StorageObjectStorageSink.h>
5-
#include "Core/Settings.h"
6-
#include "Disks/ObjectStorages/IObjectStorage.h"
7-
#include "Disks/ObjectStorages/StoredObject.h"
8-
#include "Formats/FormatFactory.h"
9-
#include "IO/CompressionMethod.h"
10-
#include "Processors/Formats/IOutputFormat.h"
11-
#include "Storages/MergeTree/IMergeTreeDataPart.h"
5+
#include <Disks/ObjectStorages/IObjectStorage.h>
6+
#include <Formats/FormatFactory.h>
7+
#include <Processors/Formats/IOutputFormat.h>
8+
#include <Storages/MergeTree/IMergeTreeDataPart.h>
129

1310
namespace DB
1411
{

0 commit comments

Comments
 (0)