Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .credo.exs
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@
{Credo.Check.Refactor.MatchInCondition, []},
{Credo.Check.Refactor.NegatedConditionsInUnless, []},
{Credo.Check.Refactor.NegatedConditionsWithElse, []},
{Credo.Check.Refactor.Nesting, []},
{Credo.Check.Refactor.Nesting, [max_nesting: 3]},
{Credo.Check.Refactor.RedundantWithClauseResult, []},
{Credo.Check.Refactor.RejectReject, []},
{Credo.Check.Refactor.UnlessWithElse, []},
Expand Down
34 changes: 32 additions & 2 deletions lib/modbuzz/pdu.ex
Original file line number Diff line number Diff line change
@@ -1,8 +1,25 @@
defmodule Modbuzz.PDU do
@moduledoc false

@max_frame_length 253

def max_frame_length, do: @max_frame_length

defdelegate encode(struct), to: Modbuzz.PDU.Protocol, as: :encode

@spec decode_request(binary()) ::
{:ok, Modbuzz.PDU.Protocol.t()}
| {:error, {:pdu_unknown_function_code, non_neg_integer()}}
@spec decode_response(binary()) ::
{:ok, Modbuzz.PDU.Protocol.t()}
| {:error, Modbuzz.PDU.Protocol.t()}
| {:error, {:pdu_unknown_function_code, non_neg_integer()}}
@spec request_length(binary()) ::
{:ok, non_neg_integer()}
| {:error, {:pdu_unknown_function_code, non_neg_integer()}}
@spec response_length(binary()) ::
{:ok, non_neg_integer()}
| {:error, {:pdu_unknown_function_code, non_neg_integer()}}
for {modbus_function_code, modbus_function} <- Modbuzz.MixProject.pdu_seed() do
req_module = Module.concat([Modbuzz.PDU, modbus_function, Req])
res_module = Module.concat([Modbuzz.PDU, modbus_function, Res])
Expand Down Expand Up @@ -34,9 +51,22 @@ defmodule Modbuzz.PDU do
end
end

def decode_request(<<modbus_function_code, _rest::binary>>) do
{:error, {:pdu_unknown_function_code, modbus_function_code}}
end

def decode_response(<<modbus_function_code, _rest::binary>>) do
{:error, {:pdu_unknown_function_code, modbus_function_code}}
end

# NOTE: We need this fallback, because the binary is not always guaranteed to be correct.
def request_length(<<_, _rest::binary>>), do: {:error, :unknown}
def response_length(<<_, _rest::binary>>), do: {:error, :unknown}
def request_length(<<modbus_function_code, _rest::binary>>) do
{:error, {:pdu_unknown_function_code, modbus_function_code}}
end

def response_length(<<modbus_function_code, _rest::binary>>) do
{:error, {:pdu_unknown_function_code, modbus_function_code}}
end

