Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions tasks/cheremkhin_a_radix_sort_batcher/common/include/common.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
#pragma once

#include <tuple>
#include <vector>

#include "task/include/task.hpp"

namespace cheremkhin_a_radix_sort_batcher {

using InType = std::vector<int>;
using OutType = std::vector<int>;
using TestType = std::tuple<InType, OutType>;
using BaseTask = ppc::task::Task<InType, OutType>;

} // namespace cheremkhin_a_radix_sort_batcher
9 changes: 9 additions & 0 deletions tasks/cheremkhin_a_radix_sort_batcher/info.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
{
"student": {
"first_name": "Андрей",
"last_name": "Черемхин",
"middle_name": "Александрович",
"group_number": "3823Б1ПР3",
"task_number": "3"
}
}
21 changes: 21 additions & 0 deletions tasks/cheremkhin_a_radix_sort_batcher/mpi/include/ops_mpi.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
#pragma once

#include "cheremkhin_a_radix_sort_batcher/common/include/common.hpp"

namespace cheremkhin_a_radix_sort_batcher {

class CheremkhinARadixSortBatcherMPI : public BaseTask {
public:
static constexpr ppc::task::TypeOfTask GetStaticTypeOfTask() {
return ppc::task::TypeOfTask::kMPI;
}
explicit CheremkhinARadixSortBatcherMPI(const InType &in);

private:
bool ValidationImpl() override;
bool PreProcessingImpl() override;
bool RunImpl() override;
bool PostProcessingImpl() override;
};

} // namespace cheremkhin_a_radix_sort_batcher
210 changes: 210 additions & 0 deletions tasks/cheremkhin_a_radix_sort_batcher/mpi/src/ops_mpi.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,210 @@
#include "cheremkhin_a_radix_sort_batcher/mpi/include/ops_mpi.hpp"

#include <mpi.h>

#include <algorithm>
#include <cstddef>
#include <cstdint>
#include <limits>
#include <utility>
#include <vector>

#include "cheremkhin_a_radix_sort_batcher/common/include/common.hpp"

