OPAL (Object Oriented Parallel Accelerator Library) 2024.2
OPAL
AsyncSendBuffers.h
Go to the documentation of this file.
1//
2// Class AsyncSendBuffer and AsyncSendBuffers
3//
4// Copyright (c) 2010 - 2013, Yves Ineichen, ETH Zürich
5// All rights reserved
6//
7// Implemented as part of the PhD thesis
8// "Toward massively parallel multi-objective optimization with application to
9// particle accelerators" (https://doi.org/10.3929/ethz-a-009792359)
10//
11// This file is part of OPAL.
12//
13// OPAL is free software: you can redistribute it and/or modify
14// it under the terms of the GNU General Public License as published by
15// the Free Software Foundation, either version 3 of the License, or
16// (at your option) any later version.
17//
18// You should have received a copy of the GNU General Public License
19// along with OPAL. If not, see <https://www.gnu.org/licenses/>.
20//
21#include <vector>
22#include <algorithm>
23#include <string.h>
24#include "mpi.h"
25#include "boost/bind.hpp"
26
28
29public:
30
31 AsyncSendBuffer(std::ostringstream &os) {
32 this->size_req = new MPI_Request();
33 this->buffer_req = new MPI_Request();
34 this->buf_size = os.str().length();
35 buffer = new char[buf_size];
36 memcpy(buffer, os.str().c_str(), buf_size);
37 }
38
40 delete size_req;
41 delete buffer_req;
42 delete[] buffer;
43 }
44
45 bool hasCompleted() {
46 int bufferflag = 0;
47 MPI_Test(this->buffer_req, &bufferflag, MPI_STATUS_IGNORE);
48 if(bufferflag) {
49 int sizeflag = 0;
50 MPI_Test(this->buffer_req, &sizeflag, MPI_STATUS_IGNORE);
51 if(sizeflag) {
52 return true;
53 }
54 }
55 return false;
56 }
57
58 void send(int recv_rank, int size_tag, int data_tag, MPI_Comm comm) {
59 MPI_Isend(&buf_size, 1, MPI_LONG, recv_rank, size_tag, comm, size_req);
60 MPI_Isend(buffer, buf_size, MPI_CHAR, recv_rank, data_tag, comm, buffer_req);
61 }
62
63
64private:
65
66 // can't use smart pointers because MPI will hold last valid reference to
67 // pointer
68 MPI_Request *size_req;
69 MPI_Request *buffer_req;
70 char *buffer;
71
72 size_t buf_size;
73
74};
75
76
78
79public:
81
82 void insert(std::shared_ptr<AsyncSendBuffer> buf) {
83 collection_.push_back(buf);
84 }
85
86 void cleanup() {
87 collection_.erase(std::remove_if(
88 collection_.begin(), collection_.end(), boost::bind(&AsyncSendBuffer::hasCompleted, _1)),
89 collection_.end());
90 }
91
92 size_t size() {
93 return collection_.size();
94 }
95
96private:
97 std::vector< std::shared_ptr<AsyncSendBuffer> > collection_;
98};
99
MPI_Request * buffer_req
MPI_Request * size_req
AsyncSendBuffer(std::ostringstream &os)
void send(int recv_rank, int size_tag, int data_tag, MPI_Comm comm)
void insert(std::shared_ptr< AsyncSendBuffer > buf)
std::vector< std::shared_ptr< AsyncSendBuffer > > collection_