#!/usr/bin/env python import sys import traceback # ---------------------------------------------------------------------------# # import the various server implementations # ---------------------------------------------------------------------------# from pymodbus.client.sync import ModbusTcpClient as ModbusClient # from pymodbus.client.sync import ModbusUdpClient as ModbusClient # from pymodbus.client.sync import ModbusSerialClient as ModbusClient # ---------------------------------------------------------------------------# # configure the client logging # ---------------------------------------------------------------------------# import logging logging.basicConfig() log = logging.getLogger() log.setLevel(logging.DEBUG) # ---------------------------------------------------------------------------# # choose the client you want # ---------------------------------------------------------------------------# # make sure to start an implementation to hit against. For this # you can use an existing device, the reference implementation in the tools # directory, or start a pymodbus server. # # If you use the UDP or TCP clients, you can override the framer being used # to use a custom implementation (say RTU over TCP). By default they use the # socket framer:: # # client = ModbusClient('localhost', port=5020, framer=ModbusRtuFramer) # # It should be noted that you can supply an ipv4 or an ipv6 host address for # both the UDP and TCP clients. # # There are also other options that can be set on the client that controls # how transactions are performed. The current ones are: # # * retries - Specify how many retries to allow per transaction (default = 3) # * retry_on_empty - Is an empty response a retry (default = False) # * source_address - Specifies the TCP source address to bind to # # Here is an example of using these options:: # # client = ModbusClient('localhost', retries=3, retry_on_empty=True) # ---------------------------------------------------------------------------# def main(address, port, commands): # commands should be in the format code:register or code:register:value commands = [a.split(":") for a in commands] resulting_values = [] client = ModbusClient(address, port=int(port)) if not client.connect(): print("Unable to connect") exit read_codes = { 1: {'type': 'discrete', 'fn': client.read_coils}, 2: {'type': 'discrete', 'fn': client.read_discrete_inputs}, 3: {'type': 'register', 'fn': client.read_holding_registers}, 4: {'type': 'register', 'fn': client.read_input_registers} } write_codes = { 5: {'type': 'discrete', 'fn': client.write_coil}, 6: {'type': 'register', 'fn': client.write_register}, 15: {'type': 'discrete', 'fn': client.write_coils}, 16: {'type': 'register', 'fn': client.write_registers} } for c in commands: if len(c) == 3: try: rc_obj = write_codes[int(c[0])] write_val = rc_obj['fn'](int(c[1]) - 1, int(c[2])) c.append(write_val.value) resulting_values.append(c) except Exception as e: print("{} - {}".format(c, e)) traceback.print_exc() if len(c) == 2: try: rc_obj = read_codes[int(c[0])] register = c[1].split("L") if len(register) == 1: register.append(1) read_val = rc_obj['fn'](int(register[0]) - 1, int(register[1])) if rc_obj['type'] == 'discrete': c.append(read_val.bits[0:int(register[1])]) else: c.append(read_val.registers) resulting_values.append(c) except Exception as e: print(e) traceback.print_exc() # ---------------------------------------------------------------------------# # close the client # ---------------------------------------------------------------------------# client.close() return resulting_values if __name__ == '__main__': if len(sys.argv) > 3: address = sys.argv[1] port = sys.argv[2] commands = sys.argv[3:] values = main(address, port, commands) for x in values: print(x)