From 92190d1e2fcc077fd606b4814a6ec420d7185d4d Mon Sep 17 00:00:00 2001 From: uveyskurt Date: Tue, 9 Jun 2026 08:18:23 +0300 Subject: [PATCH 1/5] fix(libwaterlinked): make pending command responses thread-safe --- libwaterlinked/src/client.cpp | 22 ++++++++++++++-------- 1 file changed, 14 insertions(+), 8 deletions(-) diff --git a/libwaterlinked/src/client.cpp b/libwaterlinked/src/client.cpp index 1c613e0..e2c0d8a 100644 --- a/libwaterlinked/src/client.cpp +++ b/libwaterlinked/src/client.cpp @@ -192,16 +192,20 @@ WaterLinkedClient::~WaterLinkedClient() auto WaterLinkedClient::send_command(const nlohmann::json & command) -> std::future { + const std::string command_name = command.at("command").get(); + std::promise response; + auto future = response.get_future(); + const std::string command_str{command.dump()}; + std::lock_guard lock(request_mutex_); + auto & requests = pending_requests_[command_name]; + requests.emplace_back(std::move(response)); + if (send(socket_, command_str.c_str(), command_str.size(), 0) < 0) { + requests.pop_back(); throw std::runtime_error("Failed to send command to DVL"); } - std::promise response; - auto future = response.get_future(); - - pending_requests_[command.at("command")].emplace_back(std::move(response)); - return future; } @@ -287,9 +291,11 @@ auto WaterLinkedClient::process_json_object(const nlohmann::json & json_object) } } else if (json_object.at("type") == "response") { const auto response = json_object.get(); - if (pending_requests_.contains(response.response_to) && !pending_requests_[response.response_to].empty()) { - pending_requests_[response.response_to].front().set_value(response); - pending_requests_[response.response_to].pop_front(); + std::lock_guard lock(request_mutex_); + const auto pending_request = pending_requests_.find(response.response_to); + if (pending_request != pending_requests_.end() && !pending_request->second.empty()) { + pending_request->second.front().set_value(response); + pending_request->second.pop_front(); } } else { throw std::runtime_error("Received an unknown message type from the DVL: " + json_object.dump()); From bdd7da487deceb510dae04edb05f8e471e23809a Mon Sep 17 00:00:00 2001 From: uveyskurt Date: Tue, 9 Jun 2026 10:14:28 +0300 Subject: [PATCH 2/5] fix(libwaterlinked): timeout pending command responses --- .../include/libwaterlinked/client.hpp | 13 +++++- libwaterlinked/src/client.cpp | 44 +++++++++++++++++-- .../src/waterlinked_dvl_driver.cpp | 4 +- .../waterlinked_dvl_driver_parameters.yaml | 2 +- 4 files changed, 55 insertions(+), 8 deletions(-) diff --git a/libwaterlinked/include/libwaterlinked/client.hpp b/libwaterlinked/include/libwaterlinked/client.hpp index 7f72b2e..d0d94bb 100644 --- a/libwaterlinked/include/libwaterlinked/client.hpp +++ b/libwaterlinked/include/libwaterlinked/client.hpp @@ -45,10 +45,12 @@ class WaterLinkedClient /// - The IP address of the DVL (defaults to 192.168.194.95) /// - The port of the DVL (defaults to 16171) /// - a connection timeout (s, defaults to 5s) + /// - a command response timeout (s, defaults to 5s) explicit WaterLinkedClient( const std::string & addr = "192.168.194.95", std::uint16_t port = 16171, - std::chrono::seconds connection_timeout = std::chrono::seconds(5)); + std::chrono::seconds connection_timeout = std::chrono::seconds(5), + std::chrono::seconds command_timeout = std::chrono::seconds(5)); ~WaterLinkedClient(); @@ -127,7 +129,14 @@ class WaterLinkedClient std::atomic running_{false}; - std::unordered_map>> pending_requests_; + struct PendingRequest + { + std::promise response; + std::chrono::steady_clock::time_point deadline; + }; + + std::chrono::steady_clock::duration command_timeout_; + std::unordered_map> pending_requests_; std::mutex request_mutex_; std::thread polling_thread_; diff --git a/libwaterlinked/src/client.cpp b/libwaterlinked/src/client.cpp index e2c0d8a..dbba009 100644 --- a/libwaterlinked/src/client.cpp +++ b/libwaterlinked/src/client.cpp @@ -152,7 +152,9 @@ auto connect(int socket, const struct sockaddr * addr, socklen_t addrlen, std::c WaterLinkedClient::WaterLinkedClient( const std::string & addr, std::uint16_t port, - std::chrono::seconds connection_timeout) + std::chrono::seconds connection_timeout, + std::chrono::seconds command_timeout) +: command_timeout_(command_timeout) { // Open a TCP socket and connect to the DVL socket_ = socket(AF_INET, SOCK_STREAM, 0); @@ -199,7 +201,7 @@ auto WaterLinkedClient::send_command(const nlohmann::json & command) -> std::fut const std::string command_str{command.dump()}; std::lock_guard lock(request_mutex_); auto & requests = pending_requests_[command_name]; - requests.emplace_back(std::move(response)); + requests.emplace_back(std::move(response), std::chrono::steady_clock::now() + command_timeout_); if (send(socket_, command_str.c_str(), command_str.size(), 0) < 0) { requests.pop_back(); @@ -294,8 +296,11 @@ auto WaterLinkedClient::process_json_object(const nlohmann::json & json_object) std::lock_guard lock(request_mutex_); const auto pending_request = pending_requests_.find(response.response_to); if (pending_request != pending_requests_.end() && !pending_request->second.empty()) { - pending_request->second.front().set_value(response); + pending_request->second.front().response.set_value(response); pending_request->second.pop_front(); + if (pending_request->second.empty()) { + pending_requests_.erase(pending_request); + } } } else { throw std::runtime_error("Received an unknown message type from the DVL: " + json_object.dump()); @@ -310,6 +315,39 @@ auto WaterLinkedClient::poll_connection() -> void std::size_t n_bytes_to_read = max_bytes_to_read; while (running_.load()) { + { + const auto now = std::chrono::steady_clock::now(); + std::lock_guard lock(request_mutex_); + for (auto request = pending_requests_.begin(); request != pending_requests_.end();) { + auto & pending_responses = request->second; + while (!pending_responses.empty() && pending_responses.front().deadline <= now) { + pending_responses.front().response.set_value( + {request->first, false, "Timed out waiting for response to DVL command: " + request->first, {}}); + pending_responses.pop_front(); + } + + if (pending_responses.empty()) { + request = pending_requests_.erase(request); + } else { + ++request; + } + } + } + + struct pollfd pfds[] = {{.fd = socket_, .events = POLLIN, .revents = 0}}; // NOLINT + const int poll_result = poll(pfds, 1, 100); + if (poll_result < 0) { + if (errno == EINTR) { + continue; + } + std::cout << "Failed to poll the DVL socket; the connection was likely lost.\n"; + continue; + } + + if (poll_result == 0) { + continue; + } + if (read_from_socket(socket_, buffer, n_bytes_to_read) < 0) { std::cout << "Failed to read from the DVL; the connection was likely lost.\n"; } diff --git a/waterlinked_dvl_driver/src/waterlinked_dvl_driver.cpp b/waterlinked_dvl_driver/src/waterlinked_dvl_driver.cpp index acce73a..4deae3f 100644 --- a/waterlinked_dvl_driver/src/waterlinked_dvl_driver.cpp +++ b/waterlinked_dvl_driver/src/waterlinked_dvl_driver.cpp @@ -70,8 +70,8 @@ auto WaterLinkedDvlDriver::on_configure(const rclcpp_lifecycle::State & /*previo } try { - client_ = - std::make_unique(params_.ip_address, params_.port, std::chrono::seconds(params_.timeout)); + const auto timeout = std::chrono::seconds(params_.timeout); + client_ = std::make_unique(params_.ip_address, params_.port, timeout, timeout); } catch (const std::exception & e) { RCLCPP_ERROR(get_logger(), "Failed to create WaterLinkedClient. %s", e.what()); diff --git a/waterlinked_dvl_driver/src/waterlinked_dvl_driver_parameters.yaml b/waterlinked_dvl_driver/src/waterlinked_dvl_driver_parameters.yaml index 9a30248..071f247 100644 --- a/waterlinked_dvl_driver/src/waterlinked_dvl_driver_parameters.yaml +++ b/waterlinked_dvl_driver/src/waterlinked_dvl_driver_parameters.yaml @@ -14,7 +14,7 @@ waterlinked_dvl_driver: timeout: type: int default_value: 5 - description: "The timeout for the DVL connection (s)." + description: "The timeout for the DVL connection and command responses (s)." read_only: true frame_id: From 72ee74380796659baee4239b903d32a5bffeae5f Mon Sep 17 00:00:00 2001 From: uveyskurt Date: Tue, 9 Jun 2026 10:18:20 +0300 Subject: [PATCH 3/5] fix(libwaterlinked): fail pending commands on connection loss --- libwaterlinked/src/client.cpp | 33 +++++++++++++++++++++++++++++++-- 1 file changed, 31 insertions(+), 2 deletions(-) diff --git a/libwaterlinked/src/client.cpp b/libwaterlinked/src/client.cpp index dbba009..433c223 100644 --- a/libwaterlinked/src/client.cpp +++ b/libwaterlinked/src/client.cpp @@ -341,15 +341,34 @@ auto WaterLinkedClient::poll_connection() -> void continue; } std::cout << "Failed to poll the DVL socket; the connection was likely lost.\n"; - continue; + running_.store(false); + break; } if (poll_result == 0) { continue; } - if (read_from_socket(socket_, buffer, n_bytes_to_read) < 0) { + if ((pfds[0].revents & (POLLERR | POLLHUP | POLLNVAL)) != 0) { + std::cout << "The DVL socket was closed or entered an error state.\n"; + running_.store(false); + break; + } + + const ssize_t n_read = read_from_socket(socket_, buffer, n_bytes_to_read); + if (n_read < 0) { + if (errno == EINTR) { + continue; + } std::cout << "Failed to read from the DVL; the connection was likely lost.\n"; + running_.store(false); + break; + } + + if (n_read == 0) { + std::cout << "The DVL connection was closed.\n"; + running_.store(false); + break; } auto last_delim = std::ranges::find(buffer | std::views::reverse, protocol::DELIMITER); @@ -373,6 +392,16 @@ auto WaterLinkedClient::poll_connection() -> void n_bytes_to_read = max_bytes_to_read - buffer.size(); } + + std::lock_guard lock(request_mutex_); + for (auto & [command, pending_responses] : pending_requests_) { + while (!pending_responses.empty()) { + pending_responses.front().response.set_value( + {command, false, "DVL connection closed while waiting for response to command: " + command, {}}); + pending_responses.pop_front(); + } + } + pending_requests_.clear(); } } // namespace waterlinked From a3a11a818f931deb89b280cb04ad5d579b85332f Mon Sep 17 00:00:00 2001 From: uveyskurt Date: Tue, 9 Jun 2026 10:35:01 +0300 Subject: [PATCH 4/5] fix(libwaterlinked): close socket on connection failure --- libwaterlinked/src/client.cpp | 2 ++ 1 file changed, 2 insertions(+) diff --git a/libwaterlinked/src/client.cpp b/libwaterlinked/src/client.cpp index 433c223..a088c9f 100644 --- a/libwaterlinked/src/client.cpp +++ b/libwaterlinked/src/client.cpp @@ -169,10 +169,12 @@ WaterLinkedClient::WaterLinkedClient( sockaddr.sin_addr.s_addr = inet_addr(addr.c_str()); if (sockaddr.sin_addr.s_addr == INADDR_NONE) { + close(socket_); throw std::runtime_error("Invalid socket address " + addr); } if (connect(socket_, reinterpret_cast(&sockaddr), sizeof(sockaddr), connection_timeout) < 0) { + close(socket_); throw std::runtime_error( "An error occurred while attempting to connect to the DVL. Error: " + std::string(strerror(errno))); } From 73f7484772ba223f5312e47bc41fd0cfa810e5f1 Mon Sep 17 00:00:00 2001 From: uveyskurt Date: Tue, 9 Jun 2026 10:48:32 +0300 Subject: [PATCH 5/5] fix(libwaterlinked): protect report callback registration --- libwaterlinked/include/libwaterlinked/client.hpp | 1 + libwaterlinked/src/client.cpp | 16 ++++++++++++++-- 2 files changed, 15 insertions(+), 2 deletions(-) diff --git a/libwaterlinked/include/libwaterlinked/client.hpp b/libwaterlinked/include/libwaterlinked/client.hpp index d0d94bb..123412e 100644 --- a/libwaterlinked/include/libwaterlinked/client.hpp +++ b/libwaterlinked/include/libwaterlinked/client.hpp @@ -141,6 +141,7 @@ class WaterLinkedClient std::thread polling_thread_; + std::mutex callback_mutex_; std::vector> velocity_report_callbacks_; std::vector> dead_reckoning_report_callbacks_; }; diff --git a/libwaterlinked/src/client.cpp b/libwaterlinked/src/client.cpp index a088c9f..9de555f 100644 --- a/libwaterlinked/src/client.cpp +++ b/libwaterlinked/src/client.cpp @@ -271,11 +271,13 @@ auto WaterLinkedClient::reset_dead_reckoning() -> std::future auto WaterLinkedClient::register_callback(std::function && callback) -> void { + std::lock_guard lock(callback_mutex_); velocity_report_callbacks_.emplace_back(std::move(callback)); } auto WaterLinkedClient::register_callback(std::function && callback) -> void { + std::lock_guard lock(callback_mutex_); dead_reckoning_report_callbacks_.emplace_back(std::move(callback)); } @@ -285,12 +287,22 @@ auto WaterLinkedClient::process_json_object(const nlohmann::json & json_object) // responses. if (json_object.at("type") == "velocity") { const auto report = json_object.get(); - for (const auto & callback : velocity_report_callbacks_) { + std::vector> callbacks; + { + std::lock_guard lock(callback_mutex_); + callbacks = velocity_report_callbacks_; + } + for (const auto & callback : callbacks) { callback(report); } } else if (json_object.at("type") == "position_local") { const auto report = json_object.get(); - for (const auto & callback : dead_reckoning_report_callbacks_) { + std::vector> callbacks; + { + std::lock_guard lock(callback_mutex_); + callbacks = dead_reckoning_report_callbacks_; + } + for (const auto & callback : callbacks) { callback(report); } } else if (json_object.at("type") == "response") {