4#ifndef ADAPTYST_SOCKET_HPP_
5#define ADAPTYST_SOCKET_HPP_
13#include <Poco/Net/ServerSocket.h>
18#include <Poco/Buffer.h>
19#include <Poco/Net/NetException.h>
20#include <Poco/StreamCopier.h>
21#include <Poco/FileStream.h>
22#include <Poco/Net/SocketStream.h>
24#define UNLIMITED_ACCEPTED -1
27#ifndef FILE_BUFFER_SIZE
28#define FILE_BUFFER_SIZE 1048576
32 namespace net = Poco::Net;
33 namespace fs = std::filesystem;
38 this->setg(begin.get(), begin.get(), begin.get() + length - 1);
94 virtual int read(
char *buf,
unsigned int len,
long timeout_seconds) = 0;
117 virtual void write(std::string msg,
bool new_line =
true) = 0;
126 virtual void write(fs::path file) = 0;
136 virtual void write(
unsigned int len,
char *buf) = 0;
165 virtual int read(
char *buf,
unsigned int len,
long timeout_seconds) = 0;
167 virtual void write(std::string msg,
bool new_line =
true) = 0;
168 virtual void write(fs::path file) = 0;
169 virtual void write(
unsigned int len,
char *buf) = 0;
189 this->max_accepted = max_accepted;
251 std::unique_ptr<Connection>
accept(
unsigned int buf_size,
254 this->accepted >= this->max_accepted) {
255 throw std::runtime_error(
"Maximum accepted connections reached.");
289 net::StreamSocket socket;
290 std::unique_ptr<char[]> buf;
291 unsigned int buf_size;
293 std::queue<std::string> buffered_msgs;
306 TCPSocket(net::StreamSocket &sock,
unsigned int buf_size) {
308 this->buf.reset(
new char[buf_size]);
309 this->buf_size = buf_size;
318 return this->socket.address().host().toString();
322 return this->socket.address().port();
326 return this->buf_size;
329 int read(
char *buf,
unsigned int len,
long timeout_seconds) {
331 this->socket.setReceiveTimeout(Poco::Timespan(timeout_seconds, 0));
332 int bytes = this->socket.receiveBytes(buf, len);
333 this->socket.setReceiveTimeout(Poco::Timespan());
335 }
catch (net::NetException &e) {
336 this->socket.setReceiveTimeout(Poco::Timespan());
338 }
catch (Poco::TimeoutException &e) {
339 this->socket.setReceiveTimeout(Poco::Timespan());
346 if (!this->buffered_msgs.empty()) {
347 std::string msg = this->buffered_msgs.front();
348 this->buffered_msgs.pop();
352 std::string cur_msg =
"";
359 this->socket.receiveBytes(this->buf.get() + this->start_pos,
360 this->buf_size - this->start_pos);
363 this->
read(this->buf.get() + this->start_pos,
364 this->buf_size - this->start_pos, timeout_seconds);
367 if (bytes_received == 0) {
368 return std::string(this->buf.get(), this->start_pos);
371 bool first_msg_to_receive =
true;
372 std::string first_msg;
374 charstreambuf buf(this->buf, bytes_received + this->start_pos);
375 std::istream in(&buf);
378 bool last_is_newline = this->buf.get()[bytes_received + this->start_pos - 1] ==
'\n';
382 std::getline(in, msg);
384 if (in.eof() && !last_is_newline) {
385 int size = bytes_received + this->start_pos - cur_pos;
387 if (size == this->buf_size) {
388 cur_msg += std::string(this->buf.get(), this->buf_size);
391 std::memmove(this->buf.get(), this->buf.get() + cur_pos, size);
392 this->start_pos = size;
395 if (!cur_msg.empty() || !msg.empty()) {
396 if (first_msg_to_receive) {
397 first_msg = cur_msg + msg;
398 first_msg_to_receive =
false;
400 this->buffered_msgs.push(cur_msg + msg);
406 cur_pos += msg.length() + 1;
410 if (last_is_newline) {
414 if (!first_msg_to_receive) {
421 }
catch (net::NetException &e) {
426 void write(std::string msg,
bool new_line) {
432 const char *buf = msg.c_str();
434 int bytes_written = this->socket.sendBytes(buf, msg.size());
436 if (bytes_written != msg.size()) {
437 std::runtime_error err(
"Wrote " +
438 std::to_string(bytes_written) +
439 " bytes instead of " +
440 std::to_string(msg.size()) +
442 this->socket.address().toString());
445 }
catch (net::NetException &e) {
452 net::SocketStream socket_stream(this->socket);
453 Poco::FileInputStream stream(file, std::ios::in | std::ios::binary);
454 Poco::StreamCopier::copyStream(stream, socket_stream);
455 }
catch (net::NetException &e) {
460 void write(
unsigned int len,
char *buf) {
462 int bytes_written = this->socket.sendBytes(buf, len);
463 if (bytes_written != len) {
464 std::runtime_error err(
"Wrote " +
465 std::to_string(bytes_written) +
466 " bytes instead of " +
467 std::to_string(len) +
469 this->socket.address().toString());
472 }
catch (net::NetException &e) {
483 net::ServerSocket acceptor;
485 TCPAcceptor(std::string address,
unsigned short port,
487 bool try_subsequent_ports) :
Acceptor(max_accepted) {
488 if (try_subsequent_ports) {
489 bool success =
false;
492 this->acceptor.bind(net::SocketAddress(address, port),
false);
494 }
catch (net::NetException &e) {
495 if (e.message().find(
"already in use") != std::string::npos) {
504 this->acceptor.bind(net::SocketAddress(address, port),
false);
505 }
catch (net::NetException &e) {
506 if (e.message().find(
"already in use") != std::string::npos) {
515 this->acceptor.listen();
516 }
catch (net::NetException &e) {
525 net::StreamSocket socket = this->acceptor.acceptConnection();
526 return std::make_unique<TCPSocket>(socket, buf_size);
527 }
catch (net::NetException &e) {
532 void close() { this->acceptor.close(); }
542 bool try_subsequent_ports;
556 Factory(std::string address,
unsigned short port,
557 bool try_subsequent_ports =
false) {
558 this->address = address;
560 this->try_subsequent_ports = try_subsequent_ports;
564 return std::unique_ptr<Acceptor>(
new TCPAcceptor(this->address,
567 this->try_subsequent_ports));
583 return this->acceptor.address().host().toString() +
"_" + std::to_string(this->acceptor.address().port());
594 class FileDescriptor :
public Connection {
598 unsigned int buf_size;
599 std::queue<std::string> buffered_msgs;
600 std::unique_ptr<char[]> buf;
615 FileDescriptor(
int read_fd[2],
617 unsigned int buf_size) {
618 this->buf.reset(
new char[buf_size]);
619 this->buf_size = buf_size;
622 if (read_fd !=
nullptr) {
623 this->read_fd[0] = read_fd[0];
624 this->read_fd[1] = read_fd[1];
626 this->read_fd[0] = -1;
627 this->read_fd[1] = -1;
630 if (write_fd !=
nullptr) {
631 this->write_fd[0] = write_fd[0];
632 this->write_fd[1] = write_fd[1];
634 this->write_fd[0] = -1;
635 this->write_fd[1] = -1;
643 int read(
char *buf,
unsigned int len,
long timeout_seconds) {
644 struct pollfd poll_struct;
645 poll_struct.fd = this->read_fd[0];
646 poll_struct.events = POLLIN;
648 int code = ::poll(&poll_struct, 1, 1000 * timeout_seconds);
651 throw ConnectionException();
652 }
else if (code == 0) {
653 throw TimeoutException();
656 return ::read(this->read_fd[0], buf, len);
659 std::string read(
long timeout_seconds =
NO_TIMEOUT) {
660 if (!this->buffered_msgs.empty()) {
661 std::string msg = this->buffered_msgs.front();
662 this->buffered_msgs.pop();
666 std::string cur_msg =
"";
673 ::read(this->read_fd[0], this->buf.get() + this->start_pos,
674 this->buf_size - this->start_pos);
676 if (bytes_received == -1) {
677 throw ConnectionException();
680 bytes_received = this->read(this->buf.get() + this->start_pos,
681 this->buf_size - this->start_pos,
685 if (bytes_received == 0) {
686 return std::string(this->buf.get(), this->start_pos);
689 bool first_msg_to_receive =
true;
690 std::string first_msg;
692 charstreambuf buf(this->buf, bytes_received + this->start_pos);
693 std::istream in(&buf);
696 bool last_is_newline = this->buf.get()[bytes_received + this->start_pos - 1] ==
'\n';
700 std::getline(in, msg);
702 if (in.eof() && !last_is_newline) {
703 int size = bytes_received + this->start_pos - cur_pos;
705 if (size == this->buf_size) {
706 cur_msg += std::string(this->buf.get(), this->buf_size);
709 std::memmove(this->buf.get(), this->buf.get() + cur_pos, size);
710 this->start_pos = size;
713 if (!cur_msg.empty() || !msg.empty()) {
714 if (first_msg_to_receive) {
715 first_msg = cur_msg + msg;
716 first_msg_to_receive =
false;
718 this->buffered_msgs.push(cur_msg + msg);
724 cur_pos += msg.length() + 1;
728 if (last_is_newline) {
732 if (!first_msg_to_receive) {
741 void write(std::string msg,
bool new_line) {
746 const char *buf = msg.c_str();
747 int written = ::write(this->write_fd[1], buf, msg.size());
749 if (written != msg.size()) {
750 std::runtime_error err(
"Wrote " +
751 std::to_string(written) +
752 " bytes instead of " +
753 std::to_string(msg.size()) +
755 std::to_string(this->write_fd[1]));
756 throw ConnectionException(err);
760 void write(fs::path file) {
762 std::ifstream file_stream(file, std::ios_base::in |
763 std::ios_base::binary);
766 std::runtime_error err(
"Could not open the file " +
767 file.string() +
"!");
768 throw ConnectionException(err);
771 while (file_stream) {
773 int bytes_read = file_stream.gcount();
774 int bytes_written = ::write(this->write_fd[1], buf.get(),
777 if (bytes_written != bytes_read) {
778 std::runtime_error err(
"Wrote " +
779 std::to_string(bytes_written) +
780 " bytes instead of " +
781 std::to_string(bytes_read) +
783 std::to_string(this->write_fd[1]));
784 throw ConnectionException(err);
789 void write(
unsigned int len,
char *buf) {
790 int bytes_written = ::write(this->write_fd[1], buf, len);
792 if (bytes_written != len) {
793 std::runtime_error err(
"Wrote " +
794 std::to_string(bytes_written) +
795 " bytes instead of " +
796 std::to_string(len) +
798 std::to_string(this->write_fd[1]));
799 throw ConnectionException(err);
803 unsigned int get_buf_size() {
804 return this->buf_size;
808 if (this->read_fd[0] != -1) {
809 ::close(this->read_fd[0]);
810 this->read_fd[0] = -1;
813 if (this->write_fd[1] != -1) {
814 ::close(this->write_fd[1]);
815 this->write_fd[1] = -1;
824 class PipeAcceptor :
public Acceptor {
834 PipeAcceptor() : Acceptor(1) {
835 if (pipe(this->read_fd) != 0) {
836 std::runtime_error err(
"Could not open read pipe for FileDescriptor, "
837 "code " + std::to_string(errno));
838 throw ConnectionException(err);
841 if (pipe(this->write_fd) != 0) {
842 std::runtime_error err(
"Could not open write pipe for FileDescriptor, "
843 "code " + std::to_string(errno));
844 throw ConnectionException(err);
849 std::unique_ptr<Connection> accept_connection(
unsigned int buf_size,
851 std::string expected =
"connect";
852 const int size = expected.size();
855 int bytes_received = 0;
857 while (bytes_received < size) {
859 struct pollfd poll_struct;
860 poll_struct.fd = this->read_fd[0];
861 poll_struct.events = POLLIN;
863 int code = ::poll(&poll_struct, 1, 1000 * timeout);
866 throw ConnectionException();
867 }
else if (code == 0) {
868 throw TimeoutException();
872 int received = ::read(this->read_fd[0], buf + bytes_received,
873 size - bytes_received);
879 bytes_received += received;
882 std::string msg(buf, size);
884 if (msg != expected) {
885 std::runtime_error err(
"Message received from pipe when establishing connection "
886 "is \"" + msg +
"\" instead of \"" + expected +
"\".");
887 throw ConnectionException(err);
890 return std::unique_ptr<Connection>(
new FileDescriptor(this->read_fd,
901 class Factory :
public Acceptor::Factory {
911 std::unique_ptr<Acceptor> make_acceptor(
int max_accepted) {
912 if (max_accepted != 1) {
913 throw std::runtime_error(
"max_accepted can only be 1 for FileDescriptor");
916 return std::unique_ptr<Acceptor>(
new PipeAcceptor());
919 std::string get_type() {
927 std::string get_connection_instructions() {
928 return std::to_string(this->write_fd[0]) +
"_" + std::to_string(this->read_fd[1]);
931 std::string get_type() {
return "pipe"; }
Definition socket.hpp:217
virtual std::string get_type()=0
virtual std::unique_ptr< Acceptor > make_acceptor(int max_accepted)=0
Definition socket.hpp:175
virtual std::string get_type()=0
virtual std::string get_connection_instructions()=0
Acceptor(int max_accepted)
Definition socket.hpp:188
virtual std::unique_ptr< Connection > accept_connection(unsigned int buf_size, long timeout)=0
std::unique_ptr< Connection > accept(unsigned int buf_size, long timeout=NO_TIMEOUT)
Definition socket.hpp:251
virtual ~Acceptor()
Definition socket.hpp:265
ConnectionException(std::exception &other)
Definition socket.hpp:51
ConnectionException()
Definition socket.hpp:50
virtual void write(unsigned int len, char *buf)=0
virtual std::string read(long timeout_seconds=NO_TIMEOUT)=0
virtual void write(std::string msg, bool new_line=true)=0
virtual unsigned int get_buf_size()=0
virtual void write(fs::path file)=0
virtual int read(char *buf, unsigned int len, long timeout_seconds)=0
virtual ~Connection()
Definition socket.hpp:80
Definition socket.hpp:147
virtual std::string read(long timeout_seconds=NO_TIMEOUT)=0
virtual std::string get_address()=0
virtual unsigned int get_buf_size()=0
virtual unsigned short get_port()=0
virtual int read(char *buf, unsigned int len, long timeout_seconds)=0
virtual void write(fs::path file)=0
virtual void write(std::string msg, bool new_line=true)=0
virtual void write(unsigned int len, char *buf)=0
virtual ~Socket()
Definition socket.hpp:152
std::unique_ptr< Acceptor > make_acceptor(int max_accepted)
Definition socket.hpp:563
std::string get_type()
Definition socket.hpp:570
Factory(std::string address, unsigned short port, bool try_subsequent_ports=false)
Definition socket.hpp:556
void close()
Definition socket.hpp:532
std::string get_type()
Definition socket.hpp:586
~TCPAcceptor()
Definition socket.hpp:575
std::unique_ptr< Connection > accept_connection(unsigned int buf_size, long timeout)
Definition socket.hpp:522
std::string get_connection_instructions()
Definition socket.hpp:582
std::string read(long timeout_seconds=NO_TIMEOUT)
Definition socket.hpp:344
void write(unsigned int len, char *buf)
Definition socket.hpp:460
void write(std::string msg, bool new_line)
Definition socket.hpp:426
TCPSocket(net::StreamSocket &sock, unsigned int buf_size)
Definition socket.hpp:306
unsigned int get_buf_size()
Definition socket.hpp:325
unsigned short get_port()
Definition socket.hpp:321
std::string get_address()
Definition socket.hpp:317
void write(fs::path file)
Definition socket.hpp:450
~TCPSocket()
Definition socket.hpp:313
int read(char *buf, unsigned int len, long timeout_seconds)
Definition socket.hpp:329
charstreambuf(std::unique_ptr< char[]> &begin, unsigned int length)
Definition socket.hpp:37
#define FILE_BUFFER_SIZE
Definition socket.hpp:28
#define NO_TIMEOUT
Definition socket.hpp:25
#define UNLIMITED_ACCEPTED
Definition socket.hpp:24