From 412190bcadf2d0925894da3968e6cbebf6301aed Mon Sep 17 00:00:00 2001 From: Ryota Kinukawa Date: Mon, 18 May 2026 19:24:58 +0900 Subject: [PATCH 01/19] Add options to Modbuzz.TCP.(Client|Server) --- lib/modbuzz/tcp/client.ex | 9 ++++++++- lib/modbuzz/tcp/server.ex | 1 + 2 files changed, 9 insertions(+), 1 deletion(-) diff --git a/lib/modbuzz/tcp/client.ex b/lib/modbuzz/tcp/client.ex index 1a5329a..a730157 100644 --- a/lib/modbuzz/tcp/client.ex +++ b/lib/modbuzz/tcp/client.ex @@ -343,7 +343,14 @@ defmodule Modbuzz.TCP.Client do transport.connect( address, port, - [mode: :binary, packet: :raw, active: active], + [ + mode: :binary, + packet: :raw, + active: active, + keepalive: true, + nodelay: true, + reuseaddr: true + ], _timeout = 3000 ) end diff --git a/lib/modbuzz/tcp/server.ex b/lib/modbuzz/tcp/server.ex index 5a75997..aa55513 100644 --- a/lib/modbuzz/tcp/server.ex +++ b/lib/modbuzz/tcp/server.ex @@ -80,6 +80,7 @@ defmodule Modbuzz.TCP.Server do packet: :raw, active: active, backlog: 1024, + keepalive: true, nodelay: true, send_timeout: 30_000, send_timeout_close: true, From 606e74c39a24ed8e94612d3fb04ddf542f01534a Mon Sep 17 00:00:00 2001 From: Ryota Kinukawa Date: Tue, 19 May 2026 08:30:21 +0900 Subject: [PATCH 02/19] Change Modbuzz.TCP.ADU pdu type from binary() to Modbuzz.PDU.Protocol.t() | nil --- lib/modbuzz/tcp/adu.ex | 75 ++++++++++++++++-------- lib/modbuzz/tcp/client.ex | 64 +++++++++++--------- lib/modbuzz/tcp/server/socket_handler.ex | 7 +-- test/modbuzz/tcp/client_test.exs | 2 - 4 files changed, 88 insertions(+), 60 deletions(-) diff --git a/lib/modbuzz/tcp/adu.ex b/lib/modbuzz/tcp/adu.ex index 3cd254c..d4f9959 100644 --- a/lib/modbuzz/tcp/adu.ex +++ b/lib/modbuzz/tcp/adu.ex @@ -1,53 +1,78 @@ defmodule Modbuzz.TCP.ADU do @moduledoc false - @unit_id_byte_size 1 + alias Modbuzz.PDU + @type t :: %__MODULE__{ + transaction_id: 0x0000..0xFFFF, + protocol_id: 0x0000..0xFFFF, + # pdu max is 253 bytes, so length max is 254 (1 byte for unit_id + pdu) + length: 0x0000..0x00FE, + unit_id: 0x00..0xFF, + pdu: Modbuzz.PDU.Protocol.t() | nil + } defstruct transaction_id: 0x0000, protocol_id: 0x0000, length: 0x0000, unit_id: 0x00, pdu: nil - def new(pdu, transaction_id, unit_id) when is_binary(pdu) do + def new(pdu, transaction_id, unit_id) when is_struct(pdu) do %__MODULE__{ transaction_id: transaction_id, - length: byte_size(pdu) + @unit_id_byte_size, unit_id: unit_id, pdu: pdu } end def encode(%__MODULE__{} = adu) do - <> + unit_id_length = 1 + pdu_binary = PDU.encode(adu.pdu) + pdu_binary_length = byte_size(pdu_binary) + + <> end - def decode( - <> + def decode_request( + <>, + acc ) do - %__MODULE__{ + adu = %__MODULE__{ transaction_id: transaction_id, protocol_id: protocol_id, length: length, - unit_id: unit_id, - pdu: pdu + unit_id: unit_id } + + adu_tuple = + case PDU.decode_request(pdu_binary) do + {:ok, pdu} -> {:ok, %{adu | pdu: pdu}} + end + + acc = [adu_tuple | acc] + + if rest == <<>>, do: Enum.reverse(acc), else: decode_request(rest, acc) end - def decode( - <>, + def decode_response( + <>, acc ) do - acc = [ - %__MODULE__{ - transaction_id: transaction_id, - protocol_id: protocol_id, - length: length, - unit_id: unit_id, - pdu: pdu - } - | acc - ] - - if rest == <<>>, do: Enum.reverse(acc), else: decode(rest, acc) + adu = %__MODULE__{ + transaction_id: transaction_id, + protocol_id: protocol_id, + length: length, + unit_id: unit_id + } + + adu_tuple = + case PDU.decode_response(pdu_binary) do + {:ok, pdu} -> {:ok, %{adu | pdu: pdu}} + {:error, pdu} -> {:error, %{adu | pdu: pdu}} + end + + acc = [adu_tuple | acc] + + if rest == <<>>, do: Enum.reverse(acc), else: decode_response(rest, acc) end def increment_transaction_id(transaction_id) do diff --git a/lib/modbuzz/tcp/client.ex b/lib/modbuzz/tcp/client.ex index a730157..f98294f 100644 --- a/lib/modbuzz/tcp/client.ex +++ b/lib/modbuzz/tcp/client.ex @@ -10,7 +10,6 @@ defmodule Modbuzz.TCP.Client do defstruct [:unit_id, :request, :from_pid, :sent_time] end - alias Modbuzz.PDU alias Modbuzz.TCP.ADU @doc """ @@ -138,7 +137,7 @@ defmodule Modbuzz.TCP.Client do %{transport: transport, transaction_id: transaction_id} = state transaction_id = ADU.increment_transaction_id(transaction_id) - adu = PDU.encode(request) |> ADU.new(transaction_id, unit_id) |> ADU.encode() + adu = request |> ADU.new(transaction_id, unit_id) |> ADU.encode() state = %{state | transaction_id: transaction_id} @@ -146,9 +145,12 @@ defmodule Modbuzz.TCP.Client do {:send, :ok} <- {:send, transport.send(socket, adu)}, {:recv, {:ok, binary}} <- {:recv, transport.recv(socket, _length = 0, timeout)} do [res_tuple] = - for %ADU{transaction_id: ^transaction_id, pdu: pdu} <- ADU.decode(binary, []) do - PDU.decode_response(pdu) - end + Enum.map( + ADU.decode_response(binary, []), + fn {ok_or_error, %ADU{transaction_id: ^transaction_id, pdu: pdu}} -> + {ok_or_error, pdu} + end + ) GenServer.reply(from, res_tuple) {:noreply, %{state | socket: socket}} @@ -178,7 +180,7 @@ defmodule Modbuzz.TCP.Client do } = state transaction_id = ADU.increment_transaction_id(transaction_id) - adu = PDU.encode(request) |> ADU.new(transaction_id, unit_id) |> ADU.encode() + adu = request |> ADU.new(transaction_id, unit_id) |> ADU.encode() state = %{state | transaction_id: transaction_id} @@ -209,16 +211,19 @@ defmodule Modbuzz.TCP.Client do %{transport: transport, socket: socket, transaction_id: transaction_id} = state transaction_id = ADU.increment_transaction_id(transaction_id) - adu = PDU.encode(request) |> ADU.new(transaction_id, unit_id) |> ADU.encode() + adu = request |> ADU.new(transaction_id, unit_id) |> ADU.encode() state = %{state | transaction_id: transaction_id} with {:send, :ok} <- {:send, transport.send(socket, adu)}, {:recv, {:ok, binary}} <- {:recv, transport.recv(socket, _length = 0, timeout)} do [res_tuple] = - for %ADU{transaction_id: ^transaction_id, pdu: pdu} <- ADU.decode(binary, []) do - PDU.decode_response(pdu) - end + Enum.map( + ADU.decode_response(binary, []), + fn {ok_or_error, %ADU{transaction_id: ^transaction_id, pdu: pdu}} -> + {ok_or_error, pdu} + end + ) {:reply, res_tuple, state} else @@ -252,7 +257,7 @@ defmodule Modbuzz.TCP.Client do } = state transaction_id = ADU.increment_transaction_id(transaction_id) - adu = PDU.encode(request) |> ADU.new(transaction_id, unit_id) |> ADU.encode() + adu = request |> ADU.new(transaction_id, unit_id) |> ADU.encode() state = %{state | transaction_id: transaction_id} @@ -286,23 +291,26 @@ defmodule Modbuzz.TCP.Client do %{transactions: transactions} = state transactions = - ADU.decode(binary, []) - |> Enum.reduce(transactions, fn %ADU{transaction_id: transaction_id, pdu: pdu}, acc -> - {transaction, acc} = Map.pop(acc, transaction_id) - res_tuple = PDU.decode_response(pdu) - - send( - transaction.from_pid, - { - :modbuzz, - transaction.unit_id, - transaction.request, - res_tuple - } - ) - - acc - end) + ADU.decode_response(binary, []) + |> Enum.reduce( + transactions, + fn {ok_or_error, %ADU{transaction_id: transaction_id, pdu: pdu}}, acc -> + {transaction, acc} = Map.pop(acc, transaction_id) + res_tuple = {ok_or_error, pdu} + + send( + transaction.from_pid, + { + :modbuzz, + transaction.unit_id, + transaction.request, + res_tuple + } + ) + + acc + end + ) {:noreply, %{state | transactions: transactions}} end diff --git a/lib/modbuzz/tcp/server/socket_handler.ex b/lib/modbuzz/tcp/server/socket_handler.ex index 9342146..e773e6f 100644 --- a/lib/modbuzz/tcp/server/socket_handler.ex +++ b/lib/modbuzz/tcp/server/socket_handler.ex @@ -38,11 +38,8 @@ defmodule Modbuzz.TCP.Server.SocketHandler do case transport.recv(socket, _length = 0, timeout) do {:ok, binary} -> - for adu <- Modbuzz.TCP.ADU.decode(binary, []) do - {:ok, request} = Modbuzz.PDU.decode_request(adu.pdu) - - request(data_source, adu.unit_id, request, timeout) - |> Modbuzz.PDU.encode() + for {:ok, adu} <- Modbuzz.TCP.ADU.decode_request(binary, []) do + request(data_source, adu.unit_id, adu.pdu, timeout) |> Modbuzz.TCP.ADU.new(adu.transaction_id, adu.unit_id) |> Modbuzz.TCP.ADU.encode() end diff --git a/test/modbuzz/tcp/client_test.exs b/test/modbuzz/tcp/client_test.exs index ba14266..619121e 100644 --- a/test/modbuzz/tcp/client_test.exs +++ b/test/modbuzz/tcp/client_test.exs @@ -513,7 +513,6 @@ defmodule Modbuzz.TCP.ClientTest do else %Modbuzz.PDU.ReadCoils.Res{byte_count: 0x02, coil_status: List.duplicate(false, 16)} end - |> Modbuzz.PDU.encode() |> Modbuzz.TCP.ADU.new(transaction_id, _unit_id = 0) |> Modbuzz.TCP.ADU.encode() end @@ -524,7 +523,6 @@ defmodule Modbuzz.TCP.ClientTest do else %Modbuzz.PDU.WriteSingleCoil.Res{output_address: 0x0016, output_value: true} end - |> Modbuzz.PDU.encode() |> Modbuzz.TCP.ADU.new(transaction_id, _unit_id = 0) |> Modbuzz.TCP.ADU.encode() end From 4e699b9ac108f74fdcd57f413443033343c80ba1 Mon Sep 17 00:00:00 2001 From: Ryota Kinukawa Date: Tue, 19 May 2026 16:54:52 +0900 Subject: [PATCH 03/19] Change Modbuzz.TCP.Client logic --- lib/modbuzz/tcp/client.ex | 336 ++++++++------------ test/modbuzz/tcp/client_test.exs | 516 ++++++++----------------------- 2 files changed, 265 insertions(+), 587 deletions(-) diff --git a/lib/modbuzz/tcp/client.ex b/lib/modbuzz/tcp/client.ex index f98294f..c3a52eb 100644 --- a/lib/modbuzz/tcp/client.ex +++ b/lib/modbuzz/tcp/client.ex @@ -3,15 +3,25 @@ defmodule Modbuzz.TCP.Client do use GenServer + alias Modbuzz.TCP.ADU alias Modbuzz.TCP.Log + @unit_id_max 255 + + defguardp is_valid_unit_id(unit_id) when unit_id >= 0 and unit_id <= @unit_id_max + defmodule Transaction do @moduledoc false - defstruct [:unit_id, :request, :from_pid, :sent_time] + @type t :: %__MODULE__{ + call_or_cast: :call | :cast, + unit_id: 0x00..0xFF, + request: Modbuzz.PDU.Protocol.t(), + from_or_pid: GenServer.from() | pid() | nil, + ref: reference() + } + defstruct [:call_or_cast, :unit_id, :request, :from_or_pid, :ref] end - alias Modbuzz.TCP.ADU - @doc """ Starts a #{__MODULE__} GenServer process linked to the current process. @@ -24,22 +34,20 @@ defmodule Modbuzz.TCP.Client do * `:port` - passed through to `:gen_tcp.connect/4` - * `:active` - passed through to `:gen_tcp.connect/4` - ## Examples - iex> Modbuzz.TCP.Client.start_link([address: {192, 168, 0, 123}, port: 502, active: false]) + iex> Modbuzz.TCP.Client.start_link([name: :client, address: {192, 168, 0, 123}, port: 502]) """ @spec start_link(keyword()) :: GenServer.on_start() def start_link(args) when is_list(args) do name = Keyword.get(args, :name, __MODULE__) + args = Keyword.put_new(args, :name, name) GenServer.start_link(__MODULE__, args, name: name) end @doc """ Makes a synchronous call to the server and waits for a response. - Only available when active: false. The response type is `{:ok, %Res{}}` or `{:error, %Err{} | reason :: term()}`. @@ -56,7 +64,7 @@ defmodule Modbuzz.TCP.Client do GenServer.server(), unit_id :: 0x00..0xFF, request :: Modbuzz.PDU.Protocol.t(), - timeout() + non_neg_integer() ) :: {:ok, response :: term()} | {:error, reason :: term()} def call(name \\ __MODULE__, unit_id \\ 0, request, timeout \\ 5000) @@ -66,7 +74,6 @@ defmodule Modbuzz.TCP.Client do @doc """ Casts a request to the server without waiting for a response. - Only available when active: true. This function always returns :ok regardless of whether the destination server (or node) exists. Therefore it is unknown whether the destination server successfully handled the request. @@ -92,203 +99,135 @@ defmodule Modbuzz.TCP.Client do GenServer.server(), unit_id :: 0x00..0xFF, request :: Modbuzz.PDU.Protocol.t(), - pid() + pid(), + non_neg_integer() ) :: :ok - def cast(name \\ __MODULE__, unit_id \\ 0, request, from_pid \\ self()) - when unit_id in 0x00..0xFF and is_struct(request) and is_pid(from_pid) do - GenServer.cast(name, {:cast, unit_id, request, from_pid}) + def cast(name \\ __MODULE__, unit_id \\ 0, request, pid \\ self(), timeout \\ 5000) + when unit_id in 0x00..0xFF and is_struct(request) and is_pid(pid) and is_integer(timeout) do + GenServer.cast(name, {:cast, unit_id, request, pid, timeout}) end @impl true def init(args) when is_list(args) do + client_name = Keyword.fetch!(args, :name) transport = Keyword.get(args, :transport, :gen_tcp) address = Keyword.get(args, :address, {192, 168, 5, 57}) port = Keyword.get(args, :port, 502) - active = Keyword.get(args, :active, false) state = %{ + client_name: client_name, transport: transport, address: address, port: port, - active: active, socket: nil, transaction_id: 0, transactions: %{} } - {:ok, state, {:continue, :connect}} + {:ok, state} end @impl true - def handle_continue(:connect, %{socket: nil} = state) do - case gen_tcp_connect(state) do - {:ok, socket} -> - Log.debug(":connect succeeded", state) - {:noreply, %{state | socket: socket}} - - {:error, reason} -> - Log.error(":connect failed", reason, state) - {:noreply, state, {:continue, :connect}} - end + def handle_call({:call, unit_id, request, timeout}, from, state) + when is_valid_unit_id(unit_id) do + handle_req({:call, unit_id, request, from, timeout}, state) end - def handle_continue({:recall, unit_id, request, timeout, from}, %{socket: nil} = state) do - %{transport: transport, transaction_id: transaction_id} = state - - transaction_id = ADU.increment_transaction_id(transaction_id) - adu = request |> ADU.new(transaction_id, unit_id) |> ADU.encode() - - state = %{state | transaction_id: transaction_id} - - with {:connect, {:ok, socket}} <- {:connect, gen_tcp_connect(state)}, - {:send, :ok} <- {:send, transport.send(socket, adu)}, - {:recv, {:ok, binary}} <- {:recv, transport.recv(socket, _length = 0, timeout)} do - [res_tuple] = - Enum.map( - ADU.decode_response(binary, []), - fn {ok_or_error, %ADU{transaction_id: ^transaction_id, pdu: pdu}} -> - {ok_or_error, pdu} - end - ) - - GenServer.reply(from, res_tuple) - {:noreply, %{state | socket: socket}} - else - {:connect, {:error, reason} = error} -> - Log.error(":recall connect failed", reason, state) - GenServer.reply(from, error) - {:noreply, state, {:continue, :connect}} - - {:send, {:error, reason} = error} -> - Log.error(":recall send failed", reason, state) - GenServer.reply(from, error) - {:noreply, state, {:continue, :connect}} - - {:recv, {:error, reason} = error} -> - Log.error(":recall recv failed", reason, state) - GenServer.reply(from, error) - {:noreply, state, {:continue, :connect}} - end + @impl true + def handle_cast({:cast, unit_id, request, pid, timeout}, state) + when is_valid_unit_id(unit_id) and is_pid(pid) do + handle_req({:cast, unit_id, request, pid, timeout}, state) end - def handle_continue({:recast, unit_id, request, from_pid}, %{socket: nil} = state) do + defp handle_req({call_or_cast, unit_id, request, from_or_pid, timeout}, state) do %{ + client_name: client_name, transport: transport, transaction_id: transaction_id, transactions: transactions } = state transaction_id = ADU.increment_transaction_id(transaction_id) - adu = request |> ADU.new(transaction_id, unit_id) |> ADU.encode() - - state = %{state | transaction_id: transaction_id} - with {:connect, {:ok, socket}} <- {:connect, gen_tcp_connect(state)}, - {:send, :ok} <- {:send, transport.send(socket, adu)} do - transaction = %Transaction{ - unit_id: unit_id, - request: request, - from_pid: from_pid, - sent_time: System.monotonic_time(:millisecond) - } - - transactions = Map.put(transactions, transaction_id, transaction) - {:noreply, %{state | socket: socket, transactions: transactions}} - else - {:connect, {:error, reason}} -> - Log.error(":recast connect failed", reason, state) - {:noreply, state, {:continue, :connect}} - - {:send, {:error, reason}} -> - Log.error(":recast send failed", reason, state) - {:noreply, state, {:continue, :connect}} - end - end - - @impl true - def handle_call({:call, unit_id, request, timeout}, from, %{active: false} = state) do - %{transport: transport, socket: socket, transaction_id: transaction_id} = state - - transaction_id = ADU.increment_transaction_id(transaction_id) - adu = request |> ADU.new(transaction_id, unit_id) |> ADU.encode() - - state = %{state | transaction_id: transaction_id} - - with {:send, :ok} <- {:send, transport.send(socket, adu)}, - {:recv, {:ok, binary}} <- {:recv, transport.recv(socket, _length = 0, timeout)} do - [res_tuple] = - Enum.map( - ADU.decode_response(binary, []), - fn {ok_or_error, %ADU{transaction_id: ^transaction_id, pdu: pdu}} -> - {ok_or_error, pdu} - end - ) - - {:reply, res_tuple, state} - else - {:send, {:error, reason}} -> - Log.warning(":call send failed, :recall", reason, state) + transaction = %Transaction{ + call_or_cast: call_or_cast, + unit_id: unit_id, + request: request, + from_or_pid: from_or_pid, + ref: make_ref() + } - :ok = transport.close(socket) - state = %{state | socket: nil} - {:noreply, state, {:continue, {:recall, unit_id, request, timeout, from}}} + timer = Process.send_after(self(), {:timeout?, transaction_id, transaction}, timeout) - {:recv, {:error, reason}} -> - Log.warning(":call recv failed, :recall", reason, state) + case connect(state, timeout) do + {:ok, socket} -> + binary = request |> ADU.new(transaction_id, unit_id) |> ADU.encode() + + case transport.send(socket, binary) do + :ok -> + transactions = Map.put(transactions, transaction_id, transaction) + + {:noreply, + %{ + state + | socket: socket, + transaction_id: transaction_id, + transactions: transactions + }} + + {:error, reason} -> + Log.error("#{inspect(transport)} send error", reason, state) + Process.cancel_timer(timer) + :ok = transport.close(socket) + res_tuple = {:error, :tcp_send_error} + maybe_report_response(transaction, client_name, res_tuple) + {:noreply, %{state | socket: nil}} + end - :ok = transport.close(socket) - state = %{state | socket: nil} - {:noreply, state, {:continue, {:recall, unit_id, request, timeout, from}}} + {:error, reason} -> + Log.error("#{inspect(transport)} connect error", reason, state) + Process.cancel_timer(timer) + res_tuple = {:error, :tcp_connect_error} + maybe_report_response(transaction, client_name, res_tuple) + {:noreply, %{state | socket: nil}} end end - def handle_call({:call, _unit_id, _request, _timeout}, _from, %{active: true} = _state) do - raise RuntimeError, message: "call can't be used when active is true." - end - @impl true - def handle_cast({:cast, unit_id, request, from_pid}, %{active: true} = state) do + def handle_info({:timeout?, transaction_id, %Transaction{ref: ref}}, state) do %{ - transport: transport, - socket: socket, - transaction_id: transaction_id, + client_name: client_name, transactions: transactions } = state - transaction_id = ADU.increment_transaction_id(transaction_id) - adu = request |> ADU.new(transaction_id, unit_id) |> ADU.encode() - - state = %{state | transaction_id: transaction_id} + case Map.get(transactions, transaction_id) do + # already responded + nil -> + {:noreply, state} - case transport.send(socket, adu) do - :ok -> - transaction = %Transaction{ - unit_id: unit_id, - request: request, - from_pid: from_pid, - sent_time: System.monotonic_time(:millisecond) - } + # timeout for the current request, report timeout error + %Transaction{request: request, ref: ref_} = transaction when ref_ == ref -> + Log.error("TCP server didn't respond for #{inspect(request)}", nil, state) + res_tuple = {:error, :timeout} - transactions = Map.put(transactions, transaction_id, transaction) - {:noreply, %{state | transactions: transactions}} + maybe_report_response(transaction, client_name, res_tuple) - {:error, reason} -> - Log.warning(":cast send failed, :recast", reason, state) + {:noreply, %{state | transactions: Map.put(transactions, transaction_id, nil)}} - :ok = transport.close(socket) - state = %{state | socket: nil} - {:noreply, state, {:continue, {:recast, unit_id, request, from_pid}}} + # the current request is different, do not report timeout error + # this means the request that triggered this timeout message has already been responded to + # and a new request for the same transaction_id has been sent after that + %Transaction{ref: ref_} when ref_ != ref -> + {:noreply, state} end end - def handle_cast({:cast, _unit_id, _request, _from_pid}, %{active: false} = _state) do - raise RuntimeError, message: "cast can't be used when active is false." - end - - @impl true def handle_info({:tcp, socket, binary}, %{socket: socket} = state) do - %{transactions: transactions} = state + %{ + client_name: client_name, + transactions: transactions + } = state transactions = ADU.decode_response(binary, []) @@ -297,17 +236,7 @@ defmodule Modbuzz.TCP.Client do fn {ok_or_error, %ADU{transaction_id: transaction_id, pdu: pdu}}, acc -> {transaction, acc} = Map.pop(acc, transaction_id) res_tuple = {ok_or_error, pdu} - - send( - transaction.from_pid, - { - :modbuzz, - transaction.unit_id, - transaction.request, - res_tuple - } - ) - + maybe_report_response(transaction, client_name, res_tuple) acc end ) @@ -315,51 +244,54 @@ defmodule Modbuzz.TCP.Client do {:noreply, %{state | transactions: transactions}} end - def handle_info({:tcp_closed, socket}, %{socket: socket, active: true} = state) do - %{transport: transport, transactions: transactions} = state - - Log.warning("transport closed.", nil, state) + def handle_info({:tcp_closed, socket}, %{socket: socket} = state) do + %{transport: transport} = state + Log.warning("#{inspect(transport)} closed", nil, state) :ok = transport.close(socket) - state = %{state | socket: nil} - - transactions - |> Enum.filter(fn {_transaction_id, transaction} -> - System.monotonic_time(:millisecond) - transaction.sent_time < 10 - end) - |> case do - [] -> - {:noreply, state, {:continue, :connect}} - - [{_transaction_id, transaction}] -> - Log.debug("sent successfully but RST ACK received, :recast", state) - - {:noreply, state, - {:continue, {:recast, transaction.unit_id, transaction.request, transaction.from_pid}}} - end + {:noreply, %{state | socket: nil}} end - def handle_info({:tcp_error, socket, reason}, %{socket: socket, active: true} = state) do + def handle_info({:tcp_error, socket, reason}, %{socket: socket} = state) do %{transport: transport} = state Log.error("transport error", reason, state) :ok = transport.close(socket) - {:noreply, %{state | socket: nil}, {:continue, :connect}} + {:noreply, %{state | socket: nil}} end - defp gen_tcp_connect(state) do - %{transport: transport, address: address, port: port, active: active} = state - - transport.connect( - address, - port, - [ - mode: :binary, - packet: :raw, - active: active, - keepalive: true, - nodelay: true, - reuseaddr: true - ], - _timeout = 3000 - ) + defp connect(state, timeout) do + %{transport: transport, address: address, port: port, socket: socket} = state + + if is_nil(socket) do + transport.connect( + address, + port, + [ + mode: :binary, + packet: :raw, + active: true, + keepalive: true, + nodelay: true, + reuseaddr: true + ], + timeout + ) + else + {:ok, socket} + end + end + + defp maybe_report_response(transaction, client_name, res_tuple) do + case transaction do + nil -> + :noop + + %{call_or_cast: :call, unit_id: _unit_id, request: _request, from_or_pid: from} + when is_tuple(from) -> + GenServer.reply(from, res_tuple) + + %{call_or_cast: :cast, unit_id: unit_id, request: request, from_or_pid: pid} + when is_pid(pid) -> + send(pid, {:modbuzz, client_name, unit_id, request, res_tuple}) + end end end diff --git a/test/modbuzz/tcp/client_test.exs b/test/modbuzz/tcp/client_test.exs index 619121e..4b7361f 100644 --- a/test/modbuzz/tcp/client_test.exs +++ b/test/modbuzz/tcp/client_test.exs @@ -9,520 +9,266 @@ defmodule Modbuzz.TCP.ClientTest do setup :set_mox_global describe "start_link/1" do - setup do - %{parent: self(), ref: make_ref()} - end - - test "connect/4 succeeded", %{parent: parent, ref: ref} do - Modbuzz.TCP.TransportMock - |> expect(:connect, fn _, _, _, _ -> - send(parent, {ref, :connect}) - {:ok, _dummy_port = make_ref()} - end) - - assert {:ok, pid} = Modbuzz.TCP.Client.start_link(transport: Modbuzz.TCP.TransportMock) - assert_receive({^ref, :connect}) - GenServer.stop(pid) - end - - test "connect/4 succeeded after failed", %{parent: parent, ref: ref} do - Modbuzz.TCP.TransportMock - |> expect(:connect, fn _, _, _, _ -> {:error, :timeout} end) - |> expect(:connect, fn _, _, _, _ -> - send(parent, {ref, :connect}) - {:ok, _dummy_port = make_ref()} - end) - - assert {:ok, pid} = Modbuzz.TCP.Client.start_link(transport: Modbuzz.TCP.TransportMock) - assert_receive({^ref, :connect}) - GenServer.stop(pid) + test "return :ok tuple" do + assert {:ok, _pid} = Modbuzz.TCP.Client.start_link(transport: Modbuzz.TCP.TransportMock) end end describe "call" do setup do - %{parent: self(), ref: make_ref()} - end - - test "raise when active: true" do - Process.flag(:trap_exit, true) - - Modbuzz.TCP.TransportMock - |> expect(:connect, fn _, _, _, _ -> {:ok, _dummy_port = make_ref()} end) - - pid = - start_link_supervised!( - {Modbuzz.TCP.Client, transport: Modbuzz.TCP.TransportMock, active: true}, - restart: :temporary - ) + req = %Modbuzz.PDU.ReadCoils.Req{starting_address: 0, quantity_of_coils: 16} + res = %Modbuzz.PDU.ReadCoils.Res{byte_count: 0x02, coil_status: List.duplicate(false, 16)} + res_err = %Modbuzz.PDU.ReadCoils.Err{exception_code: 0x01} - catch_exit( - Modbuzz.TCP.Client.call(%Modbuzz.PDU.ReadCoils.Req{ - starting_address: 0, - quantity_of_coils: 16 - }) - ) - - assert_receive({:EXIT, ^pid, {%RuntimeError{message: _message}, _}}) + %{req: req, res: res, res_err: res_err} end - test "return :ok tuple", %{parent: parent, ref: ref} do - Modbuzz.TCP.TransportMock - |> expect(:connect, fn _, _, _, _ -> {:ok, _dummy_port = make_ref()} end) - |> expect(:send, fn _, _ -> :ok end) - |> expect(:recv, fn _, _, _ -> - send(parent, {ref, :recv}) - {:ok, read_coils_recv_adu(1)} - end) - - start_link_supervised!( - {Modbuzz.TCP.Client, transport: Modbuzz.TCP.TransportMock, active: false}, - restart: :temporary - ) - - assert Modbuzz.TCP.Client.call(%Modbuzz.PDU.ReadCoils.Req{ - starting_address: 0, - quantity_of_coils: 16 - }) == - {:ok, - %Modbuzz.PDU.ReadCoils.Res{ - byte_count: 0x02, - coil_status: List.duplicate(false, 16) - }} - - assert_receive({^ref, :recv}) - end + test "return :ok tuple", %{req: req, res: res} do + dummy_socket = make_ref() - test "return :ok tuple, 1st send failed", %{parent: parent, ref: ref} do Modbuzz.TCP.TransportMock - |> expect(:connect, fn _, _, _, _ -> {:ok, _dummy_port = make_ref()} end) - |> expect(:send, fn _, _ -> {:error, :closed} end) - |> expect(:close, fn _ -> :ok end) - |> expect(:connect, fn _, _, _, _ -> {:ok, _dummy_port = make_ref()} end) - |> expect(:send, fn _, _ -> :ok end) - |> expect(:recv, fn _, _, _ -> - send(parent, {ref, :recv}) - {:ok, read_coils_recv_adu(2)} - end) - - start_link_supervised!( - {Modbuzz.TCP.Client, transport: Modbuzz.TCP.TransportMock, active: false}, - restart: :temporary - ) - - assert Modbuzz.TCP.Client.call(%Modbuzz.PDU.ReadCoils.Req{ - starting_address: 0, - quantity_of_coils: 16 - }) == - {:ok, - %Modbuzz.PDU.ReadCoils.Res{ - byte_count: 0x02, - coil_status: List.duplicate(false, 16) - }} - - assert_receive({^ref, :recv}) - end - - test "return :ok tuple, 1st recv failed", %{parent: parent, ref: ref} do - Modbuzz.TCP.TransportMock - |> expect(:connect, fn _, _, _, _ -> {:ok, _dummy_port = make_ref()} end) - |> expect(:send, fn _, _ -> :ok end) - |> expect(:recv, fn _, _, _ -> {:error, :closed} end) - |> expect(:close, fn _ -> :ok end) - |> expect(:connect, fn _, _, _, _ -> {:ok, _dummy_port = make_ref()} end) - |> expect(:send, fn _, _ -> :ok end) - |> expect(:recv, fn _, _, _ -> - send(parent, {ref, :recv}) - {:ok, read_coils_recv_adu(2)} + |> expect(:connect, fn _, _, _, _ -> {:ok, dummy_socket} end) + |> expect(:send, fn _, _ -> + send(Modbuzz.TCP.Client, {:tcp, dummy_socket, to_binary(res)}) + :ok end) start_link_supervised!( - {Modbuzz.TCP.Client, transport: Modbuzz.TCP.TransportMock, active: false}, + {Modbuzz.TCP.Client, transport: Modbuzz.TCP.TransportMock}, restart: :temporary ) - assert Modbuzz.TCP.Client.call(%Modbuzz.PDU.ReadCoils.Req{ - starting_address: 0, - quantity_of_coils: 16 - }) == - {:ok, - %Modbuzz.PDU.ReadCoils.Res{ - byte_count: 0x02, - coil_status: List.duplicate(false, 16) - }} - - assert_receive({^ref, :recv}) + assert Modbuzz.TCP.Client.call(req) == {:ok, res} end - test "return :error tuple, modbus error", %{parent: parent, ref: ref} do + test "return :error tuple by connect error", %{req: req} do Modbuzz.TCP.TransportMock - |> expect(:connect, fn _, _, _, _ -> {:ok, _dummy_port = make_ref()} end) - |> expect(:send, fn _, _ -> :ok end) - |> expect(:recv, fn _, _, _ -> - send(parent, {ref, :recv}) - {:ok, read_coils_recv_adu(1, _error = true)} - end) - - start_link_supervised!( - {Modbuzz.TCP.Client, transport: Modbuzz.TCP.TransportMock, active: false}, - restart: :temporary - ) - - assert Modbuzz.TCP.Client.call(%Modbuzz.PDU.ReadCoils.Req{ - starting_address: 0, - quantity_of_coils: 16 - }) == {:error, %Modbuzz.PDU.ReadCoils.Err{exception_code: 0x01}} - - assert_receive({^ref, :recv}) - end - - test "return :error tuple, 2nd connnect failed", %{parent: parent, ref: ref} do - Modbuzz.TCP.TransportMock - |> expect(:connect, fn _, _, _, _ -> {:ok, _dummy_port = make_ref()} end) - |> expect(:send, fn _, _ -> :ok end) - |> expect(:recv, fn _, _, _ -> {:error, :closed} end) - |> expect(:close, fn _ -> :ok end) |> expect(:connect, fn _, _, _, _ -> {:error, :timeout} end) - # confirm {:continue, :connect} - |> expect(:connect, fn _, _, _, _ -> - send(parent, {ref, :connect}) - {:ok, _dummy_port = make_ref()} - end) start_link_supervised!( - {Modbuzz.TCP.Client, transport: Modbuzz.TCP.TransportMock, active: false}, + {Modbuzz.TCP.Client, transport: Modbuzz.TCP.TransportMock}, restart: :temporary ) - assert Modbuzz.TCP.Client.call(%Modbuzz.PDU.ReadCoils.Req{ - starting_address: 0, - quantity_of_coils: 16 - }) == {:error, :timeout} - - assert_receive({^ref, :connect}) + assert Modbuzz.TCP.Client.call(req) == {:error, :tcp_connect_error} end - test "return :error tuple, 2nd send failed", %{parent: parent, ref: ref} do + test "return :error tuple by send error", %{req: req} do Modbuzz.TCP.TransportMock - |> expect(:connect, fn _, _, _, _ -> {:ok, _dummy_port = make_ref()} end) - |> expect(:send, fn _, _ -> :ok end) - |> expect(:recv, fn _, _, _ -> {:error, :closed} end) - |> expect(:close, fn _ -> :ok end) - |> expect(:connect, fn _, _, _, _ -> {:ok, _dummy_port = make_ref()} end) + |> expect(:connect, fn _, _, _, _ -> {:ok, _dummy_socket = make_ref()} end) |> expect(:send, fn _, _ -> {:error, :closed} end) - |> expect(:connect, fn _, _, _, _ -> - send(parent, {ref, :send}) - {:ok, _dummy_port = make_ref()} - end) + |> expect(:close, fn _ -> :ok end) start_link_supervised!( - {Modbuzz.TCP.Client, transport: Modbuzz.TCP.TransportMock, active: false}, + {Modbuzz.TCP.Client, transport: Modbuzz.TCP.TransportMock}, restart: :temporary ) - assert Modbuzz.TCP.Client.call(%Modbuzz.PDU.ReadCoils.Req{ - starting_address: 0, - quantity_of_coils: 16 - }) == {:error, :closed} - - assert_receive({^ref, :send}) + assert Modbuzz.TCP.Client.call(req) == {:error, :tcp_send_error} end - test "return :error tuple, 2nd recv failed", %{parent: parent, ref: ref} do + test "return :error tuple, modbus error", %{req: req, res_err: res_err} do + dummy_socket = make_ref() + Modbuzz.TCP.TransportMock - |> expect(:connect, fn _, _, _, _ -> {:ok, _dummy_port = make_ref()} end) - |> expect(:send, fn _, _ -> :ok end) - |> expect(:recv, fn _, _, _ -> {:error, :closed} end) - |> expect(:close, fn _ -> :ok end) - |> expect(:connect, fn _, _, _, _ -> {:ok, _dummy_port = make_ref()} end) - |> expect(:send, fn _, _ -> :ok end) - |> expect(:recv, fn _, _, _ -> {:error, :closed} end) - |> expect(:connect, fn _, _, _, _ -> - send(parent, {ref, :recv}) - {:ok, _dummy_port = make_ref()} + |> expect(:connect, fn _, _, _, _ -> {:ok, dummy_socket} end) + |> expect(:send, fn _, _ -> + send(Modbuzz.TCP.Client, {:tcp, dummy_socket, to_binary(res_err)}) + :ok end) start_link_supervised!( - {Modbuzz.TCP.Client, transport: Modbuzz.TCP.TransportMock, active: false}, + {Modbuzz.TCP.Client, transport: Modbuzz.TCP.TransportMock}, restart: :temporary ) - assert Modbuzz.TCP.Client.call(%Modbuzz.PDU.ReadCoils.Req{ - starting_address: 0, - quantity_of_coils: 16 - }) == {:error, :closed} - - assert_receive({^ref, :recv}) + assert Modbuzz.TCP.Client.call(req) == {:error, res_err} end end describe "cast" do setup do - %{parent: self(), ref: make_ref()} - end - - test "raise when active: false" do - Process.flag(:trap_exit, true) - - Modbuzz.TCP.TransportMock - |> expect(:connect, fn _, _, _, _ -> {:ok, _dummy_port = make_ref()} end) - - pid = - start_link_supervised!( - {Modbuzz.TCP.Client, transport: Modbuzz.TCP.TransportMock, active: false}, - restart: :temporary - ) + req = %Modbuzz.PDU.ReadCoils.Req{starting_address: 0, quantity_of_coils: 16} + res = %Modbuzz.PDU.ReadCoils.Res{byte_count: 0x02, coil_status: List.duplicate(false, 16)} - Modbuzz.TCP.Client.cast(%Modbuzz.PDU.ReadCoils.Req{ - starting_address: 0, - quantity_of_coils: 16 - }) - - assert_receive({:EXIT, ^pid, {%RuntimeError{message: _message}, _}}) + %{parent: self(), ref: make_ref(), req: req, res: res} end - test "return :ok", %{parent: parent, ref: ref} do - Modbuzz.TCP.TransportMock - |> expect(:connect, fn _, _, _, _ -> {:ok, _dummy_port = make_ref()} end) - |> expect(:send, fn _, _ -> - send(parent, {ref, :send}) - :ok - end) - - start_link_supervised!( - {Modbuzz.TCP.Client, transport: Modbuzz.TCP.TransportMock, active: true}, - restart: :temporary - ) - - assert Modbuzz.TCP.Client.cast(%Modbuzz.PDU.ReadCoils.Req{ - starting_address: 0, - quantity_of_coils: 16 - }) == :ok + test "return :ok", %{req: req, res: res} do + dummy_socket = make_ref() - assert_receive({^ref, :send}) - end - - test "return :ok, 1st send failed", %{parent: parent, ref: ref} do Modbuzz.TCP.TransportMock - |> expect(:connect, fn _, _, _, _ -> {:ok, _dummy_port = make_ref()} end) - |> expect(:send, fn _, _ -> {:error, :closed} end) - |> expect(:close, fn _ -> :ok end) - |> expect(:connect, fn _, _, _, _ -> {:ok, _dummy_port = make_ref()} end) + |> expect(:connect, fn _, _, _, _ -> {:ok, dummy_socket} end) |> expect(:send, fn _, _ -> - send(parent, {ref, :send}) + send(Modbuzz.TCP.Client, {:tcp, dummy_socket, to_binary(res)}) :ok end) start_link_supervised!( - {Modbuzz.TCP.Client, transport: Modbuzz.TCP.TransportMock, active: true}, + {Modbuzz.TCP.Client, transport: Modbuzz.TCP.TransportMock}, restart: :temporary ) - assert Modbuzz.TCP.Client.cast(%Modbuzz.PDU.ReadCoils.Req{ - starting_address: 0, - quantity_of_coils: 16 - }) == :ok - - assert_receive({^ref, :send}) + assert Modbuzz.TCP.Client.cast(req) == :ok + assert_receive({:modbuzz, Modbuzz.TCP.Client, _unit_id = 0, ^req, {:ok, ^res}}) end - test "return :error tuple, 2nd connect failed", %{parent: parent, ref: ref} do + test "return :ok, connect error", %{req: req} do Modbuzz.TCP.TransportMock - |> expect(:connect, fn _, _, _, _ -> {:ok, _dummy_port = make_ref()} end) - |> expect(:send, fn _, _ -> {:error, :closed} end) - |> expect(:close, fn _ -> :ok end) |> expect(:connect, fn _, _, _, _ -> {:error, :timeout} end) - # confirm {:continue, :connect} - |> expect(:connect, fn _, _, _, _ -> - send(parent, {ref, :connect}) - {:ok, _dummy_port = make_ref()} - end) start_link_supervised!( - {Modbuzz.TCP.Client, transport: Modbuzz.TCP.TransportMock, active: true}, + {Modbuzz.TCP.Client, transport: Modbuzz.TCP.TransportMock}, restart: :temporary ) - assert Modbuzz.TCP.Client.cast(%Modbuzz.PDU.ReadCoils.Req{ - starting_address: 0, - quantity_of_coils: 16 - }) == :ok + assert Modbuzz.TCP.Client.cast(req) == :ok - assert_receive({^ref, :connect}) + assert_receive( + {:modbuzz, Modbuzz.TCP.Client, _unit_id = 0, ^req, {:error, :tcp_connect_error}} + ) end - test "return :error tuple, 2nd send failed", %{parent: parent, ref: ref} do + test "return :ok, send error", %{req: req} do Modbuzz.TCP.TransportMock |> expect(:connect, fn _, _, _, _ -> {:ok, _dummy_port = make_ref()} end) |> expect(:send, fn _, _ -> {:error, :closed} end) |> expect(:close, fn _ -> :ok end) - |> expect(:connect, fn _, _, _, _ -> {:ok, _dummy_port = make_ref()} end) - |> expect(:send, fn _, _ -> {:error, :closed} end) - |> expect(:connect, fn _, _, _, _ -> - send(parent, {ref, :send}) - {:ok, _dummy_port = make_ref()} - end) start_link_supervised!( - {Modbuzz.TCP.Client, transport: Modbuzz.TCP.TransportMock, active: true}, + {Modbuzz.TCP.Client, transport: Modbuzz.TCP.TransportMock}, restart: :temporary ) - assert Modbuzz.TCP.Client.cast(%Modbuzz.PDU.ReadCoils.Req{ - starting_address: 0, - quantity_of_coils: 16 - }) == :ok + assert Modbuzz.TCP.Client.cast(req) == :ok - assert_receive({^ref, :send}) + assert_receive( + {:modbuzz, Modbuzz.TCP.Client, _unit_id = 0, ^req, {:error, :tcp_send_error}} + ) end + end - test "message {:tcp, socket, binary}" do - dummy_port = make_ref() - - Modbuzz.TCP.TransportMock - |> expect(:connect, fn _, _, _, _ -> {:ok, dummy_port} end) - |> expect(:send, fn _, _ -> :ok end) - + describe "tcp messages" do + setup do pid = start_link_supervised!( - {Modbuzz.TCP.Client, transport: Modbuzz.TCP.TransportMock, active: true}, + {Modbuzz.TCP.Client, transport: Modbuzz.TCP.TransportMock}, restart: :temporary ) - request = %Modbuzz.PDU.ReadCoils.Req{starting_address: 0, quantity_of_coils: 16} - assert Modbuzz.TCP.Client.cast(request) == :ok - - send(pid, {:tcp, dummy_port, read_coils_recv_adu(1)}) + req = %Modbuzz.PDU.ReadCoils.Req{starting_address: 0, quantity_of_coils: 16} + res = %Modbuzz.PDU.ReadCoils.Res{byte_count: 0x02, coil_status: List.duplicate(false, 16)} - assert_receive({:modbuzz, 0, ^request, {:ok, _}}) + %{pid: pid, req: req, res: res} end - test "message {:tcp, socket, binary}, recv two messages at once" do - dummy_port = make_ref() + test "message {:tcp, socket, binary}", %{ + pid: pid, + req: req, + res: res + } do + dummy_socket = make_ref() Modbuzz.TCP.TransportMock - |> expect(:connect, fn _, _, _, _ -> {:ok, dummy_port} end) - |> expect(:send, fn _, _ -> :ok end) + |> expect(:connect, fn _, _, _, _ -> {:ok, dummy_socket} end) |> expect(:send, fn _, _ -> :ok end) - pid = - start_link_supervised!( - {Modbuzz.TCP.Client, transport: Modbuzz.TCP.TransportMock, active: true}, - restart: :temporary - ) - - request_1 = %Modbuzz.PDU.ReadCoils.Req{starting_address: 0, quantity_of_coils: 16} - assert Modbuzz.TCP.Client.cast(request_1) == :ok - request_2 = %Modbuzz.PDU.WriteSingleCoil.Req{output_address: 16, output_value: true} - assert Modbuzz.TCP.Client.cast(request_2) == :ok - - send(pid, {:tcp, dummy_port, read_coils_recv_adu(1) <> write_single_coil_recv_adu(2)}) + :ok = Modbuzz.TCP.Client.cast(req) - assert_receive({:modbuzz, 0, ^request_1, {:ok, _}}) - assert_receive({:modbuzz, 0, ^request_2, {:ok, _}}) + send(pid, {:tcp, dummy_socket, to_binary(res)}) + assert_receive({:modbuzz, Modbuzz.TCP.Client, _unit_id = 0, ^req, {:ok, ^res}}) end - test "message {:tcp, socket, binary}, {:tcp_closed, socket}", %{parent: parent, ref: ref} do - dummy_port = make_ref() + test "message {:tcp, socket, binary}, recv two messages at once", %{ + pid: pid, + req: req, + res: res + } do + dummy_socket = make_ref() Modbuzz.TCP.TransportMock - |> expect(:connect, fn _, _, _, _ -> {:ok, dummy_port} end) + |> expect(:connect, fn _, _, _, _ -> {:ok, dummy_socket} end) + |> expect(:send, fn _, _ -> :ok end) |> expect(:send, fn _, _ -> :ok end) - |> expect(:close, fn _ -> :ok end) - # confirm {:continue, :connect} - |> expect(:connect, fn _, _, _, _ -> - send(parent, {ref, :connect}) - {:ok, dummy_port} - end) - - pid = - start_link_supervised!( - {Modbuzz.TCP.Client, transport: Modbuzz.TCP.TransportMock, active: true}, - restart: :temporary - ) - - request = %Modbuzz.PDU.ReadCoils.Req{starting_address: 0, quantity_of_coils: 16} - assert Modbuzz.TCP.Client.cast(request) == :ok - - send(pid, {:tcp, dummy_port, read_coils_recv_adu(1)}) - assert_receive({:modbuzz, 0, ^request, {:ok, _}}) + :ok = Modbuzz.TCP.Client.cast(req) + :ok = Modbuzz.TCP.Client.cast(req) - send(pid, {:tcp_closed, dummy_port}) - assert_receive({^ref, :connect}) + send(pid, {:tcp, dummy_socket, to_binary(res, 1) <> to_binary(res, 2)}) + assert_receive({:modbuzz, Modbuzz.TCP.Client, _unit_id = 0, ^req, {:ok, ^res}}) + assert_receive({:modbuzz, Modbuzz.TCP.Client, _unit_id = 0, ^req, {:ok, ^res}}) end - test "message {:tcp_closed, socket}", %{parent: parent, ref: ref} do - dummy_port = make_ref() + test "message {:tcp, socket, binary}, {:tcp_closed, socket}", %{ + pid: pid, + req: req, + res: res + } do + dummy_socket = make_ref() + me = self() Modbuzz.TCP.TransportMock - |> expect(:connect, fn _, _, _, _ -> {:ok, dummy_port} end) + |> expect(:connect, fn _, _, _, _ -> {:ok, dummy_socket} end) |> expect(:send, fn _, _ -> :ok end) - |> expect(:close, fn _ -> :ok end) - |> expect(:connect, fn _, _, _, _ -> {:ok, dummy_port} end) - |> expect(:send, fn _, _ -> - send(parent, {ref, :send}) + |> expect(:close, fn _ -> + send(me, :closed) :ok end) - pid = - start_link_supervised!( - {Modbuzz.TCP.Client, transport: Modbuzz.TCP.TransportMock, active: true}, - restart: :temporary - ) + :ok = Modbuzz.TCP.Client.cast(req) - request = %Modbuzz.PDU.ReadCoils.Req{starting_address: 0, quantity_of_coils: 16} - assert Modbuzz.TCP.Client.cast(request) == :ok + send(pid, {:tcp, dummy_socket, to_binary(res)}) + assert_receive({:modbuzz, Modbuzz.TCP.Client, _unit_id = 0, ^req, {:ok, ^res}}) - send(pid, {:tcp_closed, dummy_port}) - - assert_receive({^ref, :send}) + send(pid, {:tcp_closed, dummy_socket}) + assert_receive(:closed) end - test "message {:tcp_error, socket, reason}", %{parent: parent, ref: ref} do - dummy_port = make_ref() + test "message {:tcp_closed, socket}", %{ + pid: pid, + req: req + } do + dummy_socket = make_ref() + me = self() Modbuzz.TCP.TransportMock - |> expect(:connect, fn _, _, _, _ -> {:ok, dummy_port} end) + |> expect(:connect, fn _, _, _, _ -> {:ok, dummy_socket} end) |> expect(:send, fn _, _ -> :ok end) - |> expect(:close, fn _ -> :ok end) - # confirm {:continue, :connect} - |> expect(:connect, fn _, _, _, _ -> - send(parent, {ref, :connect}) - {:ok, dummy_port} + |> expect(:close, fn _ -> + send(me, :closed) + :ok end) - pid = - start_link_supervised!( - {Modbuzz.TCP.Client, transport: Modbuzz.TCP.TransportMock, active: true}, - restart: :temporary - ) + :ok = Modbuzz.TCP.Client.cast(req) - request = %Modbuzz.PDU.ReadCoils.Req{starting_address: 0, quantity_of_coils: 16} - assert Modbuzz.TCP.Client.cast(request) == :ok + send(pid, {:tcp_closed, dummy_socket}) + assert_receive(:closed) + end - send(pid, {:tcp_error, dummy_port, :reason}) + test "message {:tcp_error, socket, reason}", %{ + pid: pid, + req: req + } do + dummy_socket = make_ref() + me = self() - assert_receive({^ref, :connect}) - end - end + Modbuzz.TCP.TransportMock + |> expect(:connect, fn _, _, _, _ -> {:ok, dummy_socket} end) + |> expect(:send, fn _, _ -> :ok end) + |> expect(:close, fn _ -> + send(me, :closed) + :ok + end) + + :ok = Modbuzz.TCP.Client.cast(req) - defp read_coils_recv_adu(transaction_id, error \\ false) do - if error do - %Modbuzz.PDU.ReadCoils.Err{exception_code: 0x01} - else - %Modbuzz.PDU.ReadCoils.Res{byte_count: 0x02, coil_status: List.duplicate(false, 16)} + send(pid, {:tcp_error, dummy_socket, :reason}) + assert_receive(:closed) end - |> Modbuzz.TCP.ADU.new(transaction_id, _unit_id = 0) - |> Modbuzz.TCP.ADU.encode() end - defp write_single_coil_recv_adu(transaction_id, error \\ false) do - if error do - %Modbuzz.PDU.WriteSingleCoil.Err{exception_code: 0x01} - else - %Modbuzz.PDU.WriteSingleCoil.Res{output_address: 0x0016, output_value: true} - end + defp to_binary(pdu, transaction_id \\ 0x0001) do + pdu |> Modbuzz.TCP.ADU.new(transaction_id, _unit_id = 0) |> Modbuzz.TCP.ADU.encode() end From 39504362b28d0df0794ebe091f862824cd41102f Mon Sep 17 00:00:00 2001 From: Ryota Kinukawa Date: Tue, 19 May 2026 16:55:00 +0900 Subject: [PATCH 04/19] Chore --- lib/modbuzz/tcp/adu.ex | 8 ++++---- lib/modbuzz/tcp/client.ex | 4 ++-- lib/modbuzz/tcp/server.ex | 10 ++++------ 3 files changed, 10 insertions(+), 12 deletions(-) diff --git a/lib/modbuzz/tcp/adu.ex b/lib/modbuzz/tcp/adu.ex index d4f9959..e2d36ed 100644 --- a/lib/modbuzz/tcp/adu.ex +++ b/lib/modbuzz/tcp/adu.ex @@ -13,6 +13,10 @@ defmodule Modbuzz.TCP.ADU do } defstruct transaction_id: 0x0000, protocol_id: 0x0000, length: 0x0000, unit_id: 0x00, pdu: nil + def increment_transaction_id(transaction_id) do + if transaction_id >= 0xFFFF, do: 0, else: transaction_id + 1 + end + def new(pdu, transaction_id, unit_id) when is_struct(pdu) do %__MODULE__{ transaction_id: transaction_id, @@ -74,8 +78,4 @@ defmodule Modbuzz.TCP.ADU do if rest == <<>>, do: Enum.reverse(acc), else: decode_response(rest, acc) end - - def increment_transaction_id(transaction_id) do - if transaction_id == 0xFFFF, do: 0, else: transaction_id + 1 - end end diff --git a/lib/modbuzz/tcp/client.ex b/lib/modbuzz/tcp/client.ex index c3a52eb..3619e0b 100644 --- a/lib/modbuzz/tcp/client.ex +++ b/lib/modbuzz/tcp/client.ex @@ -81,7 +81,7 @@ defmodule Modbuzz.TCP.Client do Its response is sent as a meessage, looks like ``` - {:modbuzz, unit_id, request, response} + {:modbuzz, client_name, unit_id, request, response_tuple} ``` The response type is `{:ok, %Res{}}` or `{:error, %Err{} | reason :: term()}`. @@ -213,7 +213,7 @@ defmodule Modbuzz.TCP.Client do maybe_report_response(transaction, client_name, res_tuple) - {:noreply, %{state | transactions: Map.put(transactions, transaction_id, nil)}} + {:noreply, %{state | transactions: Map.delete(transactions, transaction_id)}} # the current request is different, do not report timeout error # this means the request that triggered this timeout message has already been responded to diff --git a/lib/modbuzz/tcp/server.ex b/lib/modbuzz/tcp/server.ex index aa55513..8f07fce 100644 --- a/lib/modbuzz/tcp/server.ex +++ b/lib/modbuzz/tcp/server.ex @@ -9,7 +9,6 @@ defmodule Modbuzz.TCP.Server do @spec start_link(keyword()) :: GenServer.on_start() def start_link(args) when is_list(args) do name = Keyword.fetch!(args, :name) - GenServer.start_link(__MODULE__, args, name: name) end @@ -27,7 +26,6 @@ defmodule Modbuzz.TCP.Server do name: name, address: address, port: port, - active: false, listen_socket: nil, data_source: data_source }, {:continue, :listen}} @@ -35,7 +33,7 @@ defmodule Modbuzz.TCP.Server do @doc false def handle_continue(:listen, state) do - case gen_tcp_listen(state) do + case listen(state) do {:ok, socket} -> {:noreply, %{state | listen_socket: socket}, {:continue, :accept}} @@ -71,14 +69,14 @@ defmodule Modbuzz.TCP.Server do {:noreply, state, {:continue, :accept}} end - defp gen_tcp_listen(state) do - %{transport: transport, address: address, port: port, active: active} = state + defp listen(state) do + %{transport: transport, address: address, port: port} = state transport.listen(port, ip: address, mode: :binary, packet: :raw, - active: active, + active: false, backlog: 1024, keepalive: true, nodelay: true, From d8ca92d0e1250f0583a4a7dd281be9ad1f8f5845 Mon Sep 17 00:00:00 2001 From: Ryota Kinukawa Date: Wed, 20 May 2026 09:55:48 +0900 Subject: [PATCH 05/19] Modify Modbuzz.TCP.Client to handle binary fragment --- lib/modbuzz/tcp/adu.ex | 37 ++++++++++++++++++++++++++----------- lib/modbuzz/tcp/client.ex | 21 +++++++++++++-------- 2 files changed, 39 insertions(+), 19 deletions(-) diff --git a/lib/modbuzz/tcp/adu.ex b/lib/modbuzz/tcp/adu.ex index e2d36ed..d4670ee 100644 --- a/lib/modbuzz/tcp/adu.ex +++ b/lib/modbuzz/tcp/adu.ex @@ -57,10 +57,30 @@ defmodule Modbuzz.TCP.ADU do end def decode_response( - <>, + <<_transaction_id::16, _protocol_id::16, length::16, _rest::binary>> = binary, acc ) do + adu_frame_size = 2 + 2 + 2 + length + + if byte_size(binary) >= adu_frame_size do + <> = binary + + adu = decode_response(adu_binary) + + decode_response(rest, [adu | acc]) + else + {Enum.reverse(acc), binary} + end + end + + def decode_response(binary, acc) do + {Enum.reverse(acc), binary} + end + + defp decode_response( + <> + ) do adu = %__MODULE__{ transaction_id: transaction_id, protocol_id: protocol_id, @@ -68,14 +88,9 @@ defmodule Modbuzz.TCP.ADU do unit_id: unit_id } - adu_tuple = - case PDU.decode_response(pdu_binary) do - {:ok, pdu} -> {:ok, %{adu | pdu: pdu}} - {:error, pdu} -> {:error, %{adu | pdu: pdu}} - end - - acc = [adu_tuple | acc] - - if rest == <<>>, do: Enum.reverse(acc), else: decode_response(rest, acc) + case PDU.decode_response(pdu_binary) do + {:ok, pdu} -> {:ok, %{adu | pdu: pdu}} + {:error, pdu} -> {:error, %{adu | pdu: pdu}} + end end end diff --git a/lib/modbuzz/tcp/client.ex b/lib/modbuzz/tcp/client.ex index 3619e0b..11059f3 100644 --- a/lib/modbuzz/tcp/client.ex +++ b/lib/modbuzz/tcp/client.ex @@ -122,7 +122,8 @@ defmodule Modbuzz.TCP.Client do port: port, socket: nil, transaction_id: 0, - transactions: %{} + transactions: %{}, + binary: <<>> } {:ok, state} @@ -182,7 +183,7 @@ defmodule Modbuzz.TCP.Client do :ok = transport.close(socket) res_tuple = {:error, :tcp_send_error} maybe_report_response(transaction, client_name, res_tuple) - {:noreply, %{state | socket: nil}} + {:noreply, %{state | socket: nil, binary: <<>>}} end {:error, reason} -> @@ -190,7 +191,7 @@ defmodule Modbuzz.TCP.Client do Process.cancel_timer(timer) res_tuple = {:error, :tcp_connect_error} maybe_report_response(transaction, client_name, res_tuple) - {:noreply, %{state | socket: nil}} + {:noreply, %{state | socket: nil, binary: <<>>}} end end @@ -229,9 +230,13 @@ defmodule Modbuzz.TCP.Client do transactions: transactions } = state + new_binary = state.binary <> binary + + {adu_tuples, binary} = ADU.decode_response(new_binary, []) + transactions = - ADU.decode_response(binary, []) - |> Enum.reduce( + Enum.reduce( + adu_tuples, transactions, fn {ok_or_error, %ADU{transaction_id: transaction_id, pdu: pdu}}, acc -> {transaction, acc} = Map.pop(acc, transaction_id) @@ -241,21 +246,21 @@ defmodule Modbuzz.TCP.Client do end ) - {:noreply, %{state | transactions: transactions}} + {:noreply, %{state | transactions: transactions, binary: binary}} end def handle_info({:tcp_closed, socket}, %{socket: socket} = state) do %{transport: transport} = state Log.warning("#{inspect(transport)} closed", nil, state) :ok = transport.close(socket) - {:noreply, %{state | socket: nil}} + {:noreply, %{state | socket: nil, binary: <<>>}} end def handle_info({:tcp_error, socket, reason}, %{socket: socket} = state) do %{transport: transport} = state Log.error("transport error", reason, state) :ok = transport.close(socket) - {:noreply, %{state | socket: nil}} + {:noreply, %{state | socket: nil, binary: <<>>}} end defp connect(state, timeout) do From f59807e1ff6e910e49ba8b435cf7cb9e258c66d7 Mon Sep 17 00:00:00 2001 From: Ryota Kinukawa Date: Wed, 20 May 2026 10:11:05 +0900 Subject: [PATCH 06/19] Modify Modbuzz.TCP.Server to handle binary fragment --- lib/modbuzz/tcp/adu.ex | 43 ++++++++++++++++-------- lib/modbuzz/tcp/server/socket_handler.ex | 15 ++++++--- 2 files changed, 39 insertions(+), 19 deletions(-) diff --git a/lib/modbuzz/tcp/adu.ex b/lib/modbuzz/tcp/adu.ex index d4670ee..5cfd164 100644 --- a/lib/modbuzz/tcp/adu.ex +++ b/lib/modbuzz/tcp/adu.ex @@ -35,25 +35,24 @@ defmodule Modbuzz.TCP.ADU do end def decode_request( - <>, + <<_transaction_id::16, _protocol_id::16, length::16, _rest::binary>> = binary, acc ) do - adu = %__MODULE__{ - transaction_id: transaction_id, - protocol_id: protocol_id, - length: length, - unit_id: unit_id - } + adu_frame_size = 2 + 2 + 2 + length - adu_tuple = - case PDU.decode_request(pdu_binary) do - {:ok, pdu} -> {:ok, %{adu | pdu: pdu}} - end + if byte_size(binary) >= adu_frame_size do + <> = binary - acc = [adu_tuple | acc] + adu = decode_request(adu_binary) + + decode_request(rest, [adu | acc]) + else + {Enum.reverse(acc), binary} + end + end - if rest == <<>>, do: Enum.reverse(acc), else: decode_request(rest, acc) + def decode_request(binary, acc) do + {Enum.reverse(acc), binary} end def decode_response( @@ -77,6 +76,22 @@ defmodule Modbuzz.TCP.ADU do {Enum.reverse(acc), binary} end + defp decode_request( + <> + ) do + adu = %__MODULE__{ + transaction_id: transaction_id, + protocol_id: protocol_id, + length: length, + unit_id: unit_id + } + + case PDU.decode_request(pdu_binary) do + {:ok, pdu} -> {:ok, %{adu | pdu: pdu}} + end + end + defp decode_response( <> diff --git a/lib/modbuzz/tcp/server/socket_handler.ex b/lib/modbuzz/tcp/server/socket_handler.ex index e773e6f..3bdea86 100644 --- a/lib/modbuzz/tcp/server/socket_handler.ex +++ b/lib/modbuzz/tcp/server/socket_handler.ex @@ -3,6 +3,7 @@ defmodule Modbuzz.TCP.Server.SocketHandler do use GenServer, restart: :temporary + alias Modbuzz.TCP.ADU alias Modbuzz.TCP.Log def start_link(args) do @@ -24,7 +25,8 @@ defmodule Modbuzz.TCP.Server.SocketHandler do port: port, socket: socket, data_source: data_source, - timeout: timeout + timeout: timeout, + binary: <<>> }, {:continue, :recv}} end @@ -38,15 +40,18 @@ defmodule Modbuzz.TCP.Server.SocketHandler do case transport.recv(socket, _length = 0, timeout) do {:ok, binary} -> - for {:ok, adu} <- Modbuzz.TCP.ADU.decode_request(binary, []) do + binary = state.binary <> binary + {adu_tuples, binary} = ADU.decode_request(binary, []) + + for {:ok, adu} <- adu_tuples do request(data_source, adu.unit_id, adu.pdu, timeout) - |> Modbuzz.TCP.ADU.new(adu.transaction_id, adu.unit_id) - |> Modbuzz.TCP.ADU.encode() + |> ADU.new(adu.transaction_id, adu.unit_id) + |> ADU.encode() end |> Enum.reduce(<<>>, &(&2 <> &1)) |> then(&transport.send(socket, &1)) - {:noreply, state, {:continue, :recv}} + {:noreply, %{state | binary: binary}, {:continue, :recv}} {:error, :closed = reason} -> :ok = transport.close(socket) From 3c6929db9b9614636aa848fe41c2f341bcc1d70e Mon Sep 17 00:00:00 2001 From: Ryota Kinukawa Date: Wed, 20 May 2026 10:35:06 +0900 Subject: [PATCH 07/19] Add legnth guard to Mobbuzz.TCP.ADU --- lib/modbuzz/tcp/adu.ex | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/lib/modbuzz/tcp/adu.ex b/lib/modbuzz/tcp/adu.ex index 5cfd164..bf69c19 100644 --- a/lib/modbuzz/tcp/adu.ex +++ b/lib/modbuzz/tcp/adu.ex @@ -13,6 +13,8 @@ defmodule Modbuzz.TCP.ADU do } defstruct transaction_id: 0x0000, protocol_id: 0x0000, length: 0x0000, unit_id: 0x00, pdu: nil + defguardp is_valid_length(length) when 1 <= length and length <= 0x00FE + def increment_transaction_id(transaction_id) do if transaction_id >= 0xFFFF, do: 0, else: transaction_id + 1 end @@ -37,7 +39,8 @@ defmodule Modbuzz.TCP.ADU do def decode_request( <<_transaction_id::16, _protocol_id::16, length::16, _rest::binary>> = binary, acc - ) do + ) + when is_valid_length(length) do adu_frame_size = 2 + 2 + 2 + length if byte_size(binary) >= adu_frame_size do @@ -58,7 +61,8 @@ defmodule Modbuzz.TCP.ADU do def decode_response( <<_transaction_id::16, _protocol_id::16, length::16, _rest::binary>> = binary, acc - ) do + ) + when is_valid_length(length) do adu_frame_size = 2 + 2 + 2 + length if byte_size(binary) >= adu_frame_size do @@ -79,7 +83,8 @@ defmodule Modbuzz.TCP.ADU do defp decode_request( <> - ) do + ) + when is_valid_length(length) do adu = %__MODULE__{ transaction_id: transaction_id, protocol_id: protocol_id, @@ -95,7 +100,8 @@ defmodule Modbuzz.TCP.ADU do defp decode_response( <> - ) do + ) + when is_valid_length(length) do adu = %__MODULE__{ transaction_id: transaction_id, protocol_id: protocol_id, From fa4341dabae9a74d3a061f703b9de9de95071b33 Mon Sep 17 00:00:00 2001 From: Ryota Kinukawa Date: Wed, 20 May 2026 11:23:48 +0900 Subject: [PATCH 08/19] Chore --- lib/modbuzz/pdu.ex | 4 ++++ lib/modbuzz/tcp/adu.ex | 4 ++++ 2 files changed, 8 insertions(+) diff --git a/lib/modbuzz/pdu.ex b/lib/modbuzz/pdu.ex index 8e02b21..98a4bb2 100644 --- a/lib/modbuzz/pdu.ex +++ b/lib/modbuzz/pdu.ex @@ -1,6 +1,10 @@ defmodule Modbuzz.PDU do @moduledoc false + @max_frame_length 253 + + def max_frame_length, do: @max_frame_length + defdelegate encode(struct), to: Modbuzz.PDU.Protocol, as: :encode for {modbus_function_code, modbus_function} <- Modbuzz.MixProject.pdu_seed() do diff --git a/lib/modbuzz/tcp/adu.ex b/lib/modbuzz/tcp/adu.ex index bf69c19..ed32f41 100644 --- a/lib/modbuzz/tcp/adu.ex +++ b/lib/modbuzz/tcp/adu.ex @@ -15,6 +15,10 @@ defmodule Modbuzz.TCP.ADU do defguardp is_valid_length(length) when 1 <= length and length <= 0x00FE + @mbap_length 7 + + def max_frame_length, do: @mbap_length + PDU.max_frame_length() + def increment_transaction_id(transaction_id) do if transaction_id >= 0xFFFF, do: 0, else: transaction_id + 1 end From c50b3787696ad104200b7da85cb2aa5a7ebd6a9a Mon Sep 17 00:00:00 2001 From: Ryota Kinukawa Date: Wed, 20 May 2026 11:36:00 +0900 Subject: [PATCH 09/19] Add timeout test to Modbuzz.TCP.ClientTest --- test/modbuzz/tcp/client_test.exs | 33 ++++++++++++++++++++++++++++++++ 1 file changed, 33 insertions(+) diff --git a/test/modbuzz/tcp/client_test.exs b/test/modbuzz/tcp/client_test.exs index 4b7361f..64cb8bc 100644 --- a/test/modbuzz/tcp/client_test.exs +++ b/test/modbuzz/tcp/client_test.exs @@ -67,6 +67,21 @@ defmodule Modbuzz.TCP.ClientTest do assert Modbuzz.TCP.Client.call(req) == {:error, :tcp_send_error} end + test "return :error tuple by timeout", %{req: req} do + dummy_socket = make_ref() + + Modbuzz.TCP.TransportMock + |> expect(:connect, fn _, _, _, _ -> {:ok, dummy_socket} end) + |> expect(:send, fn _, _ -> :ok end) + + start_link_supervised!( + {Modbuzz.TCP.Client, transport: Modbuzz.TCP.TransportMock}, + restart: :temporary + ) + + assert Modbuzz.TCP.Client.call(Modbuzz.TCP.Client, 0, req, 10) == {:error, :timeout} + end + test "return :error tuple, modbus error", %{req: req, res_err: res_err} do dummy_socket = make_ref() @@ -146,6 +161,24 @@ defmodule Modbuzz.TCP.ClientTest do {:modbuzz, Modbuzz.TCP.Client, _unit_id = 0, ^req, {:error, :tcp_send_error}} ) end + + test "return :ok, receive {:error, :timeout}", %{ + req: req + } do + dummy_socket = make_ref() + + Modbuzz.TCP.TransportMock + |> expect(:connect, fn _, _, _, _ -> {:ok, dummy_socket} end) + |> expect(:send, fn _, _ -> :ok end) + + start_link_supervised!( + {Modbuzz.TCP.Client, transport: Modbuzz.TCP.TransportMock}, + restart: :temporary + ) + + assert Modbuzz.TCP.Client.cast(Modbuzz.TCP.Client, 0, req, self(), 10) == :ok + assert_receive({:modbuzz, Modbuzz.TCP.Client, _unit_id = 0, ^req, {:error, :timeout}}) + end end describe "tcp messages" do From 336333aff1f0032d0da753a79ef6830c9c175206 Mon Sep 17 00:00:00 2001 From: Ryota Kinukawa Date: Wed, 20 May 2026 11:36:05 +0900 Subject: [PATCH 10/19] Chore --- test/modbuzz/tcp/client_test.exs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/test/modbuzz/tcp/client_test.exs b/test/modbuzz/tcp/client_test.exs index 64cb8bc..30081be 100644 --- a/test/modbuzz/tcp/client_test.exs +++ b/test/modbuzz/tcp/client_test.exs @@ -82,7 +82,7 @@ defmodule Modbuzz.TCP.ClientTest do assert Modbuzz.TCP.Client.call(Modbuzz.TCP.Client, 0, req, 10) == {:error, :timeout} end - test "return :error tuple, modbus error", %{req: req, res_err: res_err} do + test "return :error tuple by modbus error", %{req: req, res_err: res_err} do dummy_socket = make_ref() Modbuzz.TCP.TransportMock @@ -128,7 +128,7 @@ defmodule Modbuzz.TCP.ClientTest do assert_receive({:modbuzz, Modbuzz.TCP.Client, _unit_id = 0, ^req, {:ok, ^res}}) end - test "return :ok, connect error", %{req: req} do + test "return :ok, receive {:error, :tcp_connect_error}", %{req: req} do Modbuzz.TCP.TransportMock |> expect(:connect, fn _, _, _, _ -> {:error, :timeout} end) @@ -144,7 +144,7 @@ defmodule Modbuzz.TCP.ClientTest do ) end - test "return :ok, send error", %{req: req} do + test "return :ok, receive {:error, :tcp_send_error}", %{req: req} do Modbuzz.TCP.TransportMock |> expect(:connect, fn _, _, _, _ -> {:ok, _dummy_port = make_ref()} end) |> expect(:send, fn _, _ -> {:error, :closed} end) From a59bdc8b39974ce3c72f17e526c0b34d78c1d349 Mon Sep 17 00:00:00 2001 From: Ryota Kinukawa Date: Thu, 21 May 2026 15:02:48 +0900 Subject: [PATCH 11/19] Chore --- lib/modbuzz/tcp/client.ex | 12 +++++------- 1 file changed, 5 insertions(+), 7 deletions(-) diff --git a/lib/modbuzz/tcp/client.ex b/lib/modbuzz/tcp/client.ex index 11059f3..9670c33 100644 --- a/lib/modbuzz/tcp/client.ex +++ b/lib/modbuzz/tcp/client.ex @@ -211,9 +211,7 @@ defmodule Modbuzz.TCP.Client do %Transaction{request: request, ref: ref_} = transaction when ref_ == ref -> Log.error("TCP server didn't respond for #{inspect(request)}", nil, state) res_tuple = {:error, :timeout} - maybe_report_response(transaction, client_name, res_tuple) - {:noreply, %{state | transactions: Map.delete(transactions, transaction_id)}} # the current request is different, do not report timeout error @@ -224,7 +222,7 @@ defmodule Modbuzz.TCP.Client do end end - def handle_info({:tcp, socket, binary}, %{socket: socket} = state) do + def handle_info({:tcp, _socket, binary}, state) do %{ client_name: client_name, transactions: transactions @@ -249,17 +247,17 @@ defmodule Modbuzz.TCP.Client do {:noreply, %{state | transactions: transactions, binary: binary}} end - def handle_info({:tcp_closed, socket}, %{socket: socket} = state) do + def handle_info({:tcp_closed, socket}, state) do %{transport: transport} = state Log.warning("#{inspect(transport)} closed", nil, state) - :ok = transport.close(socket) + if not is_nil(socket), do: :ok = transport.close(socket) {:noreply, %{state | socket: nil, binary: <<>>}} end - def handle_info({:tcp_error, socket, reason}, %{socket: socket} = state) do + def handle_info({:tcp_error, socket, reason}, state) do %{transport: transport} = state Log.error("transport error", reason, state) - :ok = transport.close(socket) + if not is_nil(socket), do: :ok = transport.close(socket) {:noreply, %{state | socket: nil, binary: <<>>}} end From 5fdbf89e138fae6b7a7aec9e56f136643839f46d Mon Sep 17 00:00:00 2001 From: Ryota Kinukawa Date: Thu, 21 May 2026 15:03:43 +0900 Subject: [PATCH 12/19] Report {:error, :tcp_closed} when got {:tcp_closed, socket} --- lib/modbuzz/tcp/client.ex | 16 +++++++++++++--- 1 file changed, 13 insertions(+), 3 deletions(-) diff --git a/lib/modbuzz/tcp/client.ex b/lib/modbuzz/tcp/client.ex index 9670c33..c3f5af3 100644 --- a/lib/modbuzz/tcp/client.ex +++ b/lib/modbuzz/tcp/client.ex @@ -248,10 +248,20 @@ defmodule Modbuzz.TCP.Client do end def handle_info({:tcp_closed, socket}, state) do - %{transport: transport} = state - Log.warning("#{inspect(transport)} closed", nil, state) + %{ + client_name: client_name, + transport: transport, + transaction_id: transaction_id, + transactions: transactions + } = state + + Log.error("#{inspect(transport)} closed", nil, state) if not is_nil(socket), do: :ok = transport.close(socket) - {:noreply, %{state | socket: nil, binary: <<>>}} + + {transaction, transactions} = Map.pop(transactions, transaction_id) + res_tuple = {:error, :tcp_closed} + maybe_report_response(transaction, client_name, res_tuple) + {:noreply, %{state | socket: nil, binary: <<>>, transactions: transactions}} end def handle_info({:tcp_error, socket, reason}, state) do From 75c31310941f52aff84a8b0d8eb8b5cf7d63d370 Mon Sep 17 00:00:00 2001 From: Ryota Kinukawa Date: Thu, 21 May 2026 16:14:02 +0900 Subject: [PATCH 13/19] Add error handling to Modbuzz.TCP.ADU --- .credo.exs | 2 +- lib/modbuzz/pdu.ex | 8 ++++ lib/modbuzz/tcp/adu.ex | 52 ++++++++++++++++++++---- lib/modbuzz/tcp/client.ex | 37 ++++++++++------- lib/modbuzz/tcp/server/socket_handler.ex | 25 ++++++++---- test/modbuzz/tcp/client_test.exs | 21 ++++++++++ 6 files changed, 111 insertions(+), 34 deletions(-) diff --git a/.credo.exs b/.credo.exs index 784d43d..0655dad 100644 --- a/.credo.exs +++ b/.credo.exs @@ -129,7 +129,7 @@ {Credo.Check.Refactor.MatchInCondition, []}, {Credo.Check.Refactor.NegatedConditionsInUnless, []}, {Credo.Check.Refactor.NegatedConditionsWithElse, []}, - {Credo.Check.Refactor.Nesting, []}, + {Credo.Check.Refactor.Nesting, [max_nesting: 3]}, {Credo.Check.Refactor.RedundantWithClauseResult, []}, {Credo.Check.Refactor.RejectReject, []}, {Credo.Check.Refactor.UnlessWithElse, []}, diff --git a/lib/modbuzz/pdu.ex b/lib/modbuzz/pdu.ex index 98a4bb2..9b138e5 100644 --- a/lib/modbuzz/pdu.ex +++ b/lib/modbuzz/pdu.ex @@ -38,6 +38,14 @@ defmodule Modbuzz.PDU do end end + def decode_request(<>) do + {:error, {:unknown_function_code, modbus_function_code}} + end + + def decode_response(<>) do + {:error, {:unknown_function_code, modbus_function_code}} + end + # NOTE: We need this fallback, because the binary is not always guaranteed to be correct. def request_length(<<_, _rest::binary>>), do: {:error, :unknown} def response_length(<<_, _rest::binary>>), do: {:error, :unknown} diff --git a/lib/modbuzz/tcp/adu.ex b/lib/modbuzz/tcp/adu.ex index ed32f41..ab325a0 100644 --- a/lib/modbuzz/tcp/adu.ex +++ b/lib/modbuzz/tcp/adu.ex @@ -9,10 +9,11 @@ defmodule Modbuzz.TCP.ADU do # pdu max is 253 bytes, so length max is 254 (1 byte for unit_id + pdu) length: 0x0000..0x00FE, unit_id: 0x00..0xFF, - pdu: Modbuzz.PDU.Protocol.t() | nil + pdu: Modbuzz.PDU.Protocol.t() | {:unknown_function_code, non_neg_integer()} | nil } defstruct transaction_id: 0x0000, protocol_id: 0x0000, length: 0x0000, unit_id: 0x00, pdu: nil + defguardp is_modbus_protocol(protocol_id) when protocol_id == 0x0000 defguardp is_valid_length(length) when 1 <= length and length <= 0x00FE @mbap_length 7 @@ -41,10 +42,10 @@ defmodule Modbuzz.TCP.ADU do end def decode_request( - <<_transaction_id::16, _protocol_id::16, length::16, _rest::binary>> = binary, + <<_transaction_id::16, protocol_id::16, length::16, _rest::binary>> = binary, acc ) - when is_valid_length(length) do + when is_modbus_protocol(protocol_id) and is_valid_length(length) do adu_frame_size = 2 + 2 + 2 + length if byte_size(binary) >= adu_frame_size do @@ -54,19 +55,35 @@ defmodule Modbuzz.TCP.ADU do decode_request(rest, [adu | acc]) else - {Enum.reverse(acc), binary} + {:ok, {Enum.reverse(acc), binary}} end end + def decode_request( + <<_transaction_id::16, protocol_id::16, _length::16, _rest::binary>>, + _acc + ) + when not is_modbus_protocol(protocol_id) do + {:error, {:invalid_protocol, protocol_id}} + end + + def decode_request( + <<_transaction_id::16, _protocol_id::16, length::16, _rest::binary>>, + _acc + ) + when not is_valid_length(length) do + {:error, {:invalid_length, length}} + end + def decode_request(binary, acc) do - {Enum.reverse(acc), binary} + {:ok, {Enum.reverse(acc), binary}} end def decode_response( - <<_transaction_id::16, _protocol_id::16, length::16, _rest::binary>> = binary, + <<_transaction_id::16, protocol_id::16, length::16, _rest::binary>> = binary, acc ) - when is_valid_length(length) do + when is_modbus_protocol(protocol_id) and is_valid_length(length) do adu_frame_size = 2 + 2 + 2 + length if byte_size(binary) >= adu_frame_size do @@ -76,12 +93,28 @@ defmodule Modbuzz.TCP.ADU do decode_response(rest, [adu | acc]) else - {Enum.reverse(acc), binary} + {:ok, {Enum.reverse(acc), binary}} end end + def decode_response( + <<_transaction_id::16, protocol_id::16, _length::16, _rest::binary>>, + _acc + ) + when not is_modbus_protocol(protocol_id) do + {:error, {:invalid_protocol, protocol_id}} + end + + def decode_response( + <<_transaction_id::16, _protocol_id::16, length::16, _rest::binary>>, + _acc + ) + when not is_valid_length(length) do + {:error, {:invalid_length, length}} + end + def decode_response(binary, acc) do - {Enum.reverse(acc), binary} + {:ok, {Enum.reverse(acc), binary}} end defp decode_request( @@ -98,6 +131,7 @@ defmodule Modbuzz.TCP.ADU do case PDU.decode_request(pdu_binary) do {:ok, pdu} -> {:ok, %{adu | pdu: pdu}} + {:error, pdu} -> {:error, %{adu | pdu: pdu}} end end diff --git a/lib/modbuzz/tcp/client.ex b/lib/modbuzz/tcp/client.ex index c3f5af3..c1d3806 100644 --- a/lib/modbuzz/tcp/client.ex +++ b/lib/modbuzz/tcp/client.ex @@ -222,29 +222,36 @@ defmodule Modbuzz.TCP.Client do end end - def handle_info({:tcp, _socket, binary}, state) do + def handle_info({:tcp, socket, binary}, state) do %{ client_name: client_name, + transport: transport, transactions: transactions } = state new_binary = state.binary <> binary - {adu_tuples, binary} = ADU.decode_response(new_binary, []) - - transactions = - Enum.reduce( - adu_tuples, - transactions, - fn {ok_or_error, %ADU{transaction_id: transaction_id, pdu: pdu}}, acc -> - {transaction, acc} = Map.pop(acc, transaction_id) - res_tuple = {ok_or_error, pdu} - maybe_report_response(transaction, client_name, res_tuple) - acc - end - ) + case ADU.decode_response(new_binary, []) do + {:ok, {adu_tuples, binary}} -> + transactions = + Enum.reduce( + adu_tuples, + transactions, + fn {ok_or_error, %ADU{transaction_id: transaction_id, pdu: pdu}}, acc -> + {transaction, acc} = Map.pop(acc, transaction_id) + res_tuple = {ok_or_error, pdu} + maybe_report_response(transaction, client_name, res_tuple) + acc + end + ) + + {:noreply, %{state | transactions: transactions, binary: binary}} - {:noreply, %{state | transactions: transactions, binary: binary}} + {:error, reason} -> + Log.error("invalid response", reason, state) + transport.close(socket) + {:noreply, %{state | socket: nil, binary: <<>>}} + end end def handle_info({:tcp_closed, socket}, state) do diff --git a/lib/modbuzz/tcp/server/socket_handler.ex b/lib/modbuzz/tcp/server/socket_handler.ex index 3bdea86..92eb34b 100644 --- a/lib/modbuzz/tcp/server/socket_handler.ex +++ b/lib/modbuzz/tcp/server/socket_handler.ex @@ -41,17 +41,24 @@ defmodule Modbuzz.TCP.Server.SocketHandler do case transport.recv(socket, _length = 0, timeout) do {:ok, binary} -> binary = state.binary <> binary - {adu_tuples, binary} = ADU.decode_request(binary, []) - for {:ok, adu} <- adu_tuples do - request(data_source, adu.unit_id, adu.pdu, timeout) - |> ADU.new(adu.transaction_id, adu.unit_id) - |> ADU.encode() - end - |> Enum.reduce(<<>>, &(&2 <> &1)) - |> then(&transport.send(socket, &1)) + case ADU.decode_request(binary, []) do + {:ok, {adu_tuples, binary}} -> + for {:ok, adu} <- adu_tuples do + request(data_source, adu.unit_id, adu.pdu, timeout) + |> ADU.new(adu.transaction_id, adu.unit_id) + |> ADU.encode() + end + |> Enum.reduce(<<>>, &(&2 <> &1)) + |> then(&transport.send(socket, &1)) + + {:noreply, %{state | binary: binary}, {:continue, :recv}} - {:noreply, %{state | binary: binary}, {:continue, :recv}} + {:error, reason} -> + Log.error("invalid request", reason, state) + :ok = transport.close(socket) + {:stop, :invalid_length, %{state | binary: <<>>}} + end {:error, :closed = reason} -> :ok = transport.close(socket) diff --git a/test/modbuzz/tcp/client_test.exs b/test/modbuzz/tcp/client_test.exs index 30081be..2aea3b9 100644 --- a/test/modbuzz/tcp/client_test.exs +++ b/test/modbuzz/tcp/client_test.exs @@ -298,6 +298,27 @@ defmodule Modbuzz.TCP.ClientTest do send(pid, {:tcp_error, dummy_socket, :reason}) assert_receive(:closed) end + + test "message {:tcp, socket, binary}, invalid length closes socket", %{ + pid: pid, + req: req + } do + dummy_socket = make_ref() + me = self() + + Modbuzz.TCP.TransportMock + |> expect(:connect, fn _, _, _, _ -> {:ok, dummy_socket} end) + |> expect(:send, fn _, _ -> :ok end) + |> expect(:close, fn _ -> + send(me, :closed) + :ok + end) + + :ok = Modbuzz.TCP.Client.cast(req) + + send(pid, {:tcp, dummy_socket, <<0::16, 0::16, 0::16>>}) + assert_receive(:closed) + end end defp to_binary(pdu, transaction_id \\ 0x0001) do From eda3cd78e63a2b5689b46c4f5e2f5b869f39dae8 Mon Sep 17 00:00:00 2001 From: Ryota Kinukawa Date: Sat, 30 May 2026 17:19:55 +0900 Subject: [PATCH 14/19] Refactor --- lib/modbuzz/pdu.ex | 26 ++++++-- lib/modbuzz/pdu/protocol.ex | 3 + lib/modbuzz/rtu/adu.ex | 72 +++++++++++----------- lib/modbuzz/rtu/client.ex | 45 ++++++-------- lib/modbuzz/rtu/server.ex | 16 ++--- lib/modbuzz/tcp/adu.ex | 76 +++++++++++++++++------- lib/modbuzz/tcp/client.ex | 28 ++++++--- lib/modbuzz/tcp/server/socket_handler.ex | 42 +++++++------ test/modbuzz/rtu/client_test.exs | 8 +-- test/modbuzz/tcp/client_test.exs | 4 +- 10 files changed, 195 insertions(+), 125 deletions(-) diff --git a/lib/modbuzz/pdu.ex b/lib/modbuzz/pdu.ex index 9b138e5..de60a63 100644 --- a/lib/modbuzz/pdu.ex +++ b/lib/modbuzz/pdu.ex @@ -7,6 +7,19 @@ defmodule Modbuzz.PDU do defdelegate encode(struct), to: Modbuzz.PDU.Protocol, as: :encode + @spec decode_request(binary()) :: + {:ok, Modbuzz.PDU.Protocol.t()} + | {:error, {:pdu_unknown_function_code, non_neg_integer()}} + @spec decode_response(binary()) :: + {:ok, Modbuzz.PDU.Protocol.t()} + | {:error, Modbuzz.PDU.Protocol.t()} + | {:error, {:pdu_unknown_function_code, non_neg_integer()}} + @spec request_length(binary()) :: + {:ok, non_neg_integer()} + | {:error, {:pdu_unknown_function_code, non_neg_integer()}} + @spec response_length(binary()) :: + {:ok, non_neg_integer()} + | {:error, {:pdu_unknown_function_code, non_neg_integer()}} for {modbus_function_code, modbus_function} <- Modbuzz.MixProject.pdu_seed() do req_module = Module.concat([Modbuzz.PDU, modbus_function, Req]) res_module = Module.concat([Modbuzz.PDU, modbus_function, Res]) @@ -39,16 +52,21 @@ defmodule Modbuzz.PDU do end def decode_request(<>) do - {:error, {:unknown_function_code, modbus_function_code}} + {:error, {:pdu_unknown_function_code, modbus_function_code}} end def decode_response(<>) do - {:error, {:unknown_function_code, modbus_function_code}} + {:error, {:pdu_unknown_function_code, modbus_function_code}} end # NOTE: We need this fallback, because the binary is not always guaranteed to be correct. - def request_length(<<_, _rest::binary>>), do: {:error, :unknown} - def response_length(<<_, _rest::binary>>), do: {:error, :unknown} + def request_length(<>) do + {:error, {:pdu_unknown_function_code, modbus_function_code}} + end + + def response_length(<>) do + {:error, {:pdu_unknown_function_code, modbus_function_code}} + end def to_error(%req{}, exception_code) do exception_code = diff --git a/lib/modbuzz/pdu/protocol.ex b/lib/modbuzz/pdu/protocol.ex index efc6ac7..cd3e521 100644 --- a/lib/modbuzz/pdu/protocol.ex +++ b/lib/modbuzz/pdu/protocol.ex @@ -7,9 +7,12 @@ defprotocol Modbuzz.PDU.Protocol do """ @doc false + @spec encode(t()) :: binary() def encode(struct) @doc false + @spec decode(t(), binary()) :: t() def decode(struct, binary) @doc false + @spec expected_binary_size(t(), binary()) :: non_neg_integer() def expected_binary_size(struct, binary) end diff --git a/lib/modbuzz/rtu/adu.ex b/lib/modbuzz/rtu/adu.ex index c5bd82a..c9e477a 100644 --- a/lib/modbuzz/rtu/adu.ex +++ b/lib/modbuzz/rtu/adu.ex @@ -3,15 +3,22 @@ defmodule Modbuzz.RTU.ADU do alias Modbuzz.PDU - # unit_id: 1, functions_code: 1, crc: 2, so minimum frame length is 4 - @min_frame_length 4 + @unit_id_length 1 + @function_code_length 1 @crc_defn :cerlc.init(:crc16_modbus) @crc_length 2 - @type t :: %__MODULE__{unit_id: 0x00..0xF7, pdu: struct() | nil, crc_valid?: boolean()} + @type t :: %__MODULE__{ + unit_id: 0x00..0xF7, + pdu: Modbuzz.PDU.Protocol.t() | nil, + crc_valid?: boolean() + } defstruct unit_id: 0x00, pdu: nil, crc_valid?: true + @min_frame_length @unit_id_length + @function_code_length + @crc_length + def max_frame_length, do: PDU.max_frame_length() + @crc_length + def new(pdu, unit_id) when is_struct(pdu) do %__MODULE__{ unit_id: unit_id, @@ -25,58 +32,55 @@ defmodule Modbuzz.RTU.ADU do binary <> crc(binary) end + @spec decode_request(binary()) :: + {:ok, t()} + | {:error, :adu_binary_is_short} + | {:error, {:pdu_unknown_function_code, non_neg_integer()}} + | {:error, :adu_binary_is_long} + | {:error, :adu_crc_error} def decode_request(binary) when is_binary(binary) and byte_size(binary) < @min_frame_length do - {:error, :binary_is_short} + {:error, :adu_binary_is_short} end def decode_request(<>) do with {:ok, pdu_length} <- Modbuzz.PDU.request_length(binary), - true <- byte_size(binary) >= pdu_length + @crc_length || {:error, :binary_is_short}, + true <- byte_size(binary) >= pdu_length + @crc_length || {:error, :adu_binary_is_short}, <> <- binary, - true <- crc(<>) == crc || {:error, :crc_error}, + true <- crc(<>) == crc || {:error, :adu_crc_error}, {:ok, pdu} <- PDU.decode_request(pdu_binary) do {:ok, %__MODULE__{unit_id: unit_id, pdu: pdu}} else - {:error, :binary_is_short} -> - {:error, :binary_is_short} - - {:error, :unknown} -> - {:error, :unknown} - - long_binary when is_binary(long_binary) -> - {:error, :binary_is_long} - - {:error, :crc_error} -> - {:error, %__MODULE__{unit_id: unit_id, crc_valid?: false}} + {:error, {:pdu_unknown_function_code, _}} = error -> error + {:error, :adu_binary_is_short} = error -> error + long_binary when is_binary(long_binary) -> {:error, :adu_binary_is_long} + {:error, :adu_crc_error} = error -> error end end + @spec decode_response(binary()) :: + {:ok, t()} + | {:error, :adu_binary_is_short} + | {:error, {:pdu_unknown_function_code, non_neg_integer()}} + | {:error, :adu_binary_is_long} + | {:error, :adu_crc_error} + | {:error, t()} def decode_response(binary) when is_binary(binary) and byte_size(binary) < @min_frame_length do - {:error, :binary_is_short} + {:error, :adu_binary_is_short} end def decode_response(<>) do with {:ok, pdu_length} <- Modbuzz.PDU.response_length(binary), - true <- byte_size(binary) >= pdu_length + @crc_length || {:error, :binary_is_short}, + true <- byte_size(binary) >= pdu_length + @crc_length || {:error, :adu_binary_is_short}, <> <- binary, - true <- crc(<>) == crc || {:error, :crc_error}, + true <- crc(<>) == crc || {:error, :adu_crc_error}, {:ok, pdu} <- PDU.decode_response(pdu_binary) do {:ok, %__MODULE__{unit_id: unit_id, pdu: pdu}} else - {:error, :unknown} -> - {:error, :unknown} - - {:error, :binary_is_short} -> - {:error, :binary_is_short} - - long_binary when is_binary(long_binary) -> - {:error, :binary_is_long} - - {:error, :crc_error} -> - {:error, %__MODULE__{unit_id: unit_id, crc_valid?: false}} - - {:error, pdu} when is_struct(pdu) -> - {:error, %__MODULE__{unit_id: unit_id, pdu: pdu}} + {:error, {:pdu_unknown_function_code, _}} = error -> error + {:error, :adu_binary_is_short} = error -> error + long_binary when is_binary(long_binary) -> {:error, :adu_binary_is_long} + {:error, :adu_crc_error} = error -> error + {:error, pdu} when is_struct(pdu) -> {:error, %__MODULE__{unit_id: unit_id, pdu: pdu}} end end diff --git a/lib/modbuzz/rtu/client.ex b/lib/modbuzz/rtu/client.ex index 2142c42..532c70a 100644 --- a/lib/modbuzz/rtu/client.ex +++ b/lib/modbuzz/rtu/client.ex @@ -18,7 +18,7 @@ defmodule Modbuzz.RTU.Client do def init(args) do client_name = Keyword.fetch!(args, :name) transport = Keyword.get(args, :transport, Circuits.UART) - transport_opts = Keyword.get(args, :transport_opts, []) ++ [active: true] + transport_opts = args |> Keyword.get(:transport_opts, []) |> Keyword.put(:active, true) device_name = Keyword.fetch!(args, :device_name) {:ok, transport_pid} = transport.start_link([]) @@ -41,7 +41,7 @@ defmodule Modbuzz.RTU.Client do reqs: reqs } = state - Log.error("RTU client terminated, #{inspect(reason)}.", nil, state) + Log.error("RTU client terminated", reason, state) # All pending requests should be notified of the error Enum.each(reqs, fn {_unit_id, req} -> @@ -67,16 +67,13 @@ defmodule Modbuzz.RTU.Client do reqs: reqs } = state - req = Map.get(reqs, unit_id) - adu = ADU.new(request, unit_id) new_req = {call_or_cast, unit_id, request, from_or_pid, make_ref()} - timer = Process.send_after(self(), {:timeout?, new_req}, timeout) - with true <- is_nil(req) || {:error, :another_request_in_progress}, - binary <- ADU.encode(adu), + with true <- is_nil(Map.get(reqs, unit_id)) || {:error, :another_request_in_progress}, + binary = request |> ADU.new(unit_id) |> ADU.encode(), :ok <- transport.write(transport_pid, binary, timeout) do - {:noreply, %{state | reqs: Map.put(reqs, adu.unit_id, new_req)}} + {:noreply, %{state | reqs: Map.put(reqs, unit_id, new_req)}} else {:error, :another_request_in_progress} = res_tuple -> Process.cancel_timer(timer) @@ -84,7 +81,7 @@ defmodule Modbuzz.RTU.Client do {:noreply, state} {:error, reason} = res_tuple -> - Log.error("#{inspect(transport)} write error, #{inspect(reason)}.", nil, state) + Log.error("#{inspect(transport)} write error", reason, state) Process.cancel_timer(timer) maybe_report_response(new_req, client_name, res_tuple) {:noreply, state} @@ -97,16 +94,14 @@ defmodule Modbuzz.RTU.Client do reqs: reqs } = state - req = Map.get(reqs, unit_id) - - case req do + case Map.get(reqs, unit_id) do # already responded nil -> {:noreply, state} # timeout for the current request, report timeout error {_, unit_id_, request_, _, ref_} = req_ when ref_ == ref -> - Log.error("RTU server didn't respond for #{inspect(request_)}.", nil, state) + Log.error("RTU server didn't respond for #{inspect(request_)}", nil, state) res_tuple = {:error, :timeout} maybe_report_response(req_, client_name, res_tuple) @@ -141,28 +136,24 @@ defmodule Modbuzz.RTU.Client do {:noreply, %{state | reqs: Map.put(reqs, unit_id, nil), binary: <<>>}} - {:error, :binary_is_short} -> + {:error, {:pdu_unknown_function_code, _} = reason} -> + # No response is sent to the requester; the pending request will eventually time out. + Log.error("Decode error", reason, state) + {:noreply, %{state | binary: <<>>}} + + {:error, :adu_binary_is_short} -> {:noreply, %{state | binary: new_binary}} - {:error, :unknown} -> + {:error, :adu_binary_is_long = reason} -> # No response is sent to the requester; the pending request will eventually time out. - Log.error("Failed to decode ADU, unknown binary format.", nil, state) + Log.error("Decode error", reason, state) {:noreply, %{state | binary: <<>>}} - {:error, :binary_is_long} -> + {:error, :adu_crc_error = reason} -> # No response is sent to the requester; the pending request will eventually time out. - Log.error("Failed to decode ADU, binary is too long.", nil, state) + Log.warning("Decode error", reason, state) {:noreply, %{state | binary: <<>>}} - {:error, %ADU{unit_id: unit_id, crc_valid?: false} = adu} -> - Log.warning("CRC error detected, #{inspect(adu)}.", nil, state) - res_tuple = {:error, :crc_error} - req = Map.get(reqs, unit_id) - - maybe_report_response(req, client_name, res_tuple) - - {:noreply, %{state | reqs: Map.put(reqs, unit_id, nil), binary: <<>>}} - {:error, %ADU{unit_id: unit_id, pdu: pdu}} -> res_tuple = {:error, pdu} req = Map.get(reqs, unit_id) diff --git a/lib/modbuzz/rtu/server.ex b/lib/modbuzz/rtu/server.ex index 3900dcf..269105d 100644 --- a/lib/modbuzz/rtu/server.ex +++ b/lib/modbuzz/rtu/server.ex @@ -15,7 +15,7 @@ defmodule Modbuzz.RTU.Server do @doc false def init(args) do transport = Keyword.get(args, :transport, Circuits.UART) - transport_opts = Keyword.get(args, :transport_opts, []) ++ [active: true] + transport_opts = args |> Keyword.get(:transport_opts, []) |> Keyword.put(:active, true) device_name = Keyword.fetch!(args, :device_name) data_source = Keyword.fetch!(args, :data_source) timeout = Keyword.get(args, :timeout, 5000) @@ -53,19 +53,19 @@ defmodule Modbuzz.RTU.Server do {:noreply, %{state | binary: <<>>}} - {:error, :binary_is_short} -> + {:error, :adu_binary_is_short} -> {:noreply, %{state | binary: new_binary}} - {:error, :unknown} -> - Log.error("Failed to decode ADU, unknown binary format.", nil, state) + {:error, {:pdu_unknown_function_code, _} = reason} -> + Log.error("Decode error", reason, state) {:noreply, %{state | binary: <<>>}} - {:error, :binary_is_long} -> - Log.error("Failed to decode ADU, binary is too long.", nil, state) + {:error, :adu_binary_is_long = reason} -> + Log.error("Decode error", reason, state) {:noreply, %{state | binary: <<>>}} - {:error, %ADU{unit_id: _unit_id, crc_valid?: false} = adu} -> - Log.warning("CRC error detected, #{inspect(adu)}.", nil, state) + {:error, :adu_crc_error = reason} -> + Log.error("Decode error", reason, state) {:noreply, %{state | binary: <<>>}} end end diff --git a/lib/modbuzz/tcp/adu.ex b/lib/modbuzz/tcp/adu.ex index ab325a0..79ce8a0 100644 --- a/lib/modbuzz/tcp/adu.ex +++ b/lib/modbuzz/tcp/adu.ex @@ -13,10 +13,14 @@ defmodule Modbuzz.TCP.ADU do } defstruct transaction_id: 0x0000, protocol_id: 0x0000, length: 0x0000, unit_id: 0x00, pdu: nil + @unit_id_length 1 + @function_code_length 1 + @mbap_length 7 + defguardp is_modbus_protocol(protocol_id) when protocol_id == 0x0000 - defguardp is_valid_length(length) when 1 <= length and length <= 0x00FE - @mbap_length 7 + defguardp is_valid_length(length) + when @unit_id_length + @function_code_length <= length and length <= 0x00FE def max_frame_length, do: @mbap_length + PDU.max_frame_length() @@ -41,21 +45,33 @@ defmodule Modbuzz.TCP.ADU do adu.unit_id, pdu_binary::binary-size(pdu_binary_length)>> end + @spec decode_request(binary :: binary(), list()) :: + {:ok, {[{:ok, t()}], binary()}} + | {:error, {:adu_invalid_protocol, non_neg_integer()}} + | {:error, {:adu_invalid_length, non_neg_integer()}} + | {:error, {:pdu_unknown_function_code, non_neg_integer()}} def decode_request( <<_transaction_id::16, protocol_id::16, length::16, _rest::binary>> = binary, acc ) when is_modbus_protocol(protocol_id) and is_valid_length(length) do + # trnsaction_id: 2 bytes + # protcol_id: 2 bytes + # length: 2 bytes adu_frame_size = 2 + 2 + 2 + length - if byte_size(binary) >= adu_frame_size do + if byte_size(binary) < adu_frame_size do + {:ok, {Enum.reverse(acc), binary}} + else <> = binary - adu = decode_request(adu_binary) + case decode_request(adu_binary) do + {:error, {:pdu_unknown_function_code, _}} = error -> + error - decode_request(rest, [adu | acc]) - else - {:ok, {Enum.reverse(acc), binary}} + adu_tuple -> + decode_request(rest, [adu_tuple | acc]) + end end end @@ -64,7 +80,7 @@ defmodule Modbuzz.TCP.ADU do _acc ) when not is_modbus_protocol(protocol_id) do - {:error, {:invalid_protocol, protocol_id}} + {:error, {:adu_invalid_protocol, protocol_id}} end def decode_request( @@ -72,28 +88,40 @@ defmodule Modbuzz.TCP.ADU do _acc ) when not is_valid_length(length) do - {:error, {:invalid_length, length}} + {:error, {:adu_invalid_length, length}} end def decode_request(binary, acc) do {:ok, {Enum.reverse(acc), binary}} end + @spec decode_response(binary :: binary(), list()) :: + {:ok, {[{:ok, t()} | {:error, t()}], binary()}} + | {:error, {:adu_invalid_protocol, non_neg_integer()}} + | {:error, {:adu_invalid_length, non_neg_integer()}} + | {:error, {:pdu_unknown_function_code, non_neg_integer()}} def decode_response( <<_transaction_id::16, protocol_id::16, length::16, _rest::binary>> = binary, acc ) when is_modbus_protocol(protocol_id) and is_valid_length(length) do + # trnsaction_id: 2 bytes + # protcol_id: 2 bytes + # length: 2 bytes adu_frame_size = 2 + 2 + 2 + length - if byte_size(binary) >= adu_frame_size do + if byte_size(binary) < adu_frame_size do + {:ok, {Enum.reverse(acc), binary}} + else <> = binary - adu = decode_response(adu_binary) + case decode_response(adu_binary) do + {:error, {:pdu_unknown_function_code, _}} = error -> + error - decode_response(rest, [adu | acc]) - else - {:ok, {Enum.reverse(acc), binary}} + adu_tuple -> + decode_response(rest, [adu_tuple | acc]) + end end end @@ -102,7 +130,7 @@ defmodule Modbuzz.TCP.ADU do _acc ) when not is_modbus_protocol(protocol_id) do - {:error, {:invalid_protocol, protocol_id}} + {:error, {:adu_invalid_protocol, protocol_id}} end def decode_response( @@ -110,13 +138,16 @@ defmodule Modbuzz.TCP.ADU do _acc ) when not is_valid_length(length) do - {:error, {:invalid_length, length}} + {:error, {:adu_invalid_length, length}} end def decode_response(binary, acc) do {:ok, {Enum.reverse(acc), binary}} end + @spec decode_request(binary :: binary()) :: + {:ok, t()} + | {:error, {:pdu_unknown_function_code, non_neg_integer()}} defp decode_request( <> @@ -130,11 +161,15 @@ defmodule Modbuzz.TCP.ADU do } case PDU.decode_request(pdu_binary) do - {:ok, pdu} -> {:ok, %{adu | pdu: pdu}} - {:error, pdu} -> {:error, %{adu | pdu: pdu}} + {:ok, pdu} when is_struct(pdu) -> {:ok, %{adu | pdu: pdu}} + {:error, {:pdu_unknown_function_code, _}} = error -> error end end + @spec decode_response(binary :: binary()) :: + {:ok, t()} + | {:error, t()} + | {:error, {:pdu_unknown_function_code, non_neg_integer()}} defp decode_response( <> @@ -148,8 +183,9 @@ defmodule Modbuzz.TCP.ADU do } case PDU.decode_response(pdu_binary) do - {:ok, pdu} -> {:ok, %{adu | pdu: pdu}} - {:error, pdu} -> {:error, %{adu | pdu: pdu}} + {:ok, pdu} when is_struct(pdu) -> {:ok, %{adu | pdu: pdu}} + {:error, pdu} when is_struct(pdu) -> {:error, %{adu | pdu: pdu}} + {:error, {:pdu_unknown_function_code, _}} = error -> error end end end diff --git a/lib/modbuzz/tcp/client.ex b/lib/modbuzz/tcp/client.ex index c1d3806..ec9301d 100644 --- a/lib/modbuzz/tcp/client.ex +++ b/lib/modbuzz/tcp/client.ex @@ -78,7 +78,7 @@ defmodule Modbuzz.TCP.Client do This function always returns :ok regardless of whether the destination server (or node) exists. Therefore it is unknown whether the destination server successfully handled the request. - Its response is sent as a meessage, looks like + Its response is sent as a message, looks like ``` {:modbuzz, client_name, unit_id, request, response_tuple} @@ -248,7 +248,7 @@ defmodule Modbuzz.TCP.Client do {:noreply, %{state | transactions: transactions, binary: binary}} {:error, reason} -> - Log.error("invalid response", reason, state) + Log.error("decode error", reason, state) transport.close(socket) {:noreply, %{state | socket: nil, binary: <<>>}} end @@ -258,24 +258,34 @@ defmodule Modbuzz.TCP.Client do %{ client_name: client_name, transport: transport, - transaction_id: transaction_id, transactions: transactions } = state Log.error("#{inspect(transport)} closed", nil, state) if not is_nil(socket), do: :ok = transport.close(socket) - {transaction, transactions} = Map.pop(transactions, transaction_id) - res_tuple = {:error, :tcp_closed} - maybe_report_response(transaction, client_name, res_tuple) - {:noreply, %{state | socket: nil, binary: <<>>, transactions: transactions}} + Enum.each(transactions, fn {_transaction_id, transaction} -> + maybe_report_response(transaction, client_name, {:error, :tcp_closed}) + end) + + {:noreply, %{state | socket: nil, binary: <<>>, transactions: %{}}} end def handle_info({:tcp_error, socket, reason}, state) do - %{transport: transport} = state + %{ + client_name: client_name, + transport: transport, + transactions: transactions + } = state + Log.error("transport error", reason, state) if not is_nil(socket), do: :ok = transport.close(socket) - {:noreply, %{state | socket: nil, binary: <<>>}} + + Enum.each(transactions, fn {_transaction_id, transaction} -> + maybe_report_response(transaction, client_name, {:error, :tcp_error}) + end) + + {:noreply, %{state | socket: nil, binary: <<>>, transactions: %{}}} end defp connect(state, timeout) do diff --git a/lib/modbuzz/tcp/server/socket_handler.ex b/lib/modbuzz/tcp/server/socket_handler.ex index 92eb34b..bff5dfb 100644 --- a/lib/modbuzz/tcp/server/socket_handler.ex +++ b/lib/modbuzz/tcp/server/socket_handler.ex @@ -43,29 +43,24 @@ defmodule Modbuzz.TCP.Server.SocketHandler do binary = state.binary <> binary case ADU.decode_request(binary, []) do - {:ok, {adu_tuples, binary}} -> - for {:ok, adu} <- adu_tuples do - request(data_source, adu.unit_id, adu.pdu, timeout) - |> ADU.new(adu.transaction_id, adu.unit_id) - |> ADU.encode() - end - |> Enum.reduce(<<>>, &(&2 <> &1)) - |> then(&transport.send(socket, &1)) + {:ok, {adu_tuples, rest_binary}} -> + response_binary = + Enum.reduce(adu_tuples, <<>>, fn {:ok, adu}, acc -> + response_pdu_or_nil = request(data_source, adu.unit_id, adu.pdu, timeout) + acc <> to_adu_binary(response_pdu_or_nil, adu.transaction_id, adu.unit_id) + end) - {:noreply, %{state | binary: binary}, {:continue, :recv}} + transport.send(socket, response_binary) + {:noreply, %{state | binary: rest_binary}, {:continue, :recv}} {:error, reason} -> - Log.error("invalid request", reason, state) + Log.error("decode error", reason, state) :ok = transport.close(socket) - {:stop, :invalid_length, %{state | binary: <<>>}} + {:stop, reason, state} end - {:error, :closed = reason} -> - :ok = transport.close(socket) - {:stop, reason, state} - {:error, reason} -> - Log.error(":recv failed", reason, state) + Log.error("#{inspect(transport)} recv error", reason, state) :ok = transport.close(socket) {:stop, reason, state} end @@ -73,8 +68,19 @@ defmodule Modbuzz.TCP.Server.SocketHandler do defp request(data_source, unit_id, request, timeout) do case GenServer.call(data_source, {:call, unit_id, request, timeout}) do - {:ok, response} -> response - {:error, error} -> error + {:ok, pdu} when is_struct(pdu) -> pdu + {:error, pdu} when is_struct(pdu) -> pdu + {:error, _reason} -> nil + end + end + + defp to_adu_binary(pdu, transaction_id, unit_id) do + case pdu do + pdu when is_struct(pdu) -> + ADU.new(pdu, transaction_id, unit_id) |> ADU.encode() + + nil -> + <<>> end end end diff --git a/test/modbuzz/rtu/client_test.exs b/test/modbuzz/rtu/client_test.exs index c9e7fc7..2f2799c 100644 --- a/test/modbuzz/rtu/client_test.exs +++ b/test/modbuzz/rtu/client_test.exs @@ -86,8 +86,8 @@ defmodule Modbuzz.RTU.ClientTest do :ok end) - task = Task.async(fn -> GenServer.call(name, {:call, unit_id, req, 100}) end) - assert Task.await(task) == {:error, :crc_error} + task = Task.async(fn -> GenServer.call(name, {:call, unit_id, req, 10}) end) + assert Task.await(task) == {:error, :timeout} end test "return :error tuple, modbus error response", %{ @@ -227,8 +227,8 @@ defmodule Modbuzz.RTU.ClientTest do :ok end) - GenServer.cast(name, {:cast, unit_id, req, self(), 100}) - assert_receive {:modbuzz, :client, ^unit_id, ^req, {:error, :crc_error}} + GenServer.cast(name, {:cast, unit_id, req, self(), 10}) + assert_receive {:modbuzz, :client, ^unit_id, ^req, {:error, :timeout}} end test "return :error tuple, server device busy", %{ diff --git a/test/modbuzz/tcp/client_test.exs b/test/modbuzz/tcp/client_test.exs index 2aea3b9..61265f3 100644 --- a/test/modbuzz/tcp/client_test.exs +++ b/test/modbuzz/tcp/client_test.exs @@ -106,7 +106,7 @@ defmodule Modbuzz.TCP.ClientTest do req = %Modbuzz.PDU.ReadCoils.Req{starting_address: 0, quantity_of_coils: 16} res = %Modbuzz.PDU.ReadCoils.Res{byte_count: 0x02, coil_status: List.duplicate(false, 16)} - %{parent: self(), ref: make_ref(), req: req, res: res} + %{req: req, res: res} end test "return :ok", %{req: req, res: res} do @@ -275,6 +275,7 @@ defmodule Modbuzz.TCP.ClientTest do :ok = Modbuzz.TCP.Client.cast(req) send(pid, {:tcp_closed, dummy_socket}) + assert_receive({:modbuzz, Modbuzz.TCP.Client, _unit_id = 0, ^req, {:error, :tcp_closed}}) assert_receive(:closed) end @@ -296,6 +297,7 @@ defmodule Modbuzz.TCP.ClientTest do :ok = Modbuzz.TCP.Client.cast(req) send(pid, {:tcp_error, dummy_socket, :reason}) + assert_receive({:modbuzz, Modbuzz.TCP.Client, _unit_id = 0, ^req, {:error, :tcp_error}}) assert_receive(:closed) end From 4e046ccb56f6b399c082e5dc4295df5b9bae446a Mon Sep 17 00:00:00 2001 From: Ryota Kinukawa Date: Sat, 30 May 2026 19:52:31 +0900 Subject: [PATCH 15/19] Add handle_info to Modbuzz.TCP.Client for stale socket --- lib/modbuzz/tcp/client.ex | 21 ++++++++-- test/modbuzz/tcp/client_test.exs | 72 ++++++++++++++++++++++++++++++++ 2 files changed, 90 insertions(+), 3 deletions(-) diff --git a/lib/modbuzz/tcp/client.ex b/lib/modbuzz/tcp/client.ex index ec9301d..e36b663 100644 --- a/lib/modbuzz/tcp/client.ex +++ b/lib/modbuzz/tcp/client.ex @@ -222,7 +222,7 @@ defmodule Modbuzz.TCP.Client do end end - def handle_info({:tcp, socket, binary}, state) do + def handle_info({:tcp, socket, binary}, %{socket: socket} = state) do %{ client_name: client_name, transport: transport, @@ -254,7 +254,12 @@ defmodule Modbuzz.TCP.Client do end end - def handle_info({:tcp_closed, socket}, state) do + def handle_info({:tcp, _socket, _binary}, state) do + # Ignore frames from stale sockets so they cannot affect the active connection state. + {:noreply, state} + end + + def handle_info({:tcp_closed, socket}, %{socket: socket} = state) do %{ client_name: client_name, transport: transport, @@ -271,7 +276,12 @@ defmodule Modbuzz.TCP.Client do {:noreply, %{state | socket: nil, binary: <<>>, transactions: %{}}} end - def handle_info({:tcp_error, socket, reason}, state) do + def handle_info({:tcp_closed, _socket}, state) do + # Ignore close notifications from stale sockets. + {:noreply, state} + end + + def handle_info({:tcp_error, socket, reason}, %{socket: socket} = state) do %{ client_name: client_name, transport: transport, @@ -288,6 +298,11 @@ defmodule Modbuzz.TCP.Client do {:noreply, %{state | socket: nil, binary: <<>>, transactions: %{}}} end + def handle_info({:tcp_error, _socket, _reason}, state) do + # Ignore errors from stale sockets; only the active socket should fail pending work. + {:noreply, state} + end + defp connect(state, timeout) do %{transport: transport, address: address, port: port, socket: socket} = state diff --git a/test/modbuzz/tcp/client_test.exs b/test/modbuzz/tcp/client_test.exs index 61265f3..b282684 100644 --- a/test/modbuzz/tcp/client_test.exs +++ b/test/modbuzz/tcp/client_test.exs @@ -301,6 +301,78 @@ defmodule Modbuzz.TCP.ClientTest do assert_receive(:closed) end + test "message from stale socket {:tcp, socket, binary} is ignored", %{ + pid: pid, + req: req, + res: res + } do + dummy_socket = make_ref() + stale_socket = make_ref() + + Modbuzz.TCP.TransportMock + |> expect(:connect, fn _, _, _, _ -> {:ok, dummy_socket} end) + |> expect(:send, fn _, _ -> :ok end) + + :ok = Modbuzz.TCP.Client.cast(req) + + send(pid, {:tcp, stale_socket, <<0::16, 0::16, 0::16>>}) + refute_receive({:modbuzz, Modbuzz.TCP.Client, _unit_id = 0, ^req, {:error, :tcp_error}}, 10) + + refute_receive( + {:modbuzz, Modbuzz.TCP.Client, _unit_id = 0, ^req, {:error, :tcp_closed}}, + 10 + ) + + send(pid, {:tcp, dummy_socket, to_binary(res)}) + assert_receive({:modbuzz, Modbuzz.TCP.Client, _unit_id = 0, ^req, {:ok, ^res}}) + end + + test "message from stale socket {:tcp_closed, socket} is ignored", %{ + pid: pid, + req: req, + res: res + } do + dummy_socket = make_ref() + stale_socket = make_ref() + + Modbuzz.TCP.TransportMock + |> expect(:connect, fn _, _, _, _ -> {:ok, dummy_socket} end) + |> expect(:send, fn _, _ -> :ok end) + + :ok = Modbuzz.TCP.Client.cast(req) + + send(pid, {:tcp_closed, stale_socket}) + + refute_receive( + {:modbuzz, Modbuzz.TCP.Client, _unit_id = 0, ^req, {:error, :tcp_closed}}, + 10 + ) + + send(pid, {:tcp, dummy_socket, to_binary(res)}) + assert_receive({:modbuzz, Modbuzz.TCP.Client, _unit_id = 0, ^req, {:ok, ^res}}) + end + + test "message from stale socket {:tcp_error, socket, reason} is ignored", %{ + pid: pid, + req: req, + res: res + } do + dummy_socket = make_ref() + stale_socket = make_ref() + + Modbuzz.TCP.TransportMock + |> expect(:connect, fn _, _, _, _ -> {:ok, dummy_socket} end) + |> expect(:send, fn _, _ -> :ok end) + + :ok = Modbuzz.TCP.Client.cast(req) + + send(pid, {:tcp_error, stale_socket, :reason}) + refute_receive({:modbuzz, Modbuzz.TCP.Client, _unit_id = 0, ^req, {:error, :tcp_error}}, 10) + + send(pid, {:tcp, dummy_socket, to_binary(res)}) + assert_receive({:modbuzz, Modbuzz.TCP.Client, _unit_id = 0, ^req, {:ok, ^res}}) + end + test "message {:tcp, socket, binary}, invalid length closes socket", %{ pid: pid, req: req From d4019e16836c67a3255abd68dbf81680f068e080 Mon Sep 17 00:00:00 2001 From: Ryota Kinukawa Date: Sun, 31 May 2026 10:12:06 +0900 Subject: [PATCH 16/19] Fix TCP.Server.SocketHandler, skip send when response binary is empty --- lib/modbuzz/tcp/server/socket_handler.ex | 3 +- .../tcp/server/socket_handler_test.exs | 49 +++++++++++++++++++ 2 files changed, 51 insertions(+), 1 deletion(-) create mode 100644 test/modbuzz/tcp/server/socket_handler_test.exs diff --git a/lib/modbuzz/tcp/server/socket_handler.ex b/lib/modbuzz/tcp/server/socket_handler.ex index bff5dfb..6410326 100644 --- a/lib/modbuzz/tcp/server/socket_handler.ex +++ b/lib/modbuzz/tcp/server/socket_handler.ex @@ -50,7 +50,8 @@ defmodule Modbuzz.TCP.Server.SocketHandler do acc <> to_adu_binary(response_pdu_or_nil, adu.transaction_id, adu.unit_id) end) - transport.send(socket, response_binary) + if response_binary != <<>>, do: transport.send(socket, response_binary) + {:noreply, %{state | binary: rest_binary}, {:continue, :recv}} {:error, reason} -> diff --git a/test/modbuzz/tcp/server/socket_handler_test.exs b/test/modbuzz/tcp/server/socket_handler_test.exs new file mode 100644 index 0000000..3ae926f --- /dev/null +++ b/test/modbuzz/tcp/server/socket_handler_test.exs @@ -0,0 +1,49 @@ +defmodule Modbuzz.TCP.Server.SocketHandlerTest do + use ExUnit.Case + + import Mox + + setup :verify_on_exit! + setup :set_mox_global + + test "does not send when recv has only a partial ADU binary" do + socket = make_ref() + me = self() + + request = %Modbuzz.PDU.ReadCoils.Req{starting_address: 0, quantity_of_coils: 1} + + encoded_adu_binary = + request + |> Modbuzz.TCP.ADU.new(_transaction_id = 0x0001, _unit_id = 0x00) + |> Modbuzz.TCP.ADU.encode() + + partial_size = byte_size(encoded_adu_binary) - 1 + # Drop the last byte on purpose so the socket handler receives an incomplete ADU binary. + <> = encoded_adu_binary + + Modbuzz.TCP.TransportMock + |> expect(:recv, fn ^socket, _length = 0, _timeout -> + {:ok, partial_adu_binary} + end) + |> expect(:recv, fn ^socket, _length = 0, _timeout -> {:error, :closed} end) + |> expect(:close, fn ^socket -> + send(me, :closed) + :ok + end) + + start_link_supervised!( + {Modbuzz.TCP.Server.SocketHandler, + [ + transport: Modbuzz.TCP.TransportMock, + address: {127, 0, 0, 1}, + port: 50_200, + socket: socket, + data_source: self(), + timeout: 10 + ]}, + restart: :temporary + ) + + assert_receive(:closed) + end +end From 747f939f137b4adf5bbce1964d0175e2471b2a5a Mon Sep 17 00:00:00 2001 From: Ryota Kinukawa Date: Sun, 31 May 2026 10:53:13 +0900 Subject: [PATCH 17/19] Refactor --- lib/modbuzz/tcp/client.ex | 73 ++++++++++++++++++++++---------- test/modbuzz/tcp/client_test.exs | 28 ++++++++++++ 2 files changed, 78 insertions(+), 23 deletions(-) diff --git a/lib/modbuzz/tcp/client.ex b/lib/modbuzz/tcp/client.ex index e36b663..211a873 100644 --- a/lib/modbuzz/tcp/client.ex +++ b/lib/modbuzz/tcp/client.ex @@ -17,9 +17,10 @@ defmodule Modbuzz.TCP.Client do unit_id: 0x00..0xFF, request: Modbuzz.PDU.Protocol.t(), from_or_pid: GenServer.from() | pid() | nil, - ref: reference() + ref: reference(), + timer: reference() } - defstruct [:call_or_cast, :unit_id, :request, :from_or_pid, :ref] + defstruct [:call_or_cast, :unit_id, :request, :from_or_pid, :ref, :timer] end @doc """ @@ -151,16 +152,17 @@ defmodule Modbuzz.TCP.Client do transaction_id = ADU.increment_transaction_id(transaction_id) + ref = make_ref() + transaction = %Transaction{ call_or_cast: call_or_cast, unit_id: unit_id, request: request, from_or_pid: from_or_pid, - ref: make_ref() + ref: ref, + timer: Process.send_after(self(), {:timeout?, transaction_id, ref}, timeout) } - timer = Process.send_after(self(), {:timeout?, transaction_id, transaction}, timeout) - case connect(state, timeout) do {:ok, socket} -> binary = request |> ADU.new(transaction_id, unit_id) |> ADU.encode() @@ -179,24 +181,33 @@ defmodule Modbuzz.TCP.Client do {:error, reason} -> Log.error("#{inspect(transport)} send error", reason, state) - Process.cancel_timer(timer) :ok = transport.close(socket) + + transactions = Map.put(transactions, transaction_id, transaction) res_tuple = {:error, :tcp_send_error} - maybe_report_response(transaction, client_name, res_tuple) - {:noreply, %{state | socket: nil, binary: <<>>}} + + Enum.each(transactions, fn {_transaction_id, transaction} -> + Process.cancel_timer(transaction.timer) + report_response(transaction, client_name, res_tuple) + end) + + {:noreply, %{state | socket: nil, binary: <<>>, transactions: %{}}} end {:error, reason} -> Log.error("#{inspect(transport)} connect error", reason, state) - Process.cancel_timer(timer) + res_tuple = {:error, :tcp_connect_error} - maybe_report_response(transaction, client_name, res_tuple) - {:noreply, %{state | socket: nil, binary: <<>>}} + + Process.cancel_timer(transaction.timer) + report_response(transaction, client_name, res_tuple) + + {:noreply, %{state | socket: nil, binary: <<>>, transactions: %{}}} end end @impl true - def handle_info({:timeout?, transaction_id, %Transaction{ref: ref}}, state) do + def handle_info({:timeout?, transaction_id, ref}, state) do %{ client_name: client_name, transactions: transactions @@ -210,8 +221,11 @@ defmodule Modbuzz.TCP.Client do # timeout for the current request, report timeout error %Transaction{request: request, ref: ref_} = transaction when ref_ == ref -> Log.error("TCP server didn't respond for #{inspect(request)}", nil, state) + res_tuple = {:error, :timeout} - maybe_report_response(transaction, client_name, res_tuple) + + report_response(transaction, client_name, res_tuple) + {:noreply, %{state | transactions: Map.delete(transactions, transaction_id)}} # the current request is different, do not report timeout error @@ -240,7 +254,12 @@ defmodule Modbuzz.TCP.Client do fn {ok_or_error, %ADU{transaction_id: transaction_id, pdu: pdu}}, acc -> {transaction, acc} = Map.pop(acc, transaction_id) res_tuple = {ok_or_error, pdu} - maybe_report_response(transaction, client_name, res_tuple) + + if not is_nil(transaction) do + Process.cancel_timer(transaction.timer) + report_response(transaction, client_name, res_tuple) + end + acc end ) @@ -248,9 +267,16 @@ defmodule Modbuzz.TCP.Client do {:noreply, %{state | transactions: transactions, binary: binary}} {:error, reason} -> - Log.error("decode error", reason, state) - transport.close(socket) - {:noreply, %{state | socket: nil, binary: <<>>}} + Log.error("Decode error", reason, state) + :ok = transport.close(socket) + res_tuple = {:error, :decode_error} + + Enum.each(transactions, fn {_transaction_id, transaction} -> + Process.cancel_timer(transaction.timer) + report_response(transaction, client_name, res_tuple) + end) + + {:noreply, %{state | socket: nil, binary: <<>>, transactions: %{}}} end end @@ -268,9 +294,11 @@ defmodule Modbuzz.TCP.Client do Log.error("#{inspect(transport)} closed", nil, state) if not is_nil(socket), do: :ok = transport.close(socket) + res_tuple = {:error, :tcp_closed} Enum.each(transactions, fn {_transaction_id, transaction} -> - maybe_report_response(transaction, client_name, {:error, :tcp_closed}) + Process.cancel_timer(transaction.timer) + report_response(transaction, client_name, res_tuple) end) {:noreply, %{state | socket: nil, binary: <<>>, transactions: %{}}} @@ -290,9 +318,11 @@ defmodule Modbuzz.TCP.Client do Log.error("transport error", reason, state) if not is_nil(socket), do: :ok = transport.close(socket) + res_tuple = {:error, :tcp_error} Enum.each(transactions, fn {_transaction_id, transaction} -> - maybe_report_response(transaction, client_name, {:error, :tcp_error}) + Process.cancel_timer(transaction.timer) + report_response(transaction, client_name, res_tuple) end) {:noreply, %{state | socket: nil, binary: <<>>, transactions: %{}}} @@ -325,11 +355,8 @@ defmodule Modbuzz.TCP.Client do end end - defp maybe_report_response(transaction, client_name, res_tuple) do + defp report_response(%Transaction{} = transaction, client_name, res_tuple) do case transaction do - nil -> - :noop - %{call_or_cast: :call, unit_id: _unit_id, request: _request, from_or_pid: from} when is_tuple(from) -> GenServer.reply(from, res_tuple) diff --git a/test/modbuzz/tcp/client_test.exs b/test/modbuzz/tcp/client_test.exs index b282684..17cf903 100644 --- a/test/modbuzz/tcp/client_test.exs +++ b/test/modbuzz/tcp/client_test.exs @@ -179,6 +179,34 @@ defmodule Modbuzz.TCP.ClientTest do assert Modbuzz.TCP.Client.cast(Modbuzz.TCP.Client, 0, req, self(), 10) == :ok assert_receive({:modbuzz, Modbuzz.TCP.Client, _unit_id = 0, ^req, {:error, :timeout}}) end + + test "return :ok, send error fails all pending with {:error, :tcp_send_error}", %{ + req: req + } do + dummy_socket = make_ref() + + Modbuzz.TCP.TransportMock + |> expect(:connect, fn _, _, _, _ -> {:ok, dummy_socket} end) + |> expect(:send, fn _, _ -> :ok end) + |> expect(:send, fn _, _ -> {:error, :closed} end) + |> expect(:close, fn _ -> :ok end) + + start_link_supervised!( + {Modbuzz.TCP.Client, transport: Modbuzz.TCP.TransportMock}, + restart: :temporary + ) + + assert Modbuzz.TCP.Client.cast(req) == :ok + assert Modbuzz.TCP.Client.cast(req) == :ok + + assert_receive( + {:modbuzz, Modbuzz.TCP.Client, _unit_id = 0, ^req, {:error, :tcp_send_error}} + ) + + assert_receive( + {:modbuzz, Modbuzz.TCP.Client, _unit_id = 0, ^req, {:error, :tcp_send_error}} + ) + end end describe "tcp messages" do From 8f03d3e627b52ee5094b52fb82603d9c9c15c2b4 Mon Sep 17 00:00:00 2001 From: Ryota Kinukawa Date: Sun, 31 May 2026 12:53:46 +0900 Subject: [PATCH 18/19] Add :pdu_decode_error handling --- lib/modbuzz/rtu/adu.ex | 28 +++++++++++++++++++++------- lib/modbuzz/rtu/client.ex | 4 ++++ lib/modbuzz/rtu/server.ex | 4 ++++ lib/modbuzz/tcp/adu.ex | 33 ++++++++++++++++++++++++--------- 4 files changed, 53 insertions(+), 16 deletions(-) diff --git a/lib/modbuzz/rtu/adu.ex b/lib/modbuzz/rtu/adu.ex index c9e477a..0275d93 100644 --- a/lib/modbuzz/rtu/adu.ex +++ b/lib/modbuzz/rtu/adu.ex @@ -36,6 +36,7 @@ defmodule Modbuzz.RTU.ADU do {:ok, t()} | {:error, :adu_binary_is_short} | {:error, {:pdu_unknown_function_code, non_neg_integer()}} + | {:error, :pdu_decode_error} | {:error, :adu_binary_is_long} | {:error, :adu_crc_error} def decode_request(binary) when is_binary(binary) and byte_size(binary) < @min_frame_length do @@ -46,9 +47,15 @@ defmodule Modbuzz.RTU.ADU do with {:ok, pdu_length} <- Modbuzz.PDU.request_length(binary), true <- byte_size(binary) >= pdu_length + @crc_length || {:error, :adu_binary_is_short}, <> <- binary, - true <- crc(<>) == crc || {:error, :adu_crc_error}, - {:ok, pdu} <- PDU.decode_request(pdu_binary) do - {:ok, %__MODULE__{unit_id: unit_id, pdu: pdu}} + true <- crc(<>) == crc || {:error, :adu_crc_error} do + try do + case PDU.decode_request(pdu_binary) do + {:ok, pdu} -> {:ok, %__MODULE__{unit_id: unit_id, pdu: pdu}} + {:error, {:pdu_unknown_function_code, _}} = error -> error + end + rescue + FunctionClauseError -> {:error, :pdu_decode_error} + end else {:error, {:pdu_unknown_function_code, _}} = error -> error {:error, :adu_binary_is_short} = error -> error @@ -61,6 +68,7 @@ defmodule Modbuzz.RTU.ADU do {:ok, t()} | {:error, :adu_binary_is_short} | {:error, {:pdu_unknown_function_code, non_neg_integer()}} + | {:error, :pdu_decode_error} | {:error, :adu_binary_is_long} | {:error, :adu_crc_error} | {:error, t()} @@ -72,15 +80,21 @@ defmodule Modbuzz.RTU.ADU do with {:ok, pdu_length} <- Modbuzz.PDU.response_length(binary), true <- byte_size(binary) >= pdu_length + @crc_length || {:error, :adu_binary_is_short}, <> <- binary, - true <- crc(<>) == crc || {:error, :adu_crc_error}, - {:ok, pdu} <- PDU.decode_response(pdu_binary) do - {:ok, %__MODULE__{unit_id: unit_id, pdu: pdu}} + true <- crc(<>) == crc || {:error, :adu_crc_error} do + try do + case PDU.decode_response(pdu_binary) do + {:ok, pdu} -> {:ok, %__MODULE__{unit_id: unit_id, pdu: pdu}} + {:error, pdu} when is_struct(pdu) -> {:error, %__MODULE__{unit_id: unit_id, pdu: pdu}} + {:error, {:pdu_unknown_function_code, _}} = error -> error + end + rescue + FunctionClauseError -> {:error, :pdu_decode_error} + end else {:error, {:pdu_unknown_function_code, _}} = error -> error {:error, :adu_binary_is_short} = error -> error long_binary when is_binary(long_binary) -> {:error, :adu_binary_is_long} {:error, :adu_crc_error} = error -> error - {:error, pdu} when is_struct(pdu) -> {:error, %__MODULE__{unit_id: unit_id, pdu: pdu}} end end diff --git a/lib/modbuzz/rtu/client.ex b/lib/modbuzz/rtu/client.ex index 532c70a..2be70fb 100644 --- a/lib/modbuzz/rtu/client.ex +++ b/lib/modbuzz/rtu/client.ex @@ -141,6 +141,10 @@ defmodule Modbuzz.RTU.Client do Log.error("Decode error", reason, state) {:noreply, %{state | binary: <<>>}} + {:error, :pdu_decode_error = reason} -> + Log.error("Decode error", reason, state) + {:noreply, %{state | binary: <<>>}} + {:error, :adu_binary_is_short} -> {:noreply, %{state | binary: new_binary}} diff --git a/lib/modbuzz/rtu/server.ex b/lib/modbuzz/rtu/server.ex index 269105d..8ff2acd 100644 --- a/lib/modbuzz/rtu/server.ex +++ b/lib/modbuzz/rtu/server.ex @@ -60,6 +60,10 @@ defmodule Modbuzz.RTU.Server do Log.error("Decode error", reason, state) {:noreply, %{state | binary: <<>>}} + {:error, :pdu_decode_error = reason} -> + Log.error("Decode error", reason, state) + {:noreply, %{state | binary: <<>>}} + {:error, :adu_binary_is_long = reason} -> Log.error("Decode error", reason, state) {:noreply, %{state | binary: <<>>}} diff --git a/lib/modbuzz/tcp/adu.ex b/lib/modbuzz/tcp/adu.ex index 79ce8a0..37e3305 100644 --- a/lib/modbuzz/tcp/adu.ex +++ b/lib/modbuzz/tcp/adu.ex @@ -50,6 +50,7 @@ defmodule Modbuzz.TCP.ADU do | {:error, {:adu_invalid_protocol, non_neg_integer()}} | {:error, {:adu_invalid_length, non_neg_integer()}} | {:error, {:pdu_unknown_function_code, non_neg_integer()}} + | {:error, :pdu_decode_error} def decode_request( <<_transaction_id::16, protocol_id::16, length::16, _rest::binary>> = binary, acc @@ -66,7 +67,7 @@ defmodule Modbuzz.TCP.ADU do <> = binary case decode_request(adu_binary) do - {:error, {:pdu_unknown_function_code, _}} = error -> + {:error, _reason} = error -> error adu_tuple -> @@ -100,6 +101,7 @@ defmodule Modbuzz.TCP.ADU do | {:error, {:adu_invalid_protocol, non_neg_integer()}} | {:error, {:adu_invalid_length, non_neg_integer()}} | {:error, {:pdu_unknown_function_code, non_neg_integer()}} + | {:error, :pdu_decode_error} def decode_response( <<_transaction_id::16, protocol_id::16, length::16, _rest::binary>> = binary, acc @@ -116,7 +118,10 @@ defmodule Modbuzz.TCP.ADU do <> = binary case decode_response(adu_binary) do - {:error, {:pdu_unknown_function_code, _}} = error -> + {:error, %__MODULE__{}} = adu_tuple -> + decode_response(rest, [adu_tuple | acc]) + + {:error, _reason} = error -> error adu_tuple -> @@ -148,6 +153,7 @@ defmodule Modbuzz.TCP.ADU do @spec decode_request(binary :: binary()) :: {:ok, t()} | {:error, {:pdu_unknown_function_code, non_neg_integer()}} + | {:error, :pdu_decode_error} defp decode_request( <> @@ -160,9 +166,13 @@ defmodule Modbuzz.TCP.ADU do unit_id: unit_id } - case PDU.decode_request(pdu_binary) do - {:ok, pdu} when is_struct(pdu) -> {:ok, %{adu | pdu: pdu}} - {:error, {:pdu_unknown_function_code, _}} = error -> error + try do + case PDU.decode_request(pdu_binary) do + {:ok, pdu} when is_struct(pdu) -> {:ok, %{adu | pdu: pdu}} + {:error, {:pdu_unknown_function_code, _}} = error -> error + end + rescue + FunctionClauseError -> {:error, :pdu_decode_error} end end @@ -170,6 +180,7 @@ defmodule Modbuzz.TCP.ADU do {:ok, t()} | {:error, t()} | {:error, {:pdu_unknown_function_code, non_neg_integer()}} + | {:error, :pdu_decode_error} defp decode_response( <> @@ -182,10 +193,14 @@ defmodule Modbuzz.TCP.ADU do unit_id: unit_id } - case PDU.decode_response(pdu_binary) do - {:ok, pdu} when is_struct(pdu) -> {:ok, %{adu | pdu: pdu}} - {:error, pdu} when is_struct(pdu) -> {:error, %{adu | pdu: pdu}} - {:error, {:pdu_unknown_function_code, _}} = error -> error + try do + case PDU.decode_response(pdu_binary) do + {:ok, pdu} when is_struct(pdu) -> {:ok, %{adu | pdu: pdu}} + {:error, pdu} when is_struct(pdu) -> {:error, %{adu | pdu: pdu}} + {:error, {:pdu_unknown_function_code, _}} = error -> error + end + rescue + FunctionClauseError -> {:error, :pdu_decode_error} end end end From 1993d1e889d4be808dd0c322a8e4b8492ad6f032 Mon Sep 17 00:00:00 2001 From: Ryota Kinukawa Date: Sun, 31 May 2026 13:13:49 +0900 Subject: [PATCH 19/19] Honor configured timeout in data source call --- lib/modbuzz/tcp/server/socket_handler.ex | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/modbuzz/tcp/server/socket_handler.ex b/lib/modbuzz/tcp/server/socket_handler.ex index 6410326..837311c 100644 --- a/lib/modbuzz/tcp/server/socket_handler.ex +++ b/lib/modbuzz/tcp/server/socket_handler.ex @@ -68,7 +68,7 @@ defmodule Modbuzz.TCP.Server.SocketHandler do end defp request(data_source, unit_id, request, timeout) do - case GenServer.call(data_source, {:call, unit_id, request, timeout}) do + case GenServer.call(data_source, {:call, unit_id, request, timeout}, timeout + 50) do {:ok, pdu} when is_struct(pdu) -> pdu {:error, pdu} when is_struct(pdu) -> pdu {:error, _reason} -> nil