def to_error(%req{}, exception_code) do
exception_code =
Expand Down
3 changes: 3 additions & 0 deletions lib/modbuzz/pdu/protocol.ex
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,12 @@ defprotocol Modbuzz.PDU.Protocol do
"""

@doc false
@spec encode(t()) :: binary()
def encode(struct)
@doc false
@spec decode(t(), binary()) :: t()
def decode(struct, binary)
@doc false
@spec expected_binary_size(t(), binary()) :: non_neg_integer()
def expected_binary_size(struct, binary)
end
94 changes: 56 additions & 38 deletions lib/modbuzz/rtu/adu.ex
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,22 @@ defmodule Modbuzz.RTU.ADU do

alias Modbuzz.PDU

# unit_id: 1, functions_code: 1, crc: 2, so minimum frame length is 4
@min_frame_length 4
@unit_id_length 1
@function_code_length 1

@crc_defn :cerlc.init(:crc16_modbus)
@crc_length 2

@type t :: %__MODULE__{unit_id: 0x00..0xF7, pdu: struct() | nil, crc_valid?: boolean()}
@type t :: %__MODULE__{
unit_id: 0x00..0xF7,
pdu: Modbuzz.PDU.Protocol.t() | nil,
crc_valid?: boolean()
}
defstruct unit_id: 0x00, pdu: nil, crc_valid?: true

@min_frame_length @unit_id_length + @function_code_length + @crc_length
def max_frame_length, do: PDU.max_frame_length() + @crc_length

def new(pdu, unit_id) when is_struct(pdu) do
%__MODULE__{
unit_id: unit_id,
Expand All @@ -25,58 +32,69 @@ defmodule Modbuzz.RTU.ADU do
binary <> crc(binary)
end

@spec decode_request(binary()) ::
{:ok, t()}
| {:error, :adu_binary_is_short}
| {:error, {:pdu_unknown_function_code, non_neg_integer()}}
| {:error, :pdu_decode_error}
| {:error, :adu_binary_is_long}
| {:error, :adu_crc_error}
def decode_request(binary) when is_binary(binary) and byte_size(binary) < @min_frame_length do
{:error, :binary_is_short}
{:error, :adu_binary_is_short}
end

def decode_request(<<unit_id, binary::binary>>) do
with {:ok, pdu_length} <- Modbuzz.PDU.request_length(binary),
true <- byte_size(binary) >= pdu_length + @crc_length || {:error, :binary_is_short},
true <- byte_size(binary) >= pdu_length + @crc_length || {:error, :adu_binary_is_short},
<<pdu_binary::binary-size(pdu_length), crc::binary-size(@crc_length)>> <- binary,
true <- crc(<<unit_id, pdu_binary::binary>>) == crc || {:error, :crc_error},
{:ok, pdu} <- PDU.decode_request(pdu_binary) do
{:ok, %__MODULE__{unit_id: unit_id, pdu: pdu}}
true <- crc(<<unit_id, pdu_binary::binary>>) == crc || {:error, :adu_crc_error} do
try do
case PDU.decode_request(pdu_binary) do
{:ok, pdu} -> {:ok, %__MODULE__{unit_id: unit_id, pdu: pdu}}
{:error, {:pdu_unknown_function_code, _}} = error -> error
end
rescue
FunctionClauseError -> {:error, :pdu_decode_error}
end
else
{:error, :binary_is_short} ->
{:error, :binary_is_short}

{:error, :unknown} ->
{:error, :unknown}

long_binary when is_binary(long_binary) ->
{:error, :binary_is_long}

{:error, :crc_error} ->
{:error, %__MODULE__{unit_id: unit_id, crc_valid?: false}}
{:error, {:pdu_unknown_function_code, _}} = error -> error
{:error, :adu_binary_is_short} = error -> error
long_binary when is_binary(long_binary) -> {:error, :adu_binary_is_long}
{:error, :adu_crc_error} = error -> error
end
end

@spec decode_response(binary()) ::
{:ok, t()}
| {:error, :adu_binary_is_short}
| {:error, {:pdu_unknown_function_code, non_neg_integer()}}
| {:error, :pdu_decode_error}
| {:error, :adu_binary_is_long}
| {:error, :adu_crc_error}
| {:error, t()}
def decode_response(binary) when is_binary(binary) and byte_size(binary) < @min_frame_length do
{:error, :binary_is_short}
{:error, :adu_binary_is_short}
end

def decode_response(<<unit_id, binary::binary>>) do
with {:ok, pdu_length} <- Modbuzz.PDU.response_length(binary),
true <- byte_size(binary) >= pdu_length + @crc_length || {:error, :binary_is_short},
true <- byte_size(binary) >= pdu_length + @crc_length || {:error, :adu_binary_is_short},
<<pdu_binary::binary-size(pdu_length), crc::binary-size(@crc_length)>> <- binary,
true <- crc(<<unit_id, pdu_binary::binary>>) == crc || {:error, :crc_error},
{:ok, pdu} <- PDU.decode_response(pdu_binary) do
{:ok, %__MODULE__{unit_id: unit_id, pdu: pdu}}
true <- crc(<<unit_id, pdu_binary::binary>>) == crc || {:error, :adu_crc_error} do
try do
case PDU.decode_response(pdu_binary) do
{:ok, pdu} -> {:ok, %__MODULE__{unit_id: unit_id, pdu: pdu}}
{:error, pdu} when is_struct(pdu) -> {:error, %__MODULE__{unit_id: unit_id, pdu: pdu}}
{:error, {:pdu_unknown_function_code, _}} = error -> error
end
rescue
FunctionClauseError -> {:error, :pdu_decode_error}
end
else
{:error, :unknown} ->
{:error, :unknown}

{:error, :binary_is_short} ->
{:error, :binary_is_short}

long_binary when is_binary(long_binary) ->
{:error, :binary_is_long}

{:error, :crc_error} ->
{:error, %__MODULE__{unit_id: unit_id, crc_valid?: false}}

{:error, pdu} when is_struct(pdu) ->
{:error, %__MODULE__{unit_id: unit_id, pdu: pdu}}
{:error, {:pdu_unknown_function_code, _}} = error -> error
{:error, :adu_binary_is_short} = error -> error
long_binary when is_binary(long_binary) -> {:error, :adu_binary_is_long}
{:error, :adu_crc_error} = error -> error
end
end

Expand Down
49 changes: 22 additions & 27 deletions lib/modbuzz/rtu/client.ex
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ defmodule Modbuzz.RTU.Client do
def init(args) do
client_name = Keyword.fetch!(args, :name)
transport = Keyword.get(args, :transport, Circuits.UART)
transport_opts = Keyword.get(args, :transport_opts, []) ++ [active: true]
transport_opts = args |> Keyword.get(:transport_opts, []) |> Keyword.put(:active, true)
device_name = Keyword.fetch!(args, :device_name)

{:ok, transport_pid} = transport.start_link([])
Expand All @@ -41,7 +41,7 @@ defmodule Modbuzz.RTU.Client do
reqs: reqs
} = state

Log.error("RTU client terminated, #{inspect(reason)}.", nil, state)
Log.error("RTU client terminated", reason, state)

# All pending requests should be notified of the error
Enum.each(reqs, fn {_unit_id, req} ->
Expand All @@ -67,24 +67,21 @@ defmodule Modbuzz.RTU.Client do
reqs: reqs
} = state

req = Map.get(reqs, unit_id)
adu = ADU.new(request, unit_id)
new_req = {call_or_cast, unit_id, request, from_or_pid, make_ref()}

timer = Process.send_after(self(), {:timeout?, new_req}, timeout)

with true <- is_nil(req) || {:error, :another_request_in_progress},
binary <- ADU.encode(adu),
with true <- is_nil(Map.get(reqs, unit_id)) || {:error, :another_request_in_progress},
binary = request |> ADU.new(unit_id) |> ADU.encode(),
:ok <- transport.write(transport_pid, binary, timeout) do
{:noreply, %{state | reqs: Map.put(reqs, adu.unit_id, new_req)}}
{:noreply, %{state | reqs: Map.put(reqs, unit_id, new_req)}}
else
{:error, :another_request_in_progress} = res_tuple ->
Process.cancel_timer(timer)
maybe_report_response(new_req, client_name, res_tuple)
{:noreply, state}

{:error, reason} = res_tuple ->
Log.error("#{inspect(transport)} write error, #{inspect(reason)}.", nil, state)
Log.error("#{inspect(transport)} write error", reason, state)
Process.cancel_timer(timer)
maybe_report_response(new_req, client_name, res_tuple)
{:noreply, state}
Expand All @@ -97,16 +94,14 @@ defmodule Modbuzz.RTU.Client do
reqs: reqs
} = state

req = Map.get(reqs, unit_id)

case req do
case Map.get(reqs, unit_id) do
# already responded
nil ->
{:noreply, state}

# timeout for the current request, report timeout error
{_, unit_id_, request_, _, ref_} = req_ when ref_ == ref ->
Log.error("RTU server didn't respond for #{inspect(request_)}.", nil, state)
Log.error("RTU server didn't respond for #{inspect(request_)}", nil, state)
res_tuple = {:error, :timeout}

maybe_report_response(req_, client_name, res_tuple)
Expand Down Expand Up @@ -141,27 +136,27 @@ defmodule Modbuzz.RTU.Client do

{:noreply, %{state | reqs: Map.put(reqs, unit_id, nil), binary: <<>>}}

{:error, :binary_is_short} ->
{:noreply, %{state | binary: new_binary}}

{:error, :unknown} ->
{:error, {:pdu_unknown_function_code, _} = reason} ->
# No response is sent to the requester; the pending request will eventually time out.
Log.error("Failed to decode ADU, unknown binary format.", nil, state)
Log.error("Decode error", reason, state)
{:noreply, %{state | binary: <<>>}}

{:error, :binary_is_long} ->
# No response is sent to the requester; the pending request will eventually time out.
Log.error("Failed to decode ADU, binary is too long.", nil, state)
{:error, :pdu_decode_error = reason} ->
Log.error("Decode error", reason, state)
{:noreply, %{state | binary: <<>>}}

{:error, %ADU{unit_id: unit_id, crc_valid?: false} = adu} ->
Log.warning("CRC error detected, #{inspect(adu)}.", nil, state)
res_tuple = {:error, :crc_error}
req = Map.get(reqs, unit_id)
{:error, :adu_binary_is_short} ->
{:noreply, %{state | binary: new_binary}}

maybe_report_response(req, client_name, res_tuple)
{:error, :adu_binary_is_long = reason} ->
# No response is sent to the requester; the pending request will eventually time out.
Log.error("Decode error", reason, state)
{:noreply, %{state | binary: <<>>}}

{:noreply, %{state | reqs: Map.put(reqs, unit_id, nil), binary: <<>>}}
{:error, :adu_crc_error = reason} ->
# No response is sent to the requester; the pending request will eventually time out.
Log.warning("Decode error", reason, state)
{:noreply, %{state | binary: <<>>}}

{:error, %ADU{unit_id: unit_id, pdu: pdu}} ->
res_tuple = {:error, pdu}
Expand Down
20 changes: 12 additions & 8 deletions lib/modbuzz/rtu/server.ex
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ defmodule Modbuzz.RTU.Server do
@doc false
def init(args) do
transport = Keyword.get(args, :transport, Circuits.UART)
transport_opts = Keyword.get(args, :transport_opts, []) ++ [active: true]
transport_opts = args |> Keyword.get(:transport_opts, []) |> Keyword.put(:active, true)
device_name = Keyword.fetch!(args, :device_name)
data_source = Keyword.fetch!(args, :data_source)
timeout = Keyword.get(args, :timeout, 5000)
Expand Down Expand Up @@ -53,19 +53,23 @@ defmodule Modbuzz.RTU.Server do

{:noreply, %{state | binary: <<>>}}

{:error, :binary_is_short} ->
{:error, :adu_binary_is_short} ->
{:noreply, %{state | binary: new_binary}}

{:error, :unknown} ->
Log.error("Failed to decode ADU, unknown binary format.", nil, state)
{:error, {:pdu_unknown_function_code, _} = reason} ->
Log.error("Decode error", reason, state)
{:noreply, %{state | binary: <<>>}}

{:error, :binary_is_long} ->
Log.error("Failed to decode ADU, binary is too long.", nil, state)
{:error, :pdu_decode_error = reason} ->
Log.error("Decode error", reason, state)
{:noreply, %{state | binary: <<>>}}

{:error, %ADU{unit_id: _unit_id, crc_valid?: false} = adu} ->
Log.warning("CRC error detected, #{inspect(adu)}.", nil, state)
{:error, :adu_binary_is_long = reason} ->
Log.error("Decode error", reason, state)
{:noreply, %{state | binary: <<>>}}

{:error, :adu_crc_error = reason} ->
Log.error("Decode error", reason, state)
{:noreply, %{state | binary: <<>>}}
end
end
Expand Down
Loading