Commit 62e67374 authored by Praetorius, Simon's avatar Praetorius, Simon

some send operations repaired

parent 957cf693
......@@ -2,6 +2,7 @@
#include "Common.hpp"
#include "Communicator.hpp"
#include "Request.hpp"
#include "Type_Traits.hpp"
namespace mpi14
......@@ -19,5 +20,34 @@ namespace mpi14
{
MPI_Allreduce(MPI_IN_PLACE, &inout, 1, type_to_mpi<T>(), op_to_mpi<Operation>(), comm);
}
template <class T, std::size_t N,
REQUIRES( is_mpi_type<T>::value )>
void all_gather(Communicator& comm, std::array<T,N> const& in, std::vector<T>& out)
{
out.resize(comm.size() * N);
MPI_Allgather(to_void_ptr(in.data()), N, type_to_mpi<T>(), to_void_ptr(out.data()), N, type_to_mpi<T>(), comm);
}
template <class T, std::size_t N,
REQUIRES( is_mpi_type<T>::value )>
std::vector<T> all_gather(Communicator& comm, std::array<T,N> const& in)
{
std::vector<T> out(comm.size() * N);
MPI_Allgather(to_void_ptr(in.data()), N, type_to_mpi<T>(), to_void_ptr(out.data()), N, type_to_mpi<T>(), comm);
return out;
}
template <class T, std::size_t N,
REQUIRES( is_mpi_type<T>::value )>
Request iall_gather(Communicator& comm, std::array<T,N> const& in, std::vector<T>& out)
{
out.resize(comm.size() * N);
MPI_Request request;
MPI_Iallgather(to_void_ptr(in.data()), N, type_to_mpi<T>(), to_void_ptr(out.data()), N, type_to_mpi<T>(), comm, &request);
return {request};
}
} // end namespace mpi14
......@@ -56,14 +56,14 @@ namespace mpi14
std::enable_if_t<is_mpi_type<T>::value, void>
send(T const (&data)[N], int to, int tag = 0) const
{
send(to_void_ptr(data), N, to, tag);
send(&data[0], N, to, tag);
}
template <class T, std::size_t N>
std::enable_if_t<is_mpi_type<T>::value, void>
send(std::array<T,N> const& array, int to, int tag = 0) const
{
send(to_void_ptr(array.data()), N, to, tag);
send(array.data(), N, to, tag);
}
template <class T>
......@@ -185,6 +185,20 @@ namespace mpi14
template <class Data>
std::enable_if_t<is_mpi_type<Data>::value, Request>
irecv(Data* data, std::size_t size, int from, int tag = 0) const;
template <class T, std::size_t N>
std::enable_if_t<is_mpi_type<T>::value, Request>
irecv(T (&data)[N], int from, int tag = 0) const
{
return irecv(&data[0], N, from, tag);
}
template <class T, std::size_t N>
std::enable_if_t<is_mpi_type<T>::value, Request>
irecv(std::array<T,N>& data, int from, int tag = 0) const
{
return irecv(data.data(), N, from, tag);
}
template <class Receiver>
std::enable_if_t< concepts::Callable<Receiver(MPI_Status)>, Request>
......
......@@ -38,9 +38,11 @@ namespace mpi14
MPI_Status status;
int flag = 0;
MPI_Test(&request_, &flag, &status);
if (flag)
if (flag) {
status_ = status;
status_initialized_ = true;
return status;
else
} else
return {};
}
......@@ -49,6 +51,9 @@ namespace mpi14
{
assert( !advanced_ );
if (status_initialized_)
return status_;
MPI_Status status;
int flag = 0;
MPI_Request_get_status(request_, &flag, &status);
......@@ -66,8 +71,8 @@ namespace mpi14
while( !(status = test()) ) ;
return status.value();
} else {
MPI_Status status;
MPI_Wait(&request_, &status);
MPI_Wait(&request_, &status_);
status_initialized_ = true;
}
}
......@@ -107,6 +112,9 @@ namespace mpi14
MPI_Request request_;
MPI_Status status_; // the MPI_Status after the request object is destroyed
bool status_initialized_ = false;
std::function<boost::optional<MPI_Status>(void)> test_;
bool advanced_ = false;
};
......
......@@ -24,6 +24,50 @@ namespace mpi14
remaining.erase(remove_it);
}
}
template <class ReqIter>
void wait_all_weak(ReqIter first, ReqIter last)
{
std::list<ReqIter> remaining;
for (ReqIter it = first; it != last; ++it) remaining.push_back(it);
while (!remaining.empty()) {
auto remove_it = remaining.end();
for (auto it = remaining.begin(); it != remaining.end(); ++it) {
if ((*it)->status()) {
remove_it = it;
break;
}
}
if (remove_it != remaining.end())
remaining.erase(remove_it);
}
}
template <class ReqIter>
void wait_all(ReqIter first, ReqIter last, std::vector<MPI_Status>& statuses)
{
statuses.resize(std::distance(first, last));
std::list<ReqIter> remaining;
for (ReqIter it = first; it != last; ++it) remaining.push_back(it);
while (!remaining.empty()) {
auto remove_it = remaining.end();
for (auto it = remaining.begin(); it != remaining.end(); ++it) {
auto status = (*it)->test();
if (status) {
remove_it = it;
statuses[std::distance(first,*it)] = *status;
break;
}
}
if (remove_it != remaining.end())
remaining.erase(remove_it);
}
}
// Tests for completion of either one or none of the operations associated with active handles.
......
......@@ -149,9 +149,12 @@ Communicator::recv(std::vector<T>& vec, int from, int tag) const
int size = 0;
MPI_Get_count(&status, type_to_mpi<T>(), &size);
int min_size = std::max(size,1);
vec.resize(size);
MPI_Recv(vec.data(), size, type_to_mpi<T>(), from, tag, comm_, MPI_STATUS_IGNORE);
vec.resize(min_size);
MPI_Recv(vec.data(), min_size, type_to_mpi<T>(), from, tag, comm_, MPI_STATUS_IGNORE);
if (size != min_size)
vec.resize(size);
return status;
}
......@@ -208,9 +211,12 @@ Communicator::irecv(std::vector<T>& vec, int from, int tag) const
{
int size = 0;
MPI_Get_count(&status, type_to_mpi<T>(), &size);
int min_size = std::max(size,1);
vec.resize(size);
MPI_Recv(vec.data(), size, type_to_mpi<T>(), status.MPI_SOURCE, status.MPI_TAG, comm, MPI_STATUS_IGNORE);
vec.resize(min_size);
MPI_Recv(vec.data(), min_size, type_to_mpi<T>(), status.MPI_SOURCE, status.MPI_TAG, comm, MPI_STATUS_IGNORE);
if (size != min_size)
vec.resize(size);
}) };
}
......@@ -224,11 +230,14 @@ Communicator::irecv(Data& data, int from, int tag) const
{
int size = 0;
MPI_Get_count(&status, MPI_BYTE, &size);
int min_size = std::max(size,1);
std::vector<char> buffer(size);
std::vector<char> buffer(min_size);
// Receive the message. ignore the status
MPI_Recv(buffer.data(), size, MPI_BYTE, status.MPI_SOURCE, status.MPI_TAG, comm, MPI_STATUS_IGNORE);
MPI_Recv(buffer.data(), min_size, MPI_BYTE, status.MPI_SOURCE, status.MPI_TAG, comm, MPI_STATUS_IGNORE);
if (size != min_size)
buffer.resize(size);
serialization::load(buffer, data);
}) };
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment