279 lines
11 KiB
Python
279 lines
11 KiB
Python
#!/usr/bin/env python
|
|
'''
|
|
Pymodbus Server from MongoDB
|
|
--------------------------------------------------------------------------
|
|
|
|
This program takes the values in a mongo database and exposes them over Modbus/TCP
|
|
'''
|
|
# ---------------------------------------------------------------------------#
|
|
# import the modbus libraries we need
|
|
# ---------------------------------------------------------------------------#
|
|
from pymodbus.server.async import StartTcpServer
|
|
from pymodbus.device import ModbusDeviceIdentification
|
|
from pymodbus.datastore import ModbusSparseDataBlock
|
|
from pymodbus.datastore import ModbusSlaveContext, ModbusServerContext
|
|
from pymodbus.transaction import ModbusRtuFramer, ModbusAsciiFramer
|
|
from pymodbus.constants import Endian
|
|
|
|
# ---------------------------------------------------------------------------#
|
|
# import the python libraries we need
|
|
# ---------------------------------------------------------------------------#
|
|
import pymongo
|
|
from pymongo import MongoClient
|
|
from pycomm_helper import utils as plc
|
|
from time import time as now
|
|
import struct
|
|
import threading
|
|
# ---------------------------------------------------------------------------#
|
|
# configure the service logging
|
|
# ---------------------------------------------------------------------------#
|
|
import logging
|
|
logging.basicConfig()
|
|
log = logging.getLogger()
|
|
log.setLevel(logging.INFO)
|
|
|
|
|
|
def float_to_bytes(float_val):
|
|
'''
|
|
Converts a float to little-endian bytes
|
|
'''
|
|
packed_string = struct.pack('f', float_val)
|
|
unpacked_list = list(struct.unpack('HH', packed_string))
|
|
return unpacked_list
|
|
|
|
|
|
def integer_to_byte(integer_val):
|
|
'''
|
|
Converts an integer to its byte
|
|
'''
|
|
packed_string = struct.pack('h', integer_val)
|
|
unpacked = list(struct.unpack('H', packed_string))
|
|
return unpacked
|
|
|
|
|
|
def lebyte_to_float(word_list):
|
|
'''
|
|
Converts list of little-endian bytes to float
|
|
'''
|
|
packed_string = struct.pack("HH", *word_list)
|
|
unpacked_float = struct.unpack("f", packed_string)[0]
|
|
return unpacked_float
|
|
|
|
|
|
def lebyte_to_integer(word_list):
|
|
'''
|
|
Converts list(size = 1) of little-endian bytes to Integer
|
|
'''
|
|
try:
|
|
packed_string = struct.pack("H", *word_list)
|
|
unpacked_int = struct.unpack("h", packed_string)[0]
|
|
except Exception as e:
|
|
print("Unable to convert {} to integer".format(word_list))
|
|
return False
|
|
return unpacked_int
|
|
|
|
|
|
# ---------------------------------------------------------------------------#
|
|
# create your custom data block with callbacks
|
|
# ---------------------------------------------------------------------------#
|
|
class DigitalTagDataBlock(ModbusSparseDataBlock):
|
|
''' A datablock that stores the new value in memory
|
|
and passes the operation to a message queue for further
|
|
processing.
|
|
'''
|
|
|
|
def __init__(self, register_type, tag_list):
|
|
'''
|
|
'''
|
|
values = {}
|
|
self.register_type = register_type
|
|
|
|
for t in tag_list:
|
|
try:
|
|
values[t['register_number']] = t['val']
|
|
except KeyError:
|
|
values[t['register_number']] = 911
|
|
# print("Initialized DigitalTagDataBlock for {} with values {}".format(self.register_type, values))
|
|
super(DigitalTagDataBlock, self).__init__(values)
|
|
|
|
def getValues(self, address, count=1):
|
|
client = MongoClient()
|
|
db = client.tag_data
|
|
tags = db.tag_vals
|
|
|
|
for i in range(address, address + count):
|
|
tag_found = tags.find_one({'register_number': i, 'register_type': self.register_type})
|
|
# print("{} = {}".format(tag_found['tag_name'], tag_found['val']))
|
|
super(DigitalTagDataBlock, self).setValues(address, tag_found['val'])
|
|
|
|
return super(DigitalTagDataBlock, self).getValues(address, count=count)
|
|
|
|
def setValues(self, address, value):
|
|
''' Sets the requested values of the datastore
|
|
|
|
:param address: The starting address
|
|
:param values: The new values to be set
|
|
'''
|
|
client = MongoClient()
|
|
db = client.tag_data
|
|
tags = db.tag_vals
|
|
tag_found = tags.find_one({'register_number': address, 'register_type': self.register_type})
|
|
tag_name = tag_found['tag_name']
|
|
if value[0]:
|
|
plc_value = 1
|
|
else:
|
|
plc_value = 0
|
|
print("Writing {} to {}".format(plc_value, tag_name))
|
|
plc.writeTag(PLC_IP_ADDRESS, tag_name, int(plc_value))
|
|
plc_val = plc.readTag(PLC_IP_ADDRESS, tag_found['tag_name'])
|
|
if plc_val:
|
|
tag_found['val'] = plc_val[0]
|
|
tags.update({'tag_name': tag_found['tag_name']}, tag_found)
|
|
super(DigitalTagDataBlock, self).setValues(address, value)
|
|
|
|
|
|
class AnalogTagDataBlock(ModbusSparseDataBlock):
|
|
''' A datablock that stores the new value in memory
|
|
and passes the operation to a message queue for further
|
|
processing.
|
|
'''
|
|
|
|
def __init__(self, register_type, tag_list):
|
|
'''
|
|
'''
|
|
values = {}
|
|
self.register_type = register_type
|
|
register_values = []
|
|
for t in tag_list:
|
|
try:
|
|
if t['tag_type'][-3:] == 'INT':
|
|
register_values.append({"r_num": t['register_number'], "r_vals": integer_to_byte(t['val'])})
|
|
elif t['tag_type'] == 'REAL':
|
|
register_values.append({"r_num": t['register_number'], "r_vals": float_to_bytes(t['val'])})
|
|
elif t['tag_type'] == 'ARRAY':
|
|
arr_vals = []
|
|
for i in range(0, len(t['val'])):
|
|
arr_vals = arr_vals + float_to_bytes(t['val'][i])
|
|
register_values.append({"r_num": t['register_number'], "r_vals": arr_vals})
|
|
except KeyError:
|
|
print("No value for register #{} - {}".format(t['register_number'], t['tag_name']))
|
|
|
|
register_values.sort(key=lambda x: x['r_num'])
|
|
values = [0]
|
|
for x in register_values:
|
|
values = values + x['r_vals']
|
|
|
|
super(AnalogTagDataBlock, self).__init__(values)
|
|
|
|
def getValues(self, address, count=1):
|
|
client = MongoClient()
|
|
db = client.tag_data
|
|
tags = db.tag_vals
|
|
try:
|
|
for i in range(address, address + count):
|
|
tag_found = tags.find_one({'register_number': i, 'register_type': self.register_type})
|
|
val_registers = []
|
|
if tag_found:
|
|
if tag_found['tag_type'][-3:] == 'INT':
|
|
val_registers = integer_to_byte(tag_found['val'])
|
|
elif tag_found['tag_type'] == 'REAL':
|
|
val_registers = float_to_bytes(tag_found['val'])
|
|
elif tag_found['tag_type'] == 'ARRAY':
|
|
val_registers = []
|
|
for k in range(0, len(tag_found['val'])):
|
|
val_registers = val_registers + float_to_bytes(tag_found['val'][k])
|
|
for j in range(0, len(val_registers)):
|
|
super(AnalogTagDataBlock, self).setValues(i + j, val_registers[j])
|
|
print("{} = {}".format(tag_found['tag_name'], tag_found['val']))
|
|
except Exception as e:
|
|
print(e)
|
|
|
|
return super(AnalogTagDataBlock, self).getValues(address, count=count)
|
|
|
|
def setValues(self, address, value):
|
|
'''
|
|
Sets the requested values of the datastore
|
|
|
|
:param address: The starting address
|
|
:param values: The new values to be set
|
|
'''
|
|
print("provided values: {}".format(value))
|
|
try:
|
|
client = MongoClient()
|
|
db = client.tag_data
|
|
tags = db.tag_vals
|
|
tag_found = tags.find_one({'register_number': address, 'register_type': self.register_type})
|
|
tag_name = tag_found['tag_name']
|
|
if tag_found['tag_type'] == "REAL":
|
|
plc_value = lebyte_to_float(value)
|
|
else:
|
|
plc_value = value[0]
|
|
print("Writing {} to {}".format(plc_value, tag_name))
|
|
plc.writeTag(PLC_IP_ADDRESS, tag_name, plc_value)
|
|
plc_val = plc.readTag(PLC_IP_ADDRESS, tag_found['tag_name'])
|
|
if plc_val:
|
|
tag_found['val'] = plc_val[0]
|
|
tags.update({'tag_name': tag_found['tag_name']}, tag_found)
|
|
super(AnalogTagDataBlock, self).setValues(address, value)
|
|
except Exception as e:
|
|
print(e)
|
|
|
|
|
|
def getTagsFromDB():
|
|
client = MongoClient()
|
|
db = client.tag_data
|
|
tags = db.tag_vals
|
|
print("Found {} tags in the database".format(tags.count()))
|
|
di_tags_cur = tags.find({'register_type': 'di'})
|
|
di_tags = list(di_tags_cur)
|
|
di_tags_num = di_tags_cur.count()
|
|
print("{} Digital Inputs".format(di_tags_num))
|
|
|
|
co_tags_cur = tags.find({'register_type': 'co'})
|
|
co_tags = list(co_tags_cur)
|
|
co_tags_num = co_tags_cur.count()
|
|
print("{} Coils".format(co_tags_num))
|
|
|
|
ir_tags_cur = tags.find({'register_type': 'ir'})
|
|
ir_tags = list(ir_tags_cur)
|
|
ir_tags_num = ir_tags_cur.count()
|
|
print("{} Input Registers".format(ir_tags_num))
|
|
|
|
hr_tags_cur = tags.find({'register_type': 'hr'})
|
|
hr_tags = list(hr_tags_cur)
|
|
hr_tags_num = hr_tags_cur.count()
|
|
print("{} Holding Registers".format(hr_tags_num))
|
|
|
|
return {'di': di_tags, 'co': co_tags, 'ir': ir_tags, 'hr': hr_tags}
|
|
|
|
|
|
class ModbusTCPMongoServer(threading.Thread):
|
|
def __init__(self):
|
|
# ---------------------------------------------------------------------------#
|
|
# initialize your data store
|
|
# ---------------------------------------------------------------------------#
|
|
tags_in_db = getTagsFromDB()
|
|
di_block = DigitalTagDataBlock('di', tags_in_db['di'])
|
|
co_block = DigitalTagDataBlock('co', tags_in_db['co'])
|
|
hr_block = AnalogTagDataBlock('hr', tags_in_db['hr'])
|
|
ir_block = AnalogTagDataBlock('ir', tags_in_db['ir'])
|
|
store = ModbusSlaveContext(di=di_block, co=co_block, hr=hr_block, ir=ir_block)
|
|
self.context = ModbusServerContext(slaves=store, single=True)
|
|
|
|
# ---------------------------------------------------------------------------#
|
|
# initialize the server information
|
|
# ---------------------------------------------------------------------------#
|
|
self.identity = ModbusDeviceIdentification()
|
|
self.identity.VendorName = 'pymodbus'
|
|
self.identity.ProductCode = 'PM'
|
|
self.identity.VendorUrl = 'http://github.com/bashwork/pymodbus/'
|
|
self.identity.ProductName = 'pymodbus Server'
|
|
self.identity.ModelName = 'pymodbus Server'
|
|
self.identity.MajorMinorRevision = '1.0'
|
|
|
|
def run(self):
|
|
# ---------------------------------------------------------------------------#
|
|
# run the server you want
|
|
# ---------------------------------------------------------------------------#
|
|
StartTcpServer(self.context, identity=self.identity, address=("0.0.0.0", 502))
|