Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
197 changes: 196 additions & 1 deletion concoredocker.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,25 @@
#include <regex>
#include <algorithm>
#include <map>
#include <cstring>

#ifdef __linux__
#include <sys/ipc.h>
#include <sys/shm.h>
#include <unistd.h>
#endif

#include "concore_base.hpp"

class Concore {
private:
int shmId_create = -1;
int shmId_get = -1;
char* sharedData_create = nullptr;
char* sharedData_get = nullptr;
int communication_iport = 0; // iport refers to input port
int communication_oport = 0; // oport refers to output port

public:
std::unordered_map<std::string, std::string> iport;
std::unordered_map<std::string, std::string> oport;
Expand Down Expand Up @@ -55,13 +70,40 @@ class Concore {
oport = safe_literal_eval("concore.oport", {});
default_maxtime(100);
load_params();

#ifdef __linux__
int iport_number = -1;
int oport_number = -1;

if (!iport.empty())
iport_number = ExtractNumeric(iport.begin()->first);
if (!oport.empty())
oport_number = ExtractNumeric(oport.begin()->first);

if (oport_number != -1) {
communication_oport = 1;
createSharedMemory(oport_number);
}
if (iport_number != -1) {
communication_iport = 1;
getSharedMemory(iport_number);
}
#endif
}

~Concore() {
#ifdef CONCORE_USE_ZMQ
for (auto& kv : zmq_ports)
delete kv.second;
zmq_ports.clear();
#endif
#ifdef __linux__
if (communication_oport == 1 && sharedData_create != nullptr)
shmdt(sharedData_create);
if (communication_iport == 1 && sharedData_get != nullptr)
shmdt(sharedData_get);
if (shmId_create != -1)
shmctl(shmId_create, IPC_RMID, nullptr);
#endif
}

Expand All @@ -74,11 +116,20 @@ class Concore {
delay(other.delay), retrycount(other.retrycount),
inpath(std::move(other.inpath)), outpath(std::move(other.outpath)),
simtime(other.simtime), maxtime(other.maxtime),
params(std::move(other.params))
params(std::move(other.params)),
shmId_create(other.shmId_create), shmId_get(other.shmId_get),
sharedData_create(other.sharedData_create), sharedData_get(other.sharedData_get),
communication_iport(other.communication_iport), communication_oport(other.communication_oport)
{
#ifdef CONCORE_USE_ZMQ
zmq_ports = std::move(other.zmq_ports);
#endif
other.shmId_create = -1;
other.shmId_get = -1;
other.sharedData_create = nullptr;
other.sharedData_get = nullptr;
other.communication_iport = 0;
other.communication_oport = 0;
}

Concore& operator=(Concore&& other) noexcept
Expand All @@ -91,6 +142,14 @@ class Concore {
delete kv.second;
zmq_ports = std::move(other.zmq_ports);
#endif
#ifdef __linux__
if (communication_oport == 1 && sharedData_create != nullptr)
shmdt(sharedData_create);
if (communication_iport == 1 && sharedData_get != nullptr)
shmdt(sharedData_get);
if (shmId_create != -1)
shmctl(shmId_create, IPC_RMID, nullptr);
#endif

iport = std::move(other.iport);
oport = std::move(other.oport);
Expand All @@ -103,6 +162,19 @@ class Concore {
simtime = other.simtime;
maxtime = other.maxtime;
params = std::move(other.params);
shmId_create = other.shmId_create;
shmId_get = other.shmId_get;
sharedData_create = other.sharedData_create;
sharedData_get = other.sharedData_get;
communication_iport = other.communication_iport;
communication_oport = other.communication_oport;

other.shmId_create = -1;
other.shmId_get = -1;
other.sharedData_create = nullptr;
other.sharedData_get = nullptr;
other.communication_iport = 0;
other.communication_oport = 0;

return *this;
}
Expand Down Expand Up @@ -131,6 +203,57 @@ class Concore {
inpath + "/1/concore.maxtime", defaultValue);
}

key_t ExtractNumeric(const std::string& str) {
std::string numberString;
size_t numDigits = 0;
while (numDigits < str.length() && std::isdigit(str[numDigits])) {
numberString += str[numDigits];
++numDigits;
}
if (numDigits == 0)
return -1;
if (numDigits == 1 && std::stoi(numberString) <= 0)
return -1;
return std::stoi(numberString);
}

#ifdef __linux__
void createSharedMemory(key_t key) {
shmId_create = shmget(key, 256, IPC_CREAT | 0666);
if (shmId_create == -1) {
std::cerr << "Failed to create shared memory segment.\n";
return;
}
sharedData_create = static_cast<char*>(shmat(shmId_create, NULL, 0));
if (sharedData_create == reinterpret_cast<char*>(-1)) {
std::cerr << "Failed to attach shared memory segment.\n";
sharedData_create = nullptr;
}
}

void getSharedMemory(key_t key) {
int retry = 0;
const int MAX_RETRY = 100;
while (retry < MAX_RETRY) {
shmId_get = shmget(key, 256, 0666);
if (shmId_get != -1)
break;
std::cout << "Shared memory does not exist. Make sure the writer process is running.\n";
sleep(1);
retry++;
}
if (shmId_get == -1) {
std::cerr << "Failed to get shared memory segment after max retries.\n";
return;
}
sharedData_get = static_cast<char*>(shmat(shmId_get, NULL, 0));
if (sharedData_get == reinterpret_cast<char*>(-1)) {
std::cerr << "Failed to attach shared memory segment.\n";
sharedData_get = nullptr;
}
}
#endif

bool unchanged() {
if (olds == s) {
s.clear();
Expand All @@ -141,6 +264,10 @@ class Concore {
}

std::vector<double> read(int port, const std::string& name, const std::string& initstr) {
#ifdef __linux__
if (communication_iport == 1)
return read_SM(port, name, initstr);
#endif
std::this_thread::sleep_for(std::chrono::seconds(delay));
std::string file_path = inpath + "/" + std::to_string(port) + "/" + name;
std::ifstream infile(file_path);
Expand Down Expand Up @@ -178,7 +305,56 @@ class Concore {
return inval;
}

#ifdef __linux__
std::vector<double> read_SM(int port, const std::string& name, const std::string& initstr) {
std::this_thread::sleep_for(std::chrono::seconds(delay));
std::string ins;
try {
if (shmId_get != -1 && sharedData_get && sharedData_get[0] != '\0')
ins = std::string(sharedData_get, strnlen(sharedData_get, 256));
else
throw 505;
} catch (...) {
ins = initstr;
}

int retry = 0;
const int MAX_RETRY = 100;
while ((int)ins.length() == 0 && retry < MAX_RETRY) {
std::this_thread::sleep_for(std::chrono::seconds(delay));
try {
if (shmId_get != -1 && sharedData_get) {
ins = std::string(sharedData_get, strnlen(sharedData_get, 256));
retrycount++;
} else {
retrycount++;
throw 505;
}
} catch (...) {
std::cerr << "Read error\n";
}
retry++;
}

s += ins;
std::vector<double> inval = concore_base::parselist_double(ins);
if (inval.empty())
inval = concore_base::parselist_double(initstr);
if (inval.empty())
return inval;
simtime = simtime > inval[0] ? simtime : inval[0];
inval.erase(inval.begin());
return inval;
}
#endif

void write(int port, const std::string& name, const std::vector<double>& val, int delta = 0) {
#ifdef __linux__
if (communication_oport == 1) {
write_SM(port, name, val, delta);
return;
}
#endif
std::string file_path = outpath + "/" + std::to_string(port) + "/" + name;
std::ofstream outfile(file_path);
if (!outfile) {
Expand All @@ -195,6 +371,25 @@ class Concore {
}
}

#ifdef __linux__
void write_SM(int port, const std::string& name, std::vector<double> val, int delta = 0) {
try {
if (shmId_create == -1)
throw 505;
val.insert(val.begin(), simtime + delta);
std::ostringstream outfile;
outfile << '[';
for (size_t i = 0; i < val.size() - 1; i++)
outfile << val[i] << ',';
outfile << val[val.size() - 1] << ']';
std::string result = outfile.str();
std::strncpy(sharedData_create, result.c_str(), 256 - 1);
} catch (...) {
std::cerr << "skipping +" << outpath << port << "/" << name << "\n";
}
}
#endif

#ifdef CONCORE_USE_ZMQ
void init_zmq_port(const std::string& port_name, const std::string& port_type,
const std::string& address, const std::string& socket_type_str) {
Expand Down