1158 lines
48 KiB
Python
1158 lines
48 KiB
Python
# uncompyle6 version 3.9.2
|
|
# Python bytecode version base 3.7.0 (3394)
|
|
# Decompiled from: Python 3.8.19 (default, Mar 20 2024, 15:27:52)
|
|
# [Clang 14.0.6 ]
|
|
# Embedded file name: /var/user/app/device_supervisorbak/device_supervisor/lib/OpenOPC.py
|
|
# Compiled at: 2024-04-18 03:12:57
|
|
# Size of source mod 2**32: 42815 bytes
|
|
import os, sys, time, types, string, socket, re, Pyro4.core
|
|
from multiprocessing import Queue
|
|
__version__ = "1.2.0"
|
|
current_client = None
|
|
if os.name == "nt":
|
|
try:
|
|
import win32com.client, win32com.server.util, win32event, pythoncom, pywintypes, SystemHealth
|
|
pywintypes.datetime = pywintypes.TimeType
|
|
vt = dict([(pythoncom.__dict__[vtype], vtype) for vtype in pythoncom.__dict__.keys() if vtype[None[:2]] == "VT"])
|
|
win32com.client.gencache.is_readonly = False
|
|
win32com.client.gencache.Rebuild(verbose=0)
|
|
except ImportError:
|
|
win32com_found = False
|
|
else:
|
|
win32com_found = True
|
|
else:
|
|
win32com_found = False
|
|
SOURCE_CACHE = 1
|
|
SOURCE_DEVICE = 2
|
|
OPC_STATUS = (0, 'Running', 'Failed', 'NoConfig', 'Suspended', 'Test')
|
|
BROWSER_TYPE = (0, 'Hierarchical', 'Flat')
|
|
ACCESS_RIGHTS = (0, 'Read', 'Write', 'Read/Write')
|
|
OPC_QUALITY = ('Bad', 'Uncertain', 'Unknown', 'Good')
|
|
OPC_CLASS = "Matrikon.OPC.Automation;Graybox.OPC.DAWrapper;HSCOPC.Automation;RSI.OPCAutomation;OPC.Automation"
|
|
OPC_SERVER = "Hci.TPNServer;HwHsc.OPCServer;opc.deltav.1;AIM.OPC.1;Yokogawa.ExaopcDAEXQ.1;OSI.DA.1;OPC.PHDServerDA.1;Aspen.Infoplus21_DA.1;National Instruments.OPCLabVIEW;RSLinx OPC Server;KEPware.KEPServerEx.V4;Matrikon.OPC.Simulation;Prosys.OPC.Simulation;CCOPC.XMLWrapper.1;OPC.SimaticHMI.CoRtHmiRTm.1"
|
|
OPC_CLIENT = "OpenOPC"
|
|
|
|
def quality_str(quality_bits):
|
|
"""Convert OPC quality bits to a descriptive string"""
|
|
quality = quality_bits >> 6 & 3
|
|
return OPC_QUALITY[quality]
|
|
|
|
|
|
def type_check(tags):
|
|
"""Perform a type check on a list of tags"""
|
|
if type(tags) in (list, tuple):
|
|
single = False
|
|
else:
|
|
if tags == None:
|
|
tags = []
|
|
single = False
|
|
else:
|
|
tags = [
|
|
tags]
|
|
single = True
|
|
if len([t for t in tags if type(t) not in (str, bytes)]) == 0:
|
|
valid = True
|
|
else:
|
|
valid = False
|
|
return (tags, single, valid)
|
|
|
|
|
|
def wild2regex(string):
|
|
"""Convert a Unix wildcard glob into a regular expression"""
|
|
return string.replace(".", "\\.").replace("*", ".*").replace("?", ".").replace("!", "^")
|
|
|
|
|
|
def tags2trace(tags):
|
|
"""Convert a list tags into a formatted string suitable for the trace callback log"""
|
|
arg_str = ""
|
|
for i, t in enumerate(tags[1[:None]]):
|
|
if i > 0:
|
|
arg_str += ","
|
|
arg_str += "%s" % t
|
|
|
|
return arg_str
|
|
|
|
|
|
def exceptional(func, alt_return=None, alt_exceptions=(Exception,), final=None, catch=None):
|
|
"""Turns exceptions into an alternative return value"""
|
|
|
|
def _exceptional(*args, **kwargs):
|
|
try:
|
|
try:
|
|
return func(*args, **kwargs)
|
|
except alt_exceptions:
|
|
return alt_return
|
|
except:
|
|
if catch:
|
|
return catch(sys.exc_info(), (lambda: func(*args, **kwargs)))
|
|
raise
|
|
|
|
finally:
|
|
if final:
|
|
final()
|
|
|
|
return _exceptional
|
|
|
|
|
|
def get_sessions(host='localhost', port=7766):
|
|
"""Return sessions in OpenOPC Gateway Service as GUID:host hash"""
|
|
import Pyro4.core
|
|
server_obj = Pyro4.Proxy("PYRO:opc@{0}:{1}".format(host, port))
|
|
return server_obj.get_clients()
|
|
|
|
|
|
def open_client(host='localhost', port=7766, debug_log=None):
|
|
"""Connect to the specified OpenOPC Gateway Service"""
|
|
import Pyro4.core
|
|
server_obj = Pyro4.Proxy("PYRO:opc@{0}:{1}".format(host, port))
|
|
server_obj.opcda_set_debug_log(debug_log)
|
|
return server_obj.create_client()
|
|
|
|
|
|
class TimeoutError(Exception):
|
|
|
|
def __init__(self, txt):
|
|
Exception.__init__(self, txt)
|
|
|
|
|
|
class OPCError(Exception):
|
|
|
|
def __init__(self, txt):
|
|
Exception.__init__(self, txt)
|
|
|
|
|
|
class GroupEvents:
|
|
|
|
def __init__(self):
|
|
global current_client
|
|
self.client = current_client
|
|
|
|
def OnDataChange(self, TransactionID, NumItems, ClientHandles, ItemValues, Qualities, TimeStamps):
|
|
self.client.callback_queue.put((TransactionID, ClientHandles, ItemValues, Qualities, TimeStamps))
|
|
|
|
|
|
@Pyro4.expose
|
|
class client:
|
|
|
|
def __init__(self, opc_class=None, client_name=None):
|
|
"""Instantiate OPC automation class"""
|
|
self.callback_queue = Queue()
|
|
pythoncom.CoInitialize()
|
|
if opc_class == None:
|
|
if "OPC_CLASS" in os.environ:
|
|
opc_class = os.environ["OPC_CLASS"]
|
|
else:
|
|
opc_class = OPC_CLASS
|
|
opc_class_list = opc_class.split(";")
|
|
for i, c in enumerate(opc_class_list):
|
|
try:
|
|
self._opc = win32com.client.gencache.EnsureDispatch(c, 0)
|
|
self.opc_class = c
|
|
break
|
|
except pythoncom.com_error as err:
|
|
try:
|
|
if i == len(opc_class_list) - 1:
|
|
error_msg = "Dispatch: %s" % self._get_error_str(err)
|
|
raise OPCError(error_msg)
|
|
finally:
|
|
err = None
|
|
del err
|
|
|
|
self._event = win32event.CreateEvent(None, 0, 0, None)
|
|
self.opc_server = None
|
|
self.opc_host = None
|
|
self.client_name = client_name
|
|
self._groups = {}
|
|
self._group_tags = {}
|
|
self._group_valid_tags = {}
|
|
self._group_server_handles = {}
|
|
self._group_handles_tag = {}
|
|
self._group_hooks = {}
|
|
self._open_serv = None
|
|
self._open_self = None
|
|
self._open_host = None
|
|
self._open_port = None
|
|
self._open_guid = None
|
|
self._prev_serv_time = None
|
|
self._tx_id = 0
|
|
self.trace = None
|
|
self.cpu = None
|
|
|
|
def set_trace(self, trace):
|
|
if self._open_serv == None:
|
|
self.trace = trace
|
|
|
|
def connect(self, opc_server=None, opc_host='localhost'):
|
|
"""Connect to the specified OPC server"""
|
|
pythoncom.CoInitialize()
|
|
if opc_server == None:
|
|
if self.opc_server == None:
|
|
if "OPC_SERVER" in os.environ:
|
|
opc_server = os.environ["OPC_SERVER"]
|
|
else:
|
|
opc_server = OPC_SERVER
|
|
else:
|
|
opc_server = self.opc_server
|
|
opc_host = self.opc_host
|
|
opc_server_list = opc_server.split(";")
|
|
connected = False
|
|
for s in opc_server_list:
|
|
try:
|
|
if self.trace:
|
|
self.trace("Connect(%s,%s)" % (s, opc_host))
|
|
self._opc.Connect(s, opc_host)
|
|
except pythoncom.com_error as err:
|
|
try:
|
|
if len(opc_server_list) == 1:
|
|
error_msg = "Connect: %s" % self._get_error_str(err)
|
|
raise OPCError(error_msg)
|
|
finally:
|
|
err = None
|
|
del err
|
|
|
|
else:
|
|
try:
|
|
if self.client_name == None:
|
|
if "OPC_CLIENT" in os.environ:
|
|
self._opc.ClientName = os.environ["OPC_CLIENT"]
|
|
else:
|
|
self._opc.ClientName = OPC_CLIENT
|
|
else:
|
|
self._opc.ClientName = self.client_name
|
|
except:
|
|
pass
|
|
|
|
connected = True
|
|
break
|
|
|
|
if not connected:
|
|
raise OPCError("Connect: Cannot connect to any of the servers in the OPC_SERVER list")
|
|
time.sleep(0.01)
|
|
self.opc_server = opc_server
|
|
if opc_host == "localhost":
|
|
opc_host = socket.gethostname()
|
|
self.opc_host = opc_host
|
|
self._groups = {}
|
|
self._group_tags = {}
|
|
self._group_valid_tags = {}
|
|
self._group_server_handles = {}
|
|
self._group_handles_tag = {}
|
|
self._group_hooks = {}
|
|
|
|
def GUID(self):
|
|
return self._open_guid
|
|
|
|
def close(self, del_object=True):
|
|
"""Disconnect from the currently connected OPC server"""
|
|
try:
|
|
try:
|
|
pythoncom.CoInitialize()
|
|
self.remove(self.groups())
|
|
except pythoncom.com_error as err:
|
|
try:
|
|
error_msg = "Disconnect: %s" % self._get_error_str(err)
|
|
raise OPCError(error_msg)
|
|
finally:
|
|
err = None
|
|
del err
|
|
|
|
except OPCError:
|
|
pass
|
|
|
|
finally:
|
|
if self.trace:
|
|
self.trace("Disconnect()")
|
|
self._opc.Disconnect()
|
|
if self._open_serv:
|
|
if del_object:
|
|
self._open_serv.release_client(self._open_self)
|
|
|
|
def iread(self, tags=None, group=None, size=None, pause=0, source='hybrid', update=-1, timeout=5000, sync=False, include_error=False, rebuild=False):
|
|
"""Iterable version of read()"""
|
|
global current_client
|
|
|
|
def add_items(tags):
|
|
names = list(tags)
|
|
names.insert(0, 0)
|
|
errors = []
|
|
if self.trace:
|
|
self.trace("Validate(%s)" % tags2trace(names))
|
|
else:
|
|
try:
|
|
errors = opc_items.Validate(len(names) - 1, names)
|
|
except:
|
|
pass
|
|
|
|
valid_tags = []
|
|
valid_values = []
|
|
client_handles = []
|
|
if sub_group not in self._group_handles_tag:
|
|
self._group_handles_tag[sub_group] = {}
|
|
n = 0
|
|
else:
|
|
if len(self._group_handles_tag[sub_group]) > 0:
|
|
n = max(self._group_handles_tag[sub_group]) + 1
|
|
else:
|
|
n = 0
|
|
for i, tag in enumerate(tags):
|
|
if errors[i] == 0:
|
|
valid_tags.append(tag)
|
|
client_handles.append(n)
|
|
self._group_handles_tag[sub_group][n] = tag
|
|
n += 1
|
|
else:
|
|
if include_error:
|
|
error_msgs[tag] = self._opc.GetErrorString(errors[i])
|
|
if self.trace and errors[i] != 0:
|
|
self.trace("%s failed validation" % tag)
|
|
|
|
client_handles.insert(0, 0)
|
|
valid_tags.insert(0, 0)
|
|
server_handles = []
|
|
errors = []
|
|
if self.trace:
|
|
self.trace("AddItems(%s)" % tags2trace(valid_tags))
|
|
try:
|
|
server_handles, errors = opc_items.AddItems(len(client_handles) - 1, valid_tags, client_handles)
|
|
except:
|
|
pass
|
|
|
|
valid_tags_tmp = []
|
|
server_handles_tmp = []
|
|
valid_tags.pop(0)
|
|
if sub_group not in self._group_server_handles:
|
|
self._group_server_handles[sub_group] = {}
|
|
for i, tag in enumerate(valid_tags):
|
|
if errors[i] == 0:
|
|
valid_tags_tmp.append(tag)
|
|
server_handles_tmp.append(server_handles[i])
|
|
self._group_server_handles[sub_group][tag] = server_handles[i]
|
|
|
|
valid_tags = valid_tags_tmp
|
|
server_handles = server_handles_tmp
|
|
return (
|
|
valid_tags, server_handles)
|
|
|
|
def remove_items(tags):
|
|
if self.trace:
|
|
self.trace("RemoveItems(%s)" % tags2trace([""] + tags))
|
|
server_handles = [self._group_server_handles[sub_group][tag] for tag in tags]
|
|
server_handles.insert(0, 0)
|
|
errors = []
|
|
try:
|
|
errors = opc_items.Remove(len(server_handles) - 1, server_handles)
|
|
except pythoncom.com_error as err:
|
|
try:
|
|
error_msg = "RemoveItems: %s" % self._get_error_str(err)
|
|
raise OPCError(error_msg)
|
|
finally:
|
|
err = None
|
|
del err
|
|
|
|
try:
|
|
self._update_tx_time()
|
|
pythoncom.CoInitialize()
|
|
if include_error:
|
|
sync = True
|
|
if sync:
|
|
update = -1
|
|
tags, single, valid = type_check(tags)
|
|
if not valid:
|
|
raise TypeError("iread(): 'tags' parameter must be a string or a list of strings")
|
|
elif group in self._groups:
|
|
num_groups = rebuild or self._groups[group]
|
|
data_source = SOURCE_CACHE
|
|
else:
|
|
if size:
|
|
tag_groups = [tags[i[:i + size]] for i in range(0, len(tags), size)]
|
|
else:
|
|
tag_groups = [
|
|
tags]
|
|
num_groups = len(tag_groups)
|
|
data_source = SOURCE_DEVICE
|
|
results = []
|
|
for gid in range(num_groups):
|
|
if gid > 0:
|
|
if pause > 0:
|
|
time.sleep(pause / 1000.0)
|
|
else:
|
|
error_msgs = {}
|
|
opc_groups = self._opc.OPCGroups
|
|
opc_groups.DefaultGroupUpdateRate = update
|
|
if group == None:
|
|
try:
|
|
if self.trace:
|
|
self.trace("AddGroup()")
|
|
opc_group = opc_groups.Add()
|
|
except pythoncom.com_error as err:
|
|
try:
|
|
error_msg = "AddGroup: %s" % self._get_error_str(err)
|
|
raise OPCError(error_msg)
|
|
finally:
|
|
err = None
|
|
del err
|
|
|
|
sub_group = group
|
|
new_group = True
|
|
else:
|
|
sub_group = "%s.%d" % (group, gid)
|
|
try:
|
|
if self.trace:
|
|
self.trace("GetOPCGroup(%s)" % sub_group)
|
|
opc_group = opc_groups.GetOPCGroup(sub_group)
|
|
new_group = False
|
|
except:
|
|
try:
|
|
if self.trace:
|
|
self.trace("AddGroup(%s)" % sub_group)
|
|
opc_group = opc_groups.Add(sub_group)
|
|
except pythoncom.com_error as err:
|
|
try:
|
|
error_msg = "AddGroup: %s" % self._get_error_str(err)
|
|
raise OPCError(error_msg)
|
|
finally:
|
|
err = None
|
|
del err
|
|
|
|
self._groups[str(group)] = len(tag_groups)
|
|
new_group = True
|
|
|
|
opc_items = opc_group.OPCItems
|
|
if new_group:
|
|
opc_group.IsSubscribed = 1
|
|
opc_group.IsActive = 1
|
|
if not sync:
|
|
if self.trace:
|
|
self.trace("WithEvents(%s)" % opc_group.Name)
|
|
current_client = self
|
|
self._group_hooks[opc_group.Name] = win32com.client.WithEvents(opc_group, GroupEvents)
|
|
tags = tag_groups[gid]
|
|
valid_tags, server_handles = add_items(tags)
|
|
self._group_tags[sub_group] = tags
|
|
self._group_valid_tags[sub_group] = valid_tags
|
|
else:
|
|
if rebuild:
|
|
tags = tag_groups[gid]
|
|
valid_tags = self._group_valid_tags[sub_group]
|
|
add_tags = [t for t in tags if t not in valid_tags]
|
|
del_tags = [t for t in valid_tags if t not in tags]
|
|
if len(add_tags) > 0:
|
|
valid_tags, server_handles = add_items(add_tags)
|
|
valid_tags = self._group_valid_tags[sub_group] + valid_tags
|
|
if len(del_tags) > 0:
|
|
remove_items(del_tags)
|
|
valid_tags = [t for t in valid_tags if t not in del_tags]
|
|
self._group_tags[sub_group] = tags
|
|
self._group_valid_tags[sub_group] = valid_tags
|
|
if source == "hybrid":
|
|
data_source = SOURCE_DEVICE
|
|
else:
|
|
tags = self._group_tags[sub_group]
|
|
valid_tags = self._group_valid_tags[sub_group]
|
|
if sync:
|
|
server_handles = [item.ServerHandle for item in opc_items]
|
|
tag_value = {}
|
|
tag_quality = {}
|
|
tag_time = {}
|
|
tag_error = {}
|
|
if sync:
|
|
values = []
|
|
errors = []
|
|
qualities = []
|
|
timestamps = []
|
|
if len(valid_tags) > 0:
|
|
server_handles.insert(0, 0)
|
|
if source != "hybrid":
|
|
data_source = SOURCE_CACHE if source == "cache" else SOURCE_DEVICE
|
|
if self.trace:
|
|
self.trace("SyncRead(%s)" % data_source)
|
|
try:
|
|
values, errors, qualities, timestamps = opc_group.SyncRead(data_source, len(server_handles) - 1, server_handles)
|
|
except pythoncom.com_error as err:
|
|
try:
|
|
error_msg = "SyncRead: %s" % self._get_error_str(err)
|
|
raise OPCError(error_msg)
|
|
finally:
|
|
err = None
|
|
del err
|
|
|
|
for i, tag in enumerate(valid_tags):
|
|
tag_value[tag] = values[i]
|
|
tag_quality[tag] = qualities[i]
|
|
tag_time[tag] = timestamps[i]
|
|
tag_error[tag] = errors[i]
|
|
|
|
elif len(valid_tags) > 0:
|
|
if self._tx_id >= 65535:
|
|
self._tx_id = 0
|
|
else:
|
|
self._tx_id += 1
|
|
if source != "hybrid":
|
|
data_source = SOURCE_CACHE if source == "cache" else SOURCE_DEVICE
|
|
if self.trace:
|
|
self.trace("AsyncRefresh(%s)" % data_source)
|
|
try:
|
|
opc_group.AsyncRefresh(data_source, self._tx_id)
|
|
except pythoncom.com_error as err:
|
|
try:
|
|
error_msg = "AsyncRefresh: %s" % self._get_error_str(err)
|
|
raise OPCError(error_msg)
|
|
finally:
|
|
err = None
|
|
del err
|
|
|
|
tx_id = 0
|
|
start = time.time() * 1000
|
|
while tx_id != self._tx_id:
|
|
now = time.time() * 1000
|
|
if now - start > timeout:
|
|
raise TimeoutError("Callback: Timeout waiting for data")
|
|
if self.callback_queue.empty():
|
|
pythoncom.PumpWaitingMessages()
|
|
else:
|
|
tx_id, handles, values, qualities, timestamps = self.callback_queue.get()
|
|
|
|
for i, h in enumerate(handles):
|
|
tag = self._group_handles_tag[sub_group][h]
|
|
tag_value[tag] = values[i]
|
|
tag_quality[tag] = qualities[i]
|
|
tag_time[tag] = timestamps[i]
|
|
|
|
for tag in tags:
|
|
if tag in tag_value and not sync:
|
|
if not len(valid_tags) > 0 or sync or tag_error[tag] == 0:
|
|
value = tag_value[tag]
|
|
if type(value) == pywintypes.TimeType:
|
|
value = str(value)
|
|
quality = quality_str(tag_quality[tag])
|
|
timestamp = str(tag_time[tag])
|
|
else:
|
|
value = None
|
|
quality = "Error"
|
|
timestamp = None
|
|
if include_error:
|
|
error_msgs[tag] = self._opc.GetErrorString(tag_error[tag]).strip("\r\n")
|
|
else:
|
|
value = None
|
|
quality = "Error"
|
|
timestamp = None
|
|
if include_error:
|
|
if tag not in error_msgs:
|
|
error_msgs[tag] = ""
|
|
if single:
|
|
if include_error:
|
|
yield (
|
|
value, quality, timestamp, error_msgs[tag])
|
|
else:
|
|
yield (
|
|
value, quality, timestamp)
|
|
else:
|
|
if include_error:
|
|
yield (
|
|
tag, value, quality, timestamp, error_msgs[tag])
|
|
yield (
|
|
tag, value, quality, timestamp)
|
|
|
|
if group == None:
|
|
try:
|
|
if not sync:
|
|
if opc_group.Name in self._group_hooks:
|
|
if self.trace:
|
|
self.trace("CloseEvents(%s)" % opc_group.Name)
|
|
self._group_hooks[opc_group.Name].close()
|
|
if self.trace:
|
|
self.trace("RemoveGroup(%s)" % opc_group.Name)
|
|
opc_groups.Remove(opc_group.Name)
|
|
except pythoncom.com_error as err:
|
|
try:
|
|
error_msg = "RemoveGroup: %s" % self._get_error_str(err)
|
|
raise OPCError(error_msg)
|
|
finally:
|
|
err = None
|
|
del err
|
|
|
|
except pythoncom.com_error as err:
|
|
try:
|
|
error_msg = "read: %s" % self._get_error_str(err)
|
|
raise OPCError(error_msg)
|
|
finally:
|
|
err = None
|
|
del err
|
|
|
|
def read(self, tags=None, group=None, size=None, pause=0, source='hybrid', update=-1, timeout=5000, sync=False, include_error=False, rebuild=False):
|
|
"""Return list of (value, quality, time) tuples for the specified tag(s)"""
|
|
tags_list, single, valid = type_check(tags)
|
|
if not valid:
|
|
raise TypeError("read(): 'tags' parameter must be a string or a list of strings")
|
|
else:
|
|
num_health_tags = len([t for t in tags_list if t[None[:1]] == "@"])
|
|
num_opc_tags = len([t for t in tags_list if t[None[:1]] != "@"])
|
|
if num_health_tags > 0:
|
|
if num_opc_tags > 0:
|
|
raise TypeError("read(): system health and OPC tags cannot be included in the same group")
|
|
results = self._read_health(tags)
|
|
else:
|
|
results = self.iread(tags, group, size, pause, source, update, timeout, sync, include_error, rebuild)
|
|
if single:
|
|
return list(results)[0]
|
|
return list(results)
|
|
|
|
def _read_health(self, tags):
|
|
"""Return values of special system health monitoring tags"""
|
|
self._update_tx_time()
|
|
tags, single, valid = type_check(tags)
|
|
time_str = time.strftime("%x %H:%M:%S")
|
|
results = []
|
|
for t in tags:
|
|
if t == "@MemFree":
|
|
value = SystemHealth.mem_free()
|
|
else:
|
|
if t == "@MemUsed":
|
|
value = SystemHealth.mem_used()
|
|
else:
|
|
if t == "@MemTotal":
|
|
value = SystemHealth.mem_total()
|
|
else:
|
|
if t == "@MemPercent":
|
|
value = SystemHealth.mem_percent()
|
|
else:
|
|
if t == "@DiskFree":
|
|
value = SystemHealth.disk_free()
|
|
else:
|
|
if t == "@SineWave":
|
|
value = SystemHealth.sine_wave()
|
|
else:
|
|
if t == "@SawWave":
|
|
value = SystemHealth.saw_wave()
|
|
else:
|
|
if t == "@CpuUsage":
|
|
if self.cpu == None:
|
|
self.cpu = SystemHealth.CPU()
|
|
time.sleep(0.1)
|
|
value = self.cpu.get_usage()
|
|
else:
|
|
value = None
|
|
m = re.match("@TaskMem\\((.*?)\\)", t)
|
|
if m:
|
|
image_name = m.group(1)
|
|
value = SystemHealth.task_mem(image_name)
|
|
else:
|
|
m = re.match("@TaskCpu\\((.*?)\\)", t)
|
|
if m:
|
|
image_name = m.group(1)
|
|
value = SystemHealth.task_cpu(image_name)
|
|
else:
|
|
m = re.match("@TaskExists\\((.*?)\\)", t)
|
|
if m:
|
|
image_name = m.group(1)
|
|
value = SystemHealth.task_exists(image_name)
|
|
if value == None:
|
|
quality = "Error"
|
|
else:
|
|
quality = "Good"
|
|
if single:
|
|
results.append((value, quality, time_str))
|
|
results.append((t, value, quality, time_str))
|
|
|
|
return results
|
|
|
|
def iwrite(self, tag_value_pairs, size=None, pause=0, include_error=False):
|
|
"""Iterable version of write()"""
|
|
try:
|
|
self._update_tx_time()
|
|
pythoncom.CoInitialize()
|
|
|
|
def _valid_pair(p):
|
|
if type(p) in (list, tuple):
|
|
if len(p) >= 2:
|
|
if type(p[0]) in (str, bytes):
|
|
return True
|
|
return False
|
|
|
|
if type(tag_value_pairs) not in (list, tuple):
|
|
raise TypeError("write(): 'tag_value_pairs' parameter must be a (tag, value) tuple or a list of (tag,value) tuples")
|
|
else:
|
|
if tag_value_pairs == None:
|
|
tag_value_pairs = [
|
|
""]
|
|
single = False
|
|
else:
|
|
if type(tag_value_pairs[0]) in (str, bytes):
|
|
tag_value_pairs = [
|
|
tag_value_pairs]
|
|
single = True
|
|
else:
|
|
single = False
|
|
invalid_pairs = [p for p in tag_value_pairs if not _valid_pair(p)]
|
|
if len(invalid_pairs) > 0:
|
|
raise TypeError("write(): 'tag_value_pairs' parameter must be a (tag, value) tuple or a list of (tag,value) tuples")
|
|
names = [tag[0] for tag in tag_value_pairs]
|
|
tags = [tag[0] for tag in tag_value_pairs]
|
|
values = [tag[1] for tag in tag_value_pairs]
|
|
if size:
|
|
name_groups = [names[i[:i + size]] for i in range(0, len(names), size)]
|
|
tag_groups = [tags[i[:i + size]] for i in range(0, len(tags), size)]
|
|
value_groups = [values[i[:i + size]] for i in range(0, len(values), size)]
|
|
else:
|
|
name_groups = [
|
|
names]
|
|
tag_groups = [
|
|
tags]
|
|
value_groups = [values]
|
|
num_groups = len(tag_groups)
|
|
status = []
|
|
for gid in range(num_groups):
|
|
if gid > 0:
|
|
if pause > 0:
|
|
time.sleep(pause / 1000.0)
|
|
opc_groups = self._opc.OPCGroups
|
|
opc_group = opc_groups.Add()
|
|
opc_items = opc_group.OPCItems
|
|
names = name_groups[gid]
|
|
tags = tag_groups[gid]
|
|
values = value_groups[gid]
|
|
names.insert(0, 0)
|
|
errors = []
|
|
try:
|
|
errors = opc_items.Validate(len(names) - 1, names)
|
|
except:
|
|
pass
|
|
|
|
n = 1
|
|
valid_tags = []
|
|
valid_values = []
|
|
client_handles = []
|
|
error_msgs = {}
|
|
for i, tag in enumerate(tags):
|
|
if errors[i] == 0:
|
|
valid_tags.append(tag)
|
|
valid_values.append(values[i])
|
|
client_handles.append(n)
|
|
error_msgs[tag] = ""
|
|
n += 1
|
|
|
|
client_handles.insert(0, 0)
|
|
valid_tags.insert(0, 0)
|
|
server_handles = []
|
|
errors = []
|
|
try:
|
|
server_handles, errors = opc_items.AddItems(len(client_handles) - 1, valid_tags, client_handles)
|
|
except:
|
|
pass
|
|
|
|
valid_tags_tmp = []
|
|
valid_values_tmp = []
|
|
server_handles_tmp = []
|
|
valid_tags.pop(0)
|
|
for i, tag in enumerate(valid_tags):
|
|
if errors[i] == 0:
|
|
valid_tags_tmp.append(tag)
|
|
valid_values_tmp.append(valid_values[i])
|
|
server_handles_tmp.append(server_handles[i])
|
|
error_msgs[tag] = ""
|
|
|
|
valid_tags = valid_tags_tmp
|
|
valid_values = valid_values_tmp
|
|
server_handles = server_handles_tmp
|
|
server_handles.insert(0, 0)
|
|
valid_values.insert(0, 0)
|
|
errors = []
|
|
if len(valid_values) > 1:
|
|
try:
|
|
errors = opc_group.SyncWrite(len(server_handles) - 1, server_handles, valid_values)
|
|
except:
|
|
pass
|
|
|
|
n = 0
|
|
for tag in tags:
|
|
if tag in valid_tags:
|
|
if errors[n] == 0:
|
|
status = "Success"
|
|
else:
|
|
status = "Error"
|
|
if include_error:
|
|
error_msgs[tag] = self._opc.GetErrorString(errors[n])
|
|
n += 1
|
|
else:
|
|
status = "Error"
|
|
if include_error:
|
|
error_msgs[tag] = error_msgs[tag].strip("\r\n")
|
|
if single:
|
|
if include_error:
|
|
yield (
|
|
status, error_msgs[tag])
|
|
else:
|
|
yield status
|
|
elif include_error:
|
|
yield (
|
|
tag, status, error_msgs[tag])
|
|
else:
|
|
yield (
|
|
tag, status)
|
|
|
|
opc_groups.Remove(opc_group.Name)
|
|
|
|
except pythoncom.com_error as err:
|
|
try:
|
|
error_msg = "write: %s" % self._get_error_str(err)
|
|
raise OPCError(error_msg)
|
|
finally:
|
|
err = None
|
|
del err
|
|
|
|
def write(self, tag_value_pairs, size=None, pause=0, include_error=False):
|
|
"""Write list of (tag, value) pair(s) to the server"""
|
|
if type(tag_value_pairs) in (list, tuple) and type(tag_value_pairs[0]) in (list, tuple):
|
|
single = False
|
|
else:
|
|
single = True
|
|
status = self.iwrite(tag_value_pairs, size, pause, include_error)
|
|
if single:
|
|
return list(status)[0]
|
|
return list(status)
|
|
|
|
def groups(self):
|
|
"""Return a list of active tag groups"""
|
|
return self._groups.keys()
|
|
|
|
def remove(self, groups):
|
|
"""Remove the specified tag group(s)"""
|
|
try:
|
|
pythoncom.CoInitialize()
|
|
opc_groups = self._opc.OPCGroups
|
|
if type(groups) in (str, bytes):
|
|
groups = [
|
|
groups]
|
|
single = True
|
|
else:
|
|
single = False
|
|
status = []
|
|
for group in groups:
|
|
if group in self._groups:
|
|
for i in range(self._groups[group]):
|
|
sub_group = "%s.%d" % (group, i)
|
|
if sub_group in self._group_hooks:
|
|
if self.trace:
|
|
self.trace("CloseEvents(%s)" % sub_group)
|
|
self._group_hooks[sub_group].close()
|
|
try:
|
|
if self.trace:
|
|
self.trace("RemoveGroup(%s)" % sub_group)
|
|
errors = opc_groups.Remove(sub_group)
|
|
except pythoncom.com_error as err:
|
|
try:
|
|
error_msg = "RemoveGroup: %s" % self._get_error_str(err)
|
|
raise OPCError(error_msg)
|
|
finally:
|
|
err = None
|
|
del err
|
|
|
|
del self._group_tags[sub_group]
|
|
del self._group_valid_tags[sub_group]
|
|
del self._group_handles_tag[sub_group]
|
|
del self._group_server_handles[sub_group]
|
|
|
|
del self._groups[group]
|
|
|
|
except pythoncom.com_error as err:
|
|
try:
|
|
error_msg = "remove: %s" % self._get_error_str(err)
|
|
raise OPCError(error_msg)
|
|
finally:
|
|
err = None
|
|
del err
|
|
|
|
def iproperties(self, tags, id=None):
|
|
"""Iterable version of properties()"""
|
|
try:
|
|
self._update_tx_time()
|
|
pythoncom.CoInitialize()
|
|
tags, single_tag, valid = type_check(tags)
|
|
if not valid:
|
|
raise TypeError("properties(): 'tags' parameter must be a string or a list of strings")
|
|
try:
|
|
id.remove(0)
|
|
include_name = True
|
|
except:
|
|
include_name = False
|
|
|
|
if id != None:
|
|
descriptions = []
|
|
if isinstance(id, list) or isinstance(id, tuple):
|
|
property_id = list(id)
|
|
single_property = False
|
|
else:
|
|
property_id = [
|
|
id]
|
|
single_property = True
|
|
for i in property_id:
|
|
descriptions.append("Property id %d" % i)
|
|
|
|
else:
|
|
single_property = False
|
|
properties = []
|
|
for tag in tags:
|
|
if id == None:
|
|
descriptions = []
|
|
property_id = []
|
|
count, property_id, descriptions, datatypes = self._opc.QueryAvailableProperties(tag)
|
|
tag_properties = list(map((lambda x, y: (x, y)), property_id, descriptions))
|
|
property_id = [p for p, d in tag_properties if p > 0]
|
|
descriptions = [d for p, d in tag_properties if p > 0]
|
|
else:
|
|
property_id.insert(0, 0)
|
|
values = []
|
|
errors = []
|
|
values, errors = self._opc.GetItemProperties(tag, len(property_id) - 1, property_id)
|
|
property_id.pop(0)
|
|
values = [str(v) if type(v) == pywintypes.TimeType else v for v in values]
|
|
try:
|
|
i = property_id.index(1)
|
|
values[i] = vt[values[i]]
|
|
except:
|
|
pass
|
|
|
|
try:
|
|
i = property_id.index(3)
|
|
values[i] = quality_str(values[i])
|
|
except:
|
|
pass
|
|
|
|
try:
|
|
i = property_id.index(5)
|
|
values[i] = ACCESS_RIGHTS[values[i]]
|
|
except:
|
|
pass
|
|
|
|
if id != None:
|
|
if single_property:
|
|
if single_tag:
|
|
tag_properties = values
|
|
else:
|
|
tag_properties = [
|
|
values]
|
|
else:
|
|
tag_properties = list(map((lambda x, y: (x, y)), property_id, values))
|
|
else:
|
|
tag_properties = list(map((lambda x, y, z: (x, y, z)), property_id, descriptions, values))
|
|
tag_properties.insert(0, (0, "Item ID (virtual property)", tag))
|
|
if include_name:
|
|
tag_properties.insert(0, (0, tag))
|
|
tag_properties = single_tag or [tuple([tag] + list(p)) for p in tag_properties]
|
|
for p in tag_properties:
|
|
yield p
|
|
|
|
except pythoncom.com_error as err:
|
|
try:
|
|
error_msg = "properties: %s" % self._get_error_str(err)
|
|
raise OPCError(error_msg)
|
|
finally:
|
|
err = None
|
|
del err
|
|
|
|
def properties(self, tags, id=None):
|
|
"""Return list of property tuples (id, name, value) for the specified tag(s) """
|
|
if type(tags) not in (list, tuple) and type(id) not in (type(None), list, tuple):
|
|
single = True
|
|
else:
|
|
single = False
|
|
props = self.iproperties(tags, id)
|
|
if single:
|
|
return list(props)[0]
|
|
return list(props)
|
|
|
|
def ilist(self, paths='*', recursive=False, flat=False, include_type=False):
|
|
"""Iterable version of list()"""
|
|
try:
|
|
self._update_tx_time()
|
|
pythoncom.CoInitialize()
|
|
try:
|
|
browser = self._opc.CreateBrowser()
|
|
except:
|
|
return
|
|
else:
|
|
paths, single, valid = type_check(paths)
|
|
if not valid:
|
|
raise TypeError("list(): 'paths' parameter must be a string or a list of strings")
|
|
if len(paths) == 0:
|
|
paths = [
|
|
"*"]
|
|
nodes = {}
|
|
for path in paths:
|
|
if flat:
|
|
browser.MoveToRoot()
|
|
browser.Filter = ""
|
|
browser.ShowLeafs(True)
|
|
pattern = re.compile("^%s$" % wild2regex(path), re.IGNORECASE)
|
|
matches = filter(pattern.search, browser)
|
|
if include_type:
|
|
matches = [(x, node_type) for x in matches]
|
|
for node in matches:
|
|
yield node
|
|
|
|
continue
|
|
queue = []
|
|
queue.append(path)
|
|
while len(queue) > 0:
|
|
tag = queue.pop(0)
|
|
browser.MoveToRoot()
|
|
browser.Filter = ""
|
|
pattern = None
|
|
path_str = "/"
|
|
path_list = tag.replace(".", "/").split("/")
|
|
path_list = [p for p in path_list if len(p) > 0]
|
|
found_filter = False
|
|
path_postfix = "/"
|
|
for i, p in enumerate(path_list):
|
|
if found_filter:
|
|
path_postfix += p + "/"
|
|
else:
|
|
if p.find("*") >= 0:
|
|
pattern = re.compile("^%s$" % wild2regex(p), re.IGNORECASE)
|
|
found_filter = True
|
|
|
|
browser.ShowBranches()
|
|
if len(browser) == 0:
|
|
browser.ShowLeafs(False)
|
|
lowest_level = True
|
|
node_type = "Leaf"
|
|
else:
|
|
lowest_level = False
|
|
node_type = "Branch"
|
|
matches = filter(pattern.search, browser)
|
|
if not lowest_level:
|
|
if recursive:
|
|
queue += [path_str + x + path_postfix for x in matches]
|
|
elif lowest_level:
|
|
matches = [exceptional(browser.GetItemID, x)(x) for x in matches]
|
|
if include_type:
|
|
matches = [(x, node_type) for x in matches]
|
|
for node in matches:
|
|
if node not in nodes:
|
|
yield node
|
|
nodes[node] = True
|
|
|
|
except pythoncom.com_error as err:
|
|
try:
|
|
error_msg = "list: %s" % self._get_error_str(err)
|
|
raise OPCError(error_msg)
|
|
finally:
|
|
err = None
|
|
del err
|
|
|
|
def list(self, paths='*', recursive=False, flat=False, include_type=False):
|
|
"""Return list of item nodes at specified path(s) (tree browser)"""
|
|
nodes = self.ilist(paths, recursive, flat, include_type)
|
|
return list(nodes)
|
|
|
|
def servers(self, opc_host='localhost'):
|
|
"""Return list of available OPC servers"""
|
|
try:
|
|
pythoncom.CoInitialize()
|
|
servers = self._opc.GetOPCServers(opc_host)
|
|
servers = [s for s in servers if s != None]
|
|
return servers
|
|
except pythoncom.com_error as err:
|
|
try:
|
|
error_msg = "servers: %s" % self._get_error_str(err)
|
|
raise OPCError(error_msg)
|
|
finally:
|
|
err = None
|
|
del err
|
|
|
|
def info(self):
|
|
"""Return list of (name, value) pairs about the OPC server"""
|
|
try:
|
|
self._update_tx_time()
|
|
pythoncom.CoInitialize()
|
|
info_list = []
|
|
if self._open_serv:
|
|
mode = "OpenOPC"
|
|
else:
|
|
mode = "DCOM"
|
|
info_list += [("Protocol", mode)]
|
|
if mode == "OpenOPC":
|
|
info_list += [("Gateway Host", "%s:%s" % (self._open_host, self._open_port))]
|
|
info_list += [("Gateway Version", "%s" % __version__)]
|
|
info_list += [("Class", self.opc_class)]
|
|
info_list += [("Client Name", self._opc.ClientName)]
|
|
info_list += [("OPC Host", self.opc_host)]
|
|
info_list += [("OPC Server", self._opc.ServerName)]
|
|
info_list += [("State", OPC_STATUS[self._opc.ServerState])]
|
|
info_list += [("Version", "%d.%d (Build %d)" % (self._opc.MajorVersion, self._opc.MinorVersion, self._opc.BuildNumber))]
|
|
try:
|
|
browser = self._opc.CreateBrowser()
|
|
browser_type = BROWSER_TYPE[browser.Organization]
|
|
except:
|
|
browser_type = "Not Supported"
|
|
|
|
info_list += [("Browser", browser_type)]
|
|
info_list += [("Start Time", str(self._opc.StartTime))]
|
|
info_list += [("Current Time", str(self._opc.CurrentTime))]
|
|
info_list += [("Vendor", self._opc.VendorInfo)]
|
|
return info_list
|
|
except pythoncom.com_error as err:
|
|
try:
|
|
error_msg = "info: %s" % self._get_error_str(err)
|
|
raise OPCError(error_msg)
|
|
finally:
|
|
err = None
|
|
del err
|
|
|
|
def ping(self):
|
|
"""Check if we are still talking to the OPC server"""
|
|
try:
|
|
opc_serv_time = int(float(self._opc.CurrentTime) * 1000000.0)
|
|
if opc_serv_time == self._prev_serv_time:
|
|
return False
|
|
self._prev_serv_time = opc_serv_time
|
|
return True
|
|
except pythoncom.com_error:
|
|
return False
|
|
|
|
def _get_error_str(self, err):
|
|
"""Return the error string for a OPC or COM error code"""
|
|
hr, msg, exc, arg = err.args
|
|
if exc == None:
|
|
error_str = str(msg)
|
|
else:
|
|
scode = exc[5]
|
|
try:
|
|
opc_err_str = unicode(self._opc.GetErrorString(scode)).strip("\r\n")
|
|
except:
|
|
opc_err_str = None
|
|
|
|
try:
|
|
com_err_str = unicode(pythoncom.GetScodeString(scode)).strip("\r\n")
|
|
except:
|
|
com_err_str = None
|
|
|
|
if opc_err_str == None and com_err_str == None:
|
|
error_str = str(scode)
|
|
else:
|
|
if opc_err_str == com_err_str:
|
|
error_str = opc_err_str
|
|
else:
|
|
if opc_err_str == None:
|
|
error_str = com_err_str
|
|
else:
|
|
if com_err_str == None:
|
|
error_str = opc_err_str
|
|
else:
|
|
error_str = "%s (%s)" % (opc_err_str, com_err_str)
|
|
return error_str
|
|
|
|
def _update_tx_time(self):
|
|
"""Update the session's last transaction time in the Gateway Service"""
|
|
if self._open_serv:
|
|
self._open_serv._tx_times[self._open_guid] = time.time()
|
|
|
|
def __getitem__(self, key):
|
|
"""Read single item (tag as dictionary key)"""
|
|
value, quality, time = self.read(key)
|
|
return value
|
|
|
|
def __setitem__(self, key, value):
|
|
"""Write single item (tag as dictionary key)"""
|
|
self.write((key, value))
|