Skip to content

Commit 180095c

Browse files
authored
Merge pull request #8048 from nextcloud/bugfix/slowBulkUpload
Bugfix/slow bulk upload
2 parents faf93b5 + 7974a47 commit 180095c

File tree

10 files changed

+48
-39
lines changed

10 files changed

+48
-39
lines changed

src/common/checksums.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -201,7 +201,7 @@ QByteArray ComputeChecksum::checksumType() const
201201

202202
void ComputeChecksum::start(const QString &filePath)
203203
{
204-
qCInfo(lcChecksums) << "Computing" << checksumType() << "checksum of" << filePath << "in a thread";
204+
qCDebug(lcChecksums) << "Computing" << checksumType() << "checksum of" << filePath << "in a thread";
205205
startImpl(filePath);
206206
}
207207

src/common/syncjournaldb.cpp

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1711,13 +1711,11 @@ static void toDownloadInfo(SqlQuery &query, SyncJournalDb::DownloadInfo *res)
17111711
res->_valid = ok;
17121712
}
17131713

1714-
static bool deleteBatch(SqlQuery &query, const QStringList &entries, const QString &name)
1714+
static bool deleteBatch(SqlQuery &query, const QStringList &entries)
17151715
{
17161716
if (entries.isEmpty())
17171717
return true;
17181718

1719-
qCDebug(lcDb) << "Removing stale" << name << "entries:" << entries.join(QStringLiteral(", "));
1720-
// FIXME: Was ported from execBatch, check if correct!
17211719
for (const auto &entry : entries) {
17221720
query.reset_and_clear_bindings();
17231721
query.bindValue(1, entry);
@@ -1831,7 +1829,7 @@ QVector<SyncJournalDb::DownloadInfo> SyncJournalDb::getAndDeleteStaleDownloadInf
18311829
qCDebug(lcDb) << "database error:" << query->error();
18321830
return empty_result;
18331831
}
1834-
if (!deleteBatch(*query, superfluousPaths, QStringLiteral("downloadinfo"))) {
1832+
if (!deleteBatch(*query, superfluousPaths)) {
18351833
return empty_result;
18361834
}
18371835
}
@@ -1965,7 +1963,7 @@ QVector<uint> SyncJournalDb::deleteStaleUploadInfos(const QSet<QString> &keep)
19651963
}
19661964

19671965
const auto deleteUploadInfoQuery = _queryManager.get(PreparedSqlQueryManager::DeleteUploadInfoQuery);
1968-
deleteBatch(*deleteUploadInfoQuery, superfluousPaths, QStringLiteral("uploadinfo"));
1966+
deleteBatch(*deleteUploadInfoQuery, superfluousPaths);
19691967
return ids;
19701968
}
19711969

@@ -2033,7 +2031,7 @@ bool SyncJournalDb::deleteStaleErrorBlacklistEntries(const QSet<QString> &keep)
20332031

20342032
SqlQuery delQuery(_db);
20352033
delQuery.prepare("DELETE FROM blacklist WHERE path = ?");
2036-
return deleteBatch(delQuery, superfluousPaths, QStringLiteral("blacklist"));
2034+
return deleteBatch(delQuery, superfluousPaths);
20372035
}
20382036

20392037
void SyncJournalDb::deleteStaleFlagsEntries()

src/gui/folderwatcher.cpp

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -223,8 +223,6 @@ void FolderWatcher::changeDetected(const QStringList &paths)
223223
_lockedFiles.insert(checkResult.path);
224224
}
225225

226-
qCDebug(lcFolderWatcher) << "Locked files:" << _lockedFiles.values();
227-
228226
// ------- handle ignores:
229227
if (pathIsIgnored(path)) {
230228
continue;
@@ -233,9 +231,6 @@ void FolderWatcher::changeDetected(const QStringList &paths)
233231
changedPaths.insert(path);
234232
}
235233

