Commit 92c31401 authored by Praetorius, Simon's avatar Praetorius, Simon

added MPI_Probe for retriving the size

parent c53bb6c3
......@@ -9,7 +9,8 @@
#include <mpi.h>
#include "Common.hpp"
#include "Future.hpp"
#include "FutureSerialization.hpp"
#include "FutureVector.hpp"
#include "Serialization.hpp"
#include "Type_Traits.hpp"
......@@ -17,8 +18,6 @@ namespace mpi14
{
class Communicator
{
template <class> friend class Future;
public:
/// Constructor, stores an MPI communicator, e.g. MPI_COMM_WORLD
......@@ -60,6 +59,13 @@ namespace mpi14
send(data, N, to, tag);
}
template <class T, std::size_t N,
REQUIRES( is_mpi_type<T> )>
MPI_Status send(std::array<T,N> const& array, int to, int tag = 0) const
{
send(array.data(), N, to, tag);
}
template <class T,
REQUIRES( not is_mpi_type<T> )>
MPI_Status send(std::vector<T> const& data, int to, int tag = 0) const;
......@@ -95,11 +101,10 @@ namespace mpi14
REQUIRES( is_mpi_type<Data> )>
std::shared_ptr<MPI_Request> ibsend(Data const* data, std::size_t size, int to, int tag = 0) const;
#if 0
template <class T,
REQUIRES( is_mpi_type<T> )>
std::shared_ptr<MPI_Request> isend(std::vector<T> const& data, int to, int tag = 0) const;
#endif
// send complex datatype: (non-blocking)
// 1. create a binary representation of data, store it in a buffer
......@@ -128,6 +133,17 @@ namespace mpi14
return recv(data, N, from, tag);
}
template <class T, std::size_t N,
REQUIRES( is_mpi_type<T> )>
MPI_Status recv(std::array<T,N>& data, int from, int tag = 0) const
{
return recv(data.data(), N, from, tag);
}
template <class T,
REQUIRES( is_mpi_type<T> )>
MPI_Status recv(std::vector<T>& data, int from, int tag = 0) const;
// receive complex datatype
template <class Data,
REQUIRES( not is_mpi_type<Data> )>
......@@ -145,11 +161,25 @@ namespace mpi14
REQUIRES( is_mpi_type<Data> )>
std::shared_ptr<MPI_Request> irecv(Data* data, std::size_t size, int from, int tag = 0) const;
// receive vector of mpi datatypes
// 1. until message received, call MPI_Iprobe to retrieve status and size of message
// 2. resize data-vector
// 3. receive data into vector
template <class T,
REQUIRES( is_mpi_type<T> )>
FutureVector<T> irecv(std::vector<T>& data, int from, int tag) const
{
return {data, from, tag, comm_};
}
// receive complex datatype asynchronousely
// NOTE: communication of data is started when Future::test() or Future::wait() is called.
// Wait for data, by calling Future::test() or Future::wait().
template <class Data,
std::enable_if_t<not is_mpi_type<Data>, int> = 0>
Future<Data> irecv(Data& data, int from, int tag = 0) const;
REQUIRES( not is_mpi_type<Data> )>
FutureSerialization<Data> irecv(Data& data, int from, int tag) const
{
return {data, from, tag, comm_};
}
protected:
......
#pragma once
#include <vector>
#include "Serialization.hpp"
namespace mpi14
{
class FutureBase
{
friend class Communicator;
public:
FutureBase(int from, int tag, MPI_Comm comm)
: from_(from)
, tag_(tag)
, comm_(comm)
, size_(new int(-1))
, buffer_(new std::vector<char>{})
{}
virtual ~FutureBase()
{}
FutureBase(FutureBase const&) = delete;
FutureBase(FutureBase&&) = default;
FutureBase& operator=(FutureBase const&) = delete;
FutureBase& operator=(FutureBase&&) = default;
virtual void finish() {}
void wait()
{
while( !test() ) ;
}
bool test()
{
if (!size_received_) {
int flag;
MPI_Test(&size_request_, &flag, MPI_STATUS_IGNORE);
if (flag != 0) {
size_received_ = true;
assert( *size_ > 0 );
buffer_->resize(*size_);
MPI_Irecv(buffer_->data(), *size_, MPI_BYTE, from_, tag_, comm_, &data_request_);
}
} else if (!data_received_) {
int flag;
MPI_Test(&data_request_, &flag, MPI_STATUS_IGNORE);
if (flag != 0) {
data_received_ = true;
finish();
}
}
return data_received_;
}
protected:
int& size() { return *size_; }
MPI_Request& request() { return size_request_; }
protected:
int from_;
int tag_;
MPI_Comm comm_;
std::unique_ptr<int> size_;
MPI_Request size_request_;
MPI_Request data_request_;
std::unique_ptr<std::vector<char>> buffer_;
bool size_received_ = false;
bool data_received_ = false;
};
template <class Data>
class Future
: public FutureBase
{
public:
Future(Data& data, int from, int tag, MPI_Comm comm)
: FutureBase(from, tag, comm)
, data_(data)
{}
virtual void finish() override
{
serialization::load(*buffer_, data_);
}
Data& get()
{
wait();
return data_;
}
private:
Data& data_;
};
} // end namespace mpi14
#pragma once
#include <chrono>
#include <future>
namespace mpi14
{
class FutureBase
{
public:
virtual ~FutureBase() {}
virtual bool test() = 0;
void wait()
{
while( !test() ) ;
}
template <class Rep, class Period>
std::future_status wait_for(std::chrono::duration<Rep,Period> const& timeout_duration)
{
std::chrono::system_clock::time_point t = std::chrono::system_clock::now();
while( !test() ) {
if ((std::chrono::system_clock::now() - t) >= timeout_duration)
break;
}
if (test())
return std::future_status::ready;
else
return std::future_status::timeout;
}
template <class Clock, class Duration>
std::future_status wait_until(std::chrono::time_point<Clock,Duration> const& timeout_time)
{
return wait_for(timeout_time - std::chrono::system_clock::now());
}
};
} // end namespace mpi14
#pragma once
#include <vector>
#include "FutureBase.hpp"
#include "Serialization.hpp"
namespace mpi14
{
template <class Data>
class FutureSerialization
: public FutureBase
{
public:
FutureSerialization(Data& data, int from, int tag, MPI_Comm comm)
: data_(data)
, from_(from)
, tag_(tag)
, comm_(comm)
{}
Data const& get()
{
wait();
return data_;
}
bool test()
{
if (!size_received_) {
MPI_Status status;
int flag = 0;
// Wait for a message from rank from_ with tag tag_
MPI_Iprobe(from_, tag_, comm_, &flag, &status);
if (flag != 0) {
// Find out the number of elements in the message
MPI_Get_count(&status, MPI_BYTE, &size_);
size_received_ = true;
}
}
if (size_received_ && !data_received_) {
std::vector<char> buffer(size_);
// Receive the message. ignore the status
MPI_Recv(buffer.data(), size_, MPI_BYTE, from_, tag_, comm_, MPI_STATUS_IGNORE);
serialization::load(buffer, data_);
data_received_ = true;
}
return data_received_;
}
protected:
Data& data_;
int from_;
int tag_;
MPI_Comm comm_;
int size_ = 0;
bool size_received_ = false;
bool data_received_ = false;
};
} // end namespace mpi14
#pragma once
#include "FutureBase.hpp"
#include "Type_Traits.hpp"
namespace mpi14
{
template <class T>
class FutureVector
: public FutureBase
{
public:
FutureVector(std::vector<T>& vec, int from, int tag, MPI_Comm comm)
: vec_(vec)
, from_(from)
, tag_(tag)
, comm_(comm)
{}
std::vector<T> const& get()
{
wait();
return vec_;
}
virtual bool test() override
{
if (!size_received_) {
MPI_Status status;
int flag = 0;
// Wait for a message from rank from_ with tag tag_
MPI_Iprobe(from_, tag_, comm_, &flag, &status); // NOTE: maybe change to MPI_Improbe (for thread-savety)
if (flag != 0) {
// Find out the number of elements in the message
MPI_Get_count(&status, type_to_mpi<T>, &size_);
size_received_ = true;
}
}
if (size_received_ && !data_received_) {
vec_.resize(size_);
// Receive the message. ignore the status
MPI_Recv(vec_.data(), size_, type_to_mpi<T>, from_, tag_, comm_, MPI_STATUS_IGNORE);
data_received_ = true;
}
return data_received_;
}
protected:
std::vector<T>& vec_;
int from_;
int tag_;
MPI_Comm comm_;
int size_ = 0;
bool size_received_ = false;
bool data_received_ = false;
};
#if 0
// Promise_Size
[from_,tag_,comm_,size_=0,received_=false]() mutable -> std::optional<int>
{
if (!received_) {
MPI_Status status;
int flag = 0;
// Wait for a message from rank from_ with tag tag_
MPI_Iprobe(from_, tag_, comm_, &flag, &status); // NOTE: maybe change to MPI_Improbe (for thread-savety)
if (flag != 0) {
// Find out the number of elements in the message
MPI_Get_count(&status, type_to_mpi<T>, &size_);
received_ = true;
}
}
if (received_)
return size_;
else
return {};
};
[from_,tag_,comm_,&vec_,received_=false](int size) mutable -> std::optional<std::vector<T> const*>
{
if (!received_) {
vec_.resize(size);
// Receive the message. ignore the status
MPI_Recv(vec_.data(), size, type_to_mpi<T>, from_, tag_, comm_, MPI_STATUS_IGNORE);
received_ = true;
}
if (received_)
return &vec_;
else
return {};
};
#endif
} // end namespace mpi14
#pragma conce
#pragma once
#include "Common.hpp"
......
......@@ -26,16 +26,10 @@ MPI_Status Communicator::send(Data const* data, std::size_t size, int to, int ta
template <class T,
REQUIRES( not is_mpi_type<T> )>
MPI_Status Communicator::send(std::vector<T> const& data, int to, int tag) const
MPI_Status Communicator::send(std::vector<T> const& vec, int to, int tag) const
{
MPI_Status status;
MPI_Request request;
int size = data.size();
MPI_Ibsend(&size, 1, MPI_INT, to, tag, comm_, &request);
MPI_Request_free(&request);
MPI_Send(data.data(), size, type_to_mpi<T>, to, tag, comm_, &status);
MPI_Send(vec.data(), int(vec.size()), type_to_mpi<T>, to, tag, comm_, &status);
return status;
}
......@@ -48,16 +42,10 @@ template <class Data,
REQUIRES( not is_mpi_type<Data> )>
MPI_Status Communicator::send(Data const& data, int to, int tag) const
{
MPI_Status status;
MPI_Request request;
auto buffer = store(data);
int size = buffer.size();
MPI_Ibsend(&size, 1, MPI_INT, to, tag, comm_, &request);
MPI_Request_free(&request);
MPI_Send(buffer.data(), size, MPI_CHAR, to, tag, comm_, &status);
MPI_Status status;
MPI_Send(buffer.data(), int(buffer.size()), MPI_BYTE, to, tag, comm_, &status);
return status;
}
......@@ -108,21 +96,14 @@ std::shared_ptr<MPI_Request> Communicator::ibsend(Data const* data, std::size_t
}
#if 0
template <class T,
REQUIRES( is_mpi_type<T> )>
std::shared_ptr<MPI_Request> Communicator::isend(std::vector<T> const& data, int to, int tag) const
std::shared_ptr<MPI_Request> Communicator::isend(std::vector<T> const& vec, int to, int tag) const
{
MPI_Request request_size, request;
int size = data.size();
MPI_Ibsend(&size, 1, MPI_INT, to, tag, comm_, &request_size);
MPI_Request_free(&request_size);
MPI_Isend(data.data(), size, type_to_mpi<T>, to, tag, comm_, &request);
auto request = std::make_shared<MPI_Request>();
MPI_Isend(vec.data(), int(vec.size()), type_to_mpi<T>, to, tag, comm_, request.get());
return request;
}
#endif
// send complex datatype: (non-blocking)
......@@ -141,11 +122,7 @@ std::shared_ptr<MPI_Request> Communicator::isend(Data const& data, int to, int t
auto request = buffers_.back().first;
auto const& buffer = buffers_.back().second;
int size = buffer.size();
MPI_Ibsend(&size, 1, MPI_INT, to, tag, comm_, &request_size);
MPI_Request_free(&request_size);
MPI_Isend(buffer.data(), size, MPI_BYTE, to, tag, comm_, request.get());
MPI_Isend(buffer.data(), int(buffer.size()), MPI_BYTE, to, tag, comm_, request.get());
return request;
}
......@@ -173,6 +150,23 @@ MPI_Status Communicator::recv(Data* data, std::size_t size, int from, int tag) c
}
// receive array of mpi datatypes
template <class T,
REQUIRES( is_mpi_type<T> )>
MPI_Status Communicator::recv(std::vector<T>& data, int from, int tag) const
{
MPI_Status status;
MPI_Probe(from, tag, comm_, &status);
int size = 0;
MPI_Get_count(&status, type_to_mpi<T>, &size);
data.resize(size);
MPI_Recv(data.data(), size, type_to_mpi<T>, from, tag, comm_, MPI_STATUS_IGNORE);
return status;
}
// receive complex datatype
template <class Data,
REQUIRES( not is_mpi_type<Data> )>
......@@ -187,9 +181,8 @@ MPI_Status Communicator::recv(Data& data, int from, int tag) const
if (status.MPI_ERROR == MPI_SUCCESS)
serialization::load(buffer, data);
else {
else
std::cout << "recv was not successful!\n";
}
return status;
}
......@@ -216,19 +209,6 @@ std::shared_ptr<MPI_Request> Communicator::irecv(Data* data, std::size_t size, i
return request;
}
// receive complex datatype asynchronousely
// NOTE: communication of data is started when Future::test() or Future::wait() is called.
template <class Data,
std::enable_if_t<not is_mpi_type<Data>, int> = 0>
Future<Data> Communicator::irecv(Data& data, int from, int tag) const
{
Future<Data> future(data, from, tag, comm_);
MPI_Irecv(&future.size(), 1, MPI_INT, from, tag, comm_, &future.request());
return future;
}
// -------------------------------------------------------------------------------------
void Communicator::check_buffers() const
......
#include <iostream>
#include <vector>
#include <list>
#include <boost/serialization/vector.hpp>
#include <boost/serialization/list.hpp>
#include "Environment.hpp"
#include "Communicator.hpp"
......@@ -27,21 +29,25 @@ int main(int argc, char** argv)
std::cout << "[" << rank << "]: in.size = " << in.size() << ", out.size = " << out.size() << "\n";
std::list<double> in2;
for (int i = 0; i < rank+5; ++i)
in2.push_back(rank);
for (int i = 0; i < 3; ++i) {
auto req = world.isend(in, next, 1);
auto req = world.isend(in2, next, 1);
MPI_Request_free(req.get());
next = (next + 1) % world.size();
}
std::vector< std::vector<double> > vecs(3);
std::vector< std::list<double> > lists(3);
using Future = mpi14::Future<std::vector<double>>;
using Future = mpi14::FutureSerialization<std::list<double>>;
std::vector<Future> futures;
for (int i = 0; i < 3; ++i) {
futures.emplace_back( world.irecv(vecs[i], prev, 1) );
futures.emplace_back( world.irecv(lists[i], prev, 1) );
prev = (prev + world.size() - 1) % world.size();
}
mpi14::wait_all(futures.begin(), futures.end());
std::cout << "[" << rank << "]: out[0].size = " << vecs[0].size() << ", out[1].size = " << vecs[1].size() << ", out[2].size = " << vecs[2].size() << "\n";
std::cout << "[" << rank << "]: out[0].size = " << lists[0].size() << ", out[1].size = " << lists[1].size() << ", out[2].size = " << lists[2].size() << "\n";
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment