116 lines
4.3 KiB
Python
116 lines
4.3 KiB
Python
#!/usr/bin/env python
|
|
|
|
import sys
|
|
import traceback
|
|
# ---------------------------------------------------------------------------#
|
|
# import the various server implementations
|
|
# ---------------------------------------------------------------------------#
|
|
from pymodbus.client.sync import ModbusTcpClient as ModbusClient
|
|
# from pymodbus.client.sync import ModbusUdpClient as ModbusClient
|
|
# from pymodbus.client.sync import ModbusSerialClient as ModbusClient
|
|
|
|
# ---------------------------------------------------------------------------#
|
|
# configure the client logging
|
|
# ---------------------------------------------------------------------------#
|
|
import logging
|
|
logging.basicConfig()
|
|
log = logging.getLogger()
|
|
log.setLevel(logging.DEBUG)
|
|
|
|
# ---------------------------------------------------------------------------#
|
|
# choose the client you want
|
|
# ---------------------------------------------------------------------------#
|
|
# make sure to start an implementation to hit against. For this
|
|
# you can use an existing device, the reference implementation in the tools
|
|
# directory, or start a pymodbus server.
|
|
#
|
|
# If you use the UDP or TCP clients, you can override the framer being used
|
|
# to use a custom implementation (say RTU over TCP). By default they use the
|
|
# socket framer::
|
|
#
|
|
# client = ModbusClient('localhost', port=5020, framer=ModbusRtuFramer)
|
|
#
|
|
# It should be noted that you can supply an ipv4 or an ipv6 host address for
|
|
# both the UDP and TCP clients.
|
|
#
|
|
# There are also other options that can be set on the client that controls
|
|
# how transactions are performed. The current ones are:
|
|
#
|
|
# * retries - Specify how many retries to allow per transaction (default = 3)
|
|
# * retry_on_empty - Is an empty response a retry (default = False)
|
|
# * source_address - Specifies the TCP source address to bind to
|
|
#
|
|
# Here is an example of using these options::
|
|
#
|
|
# client = ModbusClient('localhost', retries=3, retry_on_empty=True)
|
|
# ---------------------------------------------------------------------------#
|
|
|
|
|
|
def main(address, port, commands):
|
|
# commands should be in the format code:register or code:register:value
|
|
|
|
commands = [a.split(":") for a in commands]
|
|
resulting_values = []
|
|
client = ModbusClient(address, port=int(port))
|
|
if not client.connect():
|
|
print("Unable to connect")
|
|
exit
|
|
|
|
read_codes = {
|
|
1: {'type': 'discrete', 'fn': client.read_coils},
|
|
2: {'type': 'discrete', 'fn': client.read_discrete_inputs},
|
|
3: {'type': 'register', 'fn': client.read_holding_registers},
|
|
4: {'type': 'register', 'fn': client.read_input_registers}
|
|
}
|
|
|
|
write_codes = {
|
|
5: {'type': 'discrete', 'fn': client.write_coil},
|
|
6: {'type': 'register', 'fn': client.write_register},
|
|
15: {'type': 'discrete', 'fn': client.write_coils},
|
|
16: {'type': 'register', 'fn': client.write_registers}
|
|
}
|
|
|
|
for c in commands:
|
|
if len(c) == 3:
|
|
try:
|
|
rc_obj = write_codes[int(c[0])]
|
|
write_val = rc_obj['fn'](int(c[1]) - 1, int(c[2]))
|
|
c.append(write_val.value)
|
|
resulting_values.append(c)
|
|
except Exception as e:
|
|
print("{} - {}".format(c, e))
|
|
traceback.print_exc()
|
|
|
|
if len(c) == 2:
|
|
try:
|
|
rc_obj = read_codes[int(c[0])]
|
|
register = c[1].split("L")
|
|
if len(register) == 1:
|
|
register.append(1)
|
|
read_val = rc_obj['fn'](int(register[0]) - 1, int(register[1]))
|
|
if rc_obj['type'] == 'discrete':
|
|
c.append(read_val.bits[0:int(register[1])])
|
|
else:
|
|
c.append(read_val.registers)
|
|
resulting_values.append(c)
|
|
except Exception as e:
|
|
print(e)
|
|
traceback.print_exc()
|
|
|
|
# ---------------------------------------------------------------------------#
|
|
# close the client
|
|
# ---------------------------------------------------------------------------#
|
|
client.close()
|
|
return resulting_values
|
|
|
|
|
|
if __name__ == '__main__':
|
|
|
|
if len(sys.argv) > 3:
|
|
address = sys.argv[1]
|
|
port = sys.argv[2]
|
|
commands = sys.argv[3:]
|
|
values = main(address, port, commands)
|
|
for x in values:
|
|
print(x)
|