From 6682de71d18b9f956119b42c2165c9bb6ede1b34 Mon Sep 17 00:00:00 2001 From: Patrick McDonagh Date: Wed, 29 Mar 2017 19:11:29 -0500 Subject: [PATCH] Adds ModbusTCP Testing Client --- modbusTCP_Client.py | 115 ++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 115 insertions(+) create mode 100644 modbusTCP_Client.py diff --git a/modbusTCP_Client.py b/modbusTCP_Client.py new file mode 100644 index 0000000..5ae1c10 --- /dev/null +++ b/modbusTCP_Client.py @@ -0,0 +1,115 @@ +#!/usr/bin/env python + +import sys +import traceback +# ---------------------------------------------------------------------------# +# import the various server implementations +# ---------------------------------------------------------------------------# +from pymodbus.client.sync import ModbusTcpClient as ModbusClient +# from pymodbus.client.sync import ModbusUdpClient as ModbusClient +# from pymodbus.client.sync import ModbusSerialClient as ModbusClient + +# ---------------------------------------------------------------------------# +# configure the client logging +# ---------------------------------------------------------------------------# +import logging +logging.basicConfig() +log = logging.getLogger() +log.setLevel(logging.DEBUG) + +# ---------------------------------------------------------------------------# +# choose the client you want +# ---------------------------------------------------------------------------# +# make sure to start an implementation to hit against. For this +# you can use an existing device, the reference implementation in the tools +# directory, or start a pymodbus server. +# +# If you use the UDP or TCP clients, you can override the framer being used +# to use a custom implementation (say RTU over TCP). By default they use the +# socket framer:: +# +# client = ModbusClient('localhost', port=5020, framer=ModbusRtuFramer) +# +# It should be noted that you can supply an ipv4 or an ipv6 host address for +# both the UDP and TCP clients. +# +# There are also other options that can be set on the client that controls +# how transactions are performed. The current ones are: +# +# * retries - Specify how many retries to allow per transaction (default = 3) +# * retry_on_empty - Is an empty response a retry (default = False) +# * source_address - Specifies the TCP source address to bind to +# +# Here is an example of using these options:: +# +# client = ModbusClient('localhost', retries=3, retry_on_empty=True) +# ---------------------------------------------------------------------------# + + +def main(address, port, commands): + # commands should be in the format code:register or code:register:value + + commands = [a.split(":") for a in commands] + resulting_values = [] + client = ModbusClient(address, port=int(port)) + if not client.connect(): + print("Unable to connect") + exit + + read_codes = { + 1: {'type': 'discrete', 'fn': client.read_coils}, + 2: {'type': 'discrete', 'fn': client.read_discrete_inputs}, + 3: {'type': 'register', 'fn': client.read_holding_registers}, + 4: {'type': 'register', 'fn': client.read_input_registers} + } + + write_codes = { + 5: {'type': 'discrete', 'fn': client.write_coil}, + 6: {'type': 'register', 'fn': client.write_register}, + 15: {'type': 'discrete', 'fn': client.write_coils}, + 16: {'type': 'register', 'fn': client.write_registers} + } + + for c in commands: + if len(c) == 3: + try: + rc_obj = write_codes[int(c[0])] + write_val = rc_obj['fn'](int(c[1]) - 1, int(c[2])) + c.append(write_val.value) + resulting_values.append(c) + except Exception as e: + print("{} - {}".format(c, e)) + traceback.print_exc() + + if len(c) == 2: + try: + rc_obj = read_codes[int(c[0])] + register = c[1].split("L") + if len(register) == 1: + register.append(1) + read_val = rc_obj['fn'](int(register[0]) - 1, int(register[1])) + if rc_obj['type'] == 'discrete': + c.append(read_val.bits[0:int(register[1])]) + else: + c.append(read_val.registers) + resulting_values.append(c) + except Exception as e: + print(e) + traceback.print_exc() + + # ---------------------------------------------------------------------------# + # close the client + # ---------------------------------------------------------------------------# + client.close() + return resulting_values + + +if __name__ == '__main__': + + if len(sys.argv) > 3: + address = sys.argv[1] + port = sys.argv[2] + commands = sys.argv[3:] + values = main(address, port, commands) + for x in values: + print(x)