Files
2025-04-30 08:48:49 -05:00

1158 lines
48 KiB
Python

# uncompyle6 version 3.9.2
# Python bytecode version base 3.7.0 (3394)
# Decompiled from: Python 3.8.19 (default, Mar 20 2024, 15:27:52)
# [Clang 14.0.6 ]
# Embedded file name: /var/user/app/device_supervisorbak/device_supervisor/lib/OpenOPC.py
# Compiled at: 2024-04-18 03:12:57
# Size of source mod 2**32: 42815 bytes
import os, sys, time, types, string, socket, re, Pyro4.core
from multiprocessing import Queue
__version__ = "1.2.0"
current_client = None
if os.name == "nt":
try:
import win32com.client, win32com.server.util, win32event, pythoncom, pywintypes, SystemHealth
pywintypes.datetime = pywintypes.TimeType
vt = dict([(pythoncom.__dict__[vtype], vtype) for vtype in pythoncom.__dict__.keys() if vtype[None[:2]] == "VT"])
win32com.client.gencache.is_readonly = False
win32com.client.gencache.Rebuild(verbose=0)
except ImportError:
win32com_found = False
else:
win32com_found = True
else:
win32com_found = False
SOURCE_CACHE = 1
SOURCE_DEVICE = 2
OPC_STATUS = (0, 'Running', 'Failed', 'NoConfig', 'Suspended', 'Test')
BROWSER_TYPE = (0, 'Hierarchical', 'Flat')
ACCESS_RIGHTS = (0, 'Read', 'Write', 'Read/Write')
OPC_QUALITY = ('Bad', 'Uncertain', 'Unknown', 'Good')
OPC_CLASS = "Matrikon.OPC.Automation;Graybox.OPC.DAWrapper;HSCOPC.Automation;RSI.OPCAutomation;OPC.Automation"
OPC_SERVER = "Hci.TPNServer;HwHsc.OPCServer;opc.deltav.1;AIM.OPC.1;Yokogawa.ExaopcDAEXQ.1;OSI.DA.1;OPC.PHDServerDA.1;Aspen.Infoplus21_DA.1;National Instruments.OPCLabVIEW;RSLinx OPC Server;KEPware.KEPServerEx.V4;Matrikon.OPC.Simulation;Prosys.OPC.Simulation;CCOPC.XMLWrapper.1;OPC.SimaticHMI.CoRtHmiRTm.1"
OPC_CLIENT = "OpenOPC"
def quality_str(quality_bits):
"""Convert OPC quality bits to a descriptive string"""
quality = quality_bits >> 6 & 3
return OPC_QUALITY[quality]
def type_check(tags):
"""Perform a type check on a list of tags"""
if type(tags) in (list, tuple):
single = False
else:
if tags == None:
tags = []
single = False
else:
tags = [
tags]
single = True
if len([t for t in tags if type(t) not in (str, bytes)]) == 0:
valid = True
else:
valid = False
return (tags, single, valid)
def wild2regex(string):
"""Convert a Unix wildcard glob into a regular expression"""
return string.replace(".", "\\.").replace("*", ".*").replace("?", ".").replace("!", "^")
def tags2trace(tags):
"""Convert a list tags into a formatted string suitable for the trace callback log"""
arg_str = ""
for i, t in enumerate(tags[1[:None]]):
if i > 0:
arg_str += ","
arg_str += "%s" % t
return arg_str
def exceptional(func, alt_return=None, alt_exceptions=(Exception,), final=None, catch=None):
"""Turns exceptions into an alternative return value"""
def _exceptional(*args, **kwargs):
try:
try:
return func(*args, **kwargs)
except alt_exceptions:
return alt_return
except:
if catch:
return catch(sys.exc_info(), (lambda: func(*args, **kwargs)))
raise
finally:
if final:
final()
return _exceptional
def get_sessions(host='localhost', port=7766):
"""Return sessions in OpenOPC Gateway Service as GUID:host hash"""
import Pyro4.core
server_obj = Pyro4.Proxy("PYRO:opc@{0}:{1}".format(host, port))
return server_obj.get_clients()
def open_client(host='localhost', port=7766, debug_log=None):
"""Connect to the specified OpenOPC Gateway Service"""
import Pyro4.core
server_obj = Pyro4.Proxy("PYRO:opc@{0}:{1}".format(host, port))
server_obj.opcda_set_debug_log(debug_log)
return server_obj.create_client()
class TimeoutError(Exception):
def __init__(self, txt):
Exception.__init__(self, txt)
class OPCError(Exception):
def __init__(self, txt):
Exception.__init__(self, txt)
class GroupEvents:
def __init__(self):
global current_client
self.client = current_client
def OnDataChange(self, TransactionID, NumItems, ClientHandles, ItemValues, Qualities, TimeStamps):
self.client.callback_queue.put((TransactionID, ClientHandles, ItemValues, Qualities, TimeStamps))
@Pyro4.expose
class client:
def __init__(self, opc_class=None, client_name=None):
"""Instantiate OPC automation class"""
self.callback_queue = Queue()
pythoncom.CoInitialize()
if opc_class == None:
if "OPC_CLASS" in os.environ:
opc_class = os.environ["OPC_CLASS"]
else:
opc_class = OPC_CLASS
opc_class_list = opc_class.split(";")
for i, c in enumerate(opc_class_list):
try:
self._opc = win32com.client.gencache.EnsureDispatch(c, 0)
self.opc_class = c
break
except pythoncom.com_error as err:
try:
if i == len(opc_class_list) - 1:
error_msg = "Dispatch: %s" % self._get_error_str(err)
raise OPCError(error_msg)
finally:
err = None
del err
self._event = win32event.CreateEvent(None, 0, 0, None)
self.opc_server = None
self.opc_host = None
self.client_name = client_name
self._groups = {}
self._group_tags = {}
self._group_valid_tags = {}
self._group_server_handles = {}
self._group_handles_tag = {}
self._group_hooks = {}
self._open_serv = None
self._open_self = None
self._open_host = None
self._open_port = None
self._open_guid = None
self._prev_serv_time = None
self._tx_id = 0
self.trace = None
self.cpu = None
def set_trace(self, trace):
if self._open_serv == None:
self.trace = trace
def connect(self, opc_server=None, opc_host='localhost'):
"""Connect to the specified OPC server"""
pythoncom.CoInitialize()
if opc_server == None:
if self.opc_server == None:
if "OPC_SERVER" in os.environ:
opc_server = os.environ["OPC_SERVER"]
else:
opc_server = OPC_SERVER
else:
opc_server = self.opc_server
opc_host = self.opc_host
opc_server_list = opc_server.split(";")
connected = False
for s in opc_server_list:
try:
if self.trace:
self.trace("Connect(%s,%s)" % (s, opc_host))
self._opc.Connect(s, opc_host)
except pythoncom.com_error as err:
try:
if len(opc_server_list) == 1:
error_msg = "Connect: %s" % self._get_error_str(err)
raise OPCError(error_msg)
finally:
err = None
del err
else:
try:
if self.client_name == None:
if "OPC_CLIENT" in os.environ:
self._opc.ClientName = os.environ["OPC_CLIENT"]
else:
self._opc.ClientName = OPC_CLIENT
else:
self._opc.ClientName = self.client_name
except:
pass
connected = True
break
if not connected:
raise OPCError("Connect: Cannot connect to any of the servers in the OPC_SERVER list")
time.sleep(0.01)
self.opc_server = opc_server
if opc_host == "localhost":
opc_host = socket.gethostname()
self.opc_host = opc_host
self._groups = {}
self._group_tags = {}
self._group_valid_tags = {}
self._group_server_handles = {}
self._group_handles_tag = {}
self._group_hooks = {}
def GUID(self):
return self._open_guid
def close(self, del_object=True):
"""Disconnect from the currently connected OPC server"""
try:
try:
pythoncom.CoInitialize()
self.remove(self.groups())
except pythoncom.com_error as err:
try:
error_msg = "Disconnect: %s" % self._get_error_str(err)
raise OPCError(error_msg)
finally:
err = None
del err
except OPCError:
pass
finally:
if self.trace:
self.trace("Disconnect()")
self._opc.Disconnect()
if self._open_serv:
if del_object:
self._open_serv.release_client(self._open_self)
def iread(self, tags=None, group=None, size=None, pause=0, source='hybrid', update=-1, timeout=5000, sync=False, include_error=False, rebuild=False):
"""Iterable version of read()"""
global current_client
def add_items(tags):
names = list(tags)
names.insert(0, 0)
errors = []
if self.trace:
self.trace("Validate(%s)" % tags2trace(names))
else:
try:
errors = opc_items.Validate(len(names) - 1, names)
except:
pass
valid_tags = []
valid_values = []
client_handles = []
if sub_group not in self._group_handles_tag:
self._group_handles_tag[sub_group] = {}
n = 0
else:
if len(self._group_handles_tag[sub_group]) > 0:
n = max(self._group_handles_tag[sub_group]) + 1
else:
n = 0
for i, tag in enumerate(tags):
if errors[i] == 0:
valid_tags.append(tag)
client_handles.append(n)
self._group_handles_tag[sub_group][n] = tag
n += 1
else:
if include_error:
error_msgs[tag] = self._opc.GetErrorString(errors[i])
if self.trace and errors[i] != 0:
self.trace("%s failed validation" % tag)
client_handles.insert(0, 0)
valid_tags.insert(0, 0)
server_handles = []
errors = []
if self.trace:
self.trace("AddItems(%s)" % tags2trace(valid_tags))
try:
server_handles, errors = opc_items.AddItems(len(client_handles) - 1, valid_tags, client_handles)
except:
pass
valid_tags_tmp = []
server_handles_tmp = []
valid_tags.pop(0)
if sub_group not in self._group_server_handles:
self._group_server_handles[sub_group] = {}
for i, tag in enumerate(valid_tags):
if errors[i] == 0:
valid_tags_tmp.append(tag)
server_handles_tmp.append(server_handles[i])
self._group_server_handles[sub_group][tag] = server_handles[i]
valid_tags = valid_tags_tmp
server_handles = server_handles_tmp
return (
valid_tags, server_handles)
def remove_items(tags):
if self.trace:
self.trace("RemoveItems(%s)" % tags2trace([""] + tags))
server_handles = [self._group_server_handles[sub_group][tag] for tag in tags]
server_handles.insert(0, 0)
errors = []
try:
errors = opc_items.Remove(len(server_handles) - 1, server_handles)
except pythoncom.com_error as err:
try:
error_msg = "RemoveItems: %s" % self._get_error_str(err)
raise OPCError(error_msg)
finally:
err = None
del err
try:
self._update_tx_time()
pythoncom.CoInitialize()
if include_error:
sync = True
if sync:
update = -1
tags, single, valid = type_check(tags)
if not valid:
raise TypeError("iread(): 'tags' parameter must be a string or a list of strings")
elif group in self._groups:
num_groups = rebuild or self._groups[group]
data_source = SOURCE_CACHE
else:
if size:
tag_groups = [tags[i[:i + size]] for i in range(0, len(tags), size)]
else:
tag_groups = [
tags]
num_groups = len(tag_groups)
data_source = SOURCE_DEVICE
results = []
for gid in range(num_groups):
if gid > 0:
if pause > 0:
time.sleep(pause / 1000.0)
else:
error_msgs = {}
opc_groups = self._opc.OPCGroups
opc_groups.DefaultGroupUpdateRate = update
if group == None:
try:
if self.trace:
self.trace("AddGroup()")
opc_group = opc_groups.Add()
except pythoncom.com_error as err:
try:
error_msg = "AddGroup: %s" % self._get_error_str(err)
raise OPCError(error_msg)
finally:
err = None
del err
sub_group = group
new_group = True
else:
sub_group = "%s.%d" % (group, gid)
try:
if self.trace:
self.trace("GetOPCGroup(%s)" % sub_group)
opc_group = opc_groups.GetOPCGroup(sub_group)
new_group = False
except:
try:
if self.trace:
self.trace("AddGroup(%s)" % sub_group)
opc_group = opc_groups.Add(sub_group)
except pythoncom.com_error as err:
try:
error_msg = "AddGroup: %s" % self._get_error_str(err)
raise OPCError(error_msg)
finally:
err = None
del err
self._groups[str(group)] = len(tag_groups)
new_group = True
opc_items = opc_group.OPCItems
if new_group:
opc_group.IsSubscribed = 1
opc_group.IsActive = 1
if not sync:
if self.trace:
self.trace("WithEvents(%s)" % opc_group.Name)
current_client = self
self._group_hooks[opc_group.Name] = win32com.client.WithEvents(opc_group, GroupEvents)
tags = tag_groups[gid]
valid_tags, server_handles = add_items(tags)
self._group_tags[sub_group] = tags
self._group_valid_tags[sub_group] = valid_tags
else:
if rebuild:
tags = tag_groups[gid]
valid_tags = self._group_valid_tags[sub_group]
add_tags = [t for t in tags if t not in valid_tags]
del_tags = [t for t in valid_tags if t not in tags]
if len(add_tags) > 0:
valid_tags, server_handles = add_items(add_tags)
valid_tags = self._group_valid_tags[sub_group] + valid_tags
if len(del_tags) > 0:
remove_items(del_tags)
valid_tags = [t for t in valid_tags if t not in del_tags]
self._group_tags[sub_group] = tags
self._group_valid_tags[sub_group] = valid_tags
if source == "hybrid":
data_source = SOURCE_DEVICE
else:
tags = self._group_tags[sub_group]
valid_tags = self._group_valid_tags[sub_group]
if sync:
server_handles = [item.ServerHandle for item in opc_items]
tag_value = {}
tag_quality = {}
tag_time = {}
tag_error = {}
if sync:
values = []
errors = []
qualities = []
timestamps = []
if len(valid_tags) > 0:
server_handles.insert(0, 0)
if source != "hybrid":
data_source = SOURCE_CACHE if source == "cache" else SOURCE_DEVICE
if self.trace:
self.trace("SyncRead(%s)" % data_source)
try:
values, errors, qualities, timestamps = opc_group.SyncRead(data_source, len(server_handles) - 1, server_handles)
except pythoncom.com_error as err:
try:
error_msg = "SyncRead: %s" % self._get_error_str(err)
raise OPCError(error_msg)
finally:
err = None
del err
for i, tag in enumerate(valid_tags):
tag_value[tag] = values[i]
tag_quality[tag] = qualities[i]
tag_time[tag] = timestamps[i]
tag_error[tag] = errors[i]
elif len(valid_tags) > 0:
if self._tx_id >= 65535:
self._tx_id = 0
else:
self._tx_id += 1
if source != "hybrid":
data_source = SOURCE_CACHE if source == "cache" else SOURCE_DEVICE
if self.trace:
self.trace("AsyncRefresh(%s)" % data_source)
try:
opc_group.AsyncRefresh(data_source, self._tx_id)
except pythoncom.com_error as err:
try:
error_msg = "AsyncRefresh: %s" % self._get_error_str(err)
raise OPCError(error_msg)
finally:
err = None
del err
tx_id = 0
start = time.time() * 1000
while tx_id != self._tx_id:
now = time.time() * 1000
if now - start > timeout:
raise TimeoutError("Callback: Timeout waiting for data")
if self.callback_queue.empty():
pythoncom.PumpWaitingMessages()
else:
tx_id, handles, values, qualities, timestamps = self.callback_queue.get()
for i, h in enumerate(handles):
tag = self._group_handles_tag[sub_group][h]
tag_value[tag] = values[i]
tag_quality[tag] = qualities[i]
tag_time[tag] = timestamps[i]
for tag in tags:
if tag in tag_value and not sync:
if not len(valid_tags) > 0 or sync or tag_error[tag] == 0:
value = tag_value[tag]
if type(value) == pywintypes.TimeType:
value = str(value)
quality = quality_str(tag_quality[tag])
timestamp = str(tag_time[tag])
else:
value = None
quality = "Error"
timestamp = None
if include_error:
error_msgs[tag] = self._opc.GetErrorString(tag_error[tag]).strip("\r\n")
else:
value = None
quality = "Error"
timestamp = None
if include_error:
if tag not in error_msgs:
error_msgs[tag] = ""
if single:
if include_error:
yield (
value, quality, timestamp, error_msgs[tag])
else:
yield (
value, quality, timestamp)
else:
if include_error:
yield (
tag, value, quality, timestamp, error_msgs[tag])
yield (
tag, value, quality, timestamp)
if group == None:
try:
if not sync:
if opc_group.Name in self._group_hooks:
if self.trace:
self.trace("CloseEvents(%s)" % opc_group.Name)
self._group_hooks[opc_group.Name].close()
if self.trace:
self.trace("RemoveGroup(%s)" % opc_group.Name)
opc_groups.Remove(opc_group.Name)
except pythoncom.com_error as err:
try:
error_msg = "RemoveGroup: %s" % self._get_error_str(err)
raise OPCError(error_msg)
finally:
err = None
del err
except pythoncom.com_error as err:
try:
error_msg = "read: %s" % self._get_error_str(err)
raise OPCError(error_msg)
finally:
err = None
del err
def read(self, tags=None, group=None, size=None, pause=0, source='hybrid', update=-1, timeout=5000, sync=False, include_error=False, rebuild=False):
"""Return list of (value, quality, time) tuples for the specified tag(s)"""
tags_list, single, valid = type_check(tags)
if not valid:
raise TypeError("read(): 'tags' parameter must be a string or a list of strings")
else:
num_health_tags = len([t for t in tags_list if t[None[:1]] == "@"])
num_opc_tags = len([t for t in tags_list if t[None[:1]] != "@"])
if num_health_tags > 0:
if num_opc_tags > 0:
raise TypeError("read(): system health and OPC tags cannot be included in the same group")
results = self._read_health(tags)
else:
results = self.iread(tags, group, size, pause, source, update, timeout, sync, include_error, rebuild)
if single:
return list(results)[0]
return list(results)
def _read_health(self, tags):
"""Return values of special system health monitoring tags"""
self._update_tx_time()
tags, single, valid = type_check(tags)
time_str = time.strftime("%x %H:%M:%S")
results = []
for t in tags:
if t == "@MemFree":
value = SystemHealth.mem_free()
else:
if t == "@MemUsed":
value = SystemHealth.mem_used()
else:
if t == "@MemTotal":
value = SystemHealth.mem_total()
else:
if t == "@MemPercent":
value = SystemHealth.mem_percent()
else:
if t == "@DiskFree":
value = SystemHealth.disk_free()
else:
if t == "@SineWave":
value = SystemHealth.sine_wave()
else:
if t == "@SawWave":
value = SystemHealth.saw_wave()
else:
if t == "@CpuUsage":
if self.cpu == None:
self.cpu = SystemHealth.CPU()
time.sleep(0.1)
value = self.cpu.get_usage()
else:
value = None
m = re.match("@TaskMem\\((.*?)\\)", t)
if m:
image_name = m.group(1)
value = SystemHealth.task_mem(image_name)
else:
m = re.match("@TaskCpu\\((.*?)\\)", t)
if m:
image_name = m.group(1)
value = SystemHealth.task_cpu(image_name)
else:
m = re.match("@TaskExists\\((.*?)\\)", t)
if m:
image_name = m.group(1)
value = SystemHealth.task_exists(image_name)
if value == None:
quality = "Error"
else:
quality = "Good"
if single:
results.append((value, quality, time_str))
results.append((t, value, quality, time_str))
return results
def iwrite(self, tag_value_pairs, size=None, pause=0, include_error=False):
"""Iterable version of write()"""
try:
self._update_tx_time()
pythoncom.CoInitialize()
def _valid_pair(p):
if type(p) in (list, tuple):
if len(p) >= 2:
if type(p[0]) in (str, bytes):
return True
return False
if type(tag_value_pairs) not in (list, tuple):
raise TypeError("write(): 'tag_value_pairs' parameter must be a (tag, value) tuple or a list of (tag,value) tuples")
else:
if tag_value_pairs == None:
tag_value_pairs = [
""]
single = False
else:
if type(tag_value_pairs[0]) in (str, bytes):
tag_value_pairs = [
tag_value_pairs]
single = True
else:
single = False
invalid_pairs = [p for p in tag_value_pairs if not _valid_pair(p)]
if len(invalid_pairs) > 0:
raise TypeError("write(): 'tag_value_pairs' parameter must be a (tag, value) tuple or a list of (tag,value) tuples")
names = [tag[0] for tag in tag_value_pairs]
tags = [tag[0] for tag in tag_value_pairs]
values = [tag[1] for tag in tag_value_pairs]
if size:
name_groups = [names[i[:i + size]] for i in range(0, len(names), size)]
tag_groups = [tags[i[:i + size]] for i in range(0, len(tags), size)]
value_groups = [values[i[:i + size]] for i in range(0, len(values), size)]
else:
name_groups = [
names]
tag_groups = [
tags]
value_groups = [values]
num_groups = len(tag_groups)
status = []
for gid in range(num_groups):
if gid > 0:
if pause > 0:
time.sleep(pause / 1000.0)
opc_groups = self._opc.OPCGroups
opc_group = opc_groups.Add()
opc_items = opc_group.OPCItems
names = name_groups[gid]
tags = tag_groups[gid]
values = value_groups[gid]
names.insert(0, 0)
errors = []
try:
errors = opc_items.Validate(len(names) - 1, names)
except:
pass
n = 1
valid_tags = []
valid_values = []
client_handles = []
error_msgs = {}
for i, tag in enumerate(tags):
if errors[i] == 0:
valid_tags.append(tag)
valid_values.append(values[i])
client_handles.append(n)
error_msgs[tag] = ""
n += 1
client_handles.insert(0, 0)
valid_tags.insert(0, 0)
server_handles = []
errors = []
try:
server_handles, errors = opc_items.AddItems(len(client_handles) - 1, valid_tags, client_handles)
except:
pass
valid_tags_tmp = []
valid_values_tmp = []
server_handles_tmp = []
valid_tags.pop(0)
for i, tag in enumerate(valid_tags):
if errors[i] == 0:
valid_tags_tmp.append(tag)
valid_values_tmp.append(valid_values[i])
server_handles_tmp.append(server_handles[i])
error_msgs[tag] = ""
valid_tags = valid_tags_tmp
valid_values = valid_values_tmp
server_handles = server_handles_tmp
server_handles.insert(0, 0)
valid_values.insert(0, 0)
errors = []
if len(valid_values) > 1:
try:
errors = opc_group.SyncWrite(len(server_handles) - 1, server_handles, valid_values)
except:
pass
n = 0
for tag in tags:
if tag in valid_tags:
if errors[n] == 0:
status = "Success"
else:
status = "Error"
if include_error:
error_msgs[tag] = self._opc.GetErrorString(errors[n])
n += 1
else:
status = "Error"
if include_error:
error_msgs[tag] = error_msgs[tag].strip("\r\n")
if single:
if include_error:
yield (
status, error_msgs[tag])
else:
yield status
elif include_error:
yield (
tag, status, error_msgs[tag])
else:
yield (
tag, status)
opc_groups.Remove(opc_group.Name)
except pythoncom.com_error as err:
try:
error_msg = "write: %s" % self._get_error_str(err)
raise OPCError(error_msg)
finally:
err = None
del err
def write(self, tag_value_pairs, size=None, pause=0, include_error=False):
"""Write list of (tag, value) pair(s) to the server"""
if type(tag_value_pairs) in (list, tuple) and type(tag_value_pairs[0]) in (list, tuple):
single = False
else:
single = True
status = self.iwrite(tag_value_pairs, size, pause, include_error)
if single:
return list(status)[0]
return list(status)
def groups(self):
"""Return a list of active tag groups"""
return self._groups.keys()
def remove(self, groups):
"""Remove the specified tag group(s)"""
try:
pythoncom.CoInitialize()
opc_groups = self._opc.OPCGroups
if type(groups) in (str, bytes):
groups = [
groups]
single = True
else:
single = False
status = []
for group in groups:
if group in self._groups:
for i in range(self._groups[group]):
sub_group = "%s.%d" % (group, i)
if sub_group in self._group_hooks:
if self.trace:
self.trace("CloseEvents(%s)" % sub_group)
self._group_hooks[sub_group].close()
try:
if self.trace:
self.trace("RemoveGroup(%s)" % sub_group)
errors = opc_groups.Remove(sub_group)
except pythoncom.com_error as err:
try:
error_msg = "RemoveGroup: %s" % self._get_error_str(err)
raise OPCError(error_msg)
finally:
err = None
del err
del self._group_tags[sub_group]
del self._group_valid_tags[sub_group]
del self._group_handles_tag[sub_group]
del self._group_server_handles[sub_group]
del self._groups[group]
except pythoncom.com_error as err:
try:
error_msg = "remove: %s" % self._get_error_str(err)
raise OPCError(error_msg)
finally:
err = None
del err
def iproperties(self, tags, id=None):
"""Iterable version of properties()"""
try:
self._update_tx_time()
pythoncom.CoInitialize()
tags, single_tag, valid = type_check(tags)
if not valid:
raise TypeError("properties(): 'tags' parameter must be a string or a list of strings")
try:
id.remove(0)
include_name = True
except:
include_name = False
if id != None:
descriptions = []
if isinstance(id, list) or isinstance(id, tuple):
property_id = list(id)
single_property = False
else:
property_id = [
id]
single_property = True
for i in property_id:
descriptions.append("Property id %d" % i)
else:
single_property = False
properties = []
for tag in tags:
if id == None:
descriptions = []
property_id = []
count, property_id, descriptions, datatypes = self._opc.QueryAvailableProperties(tag)
tag_properties = list(map((lambda x, y: (x, y)), property_id, descriptions))
property_id = [p for p, d in tag_properties if p > 0]
descriptions = [d for p, d in tag_properties if p > 0]
else:
property_id.insert(0, 0)
values = []
errors = []
values, errors = self._opc.GetItemProperties(tag, len(property_id) - 1, property_id)
property_id.pop(0)
values = [str(v) if type(v) == pywintypes.TimeType else v for v in values]
try:
i = property_id.index(1)
values[i] = vt[values[i]]
except:
pass
try:
i = property_id.index(3)
values[i] = quality_str(values[i])
except:
pass
try:
i = property_id.index(5)
values[i] = ACCESS_RIGHTS[values[i]]
except:
pass
if id != None:
if single_property:
if single_tag:
tag_properties = values
else:
tag_properties = [
values]
else:
tag_properties = list(map((lambda x, y: (x, y)), property_id, values))
else:
tag_properties = list(map((lambda x, y, z: (x, y, z)), property_id, descriptions, values))
tag_properties.insert(0, (0, "Item ID (virtual property)", tag))
if include_name:
tag_properties.insert(0, (0, tag))
tag_properties = single_tag or [tuple([tag] + list(p)) for p in tag_properties]
for p in tag_properties:
yield p
except pythoncom.com_error as err:
try:
error_msg = "properties: %s" % self._get_error_str(err)
raise OPCError(error_msg)
finally:
err = None
del err
def properties(self, tags, id=None):
"""Return list of property tuples (id, name, value) for the specified tag(s) """
if type(tags) not in (list, tuple) and type(id) not in (type(None), list, tuple):
single = True
else:
single = False
props = self.iproperties(tags, id)
if single:
return list(props)[0]
return list(props)
def ilist(self, paths='*', recursive=False, flat=False, include_type=False):
"""Iterable version of list()"""
try:
self._update_tx_time()
pythoncom.CoInitialize()
try:
browser = self._opc.CreateBrowser()
except:
return
else:
paths, single, valid = type_check(paths)
if not valid:
raise TypeError("list(): 'paths' parameter must be a string or a list of strings")
if len(paths) == 0:
paths = [
"*"]
nodes = {}
for path in paths:
if flat:
browser.MoveToRoot()
browser.Filter = ""
browser.ShowLeafs(True)
pattern = re.compile("^%s$" % wild2regex(path), re.IGNORECASE)
matches = filter(pattern.search, browser)
if include_type:
matches = [(x, node_type) for x in matches]
for node in matches:
yield node
continue
queue = []
queue.append(path)
while len(queue) > 0:
tag = queue.pop(0)
browser.MoveToRoot()
browser.Filter = ""
pattern = None
path_str = "/"
path_list = tag.replace(".", "/").split("/")
path_list = [p for p in path_list if len(p) > 0]
found_filter = False
path_postfix = "/"
for i, p in enumerate(path_list):
if found_filter:
path_postfix += p + "/"
else:
if p.find("*") >= 0:
pattern = re.compile("^%s$" % wild2regex(p), re.IGNORECASE)
found_filter = True
browser.ShowBranches()
if len(browser) == 0:
browser.ShowLeafs(False)
lowest_level = True
node_type = "Leaf"
else:
lowest_level = False
node_type = "Branch"
matches = filter(pattern.search, browser)
if not lowest_level:
if recursive:
queue += [path_str + x + path_postfix for x in matches]
elif lowest_level:
matches = [exceptional(browser.GetItemID, x)(x) for x in matches]
if include_type:
matches = [(x, node_type) for x in matches]
for node in matches:
if node not in nodes:
yield node
nodes[node] = True
except pythoncom.com_error as err:
try:
error_msg = "list: %s" % self._get_error_str(err)
raise OPCError(error_msg)
finally:
err = None
del err
def list(self, paths='*', recursive=False, flat=False, include_type=False):
"""Return list of item nodes at specified path(s) (tree browser)"""
nodes = self.ilist(paths, recursive, flat, include_type)
return list(nodes)
def servers(self, opc_host='localhost'):
"""Return list of available OPC servers"""
try:
pythoncom.CoInitialize()
servers = self._opc.GetOPCServers(opc_host)
servers = [s for s in servers if s != None]
return servers
except pythoncom.com_error as err:
try:
error_msg = "servers: %s" % self._get_error_str(err)
raise OPCError(error_msg)
finally:
err = None
del err
def info(self):
"""Return list of (name, value) pairs about the OPC server"""
try:
self._update_tx_time()
pythoncom.CoInitialize()
info_list = []
if self._open_serv:
mode = "OpenOPC"
else:
mode = "DCOM"
info_list += [("Protocol", mode)]
if mode == "OpenOPC":
info_list += [("Gateway Host", "%s:%s" % (self._open_host, self._open_port))]
info_list += [("Gateway Version", "%s" % __version__)]
info_list += [("Class", self.opc_class)]
info_list += [("Client Name", self._opc.ClientName)]
info_list += [("OPC Host", self.opc_host)]
info_list += [("OPC Server", self._opc.ServerName)]
info_list += [("State", OPC_STATUS[self._opc.ServerState])]
info_list += [("Version", "%d.%d (Build %d)" % (self._opc.MajorVersion, self._opc.MinorVersion, self._opc.BuildNumber))]
try:
browser = self._opc.CreateBrowser()
browser_type = BROWSER_TYPE[browser.Organization]
except:
browser_type = "Not Supported"
info_list += [("Browser", browser_type)]
info_list += [("Start Time", str(self._opc.StartTime))]
info_list += [("Current Time", str(self._opc.CurrentTime))]
info_list += [("Vendor", self._opc.VendorInfo)]
return info_list
except pythoncom.com_error as err:
try:
error_msg = "info: %s" % self._get_error_str(err)
raise OPCError(error_msg)
finally:
err = None
del err
def ping(self):
"""Check if we are still talking to the OPC server"""
try:
opc_serv_time = int(float(self._opc.CurrentTime) * 1000000.0)
if opc_serv_time == self._prev_serv_time:
return False
self._prev_serv_time = opc_serv_time
return True
except pythoncom.com_error:
return False
def _get_error_str(self, err):
"""Return the error string for a OPC or COM error code"""
hr, msg, exc, arg = err.args
if exc == None:
error_str = str(msg)
else:
scode = exc[5]
try:
opc_err_str = unicode(self._opc.GetErrorString(scode)).strip("\r\n")
except:
opc_err_str = None
try:
com_err_str = unicode(pythoncom.GetScodeString(scode)).strip("\r\n")
except:
com_err_str = None
if opc_err_str == None and com_err_str == None:
error_str = str(scode)
else:
if opc_err_str == com_err_str:
error_str = opc_err_str
else:
if opc_err_str == None:
error_str = com_err_str
else:
if com_err_str == None:
error_str = opc_err_str
else:
error_str = "%s (%s)" % (opc_err_str, com_err_str)
return error_str
def _update_tx_time(self):
"""Update the session's last transaction time in the Gateway Service"""
if self._open_serv:
self._open_serv._tx_times[self._open_guid] = time.time()
def __getitem__(self, key):
"""Read single item (tag as dictionary key)"""
value, quality, time = self.read(key)
return value
def __setitem__(self, key, value):
"""Write single item (tag as dictionary key)"""
self.write((key, value))