Adds ModbusTCP Testing Client

This commit is contained in:
Patrick McDonagh
2017-03-29 19:11:29 -05:00
parent 60ef141e7e
commit 6682de71d1

115
modbusTCP_Client.py Normal file
View File

@@ -0,0 +1,115 @@
#!/usr/bin/env python
import sys
import traceback
# ---------------------------------------------------------------------------#
# import the various server implementations
# ---------------------------------------------------------------------------#
from pymodbus.client.sync import ModbusTcpClient as ModbusClient
# from pymodbus.client.sync import ModbusUdpClient as ModbusClient
# from pymodbus.client.sync import ModbusSerialClient as ModbusClient
# ---------------------------------------------------------------------------#
# configure the client logging
# ---------------------------------------------------------------------------#
import logging
logging.basicConfig()
log = logging.getLogger()
log.setLevel(logging.DEBUG)
# ---------------------------------------------------------------------------#
# choose the client you want
# ---------------------------------------------------------------------------#
# make sure to start an implementation to hit against. For this
# you can use an existing device, the reference implementation in the tools
# directory, or start a pymodbus server.
#
# If you use the UDP or TCP clients, you can override the framer being used
# to use a custom implementation (say RTU over TCP). By default they use the
# socket framer::
#
# client = ModbusClient('localhost', port=5020, framer=ModbusRtuFramer)
#
# It should be noted that you can supply an ipv4 or an ipv6 host address for
# both the UDP and TCP clients.
#
# There are also other options that can be set on the client that controls
# how transactions are performed. The current ones are:
#
# * retries - Specify how many retries to allow per transaction (default = 3)
# * retry_on_empty - Is an empty response a retry (default = False)
# * source_address - Specifies the TCP source address to bind to
#
# Here is an example of using these options::
#
# client = ModbusClient('localhost', retries=3, retry_on_empty=True)
# ---------------------------------------------------------------------------#
def main(address, port, commands):
# commands should be in the format code:register or code:register:value
commands = [a.split(":") for a in commands]
resulting_values = []
client = ModbusClient(address, port=int(port))
if not client.connect():
print("Unable to connect")
exit
read_codes = {
1: {'type': 'discrete', 'fn': client.read_coils},
2: {'type': 'discrete', 'fn': client.read_discrete_inputs},
3: {'type': 'register', 'fn': client.read_holding_registers},
4: {'type': 'register', 'fn': client.read_input_registers}
}
write_codes = {
5: {'type': 'discrete', 'fn': client.write_coil},
6: {'type': 'register', 'fn': client.write_register},
15: {'type': 'discrete', 'fn': client.write_coils},
16: {'type': 'register', 'fn': client.write_registers}
}
for c in commands:
if len(c) == 3:
try:
rc_obj = write_codes[int(c[0])]
write_val = rc_obj['fn'](int(c[1]) - 1, int(c[2]))
c.append(write_val.value)
resulting_values.append(c)
except Exception as e:
print("{} - {}".format(c, e))
traceback.print_exc()
if len(c) == 2:
try:
rc_obj = read_codes[int(c[0])]
register = c[1].split("L")
if len(register) == 1:
register.append(1)
read_val = rc_obj['fn'](int(register[0]) - 1, int(register[1]))
if rc_obj['type'] == 'discrete':
c.append(read_val.bits[0:int(register[1])])
else:
c.append(read_val.registers)
resulting_values.append(c)
except Exception as e:
print(e)
traceback.print_exc()
# ---------------------------------------------------------------------------#
# close the client
# ---------------------------------------------------------------------------#
client.close()
return resulting_values
if __name__ == '__main__':
if len(sys.argv) > 3:
address = sys.argv[1]
port = sys.argv[2]
commands = sys.argv[3:]
values = main(address, port, commands)
for x in values:
print(x)