Skip to content

Commit ec98451

Browse files
committed
1. add Comm_Trans::memory_enough()
1 parent ca7f081 commit ec98451

File tree

5 files changed

+90
-17
lines changed

5 files changed

+90
-17
lines changed

include/Comm/Comm_Trans/Comm_Trans.h

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
#pragma once
77

88
#include "../Comm_Tools.h"
9+
#include "../global/Global_Func.h"
910
#include <mpi.h>
1011
#include <functional>
1112
#include <future>
@@ -50,13 +51,14 @@ class Comm_Trans
5051
Tdatas_recv &datas_recv);
5152

5253
private:
53-
void isend_data (const int rank_isend, const Tdatas_isend &datas_isend, std::string &str_isend, MPI_Request &request_isend) const;
54-
void recv_data (Tdatas_recv &datas_recv, const MPI_Status status_recv, MPI_Message message_recv, std::atomic_flag &lock_set_value);
54+
void isend_data (const int rank_isend, const Tdatas_isend &datas_isend, std::string &str_isend, MPI_Request &request_isend, std::atomic<std::size_t> &memory_max_isend) const;
55+
void recv_data (Tdatas_recv &datas_recv, const MPI_Status status_recv, MPI_Message message_recv, std::atomic_flag &lock_set_value, std::atomic<std::size_t> &memory_max_isend);
5556
void post_process(
5657
std::vector<MPI_Request> &requests_isend,
5758
std::vector<std::string> &strs_isend,
5859
std::vector<std::future<void>> &futures_isend,
5960
std::vector<std::future<void>> &futures_recv) const;
61+
bool memory_enough(const std::atomic<size_t> &memory_max) const { return Global_Func::memory_available() > memory_max.load() * 2; }
6062

6163
public:
6264
const MPI_Comm &mpi_comm;

include/Comm/Comm_Trans/Comm_Trans.hpp

