|
31 | 31 | #include "tscore/hugepages.h" |
32 | 32 | #include "tscore/Random.h" |
33 | 33 | #include "ts/ats_probe.h" |
| 34 | +#include "iocore/eventsystem/Tasks.h" |
| 35 | + |
| 36 | +#include <unordered_map> |
34 | 37 |
|
35 | 38 | #ifdef LOOP_CHECK_MODE |
36 | 39 | #define DIR_LOOP_THRESHOLD 1000 |
@@ -862,14 +865,51 @@ dir_lookaside_remove(const CacheKey *key, StripeSM *stripe) |
862 | 865 | return; |
863 | 866 | } |
864 | 867 |
|
865 | | -// Cache Sync |
866 | | -// |
| 868 | +// Cache Dir Sync |
867 | 869 |
|
868 | 870 | void |
869 | 871 | dir_sync_init() |
870 | 872 | { |
871 | | - cacheDirSync = new CacheSync; |
872 | | - cacheDirSync->trigger = eventProcessor.schedule_in(cacheDirSync, HRTIME_SECONDS(cache_config_dir_sync_frequency)); |
| 873 | + static std::vector<std::unique_ptr<CacheSync>> cache_syncs; |
| 874 | + static bool initialized = false; |
| 875 | + std::unordered_map<CacheDisk *, std::vector<int>> drive_stripe_map; |
| 876 | + |
| 877 | + if (initialized) { |
| 878 | + Warning("dir_sync_init() called multiple times - ignoring"); |
| 879 | + return; |
| 880 | + } |
| 881 | + initialized = true; |
| 882 | + |
| 883 | + for (int i = 0; i < gnstripes; i++) { |
| 884 | + drive_stripe_map[gstripes[i]->disk].push_back(i); |
| 885 | + } |
| 886 | + |
| 887 | + int num_tasks = |
| 888 | + (cache_config_dir_sync_parallel_tasks == -1) ? drive_stripe_map.size() : std::max(1, cache_config_dir_sync_parallel_tasks); |
| 889 | + |
| 890 | + cache_syncs.resize(num_tasks); |
| 891 | + for (int i = 0; i < num_tasks; i++) { |
| 892 | + cache_syncs[i] = std::make_unique<CacheSync>(); |
| 893 | + } |
| 894 | + |
| 895 | + int task_idx = 0; |
| 896 | + |
| 897 | + for (auto &[disk, indices] : drive_stripe_map) { |
| 898 | + int target_task = task_idx % num_tasks; |
| 899 | + |
| 900 | + Dbg(dbg_ctl_cache_dir_sync, "Disk %s: %zu stripe(s) assigned to task %d", disk->path, indices.size(), target_task); |
| 901 | + for (int stripe_idx : indices) { |
| 902 | + cache_syncs[target_task]->stripe_indices.push_back(stripe_idx); |
| 903 | + } |
| 904 | + task_idx++; |
| 905 | + } |
| 906 | + |
| 907 | + for (int i = 0; i < num_tasks; i++) { |
| 908 | + Dbg(dbg_ctl_cache_dir_sync, "Task %d: syncing %zu stripe(s)", i, cache_syncs[i]->stripe_indices.size()); |
| 909 | + cache_syncs[i]->current_index = 0; |
| 910 | + cache_syncs[i]->trigger = |
| 911 | + eventProcessor.schedule_in(cache_syncs[i].get(), HRTIME_SECONDS(cache_config_dir_sync_frequency), ET_TASK); |
| 912 | + } |
873 | 913 | } |
874 | 914 |
|
875 | 915 | void |
@@ -930,34 +970,35 @@ sync_cache_dir_on_shutdown() |
930 | 970 | } |
931 | 971 |
|
932 | 972 | int |
933 | | -CacheSync::mainEvent(int event, Event *e) |
| 973 | +CacheSync::mainEvent(int event, Event * /* e ATS_UNUSED */) |
934 | 974 | { |
935 | 975 | if (trigger) { |
936 | 976 | trigger->cancel_action(); |
937 | 977 | trigger = nullptr; |
938 | 978 | } |
939 | 979 |
|
940 | 980 | Lrestart: |
941 | | - if (stripe_index >= gnstripes) { |
942 | | - stripe_index = 0; |
| 981 | + if (current_index >= static_cast<int>(stripe_indices.size())) { |
| 982 | + current_index = 0; |
| 983 | +#if FREE_BUF_BETWEEN_CYCLES |
| 984 | + // Free buffer between sync cycles to avoid holding large amounts of memory |
943 | 985 | if (buf) { |
944 | 986 | if (buf_huge) { |
945 | 987 | ats_free_hugepage(buf, buflen); |
946 | 988 | } else { |
947 | 989 | ats_free(buf); |
948 | 990 | } |
949 | | - buflen = 0; |
950 | 991 | buf = nullptr; |
| 992 | + buflen = 0; |
951 | 993 | buf_huge = false; |
952 | 994 | } |
953 | | - Dbg(dbg_ctl_cache_dir_sync, "sync done"); |
954 | | - if (event == EVENT_INTERVAL) { |
955 | | - trigger = e->ethread->schedule_in(this, HRTIME_SECONDS(cache_config_dir_sync_frequency)); |
956 | | - } else { |
957 | | - trigger = eventProcessor.schedule_in(this, HRTIME_SECONDS(cache_config_dir_sync_frequency)); |
958 | | - } |
| 995 | +#endif |
| 996 | + Dbg(dbg_ctl_cache_dir_sync, "sync cycle done"); |
| 997 | + trigger = eventProcessor.schedule_in(this, HRTIME_SECONDS(cache_config_dir_sync_frequency), ET_TASK); |
959 | 998 | return EVENT_CONT; |
960 | 999 | } |
| 1000 | + stripe_index = stripe_indices[current_index]; |
| 1001 | + current_index++; |
961 | 1002 |
|
962 | 1003 | StripeSM *stripe = gstripes[stripe_index]; // must be named "vol" to make STAT macros work. |
963 | 1004 |
|
@@ -1007,6 +1048,7 @@ CacheSync::mainEvent(int event, Event *e) |
1007 | 1048 | if (stripe->is_io_in_progress() || stripe->get_agg_buf_pos()) { |
1008 | 1049 | Dbg(dbg_ctl_cache_dir_sync, "Dir %s: waiting for agg buffer", stripe->hash_text.get()); |
1009 | 1050 | stripe->dir_sync_waiting = true; |
| 1051 | + stripe->waiting_dir_sync = this; |
1010 | 1052 | if (!stripe->is_io_in_progress()) { |
1011 | 1053 | stripe->aggWrite(EVENT_IMMEDIATE, nullptr); |
1012 | 1054 | } |
@@ -1072,9 +1114,7 @@ CacheSync::mainEvent(int event, Event *e) |
1072 | 1114 | return EVENT_CONT; |
1073 | 1115 | } |
1074 | 1116 | Ldone: |
1075 | | - // done |
1076 | 1117 | writepos = 0; |
1077 | | - ++stripe_index; |
1078 | 1118 | goto Lrestart; |
1079 | 1119 | } |
1080 | 1120 |
|
|
0 commit comments