# uncompyle6 version 3.9.2 # Python bytecode version base 3.7.0 (3394) # Decompiled from: Python 3.8.19 (default, Mar 20 2024, 15:27:52) # [Clang 14.0.6 ] # Embedded file name: /var/user/app/device_supervisorbak/device_supervisor/lib/OpenOPC.py # Compiled at: 2024-04-18 03:12:57 # Size of source mod 2**32: 42815 bytes import os, sys, time, types, string, socket, re, Pyro4.core from multiprocessing import Queue __version__ = "1.2.0" current_client = None if os.name == "nt": try: import win32com.client, win32com.server.util, win32event, pythoncom, pywintypes, SystemHealth pywintypes.datetime = pywintypes.TimeType vt = dict([(pythoncom.__dict__[vtype], vtype) for vtype in pythoncom.__dict__.keys() if vtype[None[:2]] == "VT"]) win32com.client.gencache.is_readonly = False win32com.client.gencache.Rebuild(verbose=0) except ImportError: win32com_found = False else: win32com_found = True else: win32com_found = False SOURCE_CACHE = 1 SOURCE_DEVICE = 2 OPC_STATUS = (0, 'Running', 'Failed', 'NoConfig', 'Suspended', 'Test') BROWSER_TYPE = (0, 'Hierarchical', 'Flat') ACCESS_RIGHTS = (0, 'Read', 'Write', 'Read/Write') OPC_QUALITY = ('Bad', 'Uncertain', 'Unknown', 'Good') OPC_CLASS = "Matrikon.OPC.Automation;Graybox.OPC.DAWrapper;HSCOPC.Automation;RSI.OPCAutomation;OPC.Automation" OPC_SERVER = "Hci.TPNServer;HwHsc.OPCServer;opc.deltav.1;AIM.OPC.1;Yokogawa.ExaopcDAEXQ.1;OSI.DA.1;OPC.PHDServerDA.1;Aspen.Infoplus21_DA.1;National Instruments.OPCLabVIEW;RSLinx OPC Server;KEPware.KEPServerEx.V4;Matrikon.OPC.Simulation;Prosys.OPC.Simulation;CCOPC.XMLWrapper.1;OPC.SimaticHMI.CoRtHmiRTm.1" OPC_CLIENT = "OpenOPC" def quality_str(quality_bits): """Convert OPC quality bits to a descriptive string""" quality = quality_bits >> 6 & 3 return OPC_QUALITY[quality] def type_check(tags): """Perform a type check on a list of tags""" if type(tags) in (list, tuple): single = False else: if tags == None: tags = [] single = False else: tags = [ tags] single = True if len([t for t in tags if type(t) not in (str, bytes)]) == 0: valid = True else: valid = False return (tags, single, valid) def wild2regex(string): """Convert a Unix wildcard glob into a regular expression""" return string.replace(".", "\\.").replace("*", ".*").replace("?", ".").replace("!", "^") def tags2trace(tags): """Convert a list tags into a formatted string suitable for the trace callback log""" arg_str = "" for i, t in enumerate(tags[1[:None]]): if i > 0: arg_str += "," arg_str += "%s" % t return arg_str def exceptional(func, alt_return=None, alt_exceptions=(Exception,), final=None, catch=None): """Turns exceptions into an alternative return value""" def _exceptional(*args, **kwargs): try: try: return func(*args, **kwargs) except alt_exceptions: return alt_return except: if catch: return catch(sys.exc_info(), (lambda: func(*args, **kwargs))) raise finally: if final: final() return _exceptional def get_sessions(host='localhost', port=7766): """Return sessions in OpenOPC Gateway Service as GUID:host hash""" import Pyro4.core server_obj = Pyro4.Proxy("PYRO:opc@{0}:{1}".format(host, port)) return server_obj.get_clients() def open_client(host='localhost', port=7766, debug_log=None): """Connect to the specified OpenOPC Gateway Service""" import Pyro4.core server_obj = Pyro4.Proxy("PYRO:opc@{0}:{1}".format(host, port)) server_obj.opcda_set_debug_log(debug_log) return server_obj.create_client() class TimeoutError(Exception): def __init__(self, txt): Exception.__init__(self, txt) class OPCError(Exception): def __init__(self, txt): Exception.__init__(self, txt) class GroupEvents: def __init__(self): global current_client self.client = current_client def OnDataChange(self, TransactionID, NumItems, ClientHandles, ItemValues, Qualities, TimeStamps): self.client.callback_queue.put((TransactionID, ClientHandles, ItemValues, Qualities, TimeStamps)) @Pyro4.expose class client: def __init__(self, opc_class=None, client_name=None): """Instantiate OPC automation class""" self.callback_queue = Queue() pythoncom.CoInitialize() if opc_class == None: if "OPC_CLASS" in os.environ: opc_class = os.environ["OPC_CLASS"] else: opc_class = OPC_CLASS opc_class_list = opc_class.split(";") for i, c in enumerate(opc_class_list): try: self._opc = win32com.client.gencache.EnsureDispatch(c, 0) self.opc_class = c break except pythoncom.com_error as err: try: if i == len(opc_class_list) - 1: error_msg = "Dispatch: %s" % self._get_error_str(err) raise OPCError(error_msg) finally: err = None del err self._event = win32event.CreateEvent(None, 0, 0, None) self.opc_server = None self.opc_host = None self.client_name = client_name self._groups = {} self._group_tags = {} self._group_valid_tags = {} self._group_server_handles = {} self._group_handles_tag = {} self._group_hooks = {} self._open_serv = None self._open_self = None self._open_host = None self._open_port = None self._open_guid = None self._prev_serv_time = None self._tx_id = 0 self.trace = None self.cpu = None def set_trace(self, trace): if self._open_serv == None: self.trace = trace def connect(self, opc_server=None, opc_host='localhost'): """Connect to the specified OPC server""" pythoncom.CoInitialize() if opc_server == None: if self.opc_server == None: if "OPC_SERVER" in os.environ: opc_server = os.environ["OPC_SERVER"] else: opc_server = OPC_SERVER else: opc_server = self.opc_server opc_host = self.opc_host opc_server_list = opc_server.split(";") connected = False for s in opc_server_list: try: if self.trace: self.trace("Connect(%s,%s)" % (s, opc_host)) self._opc.Connect(s, opc_host) except pythoncom.com_error as err: try: if len(opc_server_list) == 1: error_msg = "Connect: %s" % self._get_error_str(err) raise OPCError(error_msg) finally: err = None del err else: try: if self.client_name == None: if "OPC_CLIENT" in os.environ: self._opc.ClientName = os.environ["OPC_CLIENT"] else: self._opc.ClientName = OPC_CLIENT else: self._opc.ClientName = self.client_name except: pass connected = True break if not connected: raise OPCError("Connect: Cannot connect to any of the servers in the OPC_SERVER list") time.sleep(0.01) self.opc_server = opc_server if opc_host == "localhost": opc_host = socket.gethostname() self.opc_host = opc_host self._groups = {} self._group_tags = {} self._group_valid_tags = {} self._group_server_handles = {} self._group_handles_tag = {} self._group_hooks = {} def GUID(self): return self._open_guid def close(self, del_object=True): """Disconnect from the currently connected OPC server""" try: try: pythoncom.CoInitialize() self.remove(self.groups()) except pythoncom.com_error as err: try: error_msg = "Disconnect: %s" % self._get_error_str(err) raise OPCError(error_msg) finally: err = None del err except OPCError: pass finally: if self.trace: self.trace("Disconnect()") self._opc.Disconnect() if self._open_serv: if del_object: self._open_serv.release_client(self._open_self) def iread(self, tags=None, group=None, size=None, pause=0, source='hybrid', update=-1, timeout=5000, sync=False, include_error=False, rebuild=False): """Iterable version of read()""" global current_client def add_items(tags): names = list(tags) names.insert(0, 0) errors = [] if self.trace: self.trace("Validate(%s)" % tags2trace(names)) else: try: errors = opc_items.Validate(len(names) - 1, names) except: pass valid_tags = [] valid_values = [] client_handles = [] if sub_group not in self._group_handles_tag: self._group_handles_tag[sub_group] = {} n = 0 else: if len(self._group_handles_tag[sub_group]) > 0: n = max(self._group_handles_tag[sub_group]) + 1 else: n = 0 for i, tag in enumerate(tags): if errors[i] == 0: valid_tags.append(tag) client_handles.append(n) self._group_handles_tag[sub_group][n] = tag n += 1 else: if include_error: error_msgs[tag] = self._opc.GetErrorString(errors[i]) if self.trace and errors[i] != 0: self.trace("%s failed validation" % tag) client_handles.insert(0, 0) valid_tags.insert(0, 0) server_handles = [] errors = [] if self.trace: self.trace("AddItems(%s)" % tags2trace(valid_tags)) try: server_handles, errors = opc_items.AddItems(len(client_handles) - 1, valid_tags, client_handles) except: pass valid_tags_tmp = [] server_handles_tmp = [] valid_tags.pop(0) if sub_group not in self._group_server_handles: self._group_server_handles[sub_group] = {} for i, tag in enumerate(valid_tags): if errors[i] == 0: valid_tags_tmp.append(tag) server_handles_tmp.append(server_handles[i]) self._group_server_handles[sub_group][tag] = server_handles[i] valid_tags = valid_tags_tmp server_handles = server_handles_tmp return ( valid_tags, server_handles) def remove_items(tags): if self.trace: self.trace("RemoveItems(%s)" % tags2trace([""] + tags)) server_handles = [self._group_server_handles[sub_group][tag] for tag in tags] server_handles.insert(0, 0) errors = [] try: errors = opc_items.Remove(len(server_handles) - 1, server_handles) except pythoncom.com_error as err: try: error_msg = "RemoveItems: %s" % self._get_error_str(err) raise OPCError(error_msg) finally: err = None del err try: self._update_tx_time() pythoncom.CoInitialize() if include_error: sync = True if sync: update = -1 tags, single, valid = type_check(tags) if not valid: raise TypeError("iread(): 'tags' parameter must be a string or a list of strings") elif group in self._groups: num_groups = rebuild or self._groups[group] data_source = SOURCE_CACHE else: if size: tag_groups = [tags[i[:i + size]] for i in range(0, len(tags), size)] else: tag_groups = [ tags] num_groups = len(tag_groups) data_source = SOURCE_DEVICE results = [] for gid in range(num_groups): if gid > 0: if pause > 0: time.sleep(pause / 1000.0) else: error_msgs = {} opc_groups = self._opc.OPCGroups opc_groups.DefaultGroupUpdateRate = update if group == None: try: if self.trace: self.trace("AddGroup()") opc_group = opc_groups.Add() except pythoncom.com_error as err: try: error_msg = "AddGroup: %s" % self._get_error_str(err) raise OPCError(error_msg) finally: err = None del err sub_group = group new_group = True else: sub_group = "%s.%d" % (group, gid) try: if self.trace: self.trace("GetOPCGroup(%s)" % sub_group) opc_group = opc_groups.GetOPCGroup(sub_group) new_group = False except: try: if self.trace: self.trace("AddGroup(%s)" % sub_group) opc_group = opc_groups.Add(sub_group) except pythoncom.com_error as err: try: error_msg = "AddGroup: %s" % self._get_error_str(err) raise OPCError(error_msg) finally: err = None del err self._groups[str(group)] = len(tag_groups) new_group = True opc_items = opc_group.OPCItems if new_group: opc_group.IsSubscribed = 1 opc_group.IsActive = 1 if not sync: if self.trace: self.trace("WithEvents(%s)" % opc_group.Name) current_client = self self._group_hooks[opc_group.Name] = win32com.client.WithEvents(opc_group, GroupEvents) tags = tag_groups[gid] valid_tags, server_handles = add_items(tags) self._group_tags[sub_group] = tags self._group_valid_tags[sub_group] = valid_tags else: if rebuild: tags = tag_groups[gid] valid_tags = self._group_valid_tags[sub_group] add_tags = [t for t in tags if t not in valid_tags] del_tags = [t for t in valid_tags if t not in tags] if len(add_tags) > 0: valid_tags, server_handles = add_items(add_tags) valid_tags = self._group_valid_tags[sub_group] + valid_tags if len(del_tags) > 0: remove_items(del_tags) valid_tags = [t for t in valid_tags if t not in del_tags] self._group_tags[sub_group] = tags self._group_valid_tags[sub_group] = valid_tags if source == "hybrid": data_source = SOURCE_DEVICE else: tags = self._group_tags[sub_group] valid_tags = self._group_valid_tags[sub_group] if sync: server_handles = [item.ServerHandle for item in opc_items] tag_value = {} tag_quality = {} tag_time = {} tag_error = {} if sync: values = [] errors = [] qualities = [] timestamps = [] if len(valid_tags) > 0: server_handles.insert(0, 0) if source != "hybrid": data_source = SOURCE_CACHE if source == "cache" else SOURCE_DEVICE if self.trace: self.trace("SyncRead(%s)" % data_source) try: values, errors, qualities, timestamps = opc_group.SyncRead(data_source, len(server_handles) - 1, server_handles) except pythoncom.com_error as err: try: error_msg = "SyncRead: %s" % self._get_error_str(err) raise OPCError(error_msg) finally: err = None del err for i, tag in enumerate(valid_tags): tag_value[tag] = values[i] tag_quality[tag] = qualities[i] tag_time[tag] = timestamps[i] tag_error[tag] = errors[i] elif len(valid_tags) > 0: if self._tx_id >= 65535: self._tx_id = 0 else: self._tx_id += 1 if source != "hybrid": data_source = SOURCE_CACHE if source == "cache" else SOURCE_DEVICE if self.trace: self.trace("AsyncRefresh(%s)" % data_source) try: opc_group.AsyncRefresh(data_source, self._tx_id) except pythoncom.com_error as err: try: error_msg = "AsyncRefresh: %s" % self._get_error_str(err) raise OPCError(error_msg) finally: err = None del err tx_id = 0 start = time.time() * 1000 while tx_id != self._tx_id: now = time.time() * 1000 if now - start > timeout: raise TimeoutError("Callback: Timeout waiting for data") if self.callback_queue.empty(): pythoncom.PumpWaitingMessages() else: tx_id, handles, values, qualities, timestamps = self.callback_queue.get() for i, h in enumerate(handles): tag = self._group_handles_tag[sub_group][h] tag_value[tag] = values[i] tag_quality[tag] = qualities[i] tag_time[tag] = timestamps[i] for tag in tags: if tag in tag_value and not sync: if not len(valid_tags) > 0 or sync or tag_error[tag] == 0: value = tag_value[tag] if type(value) == pywintypes.TimeType: value = str(value) quality = quality_str(tag_quality[tag]) timestamp = str(tag_time[tag]) else: value = None quality = "Error" timestamp = None if include_error: error_msgs[tag] = self._opc.GetErrorString(tag_error[tag]).strip("\r\n") else: value = None quality = "Error" timestamp = None if include_error: if tag not in error_msgs: error_msgs[tag] = "" if single: if include_error: yield ( value, quality, timestamp, error_msgs[tag]) else: yield ( value, quality, timestamp) else: if include_error: yield ( tag, value, quality, timestamp, error_msgs[tag]) yield ( tag, value, quality, timestamp) if group == None: try: if not sync: if opc_group.Name in self._group_hooks: if self.trace: self.trace("CloseEvents(%s)" % opc_group.Name) self._group_hooks[opc_group.Name].close() if self.trace: self.trace("RemoveGroup(%s)" % opc_group.Name) opc_groups.Remove(opc_group.Name) except pythoncom.com_error as err: try: error_msg = "RemoveGroup: %s" % self._get_error_str(err) raise OPCError(error_msg) finally: err = None del err except pythoncom.com_error as err: try: error_msg = "read: %s" % self._get_error_str(err) raise OPCError(error_msg) finally: err = None del err def read(self, tags=None, group=None, size=None, pause=0, source='hybrid', update=-1, timeout=5000, sync=False, include_error=False, rebuild=False): """Return list of (value, quality, time) tuples for the specified tag(s)""" tags_list, single, valid = type_check(tags) if not valid: raise TypeError("read(): 'tags' parameter must be a string or a list of strings") else: num_health_tags = len([t for t in tags_list if t[None[:1]] == "@"]) num_opc_tags = len([t for t in tags_list if t[None[:1]] != "@"]) if num_health_tags > 0: if num_opc_tags > 0: raise TypeError("read(): system health and OPC tags cannot be included in the same group") results = self._read_health(tags) else: results = self.iread(tags, group, size, pause, source, update, timeout, sync, include_error, rebuild) if single: return list(results)[0] return list(results) def _read_health(self, tags): """Return values of special system health monitoring tags""" self._update_tx_time() tags, single, valid = type_check(tags) time_str = time.strftime("%x %H:%M:%S") results = [] for t in tags: if t == "@MemFree": value = SystemHealth.mem_free() else: if t == "@MemUsed": value = SystemHealth.mem_used() else: if t == "@MemTotal": value = SystemHealth.mem_total() else: if t == "@MemPercent": value = SystemHealth.mem_percent() else: if t == "@DiskFree": value = SystemHealth.disk_free() else: if t == "@SineWave": value = SystemHealth.sine_wave() else: if t == "@SawWave": value = SystemHealth.saw_wave() else: if t == "@CpuUsage": if self.cpu == None: self.cpu = SystemHealth.CPU() time.sleep(0.1) value = self.cpu.get_usage() else: value = None m = re.match("@TaskMem\\((.*?)\\)", t) if m: image_name = m.group(1) value = SystemHealth.task_mem(image_name) else: m = re.match("@TaskCpu\\((.*?)\\)", t) if m: image_name = m.group(1) value = SystemHealth.task_cpu(image_name) else: m = re.match("@TaskExists\\((.*?)\\)", t) if m: image_name = m.group(1) value = SystemHealth.task_exists(image_name) if value == None: quality = "Error" else: quality = "Good" if single: results.append((value, quality, time_str)) results.append((t, value, quality, time_str)) return results def iwrite(self, tag_value_pairs, size=None, pause=0, include_error=False): """Iterable version of write()""" try: self._update_tx_time() pythoncom.CoInitialize() def _valid_pair(p): if type(p) in (list, tuple): if len(p) >= 2: if type(p[0]) in (str, bytes): return True return False if type(tag_value_pairs) not in (list, tuple): raise TypeError("write(): 'tag_value_pairs' parameter must be a (tag, value) tuple or a list of (tag,value) tuples") else: if tag_value_pairs == None: tag_value_pairs = [ ""] single = False else: if type(tag_value_pairs[0]) in (str, bytes): tag_value_pairs = [ tag_value_pairs] single = True else: single = False invalid_pairs = [p for p in tag_value_pairs if not _valid_pair(p)] if len(invalid_pairs) > 0: raise TypeError("write(): 'tag_value_pairs' parameter must be a (tag, value) tuple or a list of (tag,value) tuples") names = [tag[0] for tag in tag_value_pairs] tags = [tag[0] for tag in tag_value_pairs] values = [tag[1] for tag in tag_value_pairs] if size: name_groups = [names[i[:i + size]] for i in range(0, len(names), size)] tag_groups = [tags[i[:i + size]] for i in range(0, len(tags), size)] value_groups = [values[i[:i + size]] for i in range(0, len(values), size)] else: name_groups = [ names] tag_groups = [ tags] value_groups = [values] num_groups = len(tag_groups) status = [] for gid in range(num_groups): if gid > 0: if pause > 0: time.sleep(pause / 1000.0) opc_groups = self._opc.OPCGroups opc_group = opc_groups.Add() opc_items = opc_group.OPCItems names = name_groups[gid] tags = tag_groups[gid] values = value_groups[gid] names.insert(0, 0) errors = [] try: errors = opc_items.Validate(len(names) - 1, names) except: pass n = 1 valid_tags = [] valid_values = [] client_handles = [] error_msgs = {} for i, tag in enumerate(tags): if errors[i] == 0: valid_tags.append(tag) valid_values.append(values[i]) client_handles.append(n) error_msgs[tag] = "" n += 1 client_handles.insert(0, 0) valid_tags.insert(0, 0) server_handles = [] errors = [] try: server_handles, errors = opc_items.AddItems(len(client_handles) - 1, valid_tags, client_handles) except: pass valid_tags_tmp = [] valid_values_tmp = [] server_handles_tmp = [] valid_tags.pop(0) for i, tag in enumerate(valid_tags): if errors[i] == 0: valid_tags_tmp.append(tag) valid_values_tmp.append(valid_values[i]) server_handles_tmp.append(server_handles[i]) error_msgs[tag] = "" valid_tags = valid_tags_tmp valid_values = valid_values_tmp server_handles = server_handles_tmp server_handles.insert(0, 0) valid_values.insert(0, 0) errors = [] if len(valid_values) > 1: try: errors = opc_group.SyncWrite(len(server_handles) - 1, server_handles, valid_values) except: pass n = 0 for tag in tags: if tag in valid_tags: if errors[n] == 0: status = "Success" else: status = "Error" if include_error: error_msgs[tag] = self._opc.GetErrorString(errors[n]) n += 1 else: status = "Error" if include_error: error_msgs[tag] = error_msgs[tag].strip("\r\n") if single: if include_error: yield ( status, error_msgs[tag]) else: yield status elif include_error: yield ( tag, status, error_msgs[tag]) else: yield ( tag, status) opc_groups.Remove(opc_group.Name) except pythoncom.com_error as err: try: error_msg = "write: %s" % self._get_error_str(err) raise OPCError(error_msg) finally: err = None del err def write(self, tag_value_pairs, size=None, pause=0, include_error=False): """Write list of (tag, value) pair(s) to the server""" if type(tag_value_pairs) in (list, tuple) and type(tag_value_pairs[0]) in (list, tuple): single = False else: single = True status = self.iwrite(tag_value_pairs, size, pause, include_error) if single: return list(status)[0] return list(status) def groups(self): """Return a list of active tag groups""" return self._groups.keys() def remove(self, groups): """Remove the specified tag group(s)""" try: pythoncom.CoInitialize() opc_groups = self._opc.OPCGroups if type(groups) in (str, bytes): groups = [ groups] single = True else: single = False status = [] for group in groups: if group in self._groups: for i in range(self._groups[group]): sub_group = "%s.%d" % (group, i) if sub_group in self._group_hooks: if self.trace: self.trace("CloseEvents(%s)" % sub_group) self._group_hooks[sub_group].close() try: if self.trace: self.trace("RemoveGroup(%s)" % sub_group) errors = opc_groups.Remove(sub_group) except pythoncom.com_error as err: try: error_msg = "RemoveGroup: %s" % self._get_error_str(err) raise OPCError(error_msg) finally: err = None del err del self._group_tags[sub_group] del self._group_valid_tags[sub_group] del self._group_handles_tag[sub_group] del self._group_server_handles[sub_group] del self._groups[group] except pythoncom.com_error as err: try: error_msg = "remove: %s" % self._get_error_str(err) raise OPCError(error_msg) finally: err = None del err def iproperties(self, tags, id=None): """Iterable version of properties()""" try: self._update_tx_time() pythoncom.CoInitialize() tags, single_tag, valid = type_check(tags) if not valid: raise TypeError("properties(): 'tags' parameter must be a string or a list of strings") try: id.remove(0) include_name = True except: include_name = False if id != None: descriptions = [] if isinstance(id, list) or isinstance(id, tuple): property_id = list(id) single_property = False else: property_id = [ id] single_property = True for i in property_id: descriptions.append("Property id %d" % i) else: single_property = False properties = [] for tag in tags: if id == None: descriptions = [] property_id = [] count, property_id, descriptions, datatypes = self._opc.QueryAvailableProperties(tag) tag_properties = list(map((lambda x, y: (x, y)), property_id, descriptions)) property_id = [p for p, d in tag_properties if p > 0] descriptions = [d for p, d in tag_properties if p > 0] else: property_id.insert(0, 0) values = [] errors = [] values, errors = self._opc.GetItemProperties(tag, len(property_id) - 1, property_id) property_id.pop(0) values = [str(v) if type(v) == pywintypes.TimeType else v for v in values] try: i = property_id.index(1) values[i] = vt[values[i]] except: pass try: i = property_id.index(3) values[i] = quality_str(values[i]) except: pass try: i = property_id.index(5) values[i] = ACCESS_RIGHTS[values[i]] except: pass if id != None: if single_property: if single_tag: tag_properties = values else: tag_properties = [ values] else: tag_properties = list(map((lambda x, y: (x, y)), property_id, values)) else: tag_properties = list(map((lambda x, y, z: (x, y, z)), property_id, descriptions, values)) tag_properties.insert(0, (0, "Item ID (virtual property)", tag)) if include_name: tag_properties.insert(0, (0, tag)) tag_properties = single_tag or [tuple([tag] + list(p)) for p in tag_properties] for p in tag_properties: yield p except pythoncom.com_error as err: try: error_msg = "properties: %s" % self._get_error_str(err) raise OPCError(error_msg) finally: err = None del err def properties(self, tags, id=None): """Return list of property tuples (id, name, value) for the specified tag(s) """ if type(tags) not in (list, tuple) and type(id) not in (type(None), list, tuple): single = True else: single = False props = self.iproperties(tags, id) if single: return list(props)[0] return list(props) def ilist(self, paths='*', recursive=False, flat=False, include_type=False): """Iterable version of list()""" try: self._update_tx_time() pythoncom.CoInitialize() try: browser = self._opc.CreateBrowser() except: return else: paths, single, valid = type_check(paths) if not valid: raise TypeError("list(): 'paths' parameter must be a string or a list of strings") if len(paths) == 0: paths = [ "*"] nodes = {} for path in paths: if flat: browser.MoveToRoot() browser.Filter = "" browser.ShowLeafs(True) pattern = re.compile("^%s$" % wild2regex(path), re.IGNORECASE) matches = filter(pattern.search, browser) if include_type: matches = [(x, node_type) for x in matches] for node in matches: yield node continue queue = [] queue.append(path) while len(queue) > 0: tag = queue.pop(0) browser.MoveToRoot() browser.Filter = "" pattern = None path_str = "/" path_list = tag.replace(".", "/").split("/") path_list = [p for p in path_list if len(p) > 0] found_filter = False path_postfix = "/" for i, p in enumerate(path_list): if found_filter: path_postfix += p + "/" else: if p.find("*") >= 0: pattern = re.compile("^%s$" % wild2regex(p), re.IGNORECASE) found_filter = True browser.ShowBranches() if len(browser) == 0: browser.ShowLeafs(False) lowest_level = True node_type = "Leaf" else: lowest_level = False node_type = "Branch" matches = filter(pattern.search, browser) if not lowest_level: if recursive: queue += [path_str + x + path_postfix for x in matches] elif lowest_level: matches = [exceptional(browser.GetItemID, x)(x) for x in matches] if include_type: matches = [(x, node_type) for x in matches] for node in matches: if node not in nodes: yield node nodes[node] = True except pythoncom.com_error as err: try: error_msg = "list: %s" % self._get_error_str(err) raise OPCError(error_msg) finally: err = None del err def list(self, paths='*', recursive=False, flat=False, include_type=False): """Return list of item nodes at specified path(s) (tree browser)""" nodes = self.ilist(paths, recursive, flat, include_type) return list(nodes) def servers(self, opc_host='localhost'): """Return list of available OPC servers""" try: pythoncom.CoInitialize() servers = self._opc.GetOPCServers(opc_host) servers = [s for s in servers if s != None] return servers except pythoncom.com_error as err: try: error_msg = "servers: %s" % self._get_error_str(err) raise OPCError(error_msg) finally: err = None del err def info(self): """Return list of (name, value) pairs about the OPC server""" try: self._update_tx_time() pythoncom.CoInitialize() info_list = [] if self._open_serv: mode = "OpenOPC" else: mode = "DCOM" info_list += [("Protocol", mode)] if mode == "OpenOPC": info_list += [("Gateway Host", "%s:%s" % (self._open_host, self._open_port))] info_list += [("Gateway Version", "%s" % __version__)] info_list += [("Class", self.opc_class)] info_list += [("Client Name", self._opc.ClientName)] info_list += [("OPC Host", self.opc_host)] info_list += [("OPC Server", self._opc.ServerName)] info_list += [("State", OPC_STATUS[self._opc.ServerState])] info_list += [("Version", "%d.%d (Build %d)" % (self._opc.MajorVersion, self._opc.MinorVersion, self._opc.BuildNumber))] try: browser = self._opc.CreateBrowser() browser_type = BROWSER_TYPE[browser.Organization] except: browser_type = "Not Supported" info_list += [("Browser", browser_type)] info_list += [("Start Time", str(self._opc.StartTime))] info_list += [("Current Time", str(self._opc.CurrentTime))] info_list += [("Vendor", self._opc.VendorInfo)] return info_list except pythoncom.com_error as err: try: error_msg = "info: %s" % self._get_error_str(err) raise OPCError(error_msg) finally: err = None del err def ping(self): """Check if we are still talking to the OPC server""" try: opc_serv_time = int(float(self._opc.CurrentTime) * 1000000.0) if opc_serv_time == self._prev_serv_time: return False self._prev_serv_time = opc_serv_time return True except pythoncom.com_error: return False def _get_error_str(self, err): """Return the error string for a OPC or COM error code""" hr, msg, exc, arg = err.args if exc == None: error_str = str(msg) else: scode = exc[5] try: opc_err_str = unicode(self._opc.GetErrorString(scode)).strip("\r\n") except: opc_err_str = None try: com_err_str = unicode(pythoncom.GetScodeString(scode)).strip("\r\n") except: com_err_str = None if opc_err_str == None and com_err_str == None: error_str = str(scode) else: if opc_err_str == com_err_str: error_str = opc_err_str else: if opc_err_str == None: error_str = com_err_str else: if com_err_str == None: error_str = opc_err_str else: error_str = "%s (%s)" % (opc_err_str, com_err_str) return error_str def _update_tx_time(self): """Update the session's last transaction time in the Gateway Service""" if self._open_serv: self._open_serv._tx_times[self._open_guid] = time.time() def __getitem__(self, key): """Read single item (tag as dictionary key)""" value, quality, time = self.read(key) return value def __setitem__(self, key, value): """Write single item (tag as dictionary key)""" self.write((key, value))