Commit 99df86e6 authored by Praetorius, Simon's avatar Praetorius, Simon

redesign of asnch point-to-point communictor

parent 540fc2c8
......@@ -12,6 +12,8 @@ CTestTestfile.cmake
# directories for build and output files
build*/
output*/
must_temp/
MUST_Output.html
# Prerequisites
*.d
......
......@@ -3,13 +3,11 @@ install(FILES
Common.hpp
Communicator.hpp
Environment.hpp
FutureBase.hpp
FutureSerialization.hpp
FutureVector.hpp
Request.hpp
RequestChain.hpp
RequestOperations.hpp
Serialization.hpp
Type_Traits.hpp
mpi14.hpp
DESTINATION include/mpi14/
)
......
......@@ -23,4 +23,35 @@ namespace mpi14
template <std::size_t i>
constexpr index_t<i> index_ = {};
namespace aux
{
// Workaround for MSVC (problems with alias templates in pack expansion)
template <class... Args>
struct VoidType { using type = void; };
}
template <class... Args>
using Void_t = typename aux::VoidType<Args...>::type;
namespace concepts
{
namespace definition
{
template <class F, class Sig, class = Void_t<>>
struct Functor
: std::false_type {};
template <class F, class R, class... Args>
struct Functor<F, R(Args...),
Void_t< decltype(std::declval<F>()(std::declval<Args>()...)) >>
: std::is_convertible<std::result_of_t<F(Args...)>, R> {};
}
template <class F, class Signature>
constexpr bool Functor = definition::Functor<F,Signature>::value;
}
} // end namespace mpi14
......@@ -9,7 +9,6 @@
#include <mpi.h>
#include "Common.hpp"
#include "Future.hpp"
#include "Request.hpp"
#include "Serialization.hpp"
#include "Type_Traits.hpp"
......@@ -70,6 +69,8 @@ namespace mpi14
REQUIRES( is_mpi_type<T> )>
void send(std::vector<T> const& vec, int to, int tag = 0) const;
void send(std::string const& str, int to, int tag = 0) const;
// send complex datatype:
// 1. create a binary representation of data, store it in a buffer
// 2. send size of buffer (with MPI_Ibsend)
......@@ -103,7 +104,9 @@ namespace mpi14
template <class T,
REQUIRES( is_mpi_type<T> )>
Request isend(std::vector<T> const& data, int to, int tag = 0) const;
Request isend(std::vector<T> const& vec, int to, int tag = 0) const;
Request isend(std::string const& str, int to, int tag = 0) const;
// send complex datatype: (non-blocking)
......@@ -112,7 +115,7 @@ namespace mpi14
// 3. send buffer (with MPI_Ibsend)
template <class Data,
REQUIRES( not is_mpi_type<Data> )>
std::shared_ptr<MPI_Request> isend(Data const& data, int to, int tag = 0) const;
Request isend(Data const& data, int to, int tag = 0) const;
// -------------------------------------------------------------------------------------
......@@ -144,6 +147,8 @@ namespace mpi14
REQUIRES( is_mpi_type<T> )>
MPI_Status recv(std::vector<T>& data, int from, int tag = 0) const;
MPI_Status recv(std::string& str, int from, int tag = 0) const;
// receive complex datatype
template <class Data,
REQUIRES( not is_mpi_type<Data> )>
......@@ -154,12 +159,12 @@ namespace mpi14
// receive mpi datatype
template <class Data,
REQUIRES( is_mpi_type<Data> )>
std::shared_ptr<MPI_Request> irecv(Data& data, int from, int tag = 0) const;
Request irecv(Data& data, int from, int tag = 0) const;
// receive array of mpi datatypes
template <class Data,
REQUIRES( is_mpi_type<Data> )>
std::shared_ptr<MPI_Request> irecv(Data* data, std::size_t size, int from, int tag = 0) const;
Request irecv(Data* data, std::size_t size, int from, int tag = 0) const;
// receive vector of mpi datatypes
// 1. until message received, call MPI_Iprobe to retrieve status and size of message
......@@ -167,23 +172,20 @@ namespace mpi14
// 3. receive data into vector
template <class T,
REQUIRES( is_mpi_type<T> )>
FutureVector<T> irecv(std::vector<T>& data, int from, int tag = 0) const
{
return {data, from, tag, comm_};
}
Request irecv(std::vector<T>& vec, int from, int tag = 0) const;
Request irecv(std::string& vec, int from, int tag = 0) const;
// receive complex datatype asynchronousely
// Wait for data, by calling Future::test() or Future::wait().
template <class Data,
REQUIRES( not is_mpi_type<Data> )>
FutureSerialization<Data> irecv(Data& data, int from, int tag = 0) const
{
return {data, from, tag, comm_};
}
Request irecv(Data& data, int from, int tag = 0) const;
protected:
// free unused buffers
void check_buffers() const;
......@@ -195,7 +197,7 @@ namespace mpi14
int size_;
std::vector<char> buffer_;
mutable std::list< std::pair<std::shared_ptr<MPI_Request>, std::string> > buffers_;
mutable std::list< std::pair<MPI_Request*, std::string> > buffers_;
};
} // end namespace mpi14
......
#pragma once
#include <type_traits>
#include <vector>
#include "FutureSerialization.hpp"
#include "FutureVector.hpp"
#include "Request.hpp"
#include "Type_Traits.hpp"
namespace mpi14
{
namespace aux
{
template <class T, class = void>
struct Future
{
using type = FutureSerialization<T>;
};
template <class T>
struct Future<std::vector<T>, std::enable_if_t<mpi14::is_mpi_type<T>> >
{
using type = FutureVector<T>;
};
template <class T>
struct Future<T, std::enable_if_t<mpi14::is_mpi_type<T>> >
{
using type = Request;
};
template <>
struct Future<void>
{
using type = Request;
};
} // end namespace aux
template <class T>
using Future = typename aux::Future<T>::type;
} // end namespace mpi14
#pragma once
#include "FutureBase.hpp"
#include "Type_Traits.hpp"
namespace mpi14
{
template <class T>
class FutureVector
: public FutureBase
{
public:
FutureVector(std::vector<T>& vec, int from, int tag, MPI_Comm comm)
: vec_(vec)
, from_(from)
, tag_(tag)
, comm_(comm)
{}
std::vector<T> const& get()
{
wait();
return vec_;
}
virtual bool test() override
{
if (!size_received_) {
MPI_Status status;
int flag = 0;
// Wait for a message from rank from_ with tag tag_
MPI_Iprobe(from_, tag_, comm_, &flag, &status); // NOTE: maybe change to MPI_Improbe (for thread-savety)
if (flag != 0) {
// Find out the number of elements in the message
MPI_Get_count(&status, type_to_mpi<T>, &size_);
size_received_ = true;
}
}
if (size_received_ && !data_received_) {
vec_.resize(size_);
// Receive the message. ignore the status
MPI_Recv(vec_.data(), size_, type_to_mpi<T>, from_, tag_, comm_, MPI_STATUS_IGNORE);
data_received_ = true;
}
return data_received_;
}
protected:
std::vector<T>& vec_;
int from_;
int tag_;
MPI_Comm comm_;
int size_ = 0;
bool size_received_ = false;
bool data_received_ = false;
};
#if 0
// Promise_Size
[from_,tag_,comm_,size_=0,received_=false]() mutable -> std::optional<int>
{
if (!received_) {
MPI_Status status;
int flag = 0;
// Wait for a message from rank from_ with tag tag_
MPI_Iprobe(from_, tag_, comm_, &flag, &status); // NOTE: maybe change to MPI_Improbe (for thread-savety)
if (flag != 0) {
// Find out the number of elements in the message
MPI_Get_count(&status, type_to_mpi<T>, &size_);
received_ = true;
}
}
if (received_)
return size_;
else
return {};
};
[from_,tag_,comm_,&vec_,received_=false](int size) mutable -> std::optional<std::vector<T> const*>
{
if (!received_) {
vec_.resize(size);
// Receive the message. ignore the status
MPI_Recv(vec_.data(), size, type_to_mpi<T>, from_, tag_, comm_, MPI_STATUS_IGNORE);
received_ = true;
}
if (received_)
return &vec_;
else
return {};
};
#endif
} // end namespace mpi14
#pragma once
#include <vector>
#include <functional>
#include <type_traits>
#include <mpi.h>
#include <boost/optional.hpp>
namespace mpi14
{
{
class Request
{
public:
......@@ -13,12 +16,25 @@ namespace mpi14
Request(MPI_Request request)
: request_(request)
{}
// In advanced mode, take a callable that returns an optional<MPI_Status>
// This functor is called in the test() function and wait() isa implemented in terms of test()
template <class F,
REQUIRES( concepts::Functor<F, boost::optional<MPI_Status>(void)> ) >
Request(F&& f)
: test_(std::forward<F>(f))
, advanced_(true)
{}
// Returns an MPI_Status object if the operation identified by the request is complete.
// If the request is an active persistent request, it is marked as inactive. Any other type of
// request is deallocated and the request handle is set to MPI_REQUEST_NULL .
boost::optional<MPI_Status> test()
{
if (advanced_)
return test_();
MPI_Status status;
int flag = 0;
MPI_Test(&request_, &flag, &status);
......@@ -28,9 +44,11 @@ namespace mpi14
return {};
}
// Access the information associated with a request, without freeing the request
// Access the information associated with a request, without freeing the request.
boost::optional<MPI_Status> status() const
{
assert( !advanced_ );
MPI_Status status;
int flag = 0;
MPI_Request_get_status(request_, &flag, &status);
......@@ -43,21 +61,39 @@ namespace mpi14
// Returns when the operation identified by request is complete.
MPI_Status wait()
{
MPI_Status status;
MPI_Wait(&request_, &status);
if (advanced_) {
boost::optional<MPI_Status> status;
while( !(status = test()) ) ;
return status.value();
} else {
MPI_Status status;
MPI_Wait(&request_, &status);
}
}
// Returns the underlying MPI_Request handle
MPI_Request get() const { return request_; }
MPI_Request get() const
{
assert( !advanced_ );
return request_;
}
MPI_Request& get()
{
assert( !advanced_ );
return request_;
}
// Deallocate a request object without waiting for the associated communication to complete.
void free()
{
assert( !advanced_ );
MPI_Request_free(&request_);
}
void cancel()
{
assert( !advanced_ );
MPI_Cancel(&request_);
}
......@@ -70,88 +106,9 @@ namespace mpi14
protected:
MPI_Request request_;
std::function<boost::optional<MPI_Status>(void)> test_;
bool advanced_ = false;
};
#if 0
template <class Data, class F>
class Request
: public DefaultRequest<Data>
{
public:
DefaultRequest(Data const* data, F const& f)
: DefaultRequest(MPI_REQUEST_NULL, data)
, get_request_(f)
{}
std::optional<Data const*> test()
{
if (!initialized_) {
DefaultRequest::request_ = get_request_();
initialized_ = true;
}
return DefaultRequest::test();
}
private:
F get_request_;
bool initialized_ = false;
};
template <class Data, class F>
class DependentRequest
: public DefaultRequest<Data>
{
public:
DependentRequest(Data const* data, F const& f)
: DefaultRequest(MPI_REQUEST_NULL, data)
, get_request_(f)
{}
template <class T>
std::optional<Data const*> test(std::optional<T const*> in)
{
if (!in)
return {}
if (!initialized_) {
DefaultRequest::request_ = get_request_(*in);
initialized_ = true;
}
return DefaultRequest::test();
}
private:
F get_request_;
bool initialized_ = false;
};
#endif
#if 0
int size;
Request<int> req0(&size, [&size]()
{
MPI_Request request;
MPI_Irecv(&size, 1, MPI_INT, from_, tag_, comm_, &request);
return request;
});
std::vector<double> vec;
std::vector<char> buffer;
DependentRequest<std::vector<double>> req(&vec, [&vec,&buffer](int size)
{
assert( size > 0 );
buffer.resize(size);
MPI_Request request;
MPI_Irecv(buffer.data(), size, MPI_BYTE, from_, tag_, comm_, &request);
return request;
});
#endif
} // end namespace mpi14
#pragma once
#include <chrono>
#include <list>
#include <future>
namespace mpi14
{
class FutureBase
{
public:
virtual ~FutureBase() {}
virtual bool test() = 0;
virtual void wait()
{
while( !test() ) ;
}
template <class Rep, class Period>
std::future_status wait_for(std::chrono::duration<Rep,Period> const& timeout_duration)
{
std::chrono::system_clock::time_point t = std::chrono::system_clock::now();
while( !test() ) {
if ((std::chrono::system_clock::now() - t) >= timeout_duration)
break;
}
if (test())
return std::future_status::ready;
else
return std::future_status::timeout;
}
template <class Clock, class Duration>
std::future_status wait_until(std::chrono::time_point<Clock,Duration> const& timeout_time)
{
return wait_for(timeout_time - std::chrono::system_clock::now());
}
};
// Blocks until all communication operations associated with active handles in the range complete.
template <class FutureIter>
void wait_all(FutureIter first, FutureIter last)
template <class ReqIter>
void wait_all(ReqIter first, ReqIter last)
{
std::list<FutureIter> remaining;
for (FutureIter it = first; it != last; ++it) remaining.push_back(it);
std::list<ReqIter> remaining;
for (ReqIter it = first; it != last; ++it) remaining.push_back(it);
while (!remaining.empty()) {
auto remove_it = remaining.end();
......@@ -68,8 +28,8 @@ namespace mpi14
// Tests for completion of either one or none of the operations associated with active handles.
// In the former case, it returns an iterator to the finished handle.
template <class FutureIter>
FutureIter test_any(FutureIter first, FutureIter last)
template <class ReqIter>
ReqIter test_any(ReqIter first, ReqIter last)
{
for (auto it = first; it != last; ++it) {
if (it->test())
......@@ -80,8 +40,8 @@ namespace mpi14
// Blocks until one of the operations associated with the active requests in the range has completed.
template <class FutureIter>
void wait_any(FutureIter first, FutureIter last)
template <class ReqIter>
void wait_any(ReqIter first, ReqIter last)
{
while (test_any(first, last) == last) ;
}
......
install(FILES
Communicator.impl.hpp
RecvDynamicSize.impl.hpp
DESTINATION include/mpi14/impl/
)
#pragma once
#include "RecvDynamicSize.impl.hpp"
namespace mpi14 {
// send mpi datatype
......@@ -28,6 +30,12 @@ void Communicator::send(std::vector<T> const& vec, int to, int tag) const
}
void Communicator::send(std::string const& str, int to, int tag) const
{
MPI_Send(str.data(), int(str.size()), MPI_CHAR, to, tag, comm_);
}
// send complex datatype:
// 1. create a binary representation of data, store it in a buffer
// 2. send size of buffer (with MPI_Ibsend)
......@@ -97,22 +105,30 @@ Request Communicator::isend(std::vector<T> const& vec, int to, int tag) const
}
Request Communicator::isend(std::string const& str, int to, int tag) const
{
MPI_Request request;
MPI_Isend(str.data(), int(str.size()), MPI_CHAR, to, tag, comm_, &request);
return {request};
}
// send complex datatype: (non-blocking)
// 1. create a binary representation of data, store it in a buffer
// 2. send size of buffer (with MPI_Ibsend)
// 3. send buffer (with MPI_Ibsend)
template <class Data,
REQUIRES( not is_mpi_type<Data> )>
std::shared_ptr<MPI_Request> Communicator::isend(Data const& data, int to, int tag) const
Request Communicator::isend(Data const& data, int to, int tag) const
{
check_buffers();
buffers_.emplace_back( std::make_pair(std::make_shared<MPI_Request>(), serialization::store(data)) );
MPI_Request request;
buffers_.emplace_back(&request, serialization::store(data));
auto request = buffers_.back().first;
auto const& buffer = buffers_.back().second;
MPI_Isend(buffer.data(), int(buffer.size()), MPI_BYTE, to, tag, comm_, request.get());
return request;
MPI_Isend(buffer.data(), int(buffer.size()), MPI_BYTE, to, tag, comm_, &request);
return {request};
}
// -------------------------------------------------------------------------------------
......@@ -156,6 +172,20 @@ MPI_Status Communicator::recv(std::vector<T>& vec, int from, int tag) const
}
MPI_Status Communicator::recv(std::string& str, int from, int tag) const
{
MPI_Status status;
MPI_Probe(from, tag, comm_, &status);
int size = 0;
MPI_Get_count(&status, MPI_CHAR, &size);
str.resize(size);
MPI_Recv(&str[0], size, MPI_CHAR, from, tag, comm_, MPI_STATUS_IGNORE);
return status;
}
// receive complex datatype
template <class Data,
REQUIRES( not is_mpi_type<Data> )>
......@@ -180,22 +210,62 @@ MPI_Status Communicator::recv(Data& data, int from, int tag) const
// receive mpi datatype
template <class Data,
REQUIRES( is_mpi_type<Data> )>
std::shared_ptr<MPI_Request> Communicator::irecv(Data& data, int from, int tag) const
Request Communicator::irecv(Data& data, int from, int tag) const
{
auto request = std::make_shared<MPI_Request>();
MPI_Irecv(&data, 1, type_to_mpi<Data>, from, tag, comm_, request.get());
return request;
MPI_Request request;
MPI_Irecv(&data, 1, type_to_mpi<Data>, from, tag, comm_, &request);
return {request};
}
// receive array of mpi datatypes
template <class Data,
REQUIRES( is_mpi_type<Data> )>
std::shared_ptr<MPI_Request> Communicator::irecv(Data* data, std::size_t size, int from, int tag) const
Request Communicator::irecv(Data* data, std::size_t size, int from, int tag) const
{
MPI_Request request;
MPI_Irecv(data, size, type_to_mpi<Data>, from, tag, comm_, &request);
return {request};
}
template <class T,
REQUIRES( is_mpi_type<T> )>
Request Communicator::irecv(std::vector<T>& vec, int from, int tag) const
{
return {RecvDynamicSize(type_to_mpi<T>,from,tag,comm_,
[from,tag,comm=comm_,&vec](int size)
{
vec.resize(size);
MPI_Recv(vec.data(), size, type_to_mpi<T>, from, tag, comm, MPI_STATUS_IGNORE);
}) };
}
Request Communicator::irecv(std::string& str, int from, int tag) const
{
return {RecvDynamicSize(MPI_CHAR,from,tag,comm_,
[from,tag,comm=comm_,&str](int size)
{
str.resize(size);
MPI_Recv(&str[0], size, MPI_CHAR, from, tag, comm, MPI_STATUS_IGNORE);
}) };
}
template <class Data,
REQUIRES( not is_mpi_type<Data> )>
Request Communicator::irecv(Data& data, int from, int tag) const
{
auto request = std::make_shared<MPI_Request>();
MPI_Irecv(data, size, type_to_mpi<Data>, from, tag, comm_, request.get());
return request;
return {RecvDynamicSize(MPI_BYTE,from,tag,comm_,
[from,tag,comm=comm_,&data](int size)
{
std::vector<char> buffer(size);
// Receive the message. ignore the status
MPI_Recv(buffer.data(), size, MPI_BYTE, from, tag, comm, MPI_STATUS_IGNORE);