diff --git a/concoredocker.hpp b/concoredocker.hpp index b4fa69b..651e5b3 100644 --- a/concoredocker.hpp +++ b/concoredocker.hpp @@ -14,10 +14,25 @@ #include #include #include +#include + +#ifdef __linux__ +#include +#include +#include +#endif #include "concore_base.hpp" class Concore { +private: + int shmId_create = -1; + int shmId_get = -1; + char* sharedData_create = nullptr; + char* sharedData_get = nullptr; + int communication_iport = 0; // iport refers to input port + int communication_oport = 0; // oport refers to output port + public: std::unordered_map iport; std::unordered_map oport; @@ -55,6 +70,25 @@ class Concore { oport = safe_literal_eval("concore.oport", {}); default_maxtime(100); load_params(); + +#ifdef __linux__ + int iport_number = -1; + int oport_number = -1; + + if (!iport.empty()) + iport_number = ExtractNumeric(iport.begin()->first); + if (!oport.empty()) + oport_number = ExtractNumeric(oport.begin()->first); + + if (oport_number != -1) { + communication_oport = 1; + createSharedMemory(oport_number); + } + if (iport_number != -1) { + communication_iport = 1; + getSharedMemory(iport_number); + } +#endif } ~Concore() { @@ -62,6 +96,14 @@ class Concore { for (auto& kv : zmq_ports) delete kv.second; zmq_ports.clear(); +#endif +#ifdef __linux__ + if (communication_oport == 1 && sharedData_create != nullptr) + shmdt(sharedData_create); + if (communication_iport == 1 && sharedData_get != nullptr) + shmdt(sharedData_get); + if (shmId_create != -1) + shmctl(shmId_create, IPC_RMID, nullptr); #endif } @@ -74,11 +116,20 @@ class Concore { delay(other.delay), retrycount(other.retrycount), inpath(std::move(other.inpath)), outpath(std::move(other.outpath)), simtime(other.simtime), maxtime(other.maxtime), - params(std::move(other.params)) + params(std::move(other.params)), + shmId_create(other.shmId_create), shmId_get(other.shmId_get), + sharedData_create(other.sharedData_create), sharedData_get(other.sharedData_get), + communication_iport(other.communication_iport), communication_oport(other.communication_oport) { #ifdef CONCORE_USE_ZMQ zmq_ports = std::move(other.zmq_ports); #endif + other.shmId_create = -1; + other.shmId_get = -1; + other.sharedData_create = nullptr; + other.sharedData_get = nullptr; + other.communication_iport = 0; + other.communication_oport = 0; } Concore& operator=(Concore&& other) noexcept @@ -91,6 +142,14 @@ class Concore { delete kv.second; zmq_ports = std::move(other.zmq_ports); #endif +#ifdef __linux__ + if (communication_oport == 1 && sharedData_create != nullptr) + shmdt(sharedData_create); + if (communication_iport == 1 && sharedData_get != nullptr) + shmdt(sharedData_get); + if (shmId_create != -1) + shmctl(shmId_create, IPC_RMID, nullptr); +#endif iport = std::move(other.iport); oport = std::move(other.oport); @@ -103,6 +162,19 @@ class Concore { simtime = other.simtime; maxtime = other.maxtime; params = std::move(other.params); + shmId_create = other.shmId_create; + shmId_get = other.shmId_get; + sharedData_create = other.sharedData_create; + sharedData_get = other.sharedData_get; + communication_iport = other.communication_iport; + communication_oport = other.communication_oport; + + other.shmId_create = -1; + other.shmId_get = -1; + other.sharedData_create = nullptr; + other.sharedData_get = nullptr; + other.communication_iport = 0; + other.communication_oport = 0; return *this; } @@ -131,6 +203,57 @@ class Concore { inpath + "/1/concore.maxtime", defaultValue); } + key_t ExtractNumeric(const std::string& str) { + std::string numberString; + size_t numDigits = 0; + while (numDigits < str.length() && std::isdigit(str[numDigits])) { + numberString += str[numDigits]; + ++numDigits; + } + if (numDigits == 0) + return -1; + if (numDigits == 1 && std::stoi(numberString) <= 0) + return -1; + return std::stoi(numberString); + } + +#ifdef __linux__ + void createSharedMemory(key_t key) { + shmId_create = shmget(key, 256, IPC_CREAT | 0666); + if (shmId_create == -1) { + std::cerr << "Failed to create shared memory segment.\n"; + return; + } + sharedData_create = static_cast(shmat(shmId_create, NULL, 0)); + if (sharedData_create == reinterpret_cast(-1)) { + std::cerr << "Failed to attach shared memory segment.\n"; + sharedData_create = nullptr; + } + } + + void getSharedMemory(key_t key) { + int retry = 0; + const int MAX_RETRY = 100; + while (retry < MAX_RETRY) { + shmId_get = shmget(key, 256, 0666); + if (shmId_get != -1) + break; + std::cout << "Shared memory does not exist. Make sure the writer process is running.\n"; + sleep(1); + retry++; + } + if (shmId_get == -1) { + std::cerr << "Failed to get shared memory segment after max retries.\n"; + return; + } + sharedData_get = static_cast(shmat(shmId_get, NULL, 0)); + if (sharedData_get == reinterpret_cast(-1)) { + std::cerr << "Failed to attach shared memory segment.\n"; + sharedData_get = nullptr; + } + } +#endif + bool unchanged() { if (olds == s) { s.clear(); @@ -141,6 +264,10 @@ class Concore { } std::vector read(int port, const std::string& name, const std::string& initstr) { +#ifdef __linux__ + if (communication_iport == 1) + return read_SM(port, name, initstr); +#endif std::this_thread::sleep_for(std::chrono::seconds(delay)); std::string file_path = inpath + "/" + std::to_string(port) + "/" + name; std::ifstream infile(file_path); @@ -178,7 +305,56 @@ class Concore { return inval; } +#ifdef __linux__ + std::vector read_SM(int port, const std::string& name, const std::string& initstr) { + std::this_thread::sleep_for(std::chrono::seconds(delay)); + std::string ins; + try { + if (shmId_get != -1 && sharedData_get && sharedData_get[0] != '\0') + ins = std::string(sharedData_get, strnlen(sharedData_get, 256)); + else + throw 505; + } catch (...) { + ins = initstr; + } + + int retry = 0; + const int MAX_RETRY = 100; + while ((int)ins.length() == 0 && retry < MAX_RETRY) { + std::this_thread::sleep_for(std::chrono::seconds(delay)); + try { + if (shmId_get != -1 && sharedData_get) { + ins = std::string(sharedData_get, strnlen(sharedData_get, 256)); + retrycount++; + } else { + retrycount++; + throw 505; + } + } catch (...) { + std::cerr << "Read error\n"; + } + retry++; + } + + s += ins; + std::vector inval = concore_base::parselist_double(ins); + if (inval.empty()) + inval = concore_base::parselist_double(initstr); + if (inval.empty()) + return inval; + simtime = simtime > inval[0] ? simtime : inval[0]; + inval.erase(inval.begin()); + return inval; + } +#endif + void write(int port, const std::string& name, const std::vector& val, int delta = 0) { +#ifdef __linux__ + if (communication_oport == 1) { + write_SM(port, name, val, delta); + return; + } +#endif std::string file_path = outpath + "/" + std::to_string(port) + "/" + name; std::ofstream outfile(file_path); if (!outfile) { @@ -195,6 +371,25 @@ class Concore { } } +#ifdef __linux__ + void write_SM(int port, const std::string& name, std::vector val, int delta = 0) { + try { + if (shmId_create == -1) + throw 505; + val.insert(val.begin(), simtime + delta); + std::ostringstream outfile; + outfile << '['; + for (size_t i = 0; i < val.size() - 1; i++) + outfile << val[i] << ','; + outfile << val[val.size() - 1] << ']'; + std::string result = outfile.str(); + std::strncpy(sharedData_create, result.c_str(), 256 - 1); + } catch (...) { + std::cerr << "skipping +" << outpath << port << "/" << name << "\n"; + } + } +#endif + #ifdef CONCORE_USE_ZMQ void init_zmq_port(const std::string& port_name, const std::string& port_type, const std::string& address, const std::string& socket_type_str) {