OPAL (Object Oriented Parallel Accelerator Library) 2024.2
OPAL
Pilot.h
Go to the documentation of this file.
1//
2// Class Pilot
3// The Optimization Pilot (Master): Coordinates requests by optimizer
4// to workers and reports results back on a given communicator.
5//
6// Every worker thread notifies the master here if idle or not. When
7// available the master dispatches one of the pending simulations to the
8// worker who will run the specified simulation and report results back to
9// the master. The Optimizer class will poll the scheduler to check if some
10// (or all) results are available and continue to optimize and request new
11// simulation results.
12//
13// @see Worker
14// @see Optimizer
15//
16// @tparam Opt_t type of the optimizer
17// @tparam Sim_t type of the simulation
18// @tparam SolPropagationGraph_t strategy to distribute solution between
19// master islands
20// @tparam Comm_t comm splitter strategy
21//
22// Copyright (c) 2010 - 2013, Yves Ineichen, ETH Zürich
23// All rights reserved
24//
25// Implemented as part of the PhD thesis
26// "Toward massively parallel multi-objective optimization with application to
27// particle accelerators" (https://doi.org/10.3929/ethz-a-009792359)
28//
29// This file is part of OPAL.
30//
31// OPAL is free software: you can redistribute it and/or modify
32// it under the terms of the GNU General Public License as published by
33// the Free Software Foundation, either version 3 of the License, or
34// (at your option) any later version.
35//
36// You should have received a copy of the GNU General Public License
37// along with OPAL. If not, see <https://www.gnu.org/licenses/>.
38//
39#ifndef __PILOT_H__
40#define __PILOT_H__
41
42#include <mpi.h>
43#include <iostream>
44#include <string>
45#include <unistd.h>
46
47#include "Comm/MasterNode.h"
48#include "Comm/CommSplitter.h"
49
50#include "Util/Types.h"
51#include "Util/CmdArguments.h"
53
54#include "Pilot/Poller.h"
55#include "Pilot/Worker.h"
56#include "Optimizer/Optimizer.h"
57
58#include "Util/Trace/Trace.h"
59#include "Util/Trace/FileSink.h"
61
63
64
90
91
92
93template <
94 class Opt_t
95 , class Sim_t
96 , class SolPropagationGraph_t
97 , class Comm_t
98>
99class Pilot : protected Poller {
100
101public:
102
103 // constructor only for Pilot classes inherited from this class
104 // they have their own setup function
105 Pilot(CmdArguments_t args, std::shared_ptr<Comm_t> comm,
106 const DVarContainer_t &dvar)
107 : Poller(comm->mpiComm())
108 , comm_(comm)
109 , cmd_args_(args)
110 , dvars_(dvar)
111 {
112 // do nothing
113 }
114
115 Pilot(CmdArguments_t args, std::shared_ptr<Comm_t> comm,
116 functionDictionary_t known_expr_funcs)
117 : Poller(comm->mpiComm())
118 , comm_(comm)
119 , cmd_args_(args)
120 {
121 setup(known_expr_funcs);
122 }
123
124 Pilot(CmdArguments_t args, std::shared_ptr<Comm_t> comm,
125 functionDictionary_t known_expr_funcs,
126 const DVarContainer_t &dvar,
127 const Expressions::Named_t &obj,
128 const Expressions::Named_t &cons,
129 std::vector<double> hypervolRef = {},
130 bool isOptimizerRun = true,
131 const std::map<std::string, std::string> &userVariables = {})
132 : Poller(comm->mpiComm())
133 , comm_(comm)
134 , cmd_args_(args)
135 , objectives_(obj)
136 , constraints_(cons)
137 , dvars_(dvar)
138 , hypervolRef_(hypervolRef)
139 {
140 if (isOptimizerRun)
141 setup(known_expr_funcs, userVariables);
142 }
143
144 virtual ~Pilot()
145 {
146 for (auto itr = objectives_.begin(); itr != objectives_.end(); ++ itr)
147 delete itr->second;
148
149 for (auto itr = constraints_.begin(); itr != constraints_.end(); ++ itr)
150 delete itr->second;
151 }
152
153
154protected:
155
157 MPI_Comm worker_comm_;
159 MPI_Comm opt_comm_;
162
163 std::shared_ptr<Comm_t> comm_;
165
169
171
172 typedef MasterNode< typename Opt_t::SolutionState_t,
173 SolPropagationGraph_t > MasterNode_t;
174 std::unique_ptr< MasterNode_t > master_node_;
175
177 std::string input_file_;
178
182
186 std::vector<double> hypervolRef_;
187
188
189 // keep track of state of all workers
190 std::vector<bool> is_worker_idle_;
191
193 typedef std::map<size_t, std::pair<Param_t, reqVarContainer_t> > Jobs_t;
194 typedef Jobs_t::iterator JobIter_t;
197
198 //DEBUG
199 std::unique_ptr<Trace> job_trace_;
200
201private:
202 void setup(functionDictionary_t known_expr_funcs,
203 const std::map<std::string, std::string> &userVariables) {
204 global_rank_ = comm_->globalRank();
205
206 if(global_rank_ == 0) {
207 std::cout << "\033[01;35m";
208 std::cout << " _ _ _ _ " << std::endl;
209 std::cout << " | | (_) | | | " << std::endl;
210 std::cout << " ___ _ __ | |_ ______ _ __ _| | ___ | |_ " << std::endl;
211 std::cout << " / _ \\| '_ \\| __|______| '_ \\| | |/ _ \\| __|" << std::endl;
212 std::cout << "| (_) | |_) | |_ | |_) | | | (_) | |_ " << std::endl;
213 std::cout << " \\___/| .__/ \\__| | .__/|_|_|\\___/ \\__|" << std::endl;
214 std::cout << " | | | | " << std::endl;
215 std::cout << " |_| |_| " << std::endl;
216 // ADA std::cout << "☷ Version: \t" << PACKAGE_VERSION << std::endl;
217 //std::cout << "☷ Git: \t\t" << GIT_VERSION << std::endl;
218 //std::cout << "☷ Build Date: \t" << BUILD_DATE << std::endl;
219 std::cout << "\e[0m";
220 std::cout << std::endl;
221 }
222
223 MPI_Barrier(MPI_COMM_WORLD);
224 parseInputFile(known_expr_funcs, true);
225
226 // here the control flow starts to diverge
227 if ( comm_->isOptimizer() ) { startOptimizer(); }
228 else if ( comm_->isWorker() ) { startWorker(userVariables); }
229 else if ( comm_->isPilot() ) { startPilot(); }
230 }
231
232protected:
233
234 void parseInputFile(functionDictionary_t /*known_expr_funcs*/, bool isOptimizationRun) {
235
236 try {
237 input_file_ = cmd_args_->getArg<std::string>("inputfile", true);
238 } catch (OptPilotException &e) {
239 std::cout << "Could not find 'inputfile' in arguments.. Aborting."
240 << std::endl;
241 MPI_Abort(comm_m, -101);
242 }
243
244 if((isOptimizationRun && objectives_.size() == 0) || dvars_.size() == 0) {
245 throw OptPilotException("Pilot::Pilot()",
246 "No objectives or dvars specified");
247 }
248
249 if(global_rank_ == 0) {
250 std::ostringstream os;
251 os << "\033[01;35m";
252 os << " ✔ " << objectives_.size()
253 << " objectives" << std::endl;
254 if (isOptimizationRun) {
255 os << " ✔ " << constraints_.size()
256 << " constraints" << std::endl;
257 }
258 os << " ✔ " << dvars_.size()
259 << " dvars" << std::endl;
260 os << "\e[0m";
261 os << std::endl;
262 std::cout << os.str() << std::flush;
263 }
264
265 MPI_Barrier(MPI_COMM_WORLD);
266 }
267
268 virtual
270
271 std::ostringstream os;
272 os << "\033[01;35m" << " " << global_rank_ << " (PID: " << getpid() << ") ▶ Opt"
273 << "\e[0m" << std::endl;
274 std::cout << os.str() << std::flush;
275
276 const std::unique_ptr<Opt_t> opt(
277 new Opt_t(objectives_, constraints_, dvars_, objectives_.size(),
278 comm_->getBundle(), cmd_args_, hypervolRef_, comm_->getNrWorkerGroups()));
279 opt->initialize();
280
281 std::cout << "Stop Opt.." << std::endl;
282 }
283
284 virtual
285 void startWorker(const std::map<std::string, std::string> &userVariables) {
286
287 std::ostringstream os;
288 os << "\033[01;35m" << " " << global_rank_ << " (PID: " << getpid() << ") ▶ Worker"
289 << "\e[0m" << std::endl;
290 std::cout << os.str() << std::flush;
291
292 size_t pos = input_file_.find_last_of("/");
293 std::string tmplfile = input_file_;
294 if(pos != std::string::npos)
295 tmplfile = input_file_.substr(pos+1);
296 pos = tmplfile.find(".");
297 std::string simName = tmplfile.substr(0,pos);
298
299 const std::unique_ptr< Worker<Sim_t> > w(
301 comm_->getBundle(), cmd_args_, userVariables));
302
303 std::cout << "Stop Worker.." << std::endl;
304 }
305
306 virtual
307 void startPilot() {
308
309 std::ostringstream os;
310 os << "\033[01;35m" << " " << global_rank_ << " (PID: " << getpid() << ") ▶ Pilot"
311 << "\e[0m" << std::endl;
312 std::cout << os.str() << std::flush;
313
314 // Traces
315 std::ostringstream trace_filename;
316 trace_filename << "pilot.trace." << comm_->getBundle().island_id;
317 job_trace_.reset(new Trace("Optimizer Job Trace"));
318 job_trace_->registerComponent( "sink",
319 std::shared_ptr<TraceComponent>(new FileSink(trace_filename.str())));
320
321 worker_comm_ = comm_->getBundle().worker;
322 opt_comm_ = comm_->getBundle().opt;
323 coworker_comm_ = comm_->getBundle().world;
324
326 MPI_Comm_rank(worker_comm_, &my_rank_in_worker_comm_);
328 MPI_Comm_rank(opt_comm_, &my_rank_in_opt_comm_);
329
331 MPI_Comm_size(worker_comm_, &total_available_workers_);
334
335 // setup master network
336 num_coworkers_ = 0;
337 MPI_Comm_size(coworker_comm_, &num_coworkers_);
338 if(num_coworkers_ > 1) {
339 //FIXME: proper upper bound for window size
340 int alpha = cmd_args_->getArg<int>("initialPopulation", false);
341 int opt_size = objectives_.size() + constraints_.size();
342 int overhead = 10;
343 size_t upperbound_buffer_size =
344 sizeof(double) * alpha * (1 + opt_size) * 1000
345 + overhead;
346 master_node_.reset(
347 new MasterNode< typename Opt_t::SolutionState_t,
348 SolPropagationGraph_t >(
349 coworker_comm_, upperbound_buffer_size, objectives_.size(),
350 comm_->getBundle().island_id));
351 }
352
353 has_opt_converged_ = false;
354 continue_polling_ = true;
355 run();
356
357 std::cout << "Stop Pilot.." << std::endl;
358 }
359
360 virtual
362 {}
363
364 virtual
365 void prePoll()
366 {}
367
368 virtual
369 void onStop()
370 {}
371
372 virtual
373 void postPoll() {
374 // terminating all workers is tricky since we do not know their state.
375 // All workers are notified (to terminate) when opt has converged and
376 // all workers are idle.
377 bool all_worker_idle = true;
378
379 // in the case where new requests became available after worker
380 // delivered last results (and switched to idle state).
381 for(int i = 0; i < total_available_workers_; i++) {
382
383 if(i == my_rank_in_worker_comm_) continue;
384
385 all_worker_idle = all_worker_idle && is_worker_idle_[i];
386
387 if(is_worker_idle_[i] && !request_queue_.empty())
389 }
390
391 // when all workers have been notified we can stop polling
392 if(all_worker_idle && has_opt_converged_) {
393 continue_polling_ = false;
394 int dummy = 0;
395 for(int worker = 0; worker < total_available_workers_; worker++) {
396 MPI_Request req;
397 MPI_Isend(&dummy, 1, MPI_INT, worker,
399 }
400 }
401 }
402
403
404 virtual
405 void sendNewJobToWorker(int worker) {
406
407 // no new jobs once our opt has converged
408 if(has_opt_converged_) return;
409
410 JobIter_t job = request_queue_.begin();
411 size_t jid = job->first;
412
413 Param_t job_params = job->second.first;
414 MPI_Send(&jid, 1, MPI_UNSIGNED_LONG, worker, MPI_WORK_JOBID_TAG, worker_comm_);
415 MPI_Send_params(job_params, worker, worker_comm_);
416
417 //reqVarContainer_t job_reqvars = job->second.second;
418 //MPI_Send_reqvars(job_reqvars, worker, worker_comm_);
419
420 running_job_list_.insert(std::pair<size_t,
421 std::pair<Param_t, reqVarContainer_t> >(job->first, job->second));
422 request_queue_.erase(jid);
423 is_worker_idle_[worker] = false;
424
425 std::ostringstream dump;
426 dump << "sent job with ID " << jid << " to worker " << worker
427 << std::endl;
428 job_trace_->log(dump);
429
430 }
431
432
433 virtual
434 bool onMessage(MPI_Status status, size_t recv_value){
435
436 MPITag_t tag = MPITag_t(status.MPI_TAG);
437 switch(tag) {
438
439 case WORKER_FINISHED_TAG: {
440
441 size_t job_id = recv_value;
442
443 size_t dummy = 1;
444 MPI_Send(&dummy, 1, MPI_UNSIGNED_LONG, status.MPI_SOURCE,
446
448 MPI_Recv_reqvars(res, status.MPI_SOURCE, worker_comm_);
449
450 running_job_list_.erase(job_id);
451 is_worker_idle_[status.MPI_SOURCE] = true;
452
453 std::ostringstream dump;
454 dump << "worker finished job with ID " << job_id << std::endl;
455 job_trace_->log(dump);
456
457
458 // optimizer already terminated, cannot accept new messages
459 if(has_opt_converged_) return true;
460
461 int opt_master_rank = comm_->getLeader();
462 MPI_Send(&job_id, 1, MPI_UNSIGNED_LONG, opt_master_rank,
464
465 MPI_Send_reqvars(res, opt_master_rank, opt_comm_);
466
467 // we keep worker busy _after_ results have been sent to optimizer
468 if(!request_queue_.empty())
469 sendNewJobToWorker(status.MPI_SOURCE);
470
471 return true;
472 }
473
474 case OPT_NEW_JOB_TAG: {
475
476 size_t job_id = recv_value;
477 int opt_master_rank = comm_->getLeader();
478
479 Param_t job_params;
480 MPI_Recv_params(job_params, (size_t)opt_master_rank, opt_comm_);
481
482 reqVarContainer_t reqVars;
483 //MPI_Recv_reqvars(reqVars, (size_t)opt_master_rank, job_size, opt_comm_);
484
485 std::pair<Param_t, reqVarContainer_t> job =
486 std::pair<Param_t, reqVarContainer_t>(job_params, reqVars);
487 request_queue_.insert(
488 std::pair<size_t, std::pair<Param_t, reqVarContainer_t> >(
489 job_id, job));
490
491 std::ostringstream dump;
492 dump << "new opt job with ID " << job_id << std::endl;
493 job_trace_->log(dump);
494
495 return true;
496 }
497
499
500 if(num_coworkers_ <= 1) return true;
501
502 std::ostringstream dump;
503 dump << "starting solution exchange.. " << status.MPI_SOURCE << std::endl;
504 job_trace_->log(dump);
505
506 // we start by storing or local solution state
507 size_t buffer_size = recv_value;
508 int opt_master_rank = status.MPI_SOURCE; //comm_->getLeader();
509
510 char *buffer = new char[buffer_size];
511 MPI_Recv(buffer, buffer_size, MPI_CHAR, opt_master_rank,
513 master_node_->store(buffer, buffer_size);
514 delete[] buffer;
515
516 dump.clear();
517 dump.str(std::string());
518 dump << "getting " << buffer_size << " bytes from OPT "
519 << opt_master_rank << std::endl;
520 job_trace_->log(dump);
521
522 // and then continue collecting all other solution states
523 std::ostringstream states;
524 master_node_->collect(states);
525 buffer_size = states.str().length();
526
527 dump.clear();
528 dump.str(std::string());
529 dump << "collected solution states of other PILOTS: "
530 << buffer_size << " bytes" << std::endl;
531 job_trace_->log(dump);
532
533 // send collected solution states to optimizer;
534 MPI_Send(&buffer_size, 1, MPI_UNSIGNED_LONG, opt_master_rank,
536
537 buffer = new char[buffer_size];
538 memcpy(buffer, states.str().c_str(), buffer_size);
539 MPI_Send(buffer, buffer_size, MPI_CHAR, opt_master_rank,
541
542 dump.clear();
543 dump.str(std::string());
544 dump << "sent set of new solutions to OPT" << std::endl;
545 job_trace_->log(dump);
546
547 delete[] buffer;
548
549 return true;
550 }
551
552 case OPT_CONVERGED_TAG: {
553 return stop();
554 }
555
557 is_worker_idle_[status.MPI_SOURCE] = true;
558 return true;
559 }
560
561 default: {
562 std::string msg = "(Pilot) Error: unexpected MPI_TAG: ";
563 msg += status.MPI_TAG;
564 throw OptPilotException("Pilot::onMessage", msg);
565 }
566 }
567 }
568
569 bool stop(bool isOpt = true) {
570
571 if(has_opt_converged_) return true;
572
573 has_opt_converged_ = true;
574 request_queue_.clear();
575 size_t dummy = 0;
576 MPI_Request req;
577 MPI_Isend(&dummy, 1, MPI_UNSIGNED_LONG, comm_->getLeader(), MPI_STOP_TAG, opt_comm_, &req);
578
579 if(! isOpt) return true;
580 if(num_coworkers_ <= 1) return true;
581
582 if(! cmd_args_->getArg<bool>("one-pilot-converge", false, false))
583 return true;
584
585 // propagate converged message to other pilots
586 // FIXME what happens if two island converge at the same time?
587 int my_rank = 0;
588 MPI_Comm_rank(coworker_comm_, &my_rank);
589 for(int i=0; i < num_coworkers_; i++) {
590 if(i == my_rank) continue;
591 MPI_Request req;
592 MPI_Isend(&dummy, 1, MPI_UNSIGNED_LONG, i, OPT_CONVERGED_TAG, coworker_comm_, &req);
593 }
594
595 return true;
596 }
597
598
599 // we overwrite run here to handle polling on two different communicators
600 //XXX: would be nice to give the poller interface an array of comms and
601 // listeners to be called..
602 void run() {
603
604 MPI_Request opt_request;
605 MPI_Request worker_request;
606 MPI_Status status;
607 int flag = 0;
608 size_t recv_value_worker = 0;
609 size_t recv_value_opt = 0;
610
611 setupPoll();
612
613 MPI_Irecv(&recv_value_opt, 1, MPI_UNSIGNED_LONG, MPI_ANY_SOURCE,
614 MPI_ANY_TAG, opt_comm_, &opt_request);
615 MPI_Irecv(&recv_value_worker, 1, MPI_UNSIGNED_LONG, MPI_ANY_SOURCE,
616 MPI_ANY_TAG, worker_comm_, &worker_request);
617
618 bool pending_opt_request = true;
619 bool pending_worker_request = true;
620 bool pending_pilot_request = false;
621
622 MPI_Request pilot_request;
623 size_t recv_value_pilot = 0;
624 if(cmd_args_->getArg<bool>("one-pilot-converge", false, false)) {
625 MPI_Irecv(&recv_value_pilot, 1, MPI_UNSIGNED_LONG, MPI_ANY_SOURCE,
626 MPI_ANY_TAG, coworker_comm_, &pilot_request);
627 pending_pilot_request = true;
628 }
629
630 while(continue_polling_) {
631
632 prePoll();
633
634 if(opt_request != MPI_REQUEST_NULL) {
635 MPI_Test(&opt_request, &flag, &status);
636 if(flag) {
637 pending_opt_request = false;
638 if(status.MPI_TAG == MPI_STOP_TAG) {
639 return;
640 } else {
641 if(onMessage(status, recv_value_opt)) {
642 MPI_Irecv(&recv_value_opt, 1, MPI_UNSIGNED_LONG,
643 MPI_ANY_SOURCE, MPI_ANY_TAG, opt_comm_,
644 &opt_request);
645 pending_opt_request = true;
646 } else
647 return;
648 }
649 }
650 }
651
652 if(worker_request != MPI_REQUEST_NULL) {
653 MPI_Test(&worker_request, &flag, &status);
654 if(flag) {
655 pending_worker_request = false;
656 if(status.MPI_TAG == MPI_STOP_TAG) {
657 return;
658 } else {
659 if(onMessage(status, recv_value_worker)) {
660 MPI_Irecv(&recv_value_worker, 1,
661 MPI_UNSIGNED_LONG, MPI_ANY_SOURCE, MPI_ANY_TAG,
662 worker_comm_, &worker_request);
663 pending_worker_request = true;
664 } else
665 return;
666 }
667 }
668 }
669
670 if(cmd_args_->getArg<bool>("one-pilot-converge", false, false)) {
671 if(pilot_request != MPI_REQUEST_NULL) {
672 MPI_Test(&pilot_request, &flag, &status);
673 if(flag) {
674 pending_pilot_request = false;
675 if(status.MPI_TAG == OPT_CONVERGED_TAG) {
676 stop(false);
677 } else {
678 MPI_Irecv(&recv_value_pilot, 1,
679 MPI_UNSIGNED_LONG, MPI_ANY_SOURCE, MPI_ANY_TAG,
680 coworker_comm_, &pilot_request);
681 pending_pilot_request = true;
682 }
683 }
684 }
685 }
686
687 postPoll();
688 }
689
690 if(pending_opt_request) MPI_Cancel( &opt_request );
691 if(pending_worker_request) MPI_Cancel( &worker_request );
692 if(pending_pilot_request) MPI_Cancel( &pilot_request );
693 }
694
695};
696
697#endif
#define MPI_WORK_JOBID_TAG
unique id of the job
Definition MPIHelper.h:52
#define MPI_EXCHANGE_SOL_STATE_DATA_TAG
Definition MPIHelper.h:59
#define MPI_OPT_JOB_FINISHED_TAG
pilot tells optimizer that results are ready to collect
Definition MPIHelper.h:46
#define MPI_EXCHANGE_SOL_STATE_RES_SIZE_TAG
Definition MPIHelper.h:60
#define MPI_EXCHANGE_SOL_STATE_RES_TAG
Definition MPIHelper.h:61
MPITag_t
Definition MPIHelper.h:71
@ WORKER_FINISHED_TAG
Definition MPIHelper.h:72
@ OPT_CONVERGED_TAG
Definition MPIHelper.h:74
@ OPT_NEW_JOB_TAG
Definition MPIHelper.h:73
@ WORKER_STATUSUPDATE_TAG
Definition MPIHelper.h:75
@ EXCHANGE_SOL_STATE_TAG
Definition MPIHelper.h:77
#define MPI_WORKER_FINISHED_ACK_TAG
pilot notifies worker that he is ready to collect the results
Definition MPIHelper.h:37
#define MPI_STOP_TAG
global stop tag to exit poll loop (
Definition MPIHelper.h:64
void MPI_Send_reqvars(reqVarContainer_t reqvars, size_t pid, MPI_Comm comm)
void MPI_Send_params(Param_t params, size_t pid, MPI_Comm comm)
Definition MPIHelper.cpp:87
void MPI_Recv_reqvars(reqVarContainer_t &reqvars, size_t pid, MPI_Comm comm)
void MPI_Recv_params(Param_t &params, size_t pid, MPI_Comm comm)
std::map< std::string, DVar_t > DVarContainer_t
Definition Types.h:92
std::map< std::string, reqVarInfo_t > reqVarContainer_t
Definition Types.h:79
namedVariableCollection_t Param_t
Definition Types.h:48
std::shared_ptr< CmdArguments > CmdArguments_t
std::map< std::string, client::function::type > functionDictionary_t
Definition Expression.h:56
std::map< std::string, Expressions::Expr_t * > Named_t
type of an expressions with a name
Definition Expression.h:74
bool has_opt_converged_
Definition Pilot.h:180
CmdArguments_t cmd_args_
Definition Pilot.h:164
int total_available_workers_
Definition Pilot.h:179
MPI_Comm coworker_comm_
MPI communicator used for messages between all pilots.
Definition Pilot.h:161
void setup(functionDictionary_t known_expr_funcs, const std::map< std::string, std::string > &userVariables)
Definition Pilot.h:202
std::string input_file_
input file for simulation with embedded optimization problem
Definition Pilot.h:177
std::map< size_t, std::pair< Param_t, reqVarContainer_t > > Jobs_t
keep track of requests and running jobs
Definition Pilot.h:193
virtual void setupPoll()
executed before starting polling loop
Definition Pilot.h:361
Jobs_t request_queue_
Definition Pilot.h:196
std::vector< bool > is_worker_idle_
Definition Pilot.h:190
virtual void startPilot()
Definition Pilot.h:307
int global_rank_
Definition Pilot.h:166
virtual void startWorker(const std::map< std::string, std::string > &userVariables)
Definition Pilot.h:285
virtual ~Pilot()
Definition Pilot.h:144
virtual void startOptimizer()
Definition Pilot.h:269
bool stop(bool isOpt=true)
Definition Pilot.h:569
virtual void prePoll()
executed before checking for new request
Definition Pilot.h:365
Jobs_t::iterator JobIter_t
Definition Pilot.h:194
int my_rank_in_worker_comm_
Definition Pilot.h:167
std::unique_ptr< Trace > job_trace_
Definition Pilot.h:199
Expressions::Named_t constraints_
constraints
Definition Pilot.h:184
virtual void onStop()
enable implementation to react to STOP tag
Definition Pilot.h:369
int my_rank_in_opt_comm_
Definition Pilot.h:168
virtual void sendNewJobToWorker(int worker)
Definition Pilot.h:405
std::unique_ptr< MasterNode_t > master_node_
Definition Pilot.h:174
MasterNode< typename Opt_t::SolutionState_t, SolPropagationGraph_t > MasterNode_t
Definition Pilot.h:173
Pilot(CmdArguments_t args, std::shared_ptr< Comm_t > comm, const DVarContainer_t &dvar)
Definition Pilot.h:105
int num_coworkers_
Definition Pilot.h:170
bool continue_polling_
Definition Pilot.h:181
std::vector< double > hypervolRef_
hypervolume reference point
Definition Pilot.h:186
MPI_Comm opt_comm_
MPI communicator used for messages to/from optimizer.
Definition Pilot.h:159
Expressions::Named_t objectives_
objectives
Definition Pilot.h:183
std::shared_ptr< Comm_t > comm_
Definition Pilot.h:163
MPI_Comm worker_comm_
MPI communicator used for messages to/from worker.
Definition Pilot.h:157
virtual void postPoll()
executed after handling (if any) new request
Definition Pilot.h:373
DVarContainer_t dvars_
design variables
Definition Pilot.h:185
void run()
Definition Pilot.h:602
virtual bool onMessage(MPI_Status status, size_t recv_value)
Definition Pilot.h:434
Pilot(CmdArguments_t args, std::shared_ptr< Comm_t > comm, functionDictionary_t known_expr_funcs)
Definition Pilot.h:115
Jobs_t running_job_list_
Definition Pilot.h:195
void parseInputFile(functionDictionary_t, bool isOptimizationRun)
Definition Pilot.h:234
Pilot(CmdArguments_t args, std::shared_ptr< Comm_t > comm, functionDictionary_t known_expr_funcs, const DVarContainer_t &dvar, const Expressions::Named_t &obj, const Expressions::Named_t &cons, std::vector< double > hypervolRef={}, bool isOptimizerRun=true, const std::map< std::string, std::string > &userVariables={})
Definition Pilot.h:124
Poller(MPI_Comm comm, double delay=0.1)
Definition Poller.h:39
MPI_Comm comm_m
communicator the poller listens to requests
Definition Poller.h:52
Definition Trace.h:31