Adaptyst
A comprehensive and architecture-agnostic performance analysis tool
Loading...
Searching...
No Matches
process.hpp
Go to the documentation of this file.
1// SPDX-FileCopyrightText: 2025 CERN
2// SPDX-License-Identifier: GPL-3.0-or-later
3
4#ifndef ADAPTYST_PROCESS_HPP_
5#define ADAPTYST_PROCESS_HPP_
6
7#include "socket.hpp"
8#include "os_detect.h"
9#include <unordered_map>
10#include <vector>
11#include <string>
12#include <filesystem>
13#include <functional>
14
15#ifdef ADAPTYST_UNIX
16#include <sys/wait.h>
17#endif
18
19namespace adaptyst {
20 namespace fs = std::filesystem;
21
30 class CPUConfig {
31 private:
32 bool valid;
33 int profiler_thread_count;
34#ifdef ADAPTYST_UNIX
35 cpu_set_t cpu_analysis_set;
36 cpu_set_t cpu_workflow_set;
37#endif
38
39 public:
45 this->valid = false;
46 }
47
58 CPUConfig(std::string mask) {
59 this->valid = false;
60 this->profiler_thread_count = 0;
61
62#ifdef ADAPTYST_UNIX
63 CPU_ZERO(&this->cpu_analysis_set);
64 CPU_ZERO(&this->cpu_workflow_set);
65#endif
66
67 if (!mask.empty()) {
68 this->valid = true;
69
70 for (int i = 0; i < mask.length(); i++) {
71 if (mask[i] == 'p') {
72 this->profiler_thread_count++;
73#ifdef ADAPTYST_UNIX
74 CPU_SET(i, &this->cpu_analysis_set);
75#endif
76 } else if (mask[i] == 'c') {
77#ifdef ADAPTYST_UNIX
78 CPU_SET(i, &this->cpu_workflow_set);
79#endif
80 } else if (mask[i] == 'b') {
81 this->profiler_thread_count++;
82#ifdef ADAPTYST_UNIX
83 CPU_SET(i, &this->cpu_analysis_set);
84 CPU_SET(i, &this->cpu_workflow_set);
85#endif
86 } else if (mask[i] != ' ') {
87 this->valid = false;
88 this->profiler_thread_count = 0;
89#ifdef ADAPTYST_UNIX
90 CPU_ZERO(&this->cpu_analysis_set);
91 CPU_ZERO(&this->cpu_workflow_set);
92#endif
93 return;
94 }
95 }
96 }
97 }
98
105 bool is_valid() const {
106 return this->valid;
107 }
108
114 return this->profiler_thread_count;
115 }
116
117#ifdef ADAPTYST_UNIX
122 cpu_set_t get_cpu_analysis_set() const {
123 return this->cpu_analysis_set;
124 }
125
130 cpu_set_t get_cpu_workflow_set() const {
131 return this->cpu_workflow_set;
132 }
133#endif
134 };
135
140 class Process {
141 private:
142 std::variant<std::vector<std::string>,
143 std::function<int()> > command;
144 std::unordered_map<std::string, std::string> env;
145 bool stdout_redirect;
146 bool stdout_terminal;
147 fs::path stdout_path;
148 bool stderr_redirect;
149 fs::path stderr_path;
150 bool notifiable;
151 bool writable;
152 unsigned int buf_size;
153 int exit_code;
154#ifdef ADAPTYST_UNIX
155 int notify_pipe[2];
156 int stdin_pipe[2];
157 int stdout_pipe[2];
158 int *stdout_fd;
159 std::unique_ptr<FileDescriptor> stdout_reader;
160 std::unique_ptr<FileDescriptor> stdin_writer;
161#endif
162 bool started;
163 bool completed;
164 int id;
165
166 inline void close_fd(int fd) {
167 if (fd != -1) {
168 close(fd);
169 }
170 }
171
172 void init(unsigned int buf_size) {
173 this->stdout_redirect = false;
174 this->stdout_terminal = false;
175 this->stderr_redirect = false;
176 this->notifiable = false;
177 this->started = false;
178 this->completed = false;
179 this->writable = true;
180 this->buf_size = buf_size;
181
182#ifdef ADAPTYST_UNIX
183 this->stdout_fd = nullptr;
184 this->notify_pipe[0] = -1;
185 this->notify_pipe[1] = -1;
186 this->stdin_pipe[0] = -1;
187 this->stdin_pipe[1] = -1;
188 this->stdout_pipe[0] = -1;
189 this->stdout_pipe[1] = -1;
190
191 char **cur_existing_env_entry = environ;
192
193 while (*cur_existing_env_entry != nullptr) {
194 char *cur_entry = *cur_existing_env_entry;
195
196 int sep_index = -1;
197 for (int i = 0; cur_entry[i]; i++) {
198 if (cur_entry[i] == '=') {
199 sep_index = i;
200 break;
201 }
202 }
203
204 if (sep_index == -1) {
205 continue;
206 }
207
208 std::string key(cur_entry, sep_index);
209 std::string value(cur_entry + sep_index + 1);
210
211 if (this->env.find(key) == this->env.end()) {
212 this->env[key] = value;
213 }
214
215 cur_existing_env_entry++;
216 }
217#endif
218 }
219
220 public:
225 static const int ERROR_START = 200;
226
231 static const int ERROR_STDOUT = 201;
232
237 static const int ERROR_STDERR = 202;
238
242 static const int ERROR_STDOUT_DUP2 = 203;
243
247 static const int ERROR_STDERR_DUP2 = 204;
248
252 static const int ERROR_AFFINITY = 205;
253
257 static const int ERROR_STDIN_DUP2 = 206;
258
262 static const int ERROR_NOT_FOUND = 207;
263
268 static const int ERROR_NO_ACCESS = 208;
269
273 static const int ERROR_SETENV = 209;
274
279 static const int ERROR_ABNORMAL_EXIT = 210;
280
288 Process(std::function<int()> command,
289 unsigned int buf_size = 1024) {
290 this->command = command;
291 this->init(buf_size);
292 }
293
300 Process(std::vector<std::string> &command,
301 unsigned int buf_size = 1024) {
302 if (command.empty()) {
304 }
305
306 this->command = command;
307 this->init(buf_size);
308 }
309
311 if (this->started) {
312#ifdef ADAPTYST_UNIX
313 if (this->writable &&
314 this->stdin_writer.get() != nullptr) {
315 this->stdin_writer->close();
316 }
317
318 if (this->notifiable) {
319 close_fd(this->notify_pipe[1]);
320 }
321
322 waitpid(this->id, nullptr, 0);
323#endif
324 }
325 }
326
333 void add_env(std::string key, std::string value) {
334 this->env[key] = value;
335 }
336
342 void set_redirect_stdout(fs::path path) {
343 this->stdout_redirect = true;
344 this->stdout_path = path;
345 }
346
351 this->stdout_redirect = true;
352 this->stdout_terminal = true;
353 }
354
361 this->stdout_redirect = true;
362
363#ifdef ADAPTYST_UNIX
364 this->stdout_fd = &process.stdin_pipe[1];
365 process.writable = false;
366#else
367 this->stdout_redirect = false;
369#endif
370 }
371
377 void set_redirect_stderr(fs::path path) {
378 this->stderr_redirect = true;
379 this->stderr_path = path;
380 }
381
398 int start(bool wait_for_notify, const CPUConfig &cpu_config,
399 bool is_profiler, fs::path working_path = fs::current_path()) {
400 if (wait_for_notify) {
401 this->notifiable = true;
402 }
403
404#ifdef ADAPTYST_UNIX
405 if (this->stdout_redirect && this->stdout_fd != nullptr &&
406 *(this->stdout_fd) == -1) {
408 }
409
410 std::vector<std::string> env_entries;
411
412 for (auto &entry : this->env) {
413 env_entries.push_back(entry.first + "=" + entry.second);
414 }
415
416 if (this->notifiable && pipe(this->notify_pipe) == -1) {
418 }
419
420 if (!this->stdout_redirect) {
421 if (pipe(this->stdout_pipe) == -1) {
422 if (this->notifiable) {
423 close_fd(this->notify_pipe[0]);
424 close_fd(this->notify_pipe[1]);
425 this->notifiable = false;
426 }
427
429 }
430
431 this->stdout_reader = std::make_unique<FileDescriptor>(this->stdout_pipe,
432 nullptr,
433 this->buf_size);
434 }
435
436 if (pipe(this->stdin_pipe) == -1) {
437 if (this->notifiable) {
438 close_fd(this->notify_pipe[0]);
439 close_fd(this->notify_pipe[1]);
440 this->notifiable = false;
441 }
442
443 if (!this->stdout_redirect) {
444 close_fd(this->stdout_pipe[0]);
445 close_fd(this->stdout_pipe[1]);
446 }
447
449 }
450
451 if (this->writable) {
452 this->stdin_writer = std::make_unique<FileDescriptor>(nullptr,
453 this->stdin_pipe,
454 this->buf_size);
455 }
456
457 pid_t forked = fork();
458
459 if (forked == 0) {
460 // This executed in a separate process with everything effectively
461 // copied (NOT shared!)
462
463 if (this->notifiable) {
464 close_fd(this->notify_pipe[1]);
465 char buf;
466 int bytes_read = 0;
467 int received = ::read(this->notify_pipe[0], &buf, 1);
468 close_fd(this->notify_pipe[0]);
469
470 if (received <= 0 || buf != 0x03) {
471 std::exit(Process::ERROR_START);
472 }
473 }
474
475 close_fd(this->stdin_pipe[1]);
476 close_fd(this->stdout_pipe[0]);
477
478 fs::current_path(working_path);
479
480 if (this->stderr_redirect) {
481 int stderr_fd = creat(this->stderr_path.c_str(),
482 S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH);
483
484 if (stderr_fd == -1) {
485 std::exit(Process::ERROR_STDERR);
486 }
487
488 if (dup2(stderr_fd, STDERR_FILENO) == -1) {
490 }
491
492 close_fd(stderr_fd);
493 }
494
495 if (this->stdout_redirect) {
496 if (!this->stdout_terminal) {
497 int stdout_fd;
498
499 if (this->stdout_fd == nullptr) {
500 stdout_fd = creat(this->stdout_path.c_str(),
501 S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH);
502
503 if (stdout_fd == -1) {
504 std::exit(Process::ERROR_STDOUT);
505 }
506 } else {
507 stdout_fd = *(this->stdout_fd);
508 }
509
510 if (dup2(stdout_fd, STDOUT_FILENO) == -1) {
512 }
513
514 close_fd(stdout_fd);
515 }
516 } else {
517 if (dup2(this->stdout_pipe[1], STDOUT_FILENO) == -1) {
519 }
520
521 close_fd(this->stdout_pipe[1]);
522 }
523
524 if (dup2(this->stdin_pipe[0], STDIN_FILENO) == -1) {
525 std::exit(Process::ERROR_STDIN_DUP2);
526 }
527
528 close_fd(this->stdin_pipe[0]);
529
530 if (this->command.index() == 0) {
531 std::vector<std::string> elems = std::get<0>(this->command);
532 char *argv[elems.size() + 1];
533
534 for (int i = 0; i < elems.size(); i++) {
535 argv[i] = (char *)elems[i].c_str();
536 }
537
538 argv[elems.size()] = nullptr;
539
540 char *env[env_entries.size() + 1];
541
542 for (int i = 0; i < env_entries.size(); i++) {
543 env[i] = (char *)env_entries[i].c_str();
544 }
545
546 env[env_entries.size()] = nullptr;
547
548 if (cpu_config.is_valid()) {
549 cpu_set_t affinity = is_profiler ? cpu_config.get_cpu_analysis_set() :
550 cpu_config.get_cpu_workflow_set();
551
552 if (sched_setaffinity(0, sizeof(affinity), &affinity) == -1) {
553 std::exit(Process::ERROR_AFFINITY);
554 }
555 }
556
557 execvpe(elems[0].c_str(), argv, env);
558
559 // This is reached only if execvpe fails
560 switch (errno) {
561 case ENOENT:
562 std::exit(Process::ERROR_NOT_FOUND);
563
564 case EACCES:
565 std::exit(Process::ERROR_NO_ACCESS);
566
567 default:
568 std::exit(errno);
569 }
570 } else {
571 std::function<int()> func = std::get<1>(this->command);
572
573 for (auto &entry : this->env) {
574 if (setenv(entry.first.c_str(), entry.second.c_str(), 1) == -1) {
575 std::exit(Process::ERROR_SETENV);
576 }
577 }
578
579 std::exit(func());
580 }
581 }
582
583 if (this->notifiable) {
584 close_fd(this->notify_pipe[0]);
585 }
586
587 close_fd(this->stdin_pipe[0]);
588
589 if (this->stdout_redirect && this->stdout_fd != nullptr) {
590 close_fd(*(this->stdout_fd));
591 } else if (!this->stdout_redirect) {
592 close_fd(this->stdout_pipe[1]);
593 }
594
595 if (forked == -1) {
596 if (this->notifiable) {
597 close_fd(this->notify_pipe[1]);
598 this->notifiable = false;
599 }
600
602 }
603
604 this->started = true;
605 this->id = forked;
606 return forked;
607#else
608 this->notifiable = false;
610#endif
611 }
612
625 int start(fs::path working_path = fs::current_path()) {
626 return start(false, CPUConfig(""), false, working_path);
627 }
628
633 void notify() {
634 if (this->started) {
635 if (this->notifiable) {
636#ifdef ADAPTYST_UNIX
637 FileDescriptor notify_writer(nullptr, this->notify_pipe,
638 this->buf_size);
639 char to_send = 0x03;
640 notify_writer.write(1, &to_send);
641 this->notifiable = false;
642#else
644#endif
645 } else {
647 }
648 } else {
650 }
651 }
652
656 std::string read_line() {
657 if (this->stdout_redirect) {
659 }
660
661#ifdef ADAPTYST_UNIX
662 return this->stdout_reader->read();
663#else
665#endif
666 }
667
674 void write_stdin(char *buf, unsigned int size) {
675 if (this->started) {
676 if (this->writable) {
677#ifdef ADAPTYST_UNIX
678 this->stdin_writer->write(size, buf);
679#else
681#endif
682 } else {
684 }
685 } else {
687 }
688 }
689
695 int join() {
696 if (this->started) {
697#ifdef ADAPTYST_UNIX
698 int status;
699 int result = waitpid(this->id, &status, 0);
700
701 if (result != this->id) {
703 }
704
705 this->started = false;
706 this->notifiable = false;
707 this->completed = true;
708 if (WIFEXITED(status)) {
709 this->exit_code = WEXITSTATUS(status);
710 } else {
711 this->exit_code = ERROR_ABNORMAL_EXIT;
712 }
713
714 return this->exit_code;
715#else
717#endif
718 } else if (this->completed) {
719 return this->exit_code;
720 } else {
722 }
723 }
724
730 bool is_running() {
731 if (!this->started) {
732 return false;
733 }
734
735#ifdef ADAPTYST_UNIX
736 return waitpid(this->id, nullptr, WNOHANG) == 0;
737#else
739#endif
740 }
741
746 void close_stdin() {
747 if (!this->writable) {
749 }
750
751#ifdef ADAPTYST_UNIX
752 this->stdin_writer->close();
753 this->writable = false;
754#else
756#endif
757 }
758
762 void terminate() {
763#ifdef ADAPTYST_UNIX
764 kill(this->id, SIGTERM);
765#else
767#endif
768 }
769
774 class NotReadableException : public std::exception { };
775
779 class NotWritableException : public std::exception { };
780
786 class StartException : public std::exception { };
787
792 class EmptyCommandException : public std::exception { };
793
797 class WaitException : public std::exception { };
798
802 class NotStartedException : public std::exception { };
803
808 class NotNotifiableException : public std::exception { };
809
814 class NotImplementedException : public std::exception { };
815 };
816};
817
818#endif
Definition process.hpp:30
CPUConfig()
Definition process.hpp:44
int get_profiler_thread_count() const
Definition process.hpp:113
CPUConfig(std::string mask)
Definition process.hpp:58
bool is_valid() const
Definition process.hpp:105
Definition process.hpp:774
Definition process.hpp:802
Definition process.hpp:779
Definition process.hpp:786
Definition process.hpp:797
Process(std::vector< std::string > &command, unsigned int buf_size=1024)
Definition process.hpp:300
void set_redirect_stdout(fs::path path)
Definition process.hpp:342
void set_redirect_stdout(Process &process)
Definition process.hpp:360
static const int ERROR_SETENV
Definition process.hpp:273
static const int ERROR_STDERR
Definition process.hpp:237
void set_redirect_stdout_to_terminal()
Definition process.hpp:350
static const int ERROR_AFFINITY
Definition process.hpp:252
static const int ERROR_STDERR_DUP2
Definition process.hpp:247
int join()
Definition process.hpp:695
static const int ERROR_STDOUT
Definition process.hpp:231
int start(fs::path working_path=fs::current_path())
Definition process.hpp:625
static const int ERROR_ABNORMAL_EXIT
Definition process.hpp:279
static const int ERROR_START
Definition process.hpp:225
static const int ERROR_STDOUT_DUP2
Definition process.hpp:242
~Process()
Definition process.hpp:310
static const int ERROR_STDIN_DUP2
Definition process.hpp:257
void set_redirect_stderr(fs::path path)
Definition process.hpp:377
static const int ERROR_NO_ACCESS
Definition process.hpp:268
void write_stdin(char *buf, unsigned int size)
Definition process.hpp:674
std::string read_line()
Definition process.hpp:656
void notify()
Definition process.hpp:633
Process(std::function< int()> command, unsigned int buf_size=1024)
Definition process.hpp:288
void close_stdin()
Definition process.hpp:746
int start(bool wait_for_notify, const CPUConfig &cpu_config, bool is_profiler, fs::path working_path=fs::current_path())
Definition process.hpp:398
void add_env(std::string key, std::string value)
Definition process.hpp:333
void terminate()
Definition process.hpp:762
static const int ERROR_NOT_FOUND
Definition process.hpp:262
bool is_running()
Definition process.hpp:730
Definition output.hpp:12