Lines changed: 19 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -71,34 +71,35 @@ void Comm_Trans<Tkey,Tvalue,Tdatas_isend,Tdatas_recv>::communicate(
7171
std::vector<std::future<void>> futures_isend(comm_size);
7272
std::vector<std::future<void>> futures_recv(comm_size);
7373
std::atomic_flag lock_set_value = ATOMIC_FLAG_INIT;
74+
std::atomic<std::size_t> memory_max_isend(0);
75+
std::atomic<std::size_t> memory_max_recv(0);
7476

7577
std::future<void> future_post_process = std::async (std::launch::async,
7678
&Comm_Trans::post_process, this,
7779
std::ref(requests_isend), std::ref(strs_isend), std::ref(futures_isend), std::ref(futures_recv));
7880

7981
while (future_post_process.wait_for(std::chrono::seconds(0)) != std::future_status::ready)
8082
{
81-
if (rank_isend_tmp < this->comm_size) // && enough_memory())
82-
{
83-
const int rank_isend = (rank_isend_tmp + this->rank_mine) % this->comm_size;
84-
futures_isend[rank_isend] = std::async (std::launch::async,
85-
&Comm_Trans::isend_data, this,
86-
rank_isend, std::cref(datas_isend), std::ref(strs_isend[rank_isend]), std::ref(requests_isend[rank_isend]));
87-
++rank_isend_tmp;
88-
}
89-
90-
9183
int flag_iprobe=0;
9284
MPI_Status status_recv;
9385
MPI_Message message_recv;
9486
MPI_CHECK (MPI_Improbe(MPI_ANY_SOURCE, Comm_Trans::tag_data, this->mpi_comm, &flag_iprobe, &message_recv, &status_recv));
95-
if (flag_iprobe && rank_recv_working!=status_recv.MPI_SOURCE)
87+
if (flag_iprobe && rank_recv_working!=status_recv.MPI_SOURCE && memory_enough(memory_max_recv))
9688
{
9789
futures_recv[status_recv.MPI_SOURCE] = std::async (std::launch::async,
9890
&Comm_Trans::recv_data, this,
99-
std::ref(datas_recv), status_recv, message_recv, std::ref(lock_set_value));
91+
std::ref(datas_recv), status_recv, message_recv, std::ref(lock_set_value), std::ref(memory_max_recv));
10092
rank_recv_working = status_recv.MPI_SOURCE;
10193
}
94+
95+
if (rank_isend_tmp < this->comm_size && memory_enough(memory_max_isend))
96+
{
97+
const int rank_isend = (rank_isend_tmp + this->rank_mine) % this->comm_size;
98+
futures_isend[rank_isend] = std::async (std::launch::async,
99+
&Comm_Trans::isend_data, this,
100+
rank_isend, std::cref(datas_isend), std::ref(strs_isend[rank_isend]), std::ref(requests_isend[rank_isend]), std::ref(memory_max_isend));
101+
++rank_isend_tmp;
102+
}
102103
}
103104
future_post_process.get();
104105
}
@@ -109,7 +110,8 @@ void Comm_Trans<Tkey,Tvalue,Tdatas_isend,Tdatas_recv>::isend_data(
109110
const int rank_isend,
110111
const Tdatas_isend &datas_isend,
111112
std::string &str_isend,
112-
MPI_Request &request_isend) const
113+
MPI_Request &request_isend,
114+
std::atomic<std::size_t> &memory_max_isend) const
113115
{
114116
std::stringstream ss_isend;
115117
{
@@ -130,6 +132,7 @@ void Comm_Trans<Tkey,Tvalue,Tdatas_isend,Tdatas_recv>::isend_data(
130132
oar(size_item);
131133
} // end cereal::BinaryOutputArchive
132134
str_isend = std::move(ss_isend.str());
135+
memory_max_isend.store( std::max(str_isend.size()*sizeof(char), memory_max_isend.load()) );
133136
#if MPI_VERSION>=4
134137
MPI_CHECK (MPI_Isend_c (str_isend.c_str(), str_isend.size(), MPI_CHAR, rank_isend, Comm_Trans::tag_data, this->mpi_comm, &request_isend));
135138
#else
@@ -144,7 +147,8 @@ void Comm_Trans<Tkey,Tvalue,Tdatas_isend,Tdatas_recv>::recv_data (
144147
Tdatas_recv &datas_recv,
145148
const MPI_Status status_recv,
146149
MPI_Message message_recv,
147-
std::atomic_flag &lock_set_value)
150+
std::atomic_flag &lock_set_value,
151+
std::atomic<std::size_t> &memory_max_recv)
148152
{
149153
#if MPI_VERSION>=4
150154
MPI_Count size_mpi; MPI_CHECK( MPI_Get_count_c(&status_recv, MPI_CHAR, &size_mpi) );
@@ -158,6 +162,7 @@ void Comm_Trans<Tkey,Tvalue,Tdatas_isend,Tdatas_recv>::recv_data (
158162

159163
std::stringstream ss_recv;
160164
ss_recv.rdbuf()->pubsetbuf(buffer_recv.data(), size_mpi);
165+
memory_max_recv.store( std::max(buffer_recv.size()*sizeof(char), memory_max_recv.load()) );
161166

162167
{
163168
cereal::BinaryInputArchive iar(ss_recv);

include/Comm/global/Cereal_Func.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ namespace Cereal_Func
2424

2525
template<typename... Ts>
2626
MPI_Status mpi_recv(const MPI_Comm &mpi_comm,
27-
Ts&... data);
27+
Ts&... data);
2828
}
2929

3030
}

include/Comm/global/Global_Func.h

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
//=======================
2+
// AUTHOR : Peize Lin
3+
// DATE : 2023-02-15
4+
//=======================
5+
6+
#pragma once
7+
8+
namespace Comm
9+
{
10+
11+
namespace Global_Func
12+
{
13+
static std::size_t memory_available();
14+
}
15+
16+
}
17+
18+
#include "Global_Func.hpp"
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
//=======================
2+
// AUTHOR : Peize Lin
3+
// DATE : 2023-02-15
4+
//=======================
5+
6+
#pragma once
7+
8+
#include "Global_Func.h"
9+
10+
#include <fstream>
11+
#include <string>
12+
#include <stdexcept>
13+
14+
namespace Comm
15+
{
16+
17+
namespace Global_Func
18+
{
19+
static std::size_t memory_available()
20+
{
21+
constexpr std::size_t kB_to_B = 1024;
22+
std::ifstream ifs("/proc/meminfo");
23+
int num = 0;
24+
std::size_t mem_sum = 0;
25+
while (ifs.good())
26+
{
27+
std::string label, size, kB;
28+
ifs >> label >> size >> kB;
29+
if (label == "MemAvailable:")
30+
{
31+
return std::stol(size) * kB_to_B;
32+
}
33+
else if (label == "MemFree:" || label == "Buffers:" || label == "Cached:")
34+
{
35+
mem_sum += std::stol(size);
36+
++num;
37+
}
38+
39+
if(num==3)
40+
{
41+
return mem_sum * kB_to_B;
42+
}
43+
}
44+
throw std::runtime_error("read /proc/meminfo error in " + std::string(__FILE__) + " line " + std::to_string(__LINE__));
45+
}
46+
}
47+
48+
}

0 commit comments

Comments
 (0)