Adaptyst
A comprehensive and architecture-agnostic performance analysis tool
Loading...
Searching...
No Matches
process.hpp
Go to the documentation of this file.
1// SPDX-FileCopyrightText: 2026 CERN
2// SPDX-License-Identifier: LGPL-3.0-or-later
3
4#ifndef ADAPTYST_PROCESS_HPP_
5#define ADAPTYST_PROCESS_HPP_
6
7#include "socket.hpp"
8#include "os_detect.h"
9#include <variant>
10#include <unordered_map>
11#include <vector>
12#include <string>
13#include <filesystem>
14#include <functional>
15
16#ifdef ADAPTYST_UNIX
17#include <sys/wait.h>
18#endif
19
20namespace adaptyst {
21 namespace fs = std::filesystem;
22
31 class CPUConfig {
32 private:
33 bool valid;
34 int profiler_thread_count;
35#ifdef ADAPTYST_UNIX
36 cpu_set_t cpu_analysis_set;
37 cpu_set_t cpu_workflow_set;
38#endif
39
40 public:
46 this->valid = false;
47 }
48
59 CPUConfig(std::string mask) {
60 this->valid = false;
61 this->profiler_thread_count = 0;
62
63#ifdef ADAPTYST_UNIX
64 CPU_ZERO(&this->cpu_analysis_set);
65 CPU_ZERO(&this->cpu_workflow_set);
66#endif
67
68 if (!mask.empty()) {
69 this->valid = true;
70
71 for (int i = 0; i < mask.length(); i++) {
72 if (mask[i] == 'p') {
73 this->profiler_thread_count++;
74#ifdef ADAPTYST_UNIX
75 CPU_SET(i, &this->cpu_analysis_set);
76#endif
77 } else if (mask[i] == 'c') {
78#ifdef ADAPTYST_UNIX
79 CPU_SET(i, &this->cpu_workflow_set);
80#endif
81 } else if (mask[i] == 'b') {
82 this->profiler_thread_count++;
83#ifdef ADAPTYST_UNIX
84 CPU_SET(i, &this->cpu_analysis_set);
85 CPU_SET(i, &this->cpu_workflow_set);
86#endif
87 } else if (mask[i] != ' ') {
88 this->valid = false;
89 this->profiler_thread_count = 0;
90#ifdef ADAPTYST_UNIX
91 CPU_ZERO(&this->cpu_analysis_set);
92 CPU_ZERO(&this->cpu_workflow_set);
93#endif
94 return;
95 }
96 }
97 }
98 }
99
106 bool is_valid() const {
107 return this->valid;
108 }
109
115 return this->profiler_thread_count;
116 }
117
118#ifdef ADAPTYST_UNIX
123 cpu_set_t get_cpu_analysis_set() const {
124 return this->cpu_analysis_set;
125 }
126
131 cpu_set_t get_cpu_workflow_set() const {
132 return this->cpu_workflow_set;
133 }
134#endif
135 };
136
141 class Process {
142 private:
143 std::variant<std::vector<std::string>,
144 std::function<int()> > command;
145 std::unordered_map<std::string, std::string> env;
146 bool stdout_redirect;
147 bool stdout_terminal;
148 fs::path stdout_path;
149 bool stderr_redirect;
150 fs::path stderr_path;
151 bool notifiable;
152 bool writable;
153 unsigned int buf_size;
154 int exit_code;
155 std::function<void()> notify_callback;
156#ifdef ADAPTYST_UNIX
157 int notify_pipe[2];
158 int stdin_pipe[2];
159 int stdout_pipe[2];
160 int *stdout_fd;
161 std::unique_ptr<FileDescriptor> stdout_reader;
162 std::unique_ptr<FileDescriptor> stdin_writer;
163#endif
164 bool started;
165 bool completed;
166 int id;
167
168 inline void close_fd(int fd) {
169 if (fd != -1) {
170 close(fd);
171 }
172 }
173
174 void init(unsigned int buf_size) {
175 this->stdout_redirect = false;
176 this->stdout_terminal = false;
177 this->stderr_redirect = false;
178 this->notifiable = false;
179 this->started = false;
180 this->completed = false;
181 this->writable = true;
182 this->buf_size = buf_size;
183
184#ifdef ADAPTYST_UNIX
185 this->stdout_fd = nullptr;
186 this->notify_pipe[0] = -1;
187 this->notify_pipe[1] = -1;
188 this->stdin_pipe[0] = -1;
189 this->stdin_pipe[1] = -1;
190 this->stdout_pipe[0] = -1;
191 this->stdout_pipe[1] = -1;
192
193 char **cur_existing_env_entry = environ;
194
195 while (*cur_existing_env_entry != nullptr) {
196 char *cur_entry = *cur_existing_env_entry;
197
198 int sep_index = -1;
199 for (int i = 0; cur_entry[i]; i++) {
200 if (cur_entry[i] == '=') {
201 sep_index = i;
202 break;
203 }
204 }
205
206 if (sep_index == -1) {
207 continue;
208 }
209
210 std::string key(cur_entry, sep_index);
211 std::string value(cur_entry + sep_index + 1);
212
213 if (this->env.find(key) == this->env.end()) {
214 this->env[key] = value;
215 }
216
217 cur_existing_env_entry++;
218 }
219#endif
220 }
221
222 public:
227 static const int ERROR_START = 200;
228
233 static const int ERROR_STDOUT = 201;
234
239 static const int ERROR_STDERR = 202;
240
244 static const int ERROR_STDOUT_DUP2 = 203;
245
249 static const int ERROR_STDERR_DUP2 = 204;
250
254 static const int ERROR_AFFINITY = 205;
255
259 static const int ERROR_STDIN_DUP2 = 206;
260
264 static const int ERROR_NOT_FOUND = 207;
265
270 static const int ERROR_NO_ACCESS = 208;
271
275 static const int ERROR_SETENV = 209;
276
281 static const int ERROR_ABNORMAL_EXIT = 210;
282
292 Process(std::function<int()> command,
293 std::function<void()> notify_callback = [](){},
294 unsigned int buf_size = 1024) {
295 this->command = command;
296 this->notify_callback = notify_callback;
297 this->init(buf_size);
298 }
299
308 Process(std::vector<std::string> &command,
309 std::function<void()> notify_callback = [](){},
310 unsigned int buf_size = 1024) {
311 if (command.empty()) {
313 }
314
315 this->command = command;
316 this->notify_callback = notify_callback;
317 this->init(buf_size);
318 }
319
321 if (this->started) {
322#ifdef ADAPTYST_UNIX
323 if (this->writable &&
324 this->stdin_writer.get() != nullptr) {
325 this->stdin_writer->close();
326 }
327
328 if (this->notifiable) {
329 close_fd(this->notify_pipe[1]);
330 }
331
332 waitpid(this->id, nullptr, 0);
333#endif
334 }
335 }
336
343 void add_env(std::string key, std::string value) {
344 this->env[key] = value;
345 }
346
352 void set_redirect_stdout(fs::path path) {
353 this->stdout_redirect = true;
354 this->stdout_path = path;
355 }
356
361 this->stdout_redirect = true;
362 this->stdout_terminal = true;
363 }
364
371 this->stdout_redirect = true;
372
373#ifdef ADAPTYST_UNIX
374 this->stdout_fd = &process.stdin_pipe[1];
375 process.writable = false;
376#else
377 this->stdout_redirect = false;
379#endif
380 }
381
387 void set_redirect_stderr(fs::path path) {
388 this->stderr_redirect = true;
389 this->stderr_path = path;
390 }
391
408 int start(bool wait_for_notify, const CPUConfig &cpu_config,
409 bool is_analysis, fs::path working_path = fs::current_path()) {
410 if (wait_for_notify) {
411 this->notifiable = true;
412 }
413
414#ifdef ADAPTYST_UNIX
415 if (this->stdout_redirect && this->stdout_fd != nullptr &&
416 *(this->stdout_fd) == -1) {
418 }
419
420 std::vector<std::string> env_entries;
421
422 for (auto &entry : this->env) {
423 env_entries.push_back(entry.first + "=" + entry.second);
424 }
425
426 if (this->notifiable && pipe(this->notify_pipe) == -1) {
428 }
429
430 if (!this->stdout_redirect) {
431 if (pipe(this->stdout_pipe) == -1) {
432 if (this->notifiable) {
433 close_fd(this->notify_pipe[0]);
434 close_fd(this->notify_pipe[1]);
435 this->notifiable = false;
436 }
437
439 }
440
441 this->stdout_reader = std::make_unique<FileDescriptor>(this->stdout_pipe,
442 nullptr,
443 this->buf_size);
444 }
445
446 if (pipe(this->stdin_pipe) == -1) {
447 if (this->notifiable) {
448 close_fd(this->notify_pipe[0]);
449 close_fd(this->notify_pipe[1]);
450 this->notifiable = false;
451 }
452
453 if (!this->stdout_redirect) {
454 close_fd(this->stdout_pipe[0]);
455 close_fd(this->stdout_pipe[1]);
456 }
457
459 }
460
461 if (this->writable) {
462 this->stdin_writer = std::make_unique<FileDescriptor>(nullptr,
463 this->stdin_pipe,
464 this->buf_size);
465 }
466
467 pid_t forked = fork();
468
469 if (forked == 0) {
470 // This executed in a separate process with everything effectively
471 // copied (NOT shared!)
472
473 if (this->notifiable) {
474 close_fd(this->notify_pipe[1]);
475 char buf;
476 int bytes_read = 0;
477 int received = ::read(this->notify_pipe[0], &buf, 1);
478 close_fd(this->notify_pipe[0]);
479
480 if (received <= 0 || buf != 0x03) {
481 std::exit(Process::ERROR_START);
482 }
483 }
484
485 close_fd(this->stdin_pipe[1]);
486 close_fd(this->stdout_pipe[0]);
487
488 fs::current_path(working_path);
489
490 if (this->stderr_redirect) {
491 int stderr_fd = creat(this->stderr_path.c_str(),
492 S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH);
493
494 if (stderr_fd == -1) {
495 std::exit(Process::ERROR_STDERR);
496 }
497
498 if (dup2(stderr_fd, STDERR_FILENO) == -1) {
500 }
501
502 close_fd(stderr_fd);
503 }
504
505 if (this->stdout_redirect) {
506 if (!this->stdout_terminal) {
507 int stdout_fd;
508
509 if (this->stdout_fd == nullptr) {
510 stdout_fd = creat(this->stdout_path.c_str(),
511 S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH);
512
513 if (stdout_fd == -1) {
514 std::exit(Process::ERROR_STDOUT);
515 }
516 } else {
517 stdout_fd = *(this->stdout_fd);
518 }
519
520 if (dup2(stdout_fd, STDOUT_FILENO) == -1) {
522 }
523
524 close_fd(stdout_fd);
525 }
526 } else {
527 if (dup2(this->stdout_pipe[1], STDOUT_FILENO) == -1) {
529 }
530
531 close_fd(this->stdout_pipe[1]);
532 }
533
534 if (dup2(this->stdin_pipe[0], STDIN_FILENO) == -1) {
535 std::exit(Process::ERROR_STDIN_DUP2);
536 }
537
538 close_fd(this->stdin_pipe[0]);
539
540 if (this->command.index() == 0) {
541 std::vector<std::string> elems = std::get<0>(this->command);
542 char *argv[elems.size() + 1];
543
544 for (int i = 0; i < elems.size(); i++) {
545 argv[i] = (char *)elems[i].c_str();
546 }
547
548 argv[elems.size()] = nullptr;
549
550 char *env[env_entries.size() + 1];
551
552 for (int i = 0; i < env_entries.size(); i++) {
553 env[i] = (char *)env_entries[i].c_str();
554 }
555
556 env[env_entries.size()] = nullptr;
557
558 if (cpu_config.is_valid()) {
559 cpu_set_t affinity = is_analysis ? cpu_config.get_cpu_analysis_set() :
560 cpu_config.get_cpu_workflow_set();
561
562 if (sched_setaffinity(0, sizeof(affinity), &affinity) == -1) {
563 std::exit(Process::ERROR_AFFINITY);
564 }
565 }
566
567 execvpe(elems[0].c_str(), argv, env);
568
569 // This is reached only if execvpe fails
570 switch (errno) {
571 case ENOENT:
572 std::exit(Process::ERROR_NOT_FOUND);
573
574 case EACCES:
575 std::exit(Process::ERROR_NO_ACCESS);
576
577 default:
578 std::exit(errno);
579 }
580 } else {
581 std::function<int()> func = std::get<1>(this->command);
582
583 for (auto &entry : this->env) {
584 if (setenv(entry.first.c_str(), entry.second.c_str(), 1) == -1) {
585 std::exit(Process::ERROR_SETENV);
586 }
587 }
588
589 std::exit(func());
590 }
591 }
592
593 if (this->notifiable) {
594 close_fd(this->notify_pipe[0]);
595 }
596
597 close_fd(this->stdin_pipe[0]);
598
599 if (this->stdout_redirect && this->stdout_fd != nullptr) {
600 close_fd(*(this->stdout_fd));
601 } else if (!this->stdout_redirect) {
602 close_fd(this->stdout_pipe[1]);
603 }
604
605 if (forked == -1) {
606 if (this->notifiable) {
607 close_fd(this->notify_pipe[1]);
608 this->notifiable = false;
609 }
610
612 }
613
614 this->started = true;
615 this->id = forked;
616 return forked;
617#else
618 this->notifiable = false;
620#endif
621 }
622
635 int start(fs::path working_path = fs::current_path()) {
636 return start(false, CPUConfig(""), false, working_path);
637 }
638
643 void notify() {
644 if (this->started) {
645 if (this->notifiable) {
646 this->notify_callback();
647#ifdef ADAPTYST_UNIX
648 FileDescriptor notify_writer(nullptr, this->notify_pipe,
649 this->buf_size);
650 char to_send = 0x03;
651 notify_writer.write(1, &to_send);
652 this->notifiable = false;
653#else
655#endif
656 } else {
658 }
659 } else {
661 }
662 }
663
667 std::string read_line() {
668 if (this->stdout_redirect) {
670 }
671
672#ifdef ADAPTYST_UNIX
673 return this->stdout_reader->read();
674#else
676#endif
677 }
678
685 void write_stdin(char *buf, unsigned int size) {
686 if (this->started) {
687 if (this->writable) {
688#ifdef ADAPTYST_UNIX
689 this->stdin_writer->write(size, buf);
690#else
692#endif
693 } else {
695 }
696 } else {
698 }
699 }
700
706 int join() {
707 if (this->started) {
708#ifdef ADAPTYST_UNIX
709 int status;
710 int result = waitpid(this->id, &status, 0);
711
712 if (result != this->id) {
714 }
715
716 this->started = false;
717 this->notifiable = false;
718 this->completed = true;
719 if (WIFEXITED(status)) {
720 this->exit_code = WEXITSTATUS(status);
721 } else {
722 this->exit_code = ERROR_ABNORMAL_EXIT;
723 }
724
725 return this->exit_code;
726#else
728#endif
729 } else if (this->completed) {
730 return this->exit_code;
731 } else {
733 }
734 }
735
741 bool is_running() {
742 if (!this->started) {
743 return false;
744 }
745
746#ifdef ADAPTYST_UNIX
747 return waitpid(this->id, nullptr, WNOHANG) == 0;
748#else
750#endif
751 }
752
757 void close_stdin() {
758 if (!this->writable) {
760 }
761
762#ifdef ADAPTYST_UNIX
763 this->stdin_writer->close();
764 this->writable = false;
765#else
767#endif
768 }
769
773 void terminate() {
774#ifdef ADAPTYST_UNIX
775 kill(this->id, SIGTERM);
776#else
778#endif
779 }
780
785 class NotReadableException : public std::exception { };
786
790 class NotWritableException : public std::exception { };
791
797 class StartException : public std::exception { };
798
803 class EmptyCommandException : public std::exception { };
804
808 class WaitException : public std::exception { };
809
813 class NotStartedException : public std::exception { };
814
819 class NotNotifiableException : public std::exception { };
820
825 class NotImplementedException : public std::exception { };
826 };
827};
828
829#endif
Definition process.hpp:31
CPUConfig()
Definition process.hpp:45
int get_profiler_thread_count() const
Definition process.hpp:114
CPUConfig(std::string mask)
Definition process.hpp:59
bool is_valid() const
Definition process.hpp:106
Definition process.hpp:785
Definition process.hpp:813
Definition process.hpp:790
Definition process.hpp:797
Definition process.hpp:808
void set_redirect_stdout(fs::path path)
Definition process.hpp:352
void set_redirect_stdout(Process &process)
Definition process.hpp:370
static const int ERROR_SETENV
Definition process.hpp:275
static const int ERROR_STDERR
Definition process.hpp:239
void set_redirect_stdout_to_terminal()
Definition process.hpp:360
static const int ERROR_AFFINITY
Definition process.hpp:254
Process(std::vector< std::string > &command, std::function< void()> notify_callback=[](){}, unsigned int buf_size=1024)
Definition process.hpp:308
static const int ERROR_STDERR_DUP2
Definition process.hpp:249
Process(std::function< int()> command, std::function< void()> notify_callback=[](){}, unsigned int buf_size=1024)
Definition process.hpp:292
int join()
Definition process.hpp:706
static const int ERROR_STDOUT
Definition process.hpp:233
int start(fs::path working_path=fs::current_path())
Definition process.hpp:635
static const int ERROR_ABNORMAL_EXIT
Definition process.hpp:281
static const int ERROR_START
Definition process.hpp:227
int start(bool wait_for_notify, const CPUConfig &cpu_config, bool is_analysis, fs::path working_path=fs::current_path())
Definition process.hpp:408
static const int ERROR_STDOUT_DUP2
Definition process.hpp:244
~Process()
Definition process.hpp:320
static const int ERROR_STDIN_DUP2
Definition process.hpp:259
void set_redirect_stderr(fs::path path)
Definition process.hpp:387
static const int ERROR_NO_ACCESS
Definition process.hpp:270
void write_stdin(char *buf, unsigned int size)
Definition process.hpp:685
std::string read_line()
Definition process.hpp:667
void notify()
Definition process.hpp:643
void close_stdin()
Definition process.hpp:757
void add_env(std::string key, std::string value)
Definition process.hpp:343
void terminate()
Definition process.hpp:773
static const int ERROR_NOT_FOUND
Definition process.hpp:264
bool is_running()
Definition process.hpp:741
Definition archive.cpp:7