namespace cheremkhin_a_radix_sort_batcher {

namespace {

constexpr std::uint32_t kSignMask = 0x80000000U;
constexpr std::size_t kRadix = 256;

inline bool IsPowerOfTwo(int x) {
return x > 0 && (x & (x - 1)) == 0;
}

inline std::uint8_t GetByteForRadixSort(int v, int byte_idx) {
const std::uint32_t key = static_cast<std::uint32_t>(v) ^ kSignMask;
return static_cast<std::uint8_t>((key >> (static_cast<std::uint32_t>(byte_idx) * 8U)) & 0xFFU);
}

std::vector<int> RadixSortSigned32(const std::vector<int> &in) {
if (in.size() <= 1) {
return in;
}

std::vector<int> a;
a.reserve(in.size());
a.assign(in.begin(), in.end());
std::vector<int> tmp(in.size());

for (int byte_idx = 0; byte_idx < 4; ++byte_idx) {
std::vector<std::size_t> cnt(kRadix, 0);
for (int v : a) {
++cnt[GetByteForRadixSort(v, byte_idx)];
}

std::vector<std::size_t> pos(kRadix);
std::size_t sum = 0;
for (std::size_t i = 0; i < kRadix; ++i) {
pos[i] = sum;
sum += cnt[i];
}

for (int v : a) {
const std::uint8_t b = GetByteForRadixSort(v, byte_idx);
tmp[pos[b]++] = v;
}

a.swap(tmp);
}

return a;
}

std::vector<int> MergeSorted(const std::vector<int> &a, const std::vector<int> &b) {
std::vector<int> out;
out.resize(a.size() + b.size());
std::ranges::merge(a, b, out.begin());
return out;
}

void CompareSplit(int rank, int partner, int keep_cnt, std::vector<int> *local) {
std::vector<int> recv(static_cast<std::size_t>(keep_cnt));

MPI_Sendrecv(local->data(), keep_cnt, MPI_INT, partner, 0, recv.data(), keep_cnt, MPI_INT, partner, 0, MPI_COMM_WORLD,
MPI_STATUS_IGNORE);

std::vector<int> merged = MergeSorted(*local, recv);

std::vector<int> new_local(static_cast<std::size_t>(keep_cnt));
if (rank < partner) {
std::copy_n(merged.begin(), static_cast<std::size_t>(keep_cnt), new_local.begin());
} else {
std::copy_n(merged.end() - static_cast<std::ptrdiff_t>(keep_cnt), static_cast<std::size_t>(keep_cnt),
new_local.begin());
}
local->swap(new_local);
}

// NOLINTNEXTLINE(misc-no-recursion)
void OddEvenMerge(int lo, int n, int r, std::vector<std::pair<int, int>> *comps) {
const int m = r * 2;
if (m < n) {
OddEvenMerge(lo, n, m, comps);
OddEvenMerge(lo + r, n, m, comps);
for (int i = lo + r; i + r < lo + n; i += m) {
comps->emplace_back(i, i + r);
}
} else {
comps->emplace_back(lo, lo + r);
}
}

// NOLINTNEXTLINE(misc-no-recursion)
void OddEvenMergeSort(int lo, int n, std::vector<std::pair<int, int>> *comps) {
if (n > 1) {
const int m = n / 2;
OddEvenMergeSort(lo, m, comps);
OddEvenMergeSort(lo + m, m, comps);
OddEvenMerge(lo, n, 1, comps);
}
}

} // namespace

CheremkhinARadixSortBatcherMPI::CheremkhinARadixSortBatcherMPI(const InType &in) {
SetTypeOfTask(GetStaticTypeOfTask());
GetInput() = in;
GetOutput() = {};
}

bool CheremkhinARadixSortBatcherMPI::ValidationImpl() {
int rank = 0;
MPI_Comm_rank(MPI_COMM_WORLD, &rank);
if (rank == 0) {
return !GetInput().empty();
}
return true;
}

bool CheremkhinARadixSortBatcherMPI::PreProcessingImpl() {
return true;
}

// NOLINTNEXTLINE(readability-function-cognitive-complexity)
bool CheremkhinARadixSortBatcherMPI::RunImpl() {
int rank = 0;
int size = 0;
MPI_Comm_rank(MPI_COMM_WORLD, &rank);
MPI_Comm_size(MPI_COMM_WORLD, &size);

int n = 0;
if (rank == 0) {
n = static_cast<int>(GetInput().size());
}
MPI_Bcast(&n, 1, MPI_INT, 0, MPI_COMM_WORLD);

const int block = (n + size - 1) / size;
const int padded_n = block * size;

std::vector<int> padded;
if (rank == 0) {
padded.assign(static_cast<std::size_t>(padded_n), std::numeric_limits<int>::max());
std::copy(GetInput().begin(), GetInput().end(), padded.begin());
}

std::vector<int> local(static_cast<std::size_t>(block));
MPI_Scatter(padded.data(), block, MPI_INT, local.data(), block, MPI_INT, 0, MPI_COMM_WORLD);

local = RadixSortSigned32(local);

if (size > 1) {
if (IsPowerOfTwo(size)) {
std::vector<std::pair<int, int>> comps;
OddEvenMergeSort(0, size, &comps);

for (const auto &[a, b] : comps) {
if (rank == a || rank == b) {
const int partner = (rank == a) ? b : a;
CompareSplit(rank, partner, block, &local);
}
MPI_Barrier(MPI_COMM_WORLD);
}
} else {
// Fallback for non power-of-two number of ranks.
for (int phase = 0; phase < size; ++phase) {
int partner = -1;
if ((phase % 2) == 0) {
partner = ((rank % 2) == 0) ? rank + 1 : rank - 1;
} else {
partner = ((rank % 2) == 0) ? rank - 1 : rank + 1;
}
if (partner >= 0 && partner < size) {
CompareSplit(rank, partner, block, &local);
}
MPI_Barrier(MPI_COMM_WORLD);
}
}
}

std::vector<int> gathered;
if (rank == 0) {
gathered.resize(static_cast<std::size_t>(padded_n));
}
MPI_Gather(local.data(), block, MPI_INT, gathered.data(), block, MPI_INT, 0, MPI_COMM_WORLD);

std::vector<int> out(static_cast<std::size_t>(n));
if (rank == 0) {
std::copy_n(gathered.begin(), static_cast<std::size_t>(n), out.begin());
}
MPI_Bcast(out.data(), n, MPI_INT, 0, MPI_COMM_WORLD);

GetOutput() = out;
return true;
}

bool CheremkhinARadixSortBatcherMPI::PostProcessingImpl() {
return !GetOutput().empty();
}

} // namespace cheremkhin_a_radix_sort_batcher
125 changes: 125 additions & 0 deletions tasks/cheremkhin_a_radix_sort_batcher/report.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
# Поразрядная сортировка целых чисел с чётно-нечётным слиянием Бэтчера (SEQ | MPI)

- Студент: Черемхин Андрей Александрович, группа 3823Б1ПР3
- Технология: SEQ | MPI
- Вариант: 1

## Введение

Цель задачи — реализовать сортировку целых чисел (int) поразрядным алгоритмом (radix sort) и параллельную MPI-версию,
в которой локально отсортированные блоки объединяются с помощью чётно-нечётного слияния Бэтчера (Batcher odd-even merge)
по процессам.

