Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 12 additions & 2 deletions libwaterlinked/include/libwaterlinked/client.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,12 @@ class WaterLinkedClient
/// - The IP address of the DVL (defaults to 192.168.194.95)
/// - The port of the DVL (defaults to 16171)
/// - a connection timeout (s, defaults to 5s)
/// - a command response timeout (s, defaults to 5s)
explicit WaterLinkedClient(
const std::string & addr = "192.168.194.95",
std::uint16_t port = 16171,
std::chrono::seconds connection_timeout = std::chrono::seconds(5));
std::chrono::seconds connection_timeout = std::chrono::seconds(5),
std::chrono::seconds command_timeout = std::chrono::seconds(5));

~WaterLinkedClient();

Expand Down Expand Up @@ -127,11 +129,19 @@ class WaterLinkedClient

std::atomic<bool> running_{false};

std::unordered_map<std::string, std::deque<std::promise<CommandResponse>>> pending_requests_;
struct PendingRequest
{
std::promise<CommandResponse> response;
std::chrono::steady_clock::time_point deadline;
};

std::chrono::steady_clock::duration command_timeout_;
std::unordered_map<std::string, std::deque<PendingRequest>> pending_requests_;
std::mutex request_mutex_;

std::thread polling_thread_;

std::mutex callback_mutex_;
std::vector<std::function<void(const VelocityReport &)>> velocity_report_callbacks_;
std::vector<std::function<void(const DeadReckoningReport &)>> dead_reckoning_report_callbacks_;
};
Expand Down
111 changes: 99 additions & 12 deletions libwaterlinked/src/client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,9 @@ auto connect(int socket, const struct sockaddr * addr, socklen_t addrlen, std::c
WaterLinkedClient::WaterLinkedClient(
const std::string & addr,
std::uint16_t port,
std::chrono::seconds connection_timeout)
std::chrono::seconds connection_timeout,
std::chrono::seconds command_timeout)
: command_timeout_(command_timeout)
{
// Open a TCP socket and connect to the DVL
socket_ = socket(AF_INET, SOCK_STREAM, 0);
Expand All @@ -167,10 +169,12 @@ WaterLinkedClient::WaterLinkedClient(
sockaddr.sin_addr.s_addr = inet_addr(addr.c_str());

if (sockaddr.sin_addr.s_addr == INADDR_NONE) {
close(socket_);
throw std::runtime_error("Invalid socket address " + addr);
}

if (connect(socket_, reinterpret_cast<struct sockaddr *>(&sockaddr), sizeof(sockaddr), connection_timeout) < 0) {
close(socket_);
throw std::runtime_error(
"An error occurred while attempting to connect to the DVL. Error: " + std::string(strerror(errno)));
}
Expand All @@ -192,16 +196,20 @@ WaterLinkedClient::~WaterLinkedClient()

auto WaterLinkedClient::send_command(const nlohmann::json & command) -> std::future<CommandResponse>
{
const std::string command_name = command.at("command").get<std::string>();
std::promise<CommandResponse> response;
auto future = response.get_future();

const std::string command_str{command.dump()};
std::lock_guard lock(request_mutex_);
auto & requests = pending_requests_[command_name];
requests.emplace_back(std::move(response), std::chrono::steady_clock::now() + command_timeout_);

if (send(socket_, command_str.c_str(), command_str.size(), 0) < 0) {
requests.pop_back();
throw std::runtime_error("Failed to send command to DVL");
}

std::promise<CommandResponse> response;
auto future = response.get_future();

pending_requests_[command.at("command")].emplace_back(std::move(response));

return future;
}

Expand Down Expand Up @@ -263,11 +271,13 @@ auto WaterLinkedClient::reset_dead_reckoning() -> std::future<CommandResponse>

auto WaterLinkedClient::register_callback(std::function<void(const VelocityReport &)> && callback) -> void
{
std::lock_guard lock(callback_mutex_);
velocity_report_callbacks_.emplace_back(std::move(callback));
}

auto WaterLinkedClient::register_callback(std::function<void(const DeadReckoningReport &)> && callback) -> void
{
std::lock_guard lock(callback_mutex_);
dead_reckoning_report_callbacks_.emplace_back(std::move(callback));
}

Expand All @@ -277,19 +287,34 @@ auto WaterLinkedClient::process_json_object(const nlohmann::json & json_object)
// responses.
if (json_object.at("type") == "velocity") {
const auto report = json_object.get<VelocityReport>();
for (const auto & callback : velocity_report_callbacks_) {
std::vector<std::function<void(const VelocityReport &)>> callbacks;
{
std::lock_guard lock(callback_mutex_);
callbacks = velocity_report_callbacks_;
}
for (const auto & callback : callbacks) {
callback(report);
}
} else if (json_object.at("type") == "position_local") {
const auto report = json_object.get<DeadReckoningReport>();
for (const auto & callback : dead_reckoning_report_callbacks_) {
std::vector<std::function<void(const DeadReckoningReport &)>> callbacks;
{
std::lock_guard lock(callback_mutex_);
callbacks = dead_reckoning_report_callbacks_;
}
for (const auto & callback : callbacks) {
callback(report);
}
} else if (json_object.at("type") == "response") {
const auto response = json_object.get<CommandResponse>();
if (pending_requests_.contains(response.response_to) && !pending_requests_[response.response_to].empty()) {
pending_requests_[response.response_to].front().set_value(response);
pending_requests_[response.response_to].pop_front();
std::lock_guard lock(request_mutex_);
const auto pending_request = pending_requests_.find(response.response_to);
if (pending_request != pending_requests_.end() && !pending_request->second.empty()) {
pending_request->second.front().response.set_value(response);
pending_request->second.pop_front();
if (pending_request->second.empty()) {
pending_requests_.erase(pending_request);
}
}
} else {
throw std::runtime_error("Received an unknown message type from the DVL: " + json_object.dump());
Expand All @@ -304,8 +329,60 @@ auto WaterLinkedClient::poll_connection() -> void
std::size_t n_bytes_to_read = max_bytes_to_read;

while (running_.load()) {
if (read_from_socket(socket_, buffer, n_bytes_to_read) < 0) {
{
const auto now = std::chrono::steady_clock::now();
std::lock_guard lock(request_mutex_);
for (auto request = pending_requests_.begin(); request != pending_requests_.end();) {
auto & pending_responses = request->second;
while (!pending_responses.empty() && pending_responses.front().deadline <= now) {
pending_responses.front().response.set_value(
{request->first, false, "Timed out waiting for response to DVL command: " + request->first, {}});
pending_responses.pop_front();
}

if (pending_responses.empty()) {
request = pending_requests_.erase(request);
} else {
++request;
}
}
}

struct pollfd pfds[] = {{.fd = socket_, .events = POLLIN, .revents = 0}}; // NOLINT
const int poll_result = poll(pfds, 1, 100);
if (poll_result < 0) {
if (errno == EINTR) {
continue;
}
std::cout << "Failed to poll the DVL socket; the connection was likely lost.\n";
running_.store(false);
break;
}

if (poll_result == 0) {
continue;
}

if ((pfds[0].revents & (POLLERR | POLLHUP | POLLNVAL)) != 0) {
std::cout << "The DVL socket was closed or entered an error state.\n";
running_.store(false);
break;
}

const ssize_t n_read = read_from_socket(socket_, buffer, n_bytes_to_read);
if (n_read < 0) {
if (errno == EINTR) {
continue;
}
std::cout << "Failed to read from the DVL; the connection was likely lost.\n";
running_.store(false);
break;
}

if (n_read == 0) {
std::cout << "The DVL connection was closed.\n";
running_.store(false);
break;
}

auto last_delim = std::ranges::find(buffer | std::views::reverse, protocol::DELIMITER);
Expand All @@ -329,6 +406,16 @@ auto WaterLinkedClient::poll_connection() -> void

n_bytes_to_read = max_bytes_to_read - buffer.size();
}

std::lock_guard lock(request_mutex_);
for (auto & [command, pending_responses] : pending_requests_) {
while (!pending_responses.empty()) {
pending_responses.front().response.set_value(
{command, false, "DVL connection closed while waiting for response to command: " + command, {}});
pending_responses.pop_front();
}
}
pending_requests_.clear();
}

} // namespace waterlinked
4 changes: 2 additions & 2 deletions waterlinked_dvl_driver/src/waterlinked_dvl_driver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,8 @@ auto WaterLinkedDvlDriver::on_configure(const rclcpp_lifecycle::State & /*previo
}

try {
client_ =
std::make_unique<WaterLinkedClient>(params_.ip_address, params_.port, std::chrono::seconds(params_.timeout));
const auto timeout = std::chrono::seconds(params_.timeout);
client_ = std::make_unique<WaterLinkedClient>(params_.ip_address, params_.port, timeout, timeout);
}
catch (const std::exception & e) {
RCLCPP_ERROR(get_logger(), "Failed to create WaterLinkedClient. %s", e.what());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ waterlinked_dvl_driver:
timeout:
type: int
default_value: 5
description: "The timeout for the DVL connection (s)."
description: "The timeout for the DVL connection and command responses (s)."
read_only: true

frame_id:
Expand Down