Files
2024-10-04 18:53:54 -05:00

358 lines
12 KiB
Python

#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
Modbus TestKit: Implementation of Modbus protocol in python
(C)2009 - Luc Jean - luc.jean@gmail.com
(C)2009 - Apidev - http://www.apidev.fr
This is distributed under GNU LGPL license, see license.txt
The modbus_tk simulator is a console application which is running a server with TCP and RTU communication
It is possible to interact with the server from the command line or from a RPC (Remote Process Call)
"""
import sys, os, time
import threading
import modbus_tk
import modbus_tk.defines as defines
import modbus_tk.modbus as modbus
import modbus_tk.modbus_tcp as modbus_tcp
import modbus_tk.modbus_rtu as modbus_rtu
import serial
import modbus_tk.hooks as hooks
import Queue
import SocketServer
import ctypes
#add logging capability
LOGGER = modbus_tk.utils.create_logger(name="console", record_format="%(message)s")
#The communication between the server and the user interfaces (console or rpc)
#are done through queues
INPUT_QUEUE = Queue.Queue() #command received from the interfaces
OUTPUT_QUEUE = Queue.Queue() #response to be sent back by the interfaces
#-------------------------------------------------------------------------------
class CompositeServer(modbus.Server):
"""make possible to have several servers sharing the same databank"""
def __init__(self, list_of_server_classes, list_of_server_args, databank=None):
"""Constructor"""
modbus.Server.__init__(self, databank)
self._servers = [the_class(*the_args, **{"databank":self.get_db()})
for the_class, the_args
in zip(list_of_server_classes, list_of_server_args)
if issubclass(the_class, modbus.Server)]
def set_verbose(self, verbose):
"""if verbose is true the sent and received packets will be logged"""
for srv in self._servers:
srv.set_verbose(verbose)
def _make_thread(self):
"""should initialize the main thread of the server. You don't need it here"""
pass
def _make_query(self):
"""Returns an instance of a Query subclass implementing the MAC layer protocol"""
raise NotImplementedError()
def start(self):
"""Start the server. It will handle request"""
for srv in self._servers:
srv.start()
def stop(self):
"""stop the server. It doesn't handle request anymore"""
for srv in self._servers:
srv.stop()
class RpcHandler(SocketServer.BaseRequestHandler):
"""An instance of this class is created every time an RPC call is received by the server"""
def handle(self):
"""This function is called automatically by the SocketServer"""
# self.request is the TCP socket connected to the client
# read the incoming command
request = self.request.recv(1024).strip()
# write to the queue waiting to be processed by the server
INPUT_QUEUE.put(request)
# wait for the server answer in the output queue
response = OUTPUT_QUEUE.get(timeout=5.0)
# send back the answer
self.request.send(response)
class RpcInterface(threading.Thread):
"""Manage RPC call over TCP/IP thanks to the SocketServer module"""
def __init__(self):
"""Constructor"""
threading.Thread.__init__(self)
self.rpc_server = SocketServer.TCPServer(("", 2711), RpcHandler)
def run(self):
"""run the server and wait that it returns"""
self.rpc_server.serve_forever(0.5)
def close(self):
"""force the socket server to exit"""
try:
self.rpc_server.shutdown()
self.join(1.0)
except:
LOGGER.warning("An error occurred while closing RPC interface")
class ConsoleInterface(threading.Thread):
"""Manage user actions from the console"""
def __init__(self):
"""constructor: initialize communication with the console"""
threading.Thread.__init__(self)
self.inq = INPUT_QUEUE
self.outq = OUTPUT_QUEUE
if os.name == "nt":
ctypes.windll.Kernel32.GetStdHandle.restype = ctypes.c_ulong
self.console_handle = ctypes.windll.Kernel32.GetStdHandle(ctypes.c_ulong(0xfffffff5))
ctypes.windll.Kernel32.WaitForSingleObject.restype = ctypes.c_ulong
elif os.name == "posix":
import select
else:
raise Exception("%s platform is not supported yet" % os.name)
self._go = threading.Event()
self._go.set()
def _check_console_input(self):
"""test if there is something to read on the console"""
if os.name == "nt":
if 0 == ctypes.windll.Kernel32.WaitForSingleObject(self.console_handle, 500):
return True
elif os.name == "posix":
(inputready, abcd, efgh) = select.select([sys.stdin], [], [], 0.5)
if len(inputready)>0:
return True
else:
raise Exception("%s platform is not supported yet" % os.name)
return False
def run(self):
"""read from the console, transfer to the server and write the answer"""
while self._go.isSet(): #while app is running
if self._check_console_input(): #if something to read on the console
cmd = sys.stdin.readline() #read it
self.inq.put(cmd) #dispatch it tpo the server
response = self.outq.get(timeout=2.0) #wait for an answer
sys.stdout.write(response) #write the answer on the console
def close(self):
"""terminates the thread"""
self._go.clear()
self.join(1.0)
class Simulator:
"""The main class of the app in charge of running everything"""
def __init__(self, server=None):
"""Constructor"""
if server == None:
self.server = CompositeServer([modbus_rtu.RtuServer, modbus_tcp.TcpServer], [(serial.Serial(0),), ()])
else:
self.server = server
self.rpc = RpcInterface()
self.console = ConsoleInterface()
self.inq, self.outq = INPUT_QUEUE, OUTPUT_QUEUE
self._hooks_fct = {}
self.cmds = {"add_slave":self._do_add_slave,
"has_slave":self._do_has_slave,
"remove_slave":self._do_remove_slave,
"remove_all_slaves":self._do_remove_all_slaves,
"add_block":self._do_add_block,
"remove_block":self._do_remove_block,
"remove_all_blocks":self._do_remove_all_blocks,
"set_values":self._do_set_values,
"get_values":self._do_get_values,
"install_hook":self._do_install_hook,
"uninstall_hook":self._do_uninstall_hook,
"set_verbose":self._do_set_verbose,
}
def add_command(self, name, fct):
"""add a custom command"""
self.cmds[name] = fct
def start(self):
"""run the servers"""
self.server.start()
self.console.start()
self.rpc.start()
LOGGER.info("modbus_tk.simulator is running...")
self._handle()
def declare_hook(self, fct_name, fct):
"""declare a hook function by its name. It must be installed by an install hook command"""
self._hooks_fct[fct_name] = fct
def _tuple_to_str(self, the_tuple):
"""convert a tuple to a string"""
ret = ""
for item in the_tuple:
ret += (" " + str(item))
return ret[1:]
def _do_add_slave(self, args):
"""execute the add_slave command"""
slave_id = int(args[1])
self.server.add_slave(slave_id)
return "%d" % (slave_id)
def _do_has_slave(self, args):
"""execute the has_slave command"""
slave_id = int(args[1])
try:
self.server.get_slave(slave_id)
except:
return "0"
return "1"
def _do_remove_slave(self, args):
"""execute the remove_slave command"""
slave_id = int(args[1])
self.server.remove_slave(slave_id)
return ""
def _do_remove_all_slaves(self, args):
"""execute the remove_slave command"""
self.server.remove_all_slaves()
return ""
def _do_add_slave(self, args):
"""execute the add_slave command"""
slave_id = int(args[1])
self.server.add_slave(slave_id)
return "%d" % (slave_id)
def _do_add_block(self, args):
"""execute the add_block command"""
slave_id = int(args[1])
name = args[2]
block_type = int(args[3])
starting_address = int(args[4])
length = int(args[5])
slave = self.server.get_slave(slave_id)
slave.add_block(name, block_type, starting_address, length)
return name
def _do_remove_block(self, args):
"""execute the remove_block command"""
slave_id = int(args[1])
name = args[2]
slave = self.server.get_slave(slave_id)
slave.remove_block(name)
def _do_remove_all_blocks(self, args):
"""execute the remove_all_blocks command"""
slave_id = int(args[1])
slave = self.server.get_slave(slave_id)
slave.remove_all_blocks()
def _do_set_values(self, args):
"""execute the set_values command"""
slave_id = int(args[1])
name = args[2]
address = int(args[3])
values = []
for val in args[4:]:
values.append(int(val))
slave = self.server.get_slave(slave_id)
slave.set_values(name, address, values)
values = slave.get_values(name, address, len(values))
return self._tuple_to_str(values)
def _do_get_values(self, args):
"""execute the get_values command"""
slave_id = int(args[1])
name = args[2]
address = int(args[3])
length = int(args[4])
slave = self.server.get_slave(slave_id)
values = slave.get_values(name, address, length)
return self._tuple_to_str(values)
def _do_install_hook(self, args):
"""install a function as a hook"""
hook_name = args[1]
fct_name = args[2]
hooks.install_hook(hook_name, self._hooks_fct[fct_name])
def _do_uninstall_hook(self, args):
"""
uninstall a function as a hook.
If no function is given, uninstall all functions
"""
hook_name = args[1]
try:
hooks.uninstall_hook(hook_name)
except KeyError, e:
LOGGER.error(str(e))
def _do_set_verbose(self, args):
"""change the verbosity of the server"""
verbose = int(args[1])
self.server.set_verbose(verbose)
return "%d" % verbose
def _handle(self):
"""almost-for-ever loop in charge of listening for command and executing it"""
while True:
cmd = self.inq.get()
args = cmd.strip('\r\n').split(' ')
if cmd.find('quit') == 0:
self.outq.put('bye-bye\r\n')
break
elif self.cmds.has_key(args[0]):
try:
answer = self.cmds[args[0]](args)
self.outq.put("%s done: %s\r\n" % (args[0], answer))
except Exception, msg:
self.outq.put("%s error: %s\r\n" % (args[0], msg))
else:
self.outq.put("error: unknown command %s\r\n" % (args[0]))
def close(self):
"""close every server"""
self.console.close()
self.rpc.close()
self.server.stop()
def print_me(args):
"""hook function example"""
(server, request) = args
print "print_me: len = ", len(request)
if __name__ == "__main__":
simu = Simulator()
try:
LOGGER.info("'quit' for closing the server")
simu.declare_hook("print_me", print_me)
simu.start()
except Exception, excpt:
print excpt
finally:
simu.close()
LOGGER.info("modbus_tk.simulator has stopped!")
#In python 2.5, the SocketServer shutdown is not working Ok
#The 2 lines below are an ugly temporary workaround
time.sleep(1.0)
sys.exit()