## Постановка задачи

На вход подаётся вектор целых чисел. Требуется вернуть этот вектор, отсортированный по неубыванию.

Тип входных данных:

```cpp
using InType = std::vector<int>;
```

Тип выходных данных:

```cpp
using OutType = std::vector<int>;
```

## Базовый алгоритм (Sequential)

Используется LSD radix sort по байтам (4 прохода для 32-битного int):

- на каждом проходе выполняется стабильная сортировка подсчётом по текущему байту
- для корректной обработки отрицательных чисел используется преобразование ключа: `key = (uint32_t)value ^ 0x80000000`
(это переводит порядок signed в порядок unsigned)

Сложность:

- O(4 * n) по времени
- O(n) по памяти

## Схема распараллеливания (MPI): Бэтчер (odd-even merge) по процессам

1. Rank 0 дополняет вход до кратности числу процессов (padding значением `INT_MAX`) и распределяет блоки одинакового размера по процессам (`MPI_Scatter`).
2. Каждый процесс сортирует свой блок локально тем же radix sort.
3. Далее выполняется сеть компараторов Бэтчера по процессам (для количества процессов = степень двойки):
- на каждом компараторе два процесса обмениваются своими отсортированными блоками (`MPI_Sendrecv`)
- выполняют слияние двух отсортированных массивов (merge)
- процесс с меньшим rank оставляет “меньшую половину” своего размера, процесс с большим rank — “большую половину”
4. Rank 0 собирает итог (`MPI_Gather`), отбрасывает padding и рассылает результат всем (`MPI_Bcast`), чтобы `GetOutput()` был валиден на каждом процессе.

Примечание: если число процессов не является степенью двойки, используется безопасный fallback на чётно-нечётные соседние обмены
(odd-even transposition), чтобы задача корректно работала в любом запуске.

## Детали реализации

Структура задачи:

```
tasks/cheremkhin_a_radix_sort_batcher/
├── common/include/common.hpp
├── seq/include/ops_seq.hpp
├── seq/src/ops_seq.cpp
├── mpi/include/ops_mpi.hpp
├── mpi/src/ops_mpi.cpp
├── tests/functional/main.cpp
├── tests/performance/main.cpp
├── settings.json
├── info.json
└── report.md
```

Классы:

- `CheremkhinARadixSortBatcherSEQ`
- `CheremkhinARadixSortBatcherMPI`

## Проверка корректности

Functional-тесты (`tests/functional/main.cpp`) проверяют:

- сортировку уже отсортированного массива
- сортировку в обратном порядке
- наличие отрицательных чисел и повторов
- тривиальные случаи (1 элемент)

Ожидаемый результат вычисляется как `std::sort` над копией входа.

## Производительность

Ниже приведены результаты одного прогона. Базовая точка для расчёта ускорения — SEQ (1 запуск).

pipeline:

| Mode | Count | Time, s | Speedup | Efficiency |
|------|------:|--------:|--------:|-----------:|
| seq | 1 | 0.1738711160 | 1.00 | N/A |
| mpi | 1 | 0.2733182060 | 0.64 | 63.6% |
| mpi | 2 | 0.2901480690 | 0.60 | 30.0% |
| mpi | 3 | 0.3290196940 | 0.53 | 17.6% |
| mpi | 4 | 0.3667764640 | 0.47 | 11.9% |
| mpi | 5 | 0.3904221110 | 0.45 | 8.9% |
| mpi | 12 | 0.7389914400 | 0.24 | 2.0% |

task_run:

| Mode | Count | Time, s | Speedup | Efficiency |
|------|------:|--------:|--------:|-----------:|
| seq | 1 | 0.1557295400 | 1.00 | N/A |
| mpi | 1 | 0.2596250240 | 0.60 | 60.0% |
| mpi | 2 | 0.2384910110 | 0.65 | 32.6% |
| mpi | 3 | 0.3234444180 | 0.48 | 16.0% |
| mpi | 4 | 0.3261312520 | 0.48 | 11.9% |
| mpi | 5 | 0.2590720800 | 0.60 | 12.0% |
| mpi | 12 | 0.6011320870 | 0.26 | 2.2% |


- В данной реализации основное время при увеличении числа процессов уходит в коммуникации и многократные слияния в сети Бэтчера.
- Для числа процессов, не являющегося степенью двойки (например, 3, 5, 12), используется fallback (odd-even transposition), который добавляет дополнительные фазы обмена и ухудшает масштабируемость.


## Источники

1. [Материалы курса: отчёт (требования и структура)](https://learning-process.github.io/parallel_programming_course/ru/common_information/report.html#overview-and-placement)

Loading
Loading