236-
qCDebug(lcFolderWatcher) << "Unlocked files:" << _unlockedFiles.values();
237-
qCDebug(lcFolderWatcher) << "Locked files:" << _lockedFiles;
238-
239234
if (!_lockedFiles.isEmpty() || !_unlockedFiles.isEmpty()) {
240235
if (_lockChangeDebouncingTimer.isActive()) {
241236
_lockChangeDebouncingTimer.stop();

src/libsync/bulkpropagatorjob.cpp

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,6 @@ QByteArray getHeaderFromJsonReply(const QJsonObject &reply, const QByteArray &he
5858
return reply.value(headerName).toString().toLatin1();
5959
}
6060

61-
constexpr auto batchSize = 100;
6261
constexpr auto parallelJobsMaximumCount = 1;
6362

6463
}
@@ -70,10 +69,10 @@ Q_LOGGING_CATEGORY(lcBulkPropagatorJob, "nextcloud.sync.propagator.bulkupload",
7069
BulkPropagatorJob::BulkPropagatorJob(OwncloudPropagator *propagator, const std::deque<SyncFileItemPtr> &items)
7170
: PropagatorJob(propagator)
7271
, _items(items)
73-
, _currentBatchSize(batchSize)
72+
, _currentBatchSize(_items.size())
7473
{
75-
_filesToUpload.reserve(batchSize);
76-
_pendingChecksumFiles.reserve(batchSize);
74+
_filesToUpload.reserve(_items.size());
75+
_pendingChecksumFiles.reserve(_items.size());
7776
}
7877

7978
bool BulkPropagatorJob::scheduleSelfOrChild()
@@ -84,11 +83,15 @@ bool BulkPropagatorJob::scheduleSelfOrChild()
8483

8584
_state = Running;
8685

87-
for(auto i = 0; i < _currentBatchSize && !_items.empty(); ++i) {
86+
qCDebug(lcBulkPropagatorJob()) << "max chunk size" << PropagatorJob::propagator()->syncOptions().maxChunkSize();
87+
88+
for(auto batchDataSize = 0; batchDataSize <= PropagatorJob::propagator()->syncOptions().maxChunkSize() && !_items.empty(); ) {
8889
const auto currentItem = _items.front();
8990
_items.pop_front();
9091
_pendingChecksumFiles.insert(currentItem->_file);
9192

93+
batchDataSize += currentItem->_size;
94+
9295
QMetaObject::invokeMethod(this, [this, currentItem] {
9396
UploadFileInfo fileToUpload;
9497
fileToUpload._file = currentItem->_file;
@@ -117,7 +120,7 @@ bool BulkPropagatorJob::handleBatchSize()
117120
}
118121

119122
// change batch size before trying it again
120-
const auto halfBatchSize = batchSize / 2;
123+
const auto halfBatchSize = static_cast<int>(_items.size() / 2);
121124

122125
// we already tried to upload with half of the batch size
123126
if(_currentBatchSize == halfBatchSize) {
@@ -221,7 +224,6 @@ void BulkPropagatorJob::doStartUpload(SyncFileItemPtr item,
221224
remotePath, fileToUpload._path,
222225
fileToUpload._size, currentHeaders};
223226

224-
qCInfo(lcBulkPropagatorJob) << remotePath << "transmission checksum" << transmissionChecksumHeader << fileToUpload._path;
225227
_filesToUpload.push_back(std::move(newUploadFile));
226228
_pendingChecksumFiles.remove(item->_file);
227229

@@ -583,7 +585,7 @@ void BulkPropagatorJob::finalizeOneFile(const BulkUploadItem &oneFile)
583585

584586
void BulkPropagatorJob::finalize(const QJsonObject &fullReply)
585587
{
586-
qCDebug(lcBulkPropagatorJob) << "Received a full reply" << fullReply;
588+
qCDebug(lcBulkPropagatorJob) << "Received a full reply" << QJsonDocument::fromVariant(fullReply).toJson();
587589

588590
for(auto singleFileIt = std::begin(_filesToUpload); singleFileIt != std::end(_filesToUpload); ) {
589591
const auto &singleFile = *singleFileIt;

src/libsync/configfile.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -272,13 +272,13 @@ qint64 ConfigFile::chunkSize() const
272272
qint64 ConfigFile::maxChunkSize() const
273273
{
274274
QSettings settings(configFile(), QSettings::IniFormat);
275-
return settings.value(QLatin1String(maxChunkSizeC), 5LL * 1000LL * 1000LL * 1000LL).toLongLong(); // default to 5000 MB
275+
return settings.value(QLatin1String(maxChunkSizeC), 100LL * 1024LL * 1024LL).toLongLong(); // default to 100 MiB
276276
}
277277

278278
qint64 ConfigFile::minChunkSize() const
279279
{
280280
QSettings settings(configFile(), QSettings::IniFormat);
281-
return settings.value(QLatin1String(minChunkSizeC), 5LL * 1000LL * 1000LL).toLongLong(); // default to 5 MB
281+
return settings.value(QLatin1String(minChunkSizeC), 5LL * 1024LL * 1024LL).toLongLong(); // default to 5 MiB
282282
}
283283

284284
chrono::milliseconds ConfigFile::targetChunkUploadDuration() const

src/libsync/owncloudpropagator.cpp

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1267,7 +1267,9 @@ bool PropagatorCompositeJob::scheduleSelfOrChild()
12671267
_tasksToDo.remove(0);
12681268
PropagatorJob *job = propagator()->createJob(nextTask);
12691269
if (!job) {
1270-
qCWarning(lcDirectory) << "Useless task found for file" << nextTask->destination() << "instruction" << nextTask->_instruction;
1270+
if (!propagator()->isDelayedUploadItem(nextTask)) {
1271+
qCWarning(lcDirectory) << "Useless task found for file" << nextTask->destination() << "instruction" << nextTask->_instruction;
1272+
}
12711273
continue;
12721274
}
12731275
appendJob(job);

src/libsync/propagateupload.cpp

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -64,18 +64,27 @@ void PUTFileJob::start()
6464

6565
req.setPriority(QNetworkRequest::LowPriority); // Long uploads must not block non-propagation jobs.
6666

67+
auto requestID = QByteArray{};
68+
6769
if (_url.isValid()) {
68-
sendRequest("PUT", _url, req, _device);
70+
const auto reply = sendRequest("PUT", _url, req, _device);
71+
requestID = reply->request().rawHeader("X-Request-ID");
6972
} else {
70-
sendRequest("PUT", makeDavUrl(path()), req, _device);
73+
const auto reply = sendRequest("PUT", makeDavUrl(path()), req, _device);
74+
requestID = reply->request().rawHeader("X-Request-ID");
7175
}
7276

7377
if (reply()->error() != QNetworkReply::NoError) {
7478
qCWarning(lcPutJob) << " Network error: " << reply()->errorString();
7579
}
7680

81+
connect(reply(), &QNetworkReply::uploadProgress, this, [requestID] (qint64 bytesSent, qint64 bytesTotal) {
82+
qCDebug(lcPutJob()) << requestID << "upload progress" << bytesSent << bytesTotal;
83+
});
84+
7785
connect(reply(), &QNetworkReply::uploadProgress, this, &PUTFileJob::uploadProgress);
7886
connect(this, &AbstractNetworkJob::networkActivity, account().data(), &Account::propagatorNetworkActivity);
87+
7988
_requestTimer.start();
8089
AbstractNetworkJob::start();
8190
}

src/libsync/putmultifilejob.cpp

Lines changed: 15 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -32,8 +32,6 @@ PutMultiFileJob::PutMultiFileJob(AccountPtr account,
3232

3333
for(const auto &singleDevice : _devices) {
3434
singleDevice._device->setParent(this);
35-
connect(this, &PutMultiFileJob::uploadProgress,
36-
singleDevice._device.get(), &UploadDevice::slotJobUploadProgress);
3735
}
3836
}
3937

@@ -56,7 +54,12 @@ void PutMultiFileJob::start()
5654
if (oneDevice._device->size() == 0) {
5755
onePart.setBody({});
5856
} else {
59-
onePart.setBodyDevice(oneDevice._device.get());
57+
const auto allData = oneDevice._device->readAll();
58+
onePart.setBody(allData);
59+
}
60+
61+
if (oneDevice._device->isOpen()) {
62+
oneDevice._device->close();
6063
}
6164

6265
for (auto it = oneDevice._headers.begin(); it != oneDevice._headers.end(); ++it) {
@@ -68,13 +71,17 @@ void PutMultiFileJob::start()
6871
_body.append(onePart);
6972
}
7073

71-
sendRequest("POST", _url, req, &_body);
74+
const auto newReply = sendRequest("POST", _url, req, &_body);
75+
const auto &requestID = newReply->request().rawHeader("X-Request-ID");
7276

7377
if (reply()->error() != QNetworkReply::NoError) {
7478
qCWarning(lcPutMultiFileJob) << " Network error: " << reply()->errorString();
7579
}
7680

7781
connect(reply(), &QNetworkReply::uploadProgress, this, &PutMultiFileJob::uploadProgress);
82+
connect(reply(), &QNetworkReply::uploadProgress, this, [requestID] (qint64 bytesSent, qint64 bytesTotal) {
83+
qCDebug(lcPutMultiFileJob()) << requestID << "upload progress" << bytesSent << bytesTotal;
84+
});
7885
connect(this, &AbstractNetworkJob::networkActivity, account().data(), &Account::propagatorNetworkActivity);
7986
_requestTimer.start();
8087
AbstractNetworkJob::start();
@@ -90,15 +97,12 @@ bool PutMultiFileJob::finished()
9097
for(const auto &oneDevice : _devices) {
9198
Q_ASSERT(oneDevice._device);
9299

93-
if (!oneDevice._device->errorString().isEmpty()) {
94-
qCWarning(lcPutMultiFileJob) << "oneDevice has error:" << oneDevice._device->errorString();
95-
}
96-
97100
if (oneDevice._device->isOpen()) {
101+
if (!oneDevice._device->errorString().isEmpty()) {
102+
qCWarning(lcPutMultiFileJob) << "oneDevice has error:" << oneDevice._device->errorString();
103+
}
104+
98105
oneDevice._device->close();
99-
} else {
100-
qCWarning(lcPutMultiFileJob) << "Did not close device" << oneDevice._device.get()
101-
<< "as it was not open";
102106
}
103107
}
104108

src/libsync/syncengine.cpp

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -228,7 +228,6 @@ void SyncEngine::deleteStaleDownloadInfos(const SyncFileItemVector &syncItems)
228228
_journal->getAndDeleteStaleDownloadInfos(download_file_paths);
229229
for (const SyncJournalDb::DownloadInfo &deleted_info : deleted_infos) {
230230
const QString tmppath = _propagator->fullLocalPath(deleted_info._tmpfile);
231-
qCInfo(lcEngine) << "Deleting stale temporary file: " << tmppath;
232231
FileSystem::remove(tmppath);
233232
}
234233
}

test/testsyncengine.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1208,7 +1208,7 @@ private slots:
12081208

12091209
QVERIFY(fakeFolder.syncOnce());
12101210
QCOMPARE(nPUT, 0);
1211-
QCOMPARE(nPOST, 2);
1211+
QCOMPARE(nPOST, 1);
12121212
nPUT = 0;
12131213
nPOST = 0;
12141214

@@ -1219,7 +1219,7 @@ private slots:
12191219

12201220
QVERIFY(!fakeFolder.syncOnce());
12211221
QCOMPARE(nPUT, 120);
1222-
QCOMPARE(nPOST, 2);
1222+
QCOMPARE(nPOST, 1);
12231223
nPUT = 0;
12241224
nPOST = 0;
12251225

0 commit comments

Comments
 (0)