OPAL (Object Oriented Parallel Accelerator Library) 2024.2
OPAL
Worker.h
Go to the documentation of this file.
1//
2// Class Worker
3// A worker MPI entity consists of a processor group that runs a
4// simulation of type Sim_t. The main loop in run() accepts new jobs from the
5// master process runs the simulation and reports back the results.
6//
7// @see Pilot
8// @see Poller
9// @see MPIHelper.h
10//
11// @tparam Sim_T type of simulation to run
12//
13// Copyright (c) 2010 - 2013, Yves Ineichen, ETH Zürich
14// All rights reserved
15//
16// Implemented as part of the PhD thesis
17// "Toward massively parallel multi-objective optimization with application to
18// particle accelerators" (https://doi.org/10.3929/ethz-a-009792359)
19//
20// This file is part of OPAL.
21//
22// OPAL is free software: you can redistribute it and/or modify
23// it under the terms of the GNU General Public License as published by
24// the Free Software Foundation, either version 3 of the License, or
25// (at your option) any later version.
26//
27// You should have received a copy of the GNU General Public License
28// along with OPAL. If not, see <https://www.gnu.org/licenses/>.
29//
30#ifndef __WORKER_H__
31#define __WORKER_H__
32
33#include <iostream>
34
35#include "Pilot/Poller.h"
36
37#include "Comm/types.h"
38#include "Util/Types.h"
39#include "Util/MPIHelper.h"
40#include "Util/CmdArguments.h"
41
42template <class Sim_t>
43class Worker : protected Poller {
44
45public:
46
48 std::string simName,
49 Comm::Bundle_t comms,
50 CmdArguments_t args)
51 : Poller(comms.worker)
52 , cmd_args_(args)
53 {
54 constraints_ = constraints;
55 simulation_name_ = simName;
57 is_idle_ = true;
59
60 leader_pid_ = 0;
61 MPI_Comm_size(coworker_comm_, &num_coworkers_);
62 }
63
65 Expressions::Named_t constraints,
66 std::string simName,
67 Comm::Bundle_t comms,
68 CmdArguments_t args,
69 const std::map<std::string, std::string> &userVariables,
70 bool isOptimizer = true)
71 : Poller(comms.worker)
72 , cmd_args_(args)
73 , userVariables_(userVariables)
74 {
75 objectives_ = objectives;
76 constraints_ = constraints;
77 simulation_name_ = simName;
79 is_idle_ = true;
81
82 leader_pid_ = 0;
83 MPI_Comm_size(coworker_comm_, &num_coworkers_);
84
85 if (!isOptimizer) return;
86 int my_local_pid = 0;
87 MPI_Comm_rank(coworker_comm_, &my_local_pid);
88
89 // distinction between leader and coworkers
90 if(my_local_pid == leader_pid_)
91 run();
92 else
94 }
95
97 {}
98
99
100protected:
101 typedef const std::unique_ptr<Sim_t> SimPtr_t;
102
105
108
109
112 void runCoWorker() {
113
114 MPI_Request stop_req;
115 size_t stop_value = 0;
116
117 MPI_Irecv(&stop_value, 1, MPI_UNSIGNED_LONG, leader_pid_,
118 MPI_ANY_TAG, coworker_comm_, &stop_req);
119 is_running_ = true;
120
121 while(is_running_) {
122
123 //FIXME: bcast blocks after our leader stopped working
124 // Either we create a new class implementing a coworker in the
125 // same manner as the worker (poll loop). Anyway there is no way
126 // around removing the Bcast and adding another tag in the poll
127 // loop above in order to be able to exit cleanly.
128 if(stop_req != MPI_REQUEST_NULL) {
129 MPI_Status status;
130 int flag = 0;
131 MPI_Test(&stop_req, &flag, &status);
132
133 if(flag) {
134
135 if(status.MPI_TAG == MPI_COWORKER_NEW_JOB_TAG) {
136 Param_t params;
138
139 try {
140 SimPtr_t sim(new Sim_t(objectives_, constraints_,
143
144 sim->run();
145 } catch(OptPilotException &ex) {
146 std::cout << "Exception while running simulation: "
147 << ex.what() << std::endl;
148 }
149 MPI_Irecv(&stop_value, 1, MPI_UNSIGNED_LONG, leader_pid_,
150 MPI_ANY_TAG, coworker_comm_, &stop_req);
151 }
152
153 if(status.MPI_TAG == MPI_STOP_TAG) {
154 is_running_ = false;
155 break;
156 }
157 }
158 }
159 }
160 }
161
162
163protected:
164
168 std::string simulation_name_;
170
171 const std::map<std::string, std::string> userVariables_;
172
174 void notifyCoWorkers(int tag) {
175
176 for(int i=0; i < num_coworkers_; i++) {
177 if(i == leader_pid_) continue;
178
179 size_t dummy = 0;
180 MPI_Send(&dummy, 1, MPI_UNSIGNED_LONG, i, tag, coworker_comm_);
181 }
182 }
183
184 void setupPoll() {
185 size_t dummy = 1;
186 MPI_Send(&dummy, 1, MPI_UNSIGNED_LONG, pilot_rank_,
188 }
189
190 void prePoll()
191 {}
192
193 void postPoll()
194 {}
195
196 void onStop() {
197 if(num_coworkers_ > 1)
199 }
200
201 virtual bool onMessage(MPI_Status status, size_t recv_value) {
202
203 if(status.MPI_TAG == MPI_WORK_JOBID_TAG) {
204
205 is_idle_ = false;
206 size_t job_id = recv_value;
207
208 // get new job
209 Param_t params;
210 MPI_Recv_params(params, (size_t)pilot_rank_, comm_m);
211
212 // and forward to coworkers (if any)
213 if(num_coworkers_ > 1) {
216 }
217
218 //XXX we need to know if we want EVAL or DERIVATIVE
219 //reqVarContainer_t reqVars;
220 //MPI_Recv_reqvars(reqVars, (size_t)pilot_rank_, comm_m);
221
222 reqVarContainer_t requested_results;
223 try {
224 SimPtr_t sim(new Sim_t(objectives_, constraints_,
227
228 // run simulation in a "blocking" fashion
229 sim->run();
230 sim->collectResults();
231 sim->cleanUp();
232 requested_results = sim->getResults();
233 } catch(OptPilotException &ex) {
234 std::cout << "Exception while running simulation: "
235 << ex.what() << std::endl;
236 }
237
238 MPI_Send(&job_id, 1, MPI_UNSIGNED_LONG, pilot_rank_,
240
241 size_t dummy = 0;
242 MPI_Recv(&dummy, 1, MPI_UNSIGNED_LONG, pilot_rank_,
244
245 MPI_Send_reqvars(requested_results, (size_t)pilot_rank_, comm_m);
246
247 is_idle_ = true;
248 return true;
249
250 } else {
251 std::stringstream os;
252 os << "Unexpected MPI_TAG: " << status.MPI_TAG;
253 std::cout << "(Worker) Error: " << os.str() << std::endl;
254 throw OptPilotException("Worker::onMessage", os.str());
255 }
256 }
257};
258
259#endif
#define MPI_WORK_JOBID_TAG
unique id of the job
Definition MPIHelper.h:52
#define MPI_WORKER_FINISHED_TAG
notify pilot that work has been finished and results are ready to collect
Definition MPIHelper.h:35
#define MPI_COWORKER_NEW_JOB_TAG
notifies coworkers about new jobs
Definition MPIHelper.h:40
#define MPI_WORKER_STATUSUPDATE_TAG
notify pilot about worker status
Definition MPIHelper.h:33
#define MPI_WORKER_FINISHED_ACK_TAG
pilot notifies worker that he is ready to collect the results
Definition MPIHelper.h:37
#define MPI_STOP_TAG
global stop tag to exit poll loop (
Definition MPIHelper.h:64
void MPI_Send_reqvars(reqVarContainer_t reqvars, size_t pid, MPI_Comm comm)
void MPI_Bcast_params(Param_t &params, size_t root, MPI_Comm comm)
broadcast params to all entities in comm
Definition MPIHelper.cpp:60
void MPI_Recv_params(Param_t &params, size_t pid, MPI_Comm comm)
std::map< std::string, reqVarInfo_t > reqVarContainer_t
Definition Types.h:79
namedVariableCollection_t Param_t
Definition Types.h:48
std::shared_ptr< CmdArguments > CmdArguments_t
std::map< std::string, Expressions::Expr_t * > Named_t
type of an expressions with a name
Definition Expression.h:74
MPI_Comm coworkers
Definition types.h:39
int master_local_pid
Definition types.h:36
bundles all communicators for a specific role/pid
Definition types.h:32
Poller(MPI_Comm comm, double delay=0.1)
Definition Poller.h:39
bool is_running_
Definition Poller.h:54
MPI_Comm comm_m
communicator the poller listens to requests
Definition Poller.h:52
virtual void run()
Definition Poller.h:79
Expressions::Named_t objectives_
Definition Worker.h:106
~Worker()
Definition Worker.h:96
Worker(Expressions::Named_t objectives, Expressions::Named_t constraints, std::string simName, Comm::Bundle_t comms, CmdArguments_t args, const std::map< std::string, std::string > &userVariables, bool isOptimizer=true)
Definition Worker.h:64
const std::unique_ptr< Sim_t > SimPtr_t
Definition Worker.h:101
std::string simulation_name_
Definition Worker.h:168
void runCoWorker()
Definition Worker.h:112
int pilot_rank_
Definition Worker.h:167
int num_coworkers_
Definition Worker.h:166
void postPoll()
executed after handling (if any) new request
Definition Worker.h:193
void onStop()
enable implementation to react to STOP tag
Definition Worker.h:196
MPI_Comm coworker_comm_
Definition Worker.h:104
void notifyCoWorkers(int tag)
notify coworkers of incoming broadcast
Definition Worker.h:174
void prePoll()
executed before checking for new request
Definition Worker.h:190
CmdArguments_t cmd_args_
Definition Worker.h:169
int leader_pid_
Definition Worker.h:165
const std::map< std::string, std::string > userVariables_
Definition Worker.h:171
void setupPoll()
executed before starting polling loop
Definition Worker.h:184
Worker(Expressions::Named_t constraints, std::string simName, Comm::Bundle_t comms, CmdArguments_t args)
Definition Worker.h:47
virtual bool onMessage(MPI_Status status, size_t recv_value)
Definition Worker.h:201
Expressions::Named_t constraints_
Definition Worker.h:107
bool is_idle_
Definition Worker.h:103
virtual const char * what() const