Commit 93b51a11 authored by Praetorius, Simon's avatar Praetorius, Simon

some cleanup

parent 46f5d887
......@@ -14,6 +14,7 @@ build*/
output*/
must_temp/
MUST_Output.html
logs/
# Prerequisites
*.d
......
......@@ -218,18 +218,20 @@ namespace mpi14
Request irecv(std::string& str, int from, int tag = 0) const
{
return {RecvDynamicSize(from,tag,comm_,
[comm=comm_,&str](MPI_Status status)
[comm=comm_,&str](MPI_Status status) -> MPI_Request
{
int size = 0;
MPI_Get_count(&status, MPI_CHAR, &size);
str.resize(size);
MPI_Recv(&str[0], size, MPI_CHAR, status.MPI_SOURCE, status.MPI_TAG, comm, MPI_STATUS_IGNORE);
MPI_Request req;
MPI_Irecv(&str[0], size, MPI_CHAR, status.MPI_SOURCE, status.MPI_TAG, comm, &req);
return req;
}) };
}
// receive complex datatype asynchronousely
// Wait for data, by calling Future::test() or Future::wait().
template <class Data>
std::enable_if_t<!is_mpi_type<Data>::value, Request>
irecv(Data& data, int from, int tag = 0) const;
......@@ -254,6 +256,19 @@ namespace mpi14
buffers_.erase(it);
}
std::pair<MPI_Request, std::string>& make_buffer(MPI_Status status, std::size_t len) const
{
auto it = buffers_.emplace(buffers_.end(), MPI_Request{}, std::string(len,' '));
buffers_iterators_[{status.MPI_SOURCE, status.MPI_TAG}] = it;
return buffers_.back();
}
std::pair<MPI_Request, std::string>& get_buffer(MPI_Status status) const
{
auto it = buffers_iterators_[{status.MPI_SOURCE, status.MPI_TAG}];
return *it;
}
protected:
......@@ -263,7 +278,12 @@ namespace mpi14
int size_;
std::vector<char> buffer_;
mutable std::list< std::pair<MPI_Request, std::string> > buffers_;
using BufferList = std::list< std::pair<MPI_Request, std::string> >;
mutable BufferList buffers_;
using BufferIter = BufferList::iterator;
mutable std::map<std::pair<int,int>, BufferIter> buffers_iterators_;
};
} // end namespace mpi14
......
......@@ -9,55 +9,97 @@
namespace mpi14 {
/// \brief Test-functor for dynamic size data to be used in \ref Request class.
/**
* Implements the iprob - irecv - test workflow to receive data of unknown size.
* At first, when the header of the data is arrived, the size is fetched from the
* MPI_Status given by MPI_Iprob. This can be done in a user functor `recv_` that
* receives an MPI_Status and returns an MPI_Receive handler. Typically, the recv_
* functor increases the size of an internal receive-buffer and calls MPI_Irecv.
* When an MPI_Test on the MPI_Receive handler results in a completed communication,
* a finalize user functor `finish_` is called to make some postprocessing on the
* received data. Afterwards, the MPI_Status of the MPI_Test command is returned.
**/
class RecvDynamicSize
{
enum Progress {
STARTED,
INITIALIZED,
RECEIVING,
FINISHED
};
public:
/// Constructor with user receive-functor `R`.
template <class F,
REQUIRES( concepts::Callable<F(MPI_Status)> )>
RecvDynamicSize(int from, int tag, MPI_Comm comm, F&& f)
REQUIRES( concepts::Callable<R(MPI_Status)> )>
RecvDynamicSize(int from, int tag, MPI_Comm comm, R&& r)
: from_(from)
, tag_(tag)
, comm_(comm)
, recv_(std::forward<R>(r))
, finish_([](MPI_Status){})
{}
/// Constructor with user receive-functor `R` and user finilize-functor `F`.
template <class R, class F,
REQUIRES( concepts::Callable<R(MPI_Status)> && concepts::Callable<F(MPI_Status)> )>
RecvDynamicSize(int from, int tag, MPI_Comm comm, R&& r, F&& f)
: from_(from)
, tag_(tag)
, comm_(comm)
, recv_(std::forward<F>(f))
, recv_(std::forward<R>(r))
, finish_(std::forward<F>(f))
{}
/// Operator called as test function in the \ref Request class.
boost::optional<MPI_Status> operator()()
{
if (!size_received_) {
if (progress_ == STARTED) {
int flag = 0;
// Wait for a message from rank from_ with tag tag_
MPI_Iprobe(from_, tag_, comm_, &flag, &status_); // NOTE: maybe change to MPI_Improbe (for thread-savety)
MPI_Iprobe(from_, tag_, comm_, &flag, &status_);
if (flag != 0)
size_received_ = true;
progress_ = INITIALIZED;
}
if (size_received_ && !data_received_) {
recv_(status_);
data_received_ = true;
if (progress_ == INITIALIZED) {
req_ = recv_(status_);
progress_ = RECEIVING;
}
if (data_received_)
if (progress_ == RECEIVING) {
int flag = 0;
MPI_Test(&req_, &flag, &status_);
if (flag != 0)
progress_ = FINISHED;
}
if (progress_ == FINISHED) {
finish_(status_);
return status_;
else
} else
return {};
}
private:
int from_;
int tag_;
MPI_Comm comm_;
std::function<void(MPI_Status)> recv_;
bool size_received_ = false;
bool data_received_ = false;
int size_ = 0;
MPI_Status status_;
int from_; //< source rank
int tag_; //< communication tag
MPI_Comm comm_; //< communicator
std::function<MPI_Request(MPI_Status)> recv_; //< user receive-functor
std::function<void(MPI_Status)> finish_; //< user finalize-functor
Progress progress_ = STARTED; //< internal progress flag
MPI_Status status_; //< the status information, filled by MPI_Iprob and MPI_Test
MPI_Request req_; //< the request handler, filled by the user receive-functor \ref recv_
};
} // end namespace mpi14
......@@ -37,9 +37,6 @@ namespace mpi14
{
template <class T>
struct type_to_mpi;
// {
// static_assert( is_mpi_type<T>::value, "Type is not an MPI Datatype!" );
// };
template <> struct type_to_mpi<char> { static MPI_Datatype value() { return MPI_CHAR; } };
template <> struct type_to_mpi<short> { static MPI_Datatype value() { return MPI_SHORT; } };
......@@ -71,9 +68,6 @@ namespace mpi14
template <class Op>
struct op_to_mpi;
// {
// static_assert( always_false<Op>::value, "Op is not an MPI operation!" );
// };
template <> struct op_to_mpi<minimum> { static MPI_Op value() { return MPI_MIN; } };
template <> struct op_to_mpi<maximum> { static MPI_Op value() { return MPI_MAX; } };
......
......@@ -207,16 +207,22 @@ template <class T>
Communicator::irecv(std::vector<T>& vec, int from, int tag) const
{
return {RecvDynamicSize(from,tag,comm_,
[comm=comm_,&vec](MPI_Status status)
[comm=comm_,&vec](MPI_Status status) -> MPI_Request
{
int size = 0;
MPI_Get_count(&status, type_to_mpi<T>(), &size);
int min_size = std::max(size,1);
vec.resize(min_size);
MPI_Recv(vec.data(), min_size, type_to_mpi<T>(), status.MPI_SOURCE, status.MPI_TAG, comm, MPI_STATUS_IGNORE);
if (size != min_size)
vec.resize(size);
MPI_Request req;
MPI_Irecv(vec.data(), min_size, type_to_mpi<T>(), status.MPI_SOURCE, status.MPI_TAG, comm, &req);
return req;
},
[&vec](MPI_Status status)
{
int size = 0;
MPI_Get_count(&status, type_to_mpi<T>(), &size);
vec.resize(size);
}) };
}
......@@ -224,20 +230,33 @@ Communicator::irecv(std::vector<T>& vec, int from, int tag) const
template <class Data>
std::enable_if_t<!is_mpi_type<Data>::value, Request>
Communicator::irecv(Data& data, int from, int tag) const
{
{
check_buffers();
return {RecvDynamicSize(from,tag,comm_,
[comm=comm_,&data](MPI_Status status)
[comm=comm_,&data,this](MPI_Status status) -> MPI_Request
{
int size = 0;
MPI_Get_count(&status, MPI_BYTE, &size);
int min_size = std::max(size,1);
std::vector<char> buffer(min_size);
auto& b = this->make_buffer(status, min_size);
auto& req = b.first;
auto& buffer = b.second;
// Receive the message. ignore the status
MPI_Recv(buffer.data(), min_size, MPI_BYTE, status.MPI_SOURCE, status.MPI_TAG, comm, MPI_STATUS_IGNORE);
if (size != min_size)
buffer.resize(size);
MPI_Irecv(&buffer[0], min_size, MPI_BYTE, status.MPI_SOURCE, status.MPI_TAG, comm, &req);
return req;
},
[&data,this](MPI_Status status)
{
int size = 0;
MPI_Get_count(&status, MPI_BYTE, &size);
auto& buffer = this->get_buffer(status).second;
buffer.resize(size);
serialization::load(buffer, data);
}) };
}
......
#include <iostream>
#include <vector>
#include <list>
#include <cassert>
#include <boost/serialization/vector.hpp>
#include <boost/serialization/list.hpp>
......@@ -13,40 +14,56 @@ int main(int argc, char** argv)
mpi14::Communicator world;
int rank = world.rank();
int size = world.size();
int next = (rank + 1)%size;
int prev = (rank + size - 1)%size;
std::vector<double> in(rank+5, double(rank));
std::vector<double> out;
{ // test 1: communicate vector to next
std::vector<double> in(rank+5, double(rank));
std::vector<double> out;
int next = (rank+1)%world.size();
auto req = world.isend(in, next, 2);
req.free();
int prev = (rank+world.size()-1)%world.size();
auto stat = world.recv(out, prev, 2);
auto req = world.isend(in, next, 2);
req.free();
std::cout << "[" << rank << "]: in.size = " << in.size() << ", out.size = " << out.size() << "\n";
auto stat = world.recv(out, prev, 2);
int result = rank;
mpi14::all_reduce(world, result, mpi14::minimum{});
assert( out.size() == prev+5 );
}
std::list<double> in2;
for (int i = 0; i < rank+5; ++i)
in2.push_back(rank);
for (int i = 0; i < 3; ++i) {
auto req = world.isend(in2, next, 1);
req.free();
next = (next + 1) % world.size();
{ // test 2: sum up integers
int result = rank;
mpi14::all_reduce(world, result, mpi14::minimum{});
assert( result == 0 );
}
std::vector< std::list<double> > lists(3);
{ // test 3: send a std::list in circular way
std::list<double> in2;
for (int i = 0; i < rank+5; ++i)
in2.push_back(rank);
std::vector<mpi14::Request> requests;
for (int i = 0; i < 3; ++i) {
requests.emplace_back( world.irecv(lists[i], prev, 1) );
prev = (prev + world.size() - 1) % world.size();
}
for (int i = 0; i < 3; ++i) {
auto req = world.isend(in2, next, 1);
req.free();
next = (next + 1) % size;
}
std::vector< std::list<double> > lists(3);
mpi14::wait_all(requests.begin(), requests.end());
std::cout << "[" << rank << "]: out[0].size = " << lists[0].size() << ", out[1].size = " << lists[1].size() << ", out[2].size = " << lists[2].size() << "\n";
std::vector<mpi14::Request> requests;
for (int i = 0; i < 3; ++i) {
requests.emplace_back( world.irecv(lists[i], prev, 1) );
prev = (prev + size - 1) % size;
}
mpi14::wait_all(requests.begin(), requests.end());
assert(( lists[0].size() == 5 + (rank + size - 1) % size ));
assert(( lists[1].size() == 5 + (rank + size - 2) % size ));
assert(( lists[2].size() == 5 + (rank + size - 3) % size ));
}
}
......@@ -3,6 +3,7 @@
#include <list>
#include <algorithm>
#include <numeric>
#include <cassert>
#include <boost/serialization/vector.hpp>
#include <boost/serialization/list.hpp>
......@@ -26,22 +27,25 @@ int main(int argc, char** argv)
mpi14::Communicator world;
int rank = world.rank();
std::vector<int> in;
in.resize(10);
std::fill(in.begin(), in.end(), 42);
{ // test: broadcast a vector from rank 0 to all others
std::vector<int> out;
if (rank == 0) {
out = in;
std::vector<int> in;
in.resize(10);
std::fill(in.begin(), in.end(), 42);
std::vector<int> out;
if (rank == 0) {
out = in;
std::vector<mpi14::Request> requests;
for (int r = 1; r < world.size(); ++r)
requests.emplace_back( world.isend(in, r) );
mpi14::wait_all(requests.begin(), requests.end());
} else {
world.recv(out, 0);
}
std::vector<mpi14::Request> requests;
for (int r = 1; r < world.size(); ++r)
requests.emplace_back( world.isend(in, r) );
mpi14::wait_all(requests.begin(), requests.end());
} else {
world.recv(out, 0);
assert( compare(in,out) );
}
std::cout << "[" << rank << "] in == out ? ---> " << compare(in,out) << "\n";
}
......@@ -13,28 +13,28 @@ int main(int argc, char** argv)
mpi14::Communicator world;
int rank = world.rank();
std::vector<int> in;
in.resize(10);
std::fill(in.begin(), in.end(), rank + 42);
{ // test: collect vectors from all processors on rank 0
std::vector<std::vector<int>> outs(world.size());
std::vector<int> in;
in.resize(10);
std::fill(in.begin(), in.end(), rank + 42);
std::vector<std::vector<int>> outs(world.size());
if (rank == 0) {
outs[0] = in;
std::vector<mpi14::Request> requests;
for (int r = 1; r < world.size(); ++r)
requests.emplace_back( world.irecv(outs[r], r) );
mpi14::wait_all(requests.begin(), requests.end());
} else {
world.send(in, 0);
}
if (rank == 0) {
std::cout << "[" << rank << "] sizes = {" << outs[0].size();
for (std::size_t i = 1; i < outs.size(); ++i)
std::cout << ", " << outs[i].size();
std::cout << "}\n";
if (rank == 0) {
outs[0] = in;
std::vector<mpi14::Request> requests;
for (int r = 1; r < world.size(); ++r)
requests.emplace_back( world.irecv(outs[r], r) );
mpi14::wait_all(requests.begin(), requests.end());
} else {
world.send(in, 0);
}
if (rank == 0) {
assert(( std::all_of(outs.begin(), outs.end(), [](auto const& v) { return v.size() == 10; }) ));
}
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment