|
28 | 28 | namespace fdsdump { |
29 | 29 | namespace aggregator { |
30 | 30 |
|
| 31 | +// Merge two hash tables containing records defined by view |
| 32 | +void merge_hash_tables(const View &view, HashTable &dst_table, HashTable &src_table) |
| 33 | +{ |
| 34 | + uint8_t *dst_record = nullptr; |
| 35 | + for (uint8_t *src_record : src_table.items()) { |
| 36 | + bool found = dst_table.find_or_create(src_record, dst_record); |
| 37 | + if (found) { |
| 38 | + // If found - merge |
| 39 | + for (auto x : view.iter_values(dst_record, src_record)) { |
| 40 | + x.field.merge(x.value1, x.value2); |
| 41 | + } |
| 42 | + } else { |
| 43 | + // If not found - copy over |
| 44 | + // Key is already copied by find_or_create, so only value needs to be copied |
| 45 | + unsigned int key_size = view.key_size(src_record); |
| 46 | + unsigned int value_size = view.value_size(); |
| 47 | + //TODO: Possible optimalization: Can we just move pointers instead of memcpy? |
| 48 | + std::memcpy(dst_record + key_size, src_record + key_size, value_size); |
| 49 | + } |
| 50 | + } |
| 51 | +} |
| 52 | + |
| 53 | +// Sort records |
| 54 | +void sort_records(const View &view, std::vector<uint8_t *>& records) |
| 55 | +{ |
| 56 | + if (view.order_fields().empty()) { |
| 57 | + return; |
| 58 | + } |
| 59 | + std::sort(records.begin(), records.end(), |
| 60 | + [&view](uint8_t *a, uint8_t *b) { return view.ordered_before(a, b); }); |
| 61 | +} |
| 62 | + |
31 | 63 | Aggregator::Aggregator(const View &view) : |
32 | 64 | m_table(view), |
33 | 65 | m_key_buffer(65535), |
@@ -104,45 +136,14 @@ Aggregator::aggregate(FlowContext &ctx) |
104 | 136 | void |
105 | 137 | Aggregator::sort_items() |
106 | 138 | { |
107 | | - if (!m_view.order_fields().empty()) { |
108 | | - std::sort(items().begin(), items().end(), |
109 | | - [this](uint8_t *a, uint8_t *b) { return m_view.ordered_before(a, b); }); |
110 | | - } |
| 139 | + sort_records(m_view, items()); |
111 | 140 | } |
112 | 141 |
|
113 | | - |
114 | | -// uint8_t *record; |
115 | | - |
116 | | -// if (!m_table.find_or_create(&m_key_buffer[0], record)) { |
117 | | -// init_values(*m_view_def.get(), record + m_view_def->keys_size); |
118 | | -// } |
119 | | - |
120 | | -// for (const auto &iter : m_view_def->iter_values(record + m_view_def->keys_size)) { |
121 | | -// aggregate_value(iter.field, drec, &iter.value, direction, drec_find_flags); |
122 | | -// } |
123 | | -// } |
124 | | - |
125 | | -// void |
126 | | -// Aggregator::merge(Aggregator &other, unsigned int max_num_items) |
127 | | -// { |
128 | | -// unsigned int n = 0; |
129 | | -// for (uint8_t *other_record : other.items()) { |
130 | | -// if (max_num_items != 0 && n == max_num_items) { |
131 | | -// break; |
132 | | -// } |
133 | | - |
134 | | -// uint8_t *record; |
135 | | - |
136 | | -// if (!m_table.find_or_create(other_record, record)) { |
137 | | -// //TODO: this copy is unnecessary, we could just take the already allocated record from the other table instead |
138 | | -// memcpy(record, other_record, m_view_def->keys_size + m_view_def->values_size); |
139 | | -// } else { |
140 | | -// merge_records(*m_view_def.get(), record, other_record); |
141 | | -// } |
142 | | - |
143 | | -// n++; |
144 | | -// } |
145 | | -// } |
| 142 | +void |
| 143 | +Aggregator::merge(Aggregator &other) |
| 144 | +{ |
| 145 | + merge_hash_tables(m_view, m_table, other.m_table); |
| 146 | +} |
146 | 147 |
|
147 | 148 | } // aggregator |
148 | 149 | } // fdsdump |
0 commit comments