22
33#include " read_write_range_lock.h"
44
5+ #include < cloud/filestore/libs/diagnostics/critical_events.h>
56#include < cloud/filestore/libs/service/context.h>
67
78#include < cloud/storage/core/libs/common/file_ring_buffer.h>
89
10+ #include < library/cpp/digest/crc32c/crc32c.h>
911#include < library/cpp/threading/future/subscription/wait_all.h>
1012
1113#include < util/generic/hash_set.h>
@@ -292,6 +294,8 @@ class TWriteBackCache::TImpl final
292294 const ITimerPtr Timer;
293295 const IWriteBackCacheStatsPtr Stats;
294296 const TFlushConfig FlushConfig;
297+ const TLog Log;
298+ const TString LogTag;
295299
296300 // All fields below should be protected by this lock
297301 TMutex Lock;
@@ -329,6 +333,9 @@ class TWriteBackCache::TImpl final
329333 ISchedulerPtr scheduler,
330334 ITimerPtr timer,
331335 IWriteBackCacheStatsPtr stats,
336+ TLog log,
337+ const TString& fileSystemId,
338+ const TString& clientId,
332339 const TString& filePath,
333340 ui64 capacityBytes,
334341 TFlushConfig flushConfig)
@@ -337,6 +344,9 @@ class TWriteBackCache::TImpl final
337344 , Timer(std::move(timer))
338345 , Stats(std::move(stats))
339346 , FlushConfig(flushConfig)
347+ , Log(std::move(log))
348+ , LogTag(
349+ Sprintf (" [f:%s][c:%s]" , fileSystemId.c_str(), clientId.c_str()))
340350 , CachedEntriesPersistentQueue(filePath, capacityBytes)
341351 {
342352 // File ring buffer should be able to store any valid TWriteDataRequest.
@@ -349,27 +359,58 @@ class TWriteBackCache::TImpl final
349359
350360 Stats->ResetNonDerivativeCounters ();
351361
362+ TWriteDataEntryDeserializationStats deserializationStats;
363+
352364 CachedEntriesPersistentQueue.Visit (
353365 [&](auto checksum, auto serializedRequest)
354366 {
355367 auto entry = std::make_unique<TWriteDataEntry>(
356368 checksum,
357369 serializedRequest,
370+ deserializationStats,
358371 this );
359372
360373 if (entry->IsCorrupted ()) {
361374 // This may happen when a buffer was corrupted.
362375 // We should add this entry to a queue like a normal entry
363376 // because there is 1-by-1 correspondence between
364377 // CachedEntriesPersistentQueue and CachedEntries.
365- // TODO(nasonov): report it
366378 CachedEntries.push_back (std::move (entry));
367379 } else {
368380 auto * nodeState = GetOrCreateNodeState (entry->GetNodeId ());
369381 AddCachedEntry (nodeState, std::move (entry));
370382 }
371383 });
372384
385+ STORAGE_INFO (
386+ LogTag << " WriteBackCache has been initialized "
387+ << " {\" FilePath\" : " << filePath.Quote ()
388+ << " , \" RawCapacityBytes\" : "
389+ << CachedEntriesPersistentQueue.GetRawCapacity ()
390+ << " , \" RawUsedBytesCount\" : "
391+ << CachedEntriesPersistentQueue.GetRawUsedBytesCount ()
392+ << " , \" WriteDataRequestCount\" : "
393+ << CachedEntriesPersistentQueue.Size () << " }" );
394+
395+ if (deserializationStats.HasFailed ()) {
396+ // Each deserialization failure event has been already reported
397+ // as a critical error - just write statistics to the log
398+ STORAGE_ERROR (
399+ LogTag << " WriteBackCache request deserialization failure "
400+ << " {\" ChecksumMismatchCount\" : "
401+ << deserializationStats.ChecksumMismatchCount
402+ << " , \" EntrySizeMismatchCount\" : "
403+ << deserializationStats.EntrySizeMismatchCount
404+ << " , \" ProtobufDeserializationErrorCount\" : "
405+ << deserializationStats.ProtobufDeserializationErrorCount
406+ << " }" );
407+ }
408+
409+ if (CachedEntriesPersistentQueue.IsCorrupted ()) {
410+ ReportWriteBackCacheCorruptionError (
411+ LogTag + " WriteBackCache persistent queue is corrupted" );
412+ }
413+
373414 UpdatePersistentQueueStats ();
374415 }
375416
@@ -1044,8 +1085,18 @@ class TWriteBackCache::TImpl final
10441085 FlushConfig.MaxSumWriteRequestsSize );
10451086
10461087 if (entryCount == 0 ) {
1047- // Even a single entry is too large to flush
1048- // TODO(nasonov): report and try to flush it anyway
1088+ STORAGE_WARN (
1089+ LogTag << " WriteBackCache WriteData request size exceeds "
1090+ " flush limits, flushing anyway "
1091+ << " {\" MaxWriteRequestSize\" : "
1092+ << FlushConfig.MaxWriteRequestSize
1093+ << " , \" MaxWriteRequestsCount\" : "
1094+ << FlushConfig.MaxWriteRequestsCount
1095+ << " , \" MaxSumWriteRequestsSize\" : "
1096+ << FlushConfig.MaxSumWriteRequestsSize
1097+ << " , \" WriteDataRequestSize\" : "
1098+ << nodeState->CachedEntries .front ()->GetBuffer ().size ()
1099+ << " }" );
10491100 entryCount = 1 ;
10501101 }
10511102
@@ -1256,6 +1307,9 @@ TWriteBackCache::TWriteBackCache(
12561307 ISchedulerPtr scheduler,
12571308 ITimerPtr timer,
12581309 IWriteBackCacheStatsPtr stats,
1310+ TLog log,
1311+ const TString& fileSystemId,
1312+ const TString& clientId,
12591313 const TString& filePath,
12601314 ui64 capacityBytes,
12611315 TDuration automaticFlushPeriod,
@@ -1269,6 +1323,9 @@ TWriteBackCache::TWriteBackCache(
12691323 std::move(scheduler),
12701324 std::move(timer),
12711325 std::move(stats),
1326+ std::move(log),
1327+ fileSystemId,
1328+ clientId,
12721329 filePath,
12731330 capacityBytes,
12741331 {.AutomaticFlushPeriod = automaticFlushPeriod,
@@ -1321,10 +1378,24 @@ TWriteBackCache::TWriteDataEntry::TWriteDataEntry(
13211378TWriteBackCache::TWriteDataEntry::TWriteDataEntry (
13221379 ui32 checksum,
13231380 TStringBuf serializedRequest,
1381+ TWriteDataEntryDeserializationStats& deserializationStats,
13241382 TImpl* impl)
13251383{
1326- // TODO(nasonov): validate checksum
1384+ deserializationStats.EntryCount ++;
1385+
13271386 Y_UNUSED (checksum);
1387+ // TODO(nasonov): validate checksum when TFileRingBuffer supports
1388+ // in-place allocation. Currently, data is written directly to the
1389+ // TFileRingBuffer without Crc32 calculation
1390+ //
1391+ // auto expectedChecksum =
1392+ // Crc32c(serializedRequest.data(), serializedRequest.size());
1393+ //
1394+ // if (expectedChecksum != checksum) {
1395+ // stats.ChecksumMismatchCount++;
1396+ // SetStatus(EWriteDataRequestStatus::Corrupted, impl);
1397+ // return;
1398+ // }
13281399
13291400 TMemoryInput mi (serializedRequest);
13301401
@@ -1336,23 +1407,29 @@ TWriteBackCache::TWriteDataEntry::TWriteDataEntry(
13361407 // Currently this may happen when execution stopped between allocation
13371408 // and Serialization. In future, this can happen only as a result of
13381409 // corruption
1410+ deserializationStats.EntrySizeMismatchCount ++;
1411+ ReportWriteBackCacheCorruptionError (
1412+ " TWriteDataEntry deserialization error: entry is empty" );
13391413 SetStatus (EWriteDataRequestStatus::Corrupted, impl);
13401414 return ;
13411415 }
13421416
13431417 const char * bufferPtr = mi.Buf ();
13441418
13451419 if (mi.Skip (bufferSize) != bufferSize) {
1346- // Buffer corruption
1347- // TODO(nasonov): report and handle
1420+ deserializationStats.EntrySizeMismatchCount ++;
1421+ ReportWriteBackCacheCorruptionError (
1422+ " TWriteDataEntry deserialization error: invalid entry size" );
13481423 SetStatus (EWriteDataRequestStatus::Corrupted, impl);
13491424 return ;
13501425 }
13511426
13521427 auto parsedRequest = std::make_shared<NProto::TWriteDataRequest>();
1353- if (!parsedRequest->ParseFromArray (mi.Buf (), static_cast <int >(mi.Avail ()))) {
1354- // Buffer corruption
1355- // TODO(nasonov): report and handle
1428+ if (!parsedRequest->ParseFromArray (mi.Buf (), static_cast <int >(mi.Avail ())))
1429+ {
1430+ deserializationStats.ProtobufDeserializationErrorCount ++;
1431+ ReportWriteBackCacheCorruptionError (
1432+ " TWriteDataEntry deserialization error: ParseFromArray has failed" );
13561433 SetStatus (EWriteDataRequestStatus::Corrupted, impl);
13571434 return ;
13581435 }
0 commit comments