From b78b6a17534bbb3b48836d4ccd194b9c9ffe0a48 Mon Sep 17 00:00:00 2001 From: imcovangent <I.vanGent@tudelft.nl> Date: Sun, 3 Dec 2017 18:53:45 +0100 Subject: [PATCH] Further developments on knowledge base deprecation work. Former-commit-id: 47303a7309eedb29ce8fa39d9758441f23ccab8a --- examples/scripts/tu_delft_wing_design.txt | 2 +- kadmos/cmdows/cmdows.py | 454 ++++++++----- kadmos/graph/graph_kadmos.py | 2 +- kadmos/knowledgebase/initiation.py | 752 ---------------------- kadmos/utilities/general.py | 29 + kadmos/utilities/xml.py | 11 +- 6 files changed, 344 insertions(+), 906 deletions(-) delete mode 100644 kadmos/knowledgebase/initiation.py diff --git a/examples/scripts/tu_delft_wing_design.txt b/examples/scripts/tu_delft_wing_design.txt index 7b3200f52..155ffbd99 100644 --- a/examples/scripts/tu_delft_wing_design.txt +++ b/examples/scripts/tu_delft_wing_design.txt @@ -1 +1 @@ -The wing design case descirbed here was developed at TU Delft and takes a variety of different diciplines into account. \ No newline at end of file +The wing design case described here was developed at TU Delft and takes a variety of different disciplines into account. \ No newline at end of file diff --git a/kadmos/cmdows/cmdows.py b/kadmos/cmdows/cmdows.py index 723756081..ee792afa3 100644 --- a/kadmos/cmdows/cmdows.py +++ b/kadmos/cmdows/cmdows.py @@ -7,8 +7,12 @@ from lxml.etree import ElementTree from collections import Counter, OrderedDict # Settings for the logger +from kadmos.utilities.general import dict_to_ord_dict, assert_dict_keys from kadmos.utilities.strings import find_between, find_until -from kadmos.utilities.xml import get_uid_search_xpath, Element +from kadmos.utilities.xml import get_uid_search_xpath, ExtendedElement + +# Settings for the parser +parser = etree.XMLParser(remove_blank_text=True) logger = logging.getLogger(__name__) @@ -16,13 +20,21 @@ logger = logging.getLogger(__name__) class CMDOWS(object): """Class for with various methods for checking and manipulating CMDOWS files""" + # ----------------------------------------- # + # Initialization and check functions # + # ----------------------------------------- # def __init__(self, file_path=None, element=None): + self.SINGLE_MULTI_OPTIONS = ['single', 'multi'] + self.VERIFICATION_ELEMENTS = ['method', 'verifier', 'result', 'date', 'version'] + self.MODEL_DEFINITION_ELEMENTS = ['reference_data_set', 'analysis_method', 'fitting_method'] + self.DATA_EXCHANGE_SETTINGS_ELEMENTS = ['dataserver', 'urlsite', 'web_authentication_protocol', 'context', + 'folder', 'polling_time', 'max_iterations', 'time_out', + 'shared_file_policy', 'servermutex'] if file_path: self.file = file_path if element is not None: self.root = ElementTree(element).getroot() if file_path and element is None: - parser = etree.XMLParser(remove_blank_text=True) self.root = ElementTree(file=file_path, parser=parser).getroot() def version(self): @@ -101,7 +113,7 @@ class CMDOWS(object): return result def check_uids(self): - """Method so check if all uIDs are actually unique in a CMDOWS file""" + """Method to check if all uIDs are actually unique in a CMDOWS file""" ids = [element.attrib['uID'] for element in self.root.xpath('.//*[@uID]')] result = (len(ids) == len(set(ids))) if not result: @@ -119,6 +131,140 @@ class CMDOWS(object): logger.warning('The following uIDs do not exist although they are referred to: ' + ', '.join(invalids)) return result + def assert_element_tag(self, el, expected_tag): + """Method to assert that the tag of an element is as expected.""" + if expected_tag: + assert el.tag == expected_tag, 'Element should have tag {}, but has tag: {}.'.format(expected_tag, el.tag) + + # ----------------------------------------- # + # Get functions # + # ----------------------------------------- # + def get_inputs_uids(self, exblock_uid): + """Method to collect the inputs of a CMDOWS file executableBlock entry""" + assert self.get_element_of_uid(exblock_uid).getparent().getparent().tag == 'executableBlocks', \ + 'UID ' + exblock_uid + ' does not seem to refer to an executableBlock.' + xpath = self.get_xpath_of_uid(exblock_uid) + return self.root.xpath(xpath + '/inputs/input/parameterUID/text()') + + def get_outputs_uids(self, exblock_uid): + """Method to collect the outputs of a CMDOWS file executableBlock entry""" + assert self.get_element_of_uid(exblock_uid).getparent().getparent().tag == 'executableBlocks', \ + 'UID ' + exblock_uid + ' does not seem to refer to an executableBlock.' + xpath = self.get_xpath_of_uid(exblock_uid) + return self.root.xpath(xpath + '/outputs/output/parameterUID/text()') + + def get_element_of_uid(self, uid, expected_tag=None): + """Method to get the element based on a UID value.""" + xpath_expression = get_uid_search_xpath(uid) + els = self.root.xpath(xpath_expression) + if len(els) > 1: + raise AssertionError('Multiple elements with UID ' + uid + ' found. Use "check_uids()" to check if all UIDs' + ' are unique.') + elif len(els) == 0: + raise AssertionError('Could not find element with UID ' + uid + '.') + self.assert_element_tag(els[0], expected_tag) + return els[0] + + def get_xpath_of_uid(self, uid, expected_tag=None): + """Method to get the xpath based on a UID value.""" + el = self.get_element_of_uid(uid, expected_tag=None) + return el.getroottree().getpath(el) + + def get_executable_blocks_uids(self): + """Method to get a list of all the executable block UIDs present in the file.""" + uid_list = [] + # First collect the executable blocks from the main element + el = self.root.xpath('/cmdows/executableBlocks') + assert len(el) == 1, '"/cmdows/executableBlocks" is not a unique XPath. Check given CMDOWS file structure.' + for exblock_types in el[0].iterchildren(): + for blocks in exblock_types.iterchildren(): + try: + uid_list.append(blocks.attrib['uID']) + except: + raise AttributeError('Could not find the uID attribute for this element: {}.'.format(blocks)) + # Then collect the executable blocks from the architecture elements + el = self.root.xpath('/cmdows/architectureElements/executableBlocks') + if el: + assert len(el) == 1, '"/cmdows/architectureElements/executableBlocks" is not a unique XPath. ' \ + 'Check given CMDOWS file structure.' + for arblock_type in el[0].iterchildren(): + for arblock in arblock_type.iterchildren(): + try: + uid_list.append(arblock.attrib['uID']) + except: + if arblock_type.tag in ['coordinators', 'optimizers', 'convergers', 'consistencyConstraintFunctions']: + raise AttributeError('Could not find the uID attribute for this element: {}.'.format(arblock)) + return uid_list + + def get_parameters_uids(self): + """Method to get a list of all the parameter UIDs present in the file.""" + uid_list = [] + # First collect the parameters from the main element + el = self.root.xpath('/cmdows/parameters') + assert len(el) == 1, '"/cmdows/parameters" is not a unique XPath. Check given CMDOWS file structure.' + for param in el[0].iterchildren(): + try: + uid_list.append(param.attrib['uID']) + except: + raise AttributeError('Could not find the uID attribute for this element: {}.'.format(param)) + # Then collect the parameters from the architecture elements + el = self.root.xpath('/cmdows/architectureElements/parameters') + if el: + assert len(el) == 1, '"/cmdows/architectureElements/parameters" is not a unique XPath. ' \ + 'Check given CMDOWS file structure.' + for param_type in el[0].iterchildren(): + for param in param_type.iterchildren(): + try: + uid_list.append(param.attrib['uID']) + except: + raise AttributeError('Could not find the uID attribute for this element: {}.'.format(param)) + return uid_list + + def get_design_competences_uids(self): + """Method to get a list of all the design competences UIDs present in the file.""" + uid_list = [] + el = self.root.xpath('/cmdows/executableBlocks/designCompetences') + assert len(el) <= 1, '"/cmdows/executableBlocks/designCompetences" is not a unique XPath. ' \ + 'Check given CMDOWS file structure.' + if len(el) == 1: + for dc in el[0].iterchildren(): + try: + uid_list.append(dc.attrib['uID']) + except: + raise AttributeError('Could not find the uID attribute for this DC element: {}.'.format(dc)) + return uid_list + + def get_mathematical_functions_uids(self): + """Method to get a list of all the mathematical functions UIDs present in the file.""" + uid_list = [] + el = self.root.xpath('/cmdows/executableBlocks/mathematicalFunctions') + assert len(el) <= 1, '"/cmdows/executableBlocks/mathematicalFunctions" is not a unique XPath. ' \ + 'Check given CMDOWS file structure.' + if len(el) == 1: + for mf in el[0].iterchildren(): + try: + uid_list.append(mf.attrib['uID']) + except: + raise AttributeError('Could not find the uID attribute for this MF element: {}.'.format(mf)) + return uid_list + + def get_used_parameter_uids(self): + """Method to get a list of all the parameter UIDs used in the file.""" + uid_list = [] + el = self.root.xpath('/cmdows/executableBlocks/mathematicalFunctions') + assert len(el) <= 1, '"/cmdows/executableBlocks/mathematicalFunctions" is not a unique XPath. ' \ + 'Check given CMDOWS file structure.' + if len(el) == 1: + for mf in el[0].iterchildren(): + try: + uid_list.append(mf.attrib['uID']) + except: + raise AttributeError('Could not find the uID attribute for this MF element: {}.'.format(mf)) + return uid_list + + # ----------------------------------------- # + # Add / change file functions # + # ----------------------------------------- # def add_contact(self, name, email, uid, company=None, department=None, function=None, address=None, telephone=None, country=None, roles=None): """Method to add a contact element to the organization branch.""" @@ -134,14 +280,15 @@ class CMDOWS(object): # Add the contact details contact_element = Element('contact', uID=uid) - contact_element._add_element('name', name) - contact_element._add_element('email', email) - contact_element._add_element('company', company) if company else None - contact_element._add_element('department', department) if department else None - contact_element._add_element('function', function) if function else None - contact_element._add_element('address', address) if address else None - contact_element._add_element('telephone', telephone) if telephone else None - contact_element._add_element('country', country) if country else None + contact_element.add('name', name) + contact_element.add('email', email) + contact_element.add('company', company, only_add_if_valued=True) + contact_element.add('department', department, only_add_if_valued=True) + contact_element.add('function', function, only_add_if_valued=True) + contact_element.add('address', address, only_add_if_valued=True) + contact_element.add('telephone', telephone, only_add_if_valued=True) + contact_element.add('country', country, only_add_if_valued=True) + parent_element.append(contact_element) if roles: @@ -153,6 +300,132 @@ class CMDOWS(object): raise IOError('Invalid type for roles provided: {}.'.format(type(roles))) return + def add_dc_general_info(self, dc_uid, description, status=None, creation_date=None, owner_uid=None, creator_uid=None, + operator_uid=None, model_definition=None): + """Method to add a general info element to a dc branch.""" + + # Assert that there is a DC element + xpath = self.get_xpath_of_uid(dc_uid, expected_tag='designCompetence') + + # Assert that the contact_uids exist + self.get_element_of_uid(owner_uid) if owner_uid else None + self.get_element_of_uid(creator_uid) if creator_uid else None + self.get_element_of_uid(operator_uid) if operator_uid else None + + # Assert that the model definition is a dict with the right elements + if model_definition: + assert_dict_keys(model_definition, self.MODEL_DEFINITION_ELEMENTS, all_keys_required=False) + model_definition = dict_to_ord_dict(model_definition, self.MODEL_DEFINITION_ELEMENTS) + + # Assert that there is no existing general info element or else remove it + self.remove_element_based_on_xpath(xpath + '/metadata/generalInfo') + + # Ensure element to metadata parent + parent_element = self.ensure_abs_xpath(xpath + '/metadata') + + # Add the contact details + general_info_element = Element('generalInfo') + general_info_element.add('description', description) + general_info_element.add('status', status, only_add_if_valued=True) + general_info_element.add('creation_date', creation_date, only_add_if_valued=True, camel_case_conversion=True) + general_info_element.add('owner', {'contactUID': owner_uid}) if owner_uid else None + general_info_element.add('creator', {'contactUID': creator_uid}, only_add_if_valued=True) if creator_uid else None + general_info_element.add('operator', {'contactUID': operator_uid}, only_add_if_valued=True) if operator_uid else None + general_info_element.add('model_definition', model_definition, camel_case_conversion=True, + only_add_if_valued=True) + parent_element.append(general_info_element) + return + + def add_dc_performance_info(self, dc_uid, precision=None, fidelity_level=None, run_time=None, verification=None): + """Method to add a performance info element to a DC branch.""" + + # Assert that there is a DC element + xpath = self.get_xpath_of_uid(dc_uid, expected_tag='designCompetence') + + # Assert that there is at least one element with a value + assert sum(x is not None for x in [precision, fidelity_level, run_time, verification]) > 0, \ + 'At least one element must have a value.' + + # Assert that the verification is a dict with the right elements + if verification: + assert_dict_keys(verification, self.VERIFICATION_ELEMENTS, all_keys_required=True) + # Assert the verifier is a contact + self.get_element_of_uid(verification['verifier']) + + # Assert that there is no existing general info element or else remove it + self.remove_element_based_on_xpath(xpath + '/metadata/performanceInfo') + + # Ensure elements to metadata parent + parent_element = self.ensure_abs_xpath(xpath + '/metadata') + + # Add the contact details + performance_info_element = Element('performanceInfo') + performance_info_element.add('precision', precision, only_add_if_valued=True) + performance_info_element.add('fidelityLevel', fidelity_level, only_add_if_valued=True) + performance_info_element.add('run_time', run_time, only_add_if_valued=True, camel_case_conversion=True) + parent_element.append(performance_info_element) + + self.add_dc_verification(dc_uid, verification['method'], verification['verifier'], verification['result'], + verification['date'], verification['version']) + return + + def add_dc_verification(self, dc_uid, method, verifier, result, date, version): + """Method to add a verification to a DC""" + + # Assert that there is a DC element + xpath = self.get_xpath_of_uid(dc_uid, expected_tag='designCompetence') + + # Assert that the verifier is a contact + self.get_element_of_uid(verifier, expected_tag='contact') if verifier else None + + # Ensure elements to verifications parent + parent_element = self.ensure_abs_xpath(xpath + '/metadata/performanceInfo/verifications') + + # Find verifications element + verification_element = Element('verification') + verification_element.add('method', method) + verification_element.add('verifier', {'contactUID' : verifier}, camel_case_conversion=False) + verification_element.add('result', result) + verification_element.add('date', date) + verification_element.add('version', version) + + parent_element.append(verification_element) + return + + def add_dc_remote_component_info(self, dc_uid, single_or_multi_execution, job_name, remote_engineer, + notification_message, data_exchange_dict=None): + """Method to add a remote execution info element to a dc branch.""" + + # Assert that there is a DC element + xpath = self.get_xpath_of_uid(dc_uid, expected_tag='designCompetence') + + # Assert the single/multi input + assert single_or_multi_execution in self.SINGLE_MULTI_OPTIONS, 'single_or_multi_execution should be either {}, now is {}'.format(self.SINGLE_MULTI_OPTIONS) + + # Assert the remote_engineer is a contact + self.get_element_of_uid(remote_engineer, expected_tag='contact') + + if data_exchange_dict: + assert_dict_keys(data_exchange_dict, self.DATA_EXCHANGE_SETTINGS_ELEMENTS) + data_exchange_dict = dict_to_ord_dict(data_exchange_dict, self.DATA_EXCHANGE_SETTINGS_ELEMENTS) + + # Assert that there is an execution info element and remove a potential existing remoteComponent element + parent_element = self.ensure_abs_xpath(xpath + '/metadata/executionInfo') + self.remove_element_based_on_xpath(xpath + '/metadata/executionInfo/remoteComponentInfo') + + # Add the remote component element + remote_component_element = Element('remoteComponentInfo') + remote_component_element.add('job_settings', + value=OrderedDict((('single_or_multi_execution',single_or_multi_execution), + ('job_name', job_name), + ('remote_engineer', {'contact_UID' : remote_engineer}), + ('notification_message', notification_message))), + camel_case_conversion=True) + remote_component_element.add('data_exchange_settings', value=data_exchange_dict, only_add_if_valued=True, + camel_case_conversion=True) + parent_element.append(remote_component_element) + return + def add_actor(self, contact_uid, role): """Method to add a role element to the organization branch.""" @@ -170,6 +443,18 @@ class CMDOWS(object): parent_element.append(contact_element) return + def add_subelement(self, element, subelement_xpath_tag): + if "@uID" in subelement_xpath_tag: + el_tag = find_until(subelement_xpath_tag, '[') + uid_attr = find_between(subelement_xpath_tag, '[@uID="', '"]') + element.append(Element(el_tag, uID=uid_attr)) + elif "[" in subelement_xpath_tag: + el_tag = find_until(subelement_xpath_tag, '[') + element.append(Element(el_tag)) + else: + element.append(Element(subelement_xpath_tag)) + return + def ensure_abs_xpath(self, xpath): """Method to ensure that the elements given by an absolute XPath exist.""" split_path = xpath.split('/') @@ -185,18 +470,6 @@ class CMDOWS(object): self.add_subelement(el_pr[0], eltag) return self.root.xpath(local_xpath)[0] - def add_subelement(self, element, subelement_xpath_tag): - if "@uID" in subelement_xpath_tag: - el_tag = find_until(subelement_xpath_tag, '[') - uid_attr = find_between(subelement_xpath_tag, '[@uID="', '"]') - element.append(Element(el_tag, uID=uid_attr)) - elif "[" in subelement_xpath_tag: - el_tag = find_until(subelement_xpath_tag, '[') - element.append(Element(el_tag)) - else: - element.append(Element(subelement_xpath_tag)) - return - def resolve_uids(self): """Method to rename duplicate UIDs in a CMDOWS file""" logger.warning('The resolve_uids method is a hack and should not be used.') @@ -209,26 +482,15 @@ class CMDOWS(object): for duplicate_id, duplicate_element in enumerate(duplicate_elements): duplicate_element.attrib['uID'] = duplicate_element.attrib['uID'] + '_' + str(duplicate_id) - def get_inputs_uids(self, exblock_uid): - """Method to collect the inputs of a CMDOWS file executableBlock entry""" - assert self.get_element_of_uid(exblock_uid).getparent().getparent().tag == 'executableBlocks', \ - 'UID ' + exblock_uid + ' does not seem to refer to an executableBlock.' - xpath = self.get_xpath_of_uid(exblock_uid) - return self.root.xpath(xpath + '/inputs/input/parameterUID/text()') - + # ----------------------------------------- # + # Remove functions # + # ----------------------------------------- # def remove_inputs(self, exblock_uid): """Method to remove the inputs of a CMDOWS file executableBlock entry""" assert self.get_element_of_uid(exblock_uid).getparent().getparent().tag == 'executableBlocks', \ 'UID ' + exblock_uid + ' does not seem to refer to an executableBlock.' self.remove_children_of(exblock_uid, children_to_remove=['inputs']) - def get_outputs_uids(self, exblock_uid): - """Method to collect the outputs of a CMDOWS file executableBlock entry""" - assert self.get_element_of_uid(exblock_uid).getparent().getparent().tag == 'executableBlocks', \ - 'UID ' + exblock_uid + ' does not seem to refer to an executableBlock.' - xpath = self.get_xpath_of_uid(exblock_uid) - return self.root.xpath(xpath + '/outputs/output/parameterUID/text()') - def remove_outputs(self, exblock_uid): """Method to remove the outputs of a CMDOWS file executableBlock entry""" assert self.get_element_of_uid(exblock_uid).getparent().getparent().tag == 'executableBlocks', \ @@ -281,8 +543,7 @@ class CMDOWS(object): def remove_element_based_on_uid(self, uid, expected_tag=None): """Method to remove an element based on its UID.""" el = self.get_element_of_uid(uid) - if expected_tag: - assert el.tag == expected_tag, 'Element should have tag {}, but has tag: {}.'.format(expected_tag, el.tag) + self.assert_element_tag(el, expected_tag) el.getparent().remove(el) def remove_element_based_on_xpath(self, xpath, expected_amount=None, expected_text=None, higher_level_removal=None): @@ -326,114 +587,9 @@ class CMDOWS(object): if not els: self.remove_element_based_on_xpath('/cmdows/header/organization/organigram/' + role + 's', expected_amount=1) - def get_element_of_uid(self, uid): - """Method to get the element based on a UID value.""" - xpath_expression = get_uid_search_xpath(uid) - els = self.root.xpath(xpath_expression) - if len(els) > 1: - raise AssertionError('Multiple elements with UID ' + uid + ' found. Use "check_uids()" to check if all UIDs' - ' are unique.') - elif len(els) == 0: - raise AssertionError('Could not find element with UID ' + uid + '.') - return els[0] - - def get_xpath_of_uid(self, uid): - """Method to get the xpath based on a UID value.""" - el = self.get_element_of_uid(uid) - return el.getroottree().getpath(el) - - def get_executable_blocks_uids(self): - """Method to get a list of all the executable block UIDs present in the file.""" - uid_list = [] - # First collect the executable blocks from the main element - el = self.root.xpath('/cmdows/executableBlocks') - assert len(el) == 1, '"/cmdows/executableBlocks" is not a unique XPath. Check given CMDOWS file structure.' - for exblock_types in el[0].iterchildren(): - for blocks in exblock_types.iterchildren(): - try: - uid_list.append(blocks.attrib['uID']) - except: - raise AttributeError('Could not find the uID attribute for this element: {}.'.format(blocks)) - # Then collect the executable blocks from the architecture elements - el = self.root.xpath('/cmdows/architectureElements/executableBlocks') - if el: - assert len(el) == 1, '"/cmdows/architectureElements/executableBlocks" is not a unique XPath. ' \ - 'Check given CMDOWS file structure.' - for arblock_type in el[0].iterchildren(): - for arblock in arblock_type.iterchildren(): - try: - uid_list.append(arblock.attrib['uID']) - except: - if arblock_type.tag in ['coordinators', 'optimizers', 'convergers', 'consistencyConstraintFunctions']: - raise AttributeError('Could not find the uID attribute for this element: {}.'.format(arblock)) - return uid_list - - def get_parameters_uids(self): - """Method to get a list of all the parameter UIDs present in the file.""" - uid_list = [] - # First collect the parameters from the main element - el = self.root.xpath('/cmdows/parameters') - assert len(el) == 1, '"/cmdows/parameters" is not a unique XPath. Check given CMDOWS file structure.' - for param in el[0].iterchildren(): - try: - uid_list.append(param.attrib['uID']) - except: - raise AttributeError('Could not find the uID attribute for this element: {}.'.format(param)) - # Then collect the parameters from the architecture elements - el = self.root.xpath('/cmdows/architectureElements/parameters') - if el: - assert len(el) == 1, '"/cmdows/architectureElements/parameters" is not a unique XPath. ' \ - 'Check given CMDOWS file structure.' - for param_type in el[0].iterchildren(): - for param in param_type.iterchildren(): - try: - uid_list.append(param.attrib['uID']) - except: - raise AttributeError('Could not find the uID attribute for this element: {}.'.format(param)) - return uid_list - - def get_design_competences_uids(self): - """Method to get a list of all the design competences UIDs present in the file.""" - uid_list = [] - el = self.root.xpath('/cmdows/executableBlocks/designCompetences') - assert len(el) <= 1, '"/cmdows/executableBlocks/designCompetences" is not a unique XPath. ' \ - 'Check given CMDOWS file structure.' - if len(el) == 1: - for dc in el[0].iterchildren(): - try: - uid_list.append(dc.attrib['uID']) - except: - raise AttributeError('Could not find the uID attribute for this DC element: {}.'.format(dc)) - return uid_list - - def get_mathematical_functions_uids(self): - """Method to get a list of all the mathematical functions UIDs present in the file.""" - uid_list = [] - el = self.root.xpath('/cmdows/executableBlocks/mathematicalFunctions') - assert len(el) <= 1, '"/cmdows/executableBlocks/mathematicalFunctions" is not a unique XPath. ' \ - 'Check given CMDOWS file structure.' - if len(el) == 1: - for mf in el[0].iterchildren(): - try: - uid_list.append(mf.attrib['uID']) - except: - raise AttributeError('Could not find the uID attribute for this MF element: {}.'.format(mf)) - return uid_list - - def get_used_parameter_uids(self): - """Method to get a list of all the parameter UIDs used in the file.""" - uid_list = [] - el = self.root.xpath('/cmdows/executableBlocks/mathematicalFunctions') - assert len(el) <= 1, '"/cmdows/executableBlocks/mathematicalFunctions" is not a unique XPath. ' \ - 'Check given CMDOWS file structure.' - if len(el) == 1: - for mf in el[0].iterchildren(): - try: - uid_list.append(mf.attrib['uID']) - except: - raise AttributeError('Could not find the uID attribute for this MF element: {}.'.format(mf)) - return uid_list - + # ----------------------------------------- # + # Export functions # + # ----------------------------------------- # def save(self, file_path=None, pretty_print=False, method='xml', xml_declaration=True, encoding='UTF-8'): """Method to save a manipulated CMDOWS file""" if file_path: @@ -444,3 +600,7 @@ class CMDOWS(object): raise IOError('Please specify the path for the CMDOWS file.') ElementTree(self.root).write(file_path, pretty_print=pretty_print, method=method, xml_declaration=xml_declaration, encoding=encoding) + +# Set element on the module level +parser.set_element_class_lookup(etree.ElementDefaultClassLookup(element=ExtendedElement)) +Element = parser.makeelement diff --git a/kadmos/graph/graph_kadmos.py b/kadmos/graph/graph_kadmos.py index af6d7ccf8..b296044f3 100644 --- a/kadmos/graph/graph_kadmos.py +++ b/kadmos/graph/graph_kadmos.py @@ -3969,7 +3969,7 @@ def _read_io_xml_file(file_path, mode, xsd_check): # check if toolspecific nodes found in file if any("toolspecific" in node for node in dataDict["leafNodeSet"]): - logger.warning("'toolspecific' nodes found in {}".format(os.path.split(file_path)[0])) + logger.warning("'toolspecific' nodes found in {}".format(os.path.split(file_path)[1])) return dataDict diff --git a/kadmos/knowledgebase/initiation.py b/kadmos/knowledgebase/initiation.py deleted file mode 100644 index cfead72c0..000000000 --- a/kadmos/knowledgebase/initiation.py +++ /dev/null @@ -1,752 +0,0 @@ -import json -import logging -import networkx as nx -import os -import shutil -import re -from lxml import etree - -from kadmos.graph import KadmosGraph, RepositoryConnectivityGraph -import kadmos.utilities.mapping as MU -from kadmos.utilities.prompting import user_prompt_yes_no, user_prompt_select_options -from kadmos.utilities.printing import print_indexed_list - - -# Settings for the logger -logger = logging.getLogger(__name__) - - -class KnowledgeBaseInitiator(object): - """ - This class initiates the knowledge base for a given set of tool blueprints. The user is able to select tools from the - database of blueprints. - - """ - - - def __init__(self, knowledge_base_path, schema_files_dir=None, specific_files_dir = None): - """ - ADD DESCRIPTION - """ - - # save inputs as attributes - self.kb_path = knowledge_base_path - self.schema_files_dir = schema_files_dir - self.specific_files_dir = specific_files_dir - self.circularConnections= {} - - # ----> HARDCODE <---- - - self.WORKDIR_SUB = ['SCHEMATIC_BASE', 'SPECIFIC_BASE'] # directories must be present in working directory - - self.GEN_INFO = ["name", "version", "creator", "description", "schema_input", "schema_output"] # prescribed general info - - self.EXEC_INFO = ["mode", "description", "prog_env", "toolspecific", "runtime", "precision", "fidelity"] # prescribed execution info - - self.STANDARD_KB_NAME = "UNNAMED_KB_DIR" # knowledge base folder name if none indicated at initiation - - self.FILEPATTERN = "(-input-schema.xml)$|(-output-schema.xml)$|(-info.json)$" # file patterns for matching - - ########## execute Class Methods ######## - - # perform checks on kb schema directory and kb directory - self.check_knowledge_base_schema() - - # get function files from schema directory and save in instance: self.function_files - self._get_function_files() - - # read data from provided schema files and save them in instance: self.function_data - self._get_function_data() - - # get function graphs in kb schema according to their execution modes - self.get_function_graphs_in_kb_schema() - - - def check_knowledge_base_schema(self): - """ - This function checks whether the provided knowledge base schema directory exists or not. If not, error is thrown. - It then proceeds to check the existence of the specified knowledge base directory, and creates a knowledge base - directory based on user input. - If initiation is done with a specified directory name, the user will be prompted about the decision to replace - an existing directory, if one is found with the same name. Otherwise, the specified directory is created without - prompts. If no knowledge base directory is given, the standard name will be used (self.STANDARD_KB_NAME). - - :return: self.schema_files_dir_path: Full path to the schema files directory - :return: self.specific_files_dir: Depending on user-input, new or old kb directory name - :return: self.specific_files_dir_path: Full path to the created knowledge base - """ - - # assert that knowledge base dir and subdirs exist - assert isinstance(self.kb_path, basestring), "Provided 'KB_path' argument must be of type string." - assert os.path.exists(self.kb_path), "Provided 'KB_path' does not exist." - assert os.path.isdir(self.kb_path), "Provided 'KB_path' must be a directory." - assert all(os.path.isdir(os.path.join(self.kb_path, dir)) for dir in self.WORKDIR_SUB), \ - "All of the directories must be present in the working directory: {}".format(', '.join(self.WORKDIR_SUB)) - - # define schematic and specific base paths - self.schema_base_path = os.path.join(self.kb_path, 'SCHEMATIC_BASE') - self.spec_base_path = os.path.join(self.kb_path, 'SPECIFIC_BASE') - - # check if schema files directory exist; if not, let user select one - if self.schema_files_dir is not None: - assert isinstance(self.schema_files_dir, basestring), \ - "The provided schematic base directory must be of type string." - assert os.path.isdir(os.path.join(self.schema_base_path, self.schema_files_dir)), \ - "Could not find schema dir {} directory in {}".format(self.schema_files_dir, self.schema_base_path) - self.schema_files_dir_path = os.path.join(self.schema_base_path, self.schema_files_dir) - - else: # if schema dir not provided - - # get schema file directories - schema_dirs = [obj for obj in os.listdir(self.schema_base_path) if os.path.isdir(os.path.join(self.schema_base_path, obj))] - assert schema_dirs, "No schema file directories found in {}".format(self.kb_path, 'SCHEMATIC_BASE') - - if len(schema_dirs) > 1: - - # print schema dirs and let user choose - mssg = "\nThe following directories were found in {}: " - print_indexed_list(*schema_dirs, message=mssg) - user_sel = user_prompt_select_options(*schema_dirs, allow_multi=False) - self.schema_files_dir = next(iter(user_sel)) - - else: # if only one present, take that one - self.schema_files_dir = next(iter(schema_dirs)) - - # define path of schema files directory - self.schema_files_dir_path = os.path.join(self.schema_base_path, self.schema_files_dir) - - # create new knowledge base directory - if self.specific_files_dir is not None: - assert isinstance(self.specific_files_dir, basestring), "The specific files directory must be a string." - - # make sure that name does not already exist, otherwise ask user if overwrite or not - if os.path.exists(os.path.join(self.spec_base_path, self.specific_files_dir)): - - mssg = "The provided directory name '{}' already exists in:\n'{}'.\n\nWould you like to overwrite its contents?".format(self.specific_files_dir, self.spec_base_path) - usr_sel = user_prompt_yes_no(message=mssg) - # usr_sel = 0 #TODO:DEMO - - # either create a unique directory or delete directory contents - if usr_sel: - shutil.rmtree(os.path.join(self.spec_base_path, self.specific_files_dir)) - os.makedirs(os.path.join(self.spec_base_path, self.specific_files_dir)) - else: - self.specific_files_dir = self._create_unique_directory(self.spec_base_path, self.specific_files_dir) - - else: # create that directory - os.makedirs(os.path.join(self.spec_base_path, self.specific_files_dir)) - - else: # create dir with standard name - if os.path.exists(os.path.join(self.spec_base_path, self.STANDARD_KB_NAME)): - self.specific_files_dir = self._create_unique_directory(self.spec_base_path, self.STANDARD_KB_NAME) - else: - self.specific_files_dir = self.STANDARD_KB_NAME - os.makedirs(os.path.join(self.spec_base_path, self.specific_files_dir)) - - # store specific files dir - self.specific_files_dir_path = os.path.join(self.spec_base_path, self.specific_files_dir) - - # print init message - initString = "\nInitiating KADMOS for Knowledge Base Schema: {}\n".format(self.schema_files_dir) - print "\n{0}{1}{0}\n".format("#"*(len(initString)-2), initString) - - print "NOTE: Writing to {}.\n".format(self.specific_files_dir_path) - - return - - - def _create_unique_directory(self, parent_dir_path, directory): - """ - This helper function takes the desired name and adds squared brackets to its name containing an index, making the - directory unique. The new directory is created in the provided parent path. - - :param parent_dir_path: Path to parent directory - :param directory: Desired directory name - :return: alternate_dir: Alternative directory name - """ - - # loop through alternative names until a non-existing name is found; create dir - kb_dir_idx = 0 - alternate_dir = "" - while True: - kb_dir_idx += 1 - alternate_dir = "{}[{}]".format(directory, kb_dir_idx) - if not os.path.exists(os.path.join(parent_dir_path, alternate_dir)): - break - - os.makedirs(os.path.join(parent_dir_path, alternate_dir)) - - return alternate_dir - - - def _get_function_files(self): - """ - (PRIVATE) This function - - :return: - """ - # TODO: function description! - # TODO: MAKE SURE THAT (a) DIR IS NOT EMPTY and (b) FILE STRUCTURE IS THERE (INFO, INPUT, OUTPUT)! - - # Read input and output XML files and info json files, save them in self.function_files - self.function_files = dict(input=[], output=[], info=[]) - self.function_list = [] - - # setup pattern to match type of file (info, input, output) - typePattern = "-[a-zA-Z]*." - - files_in_dir = os.listdir(self.schema_files_dir_path) - - listOfTools = [] - for file in files_in_dir: - matchEnding = re.search(self.FILEPATTERN, file) - if matchEnding: - listOfTools.append(file[:-len(matchEnding.group())]) - listOfTools = list(set(listOfTools)) # make elements unique - - # prompt user for tools selection - # ignoreTools = self._ignore_tools_for_kb(listOfTools) - ignoreTools = [] #TODO:DEMO - - for file in files_in_dir: - matchEnding = re.search(self.FILEPATTERN, file) # match name ending - if matchEnding: - # if the file matches any in the ignoreList, skip - if file[:-len(matchEnding.group())] in ignoreTools: - continue - matchType = re.search(typePattern, matchEnding.group()) - self.function_files[matchType.group()[1:-1]].append(file) - else: - if not file.endswith("mapping.json"): # TODO: add mapping file properly - print "Could not identify the type of {}, please make sure files adhere to naming conventions".format(file) - - # assert that the correct function files present, not just amount - redundantFiles = [] # contains functions with missing info-file - infoFiles = [file[:-len('-info.json')] for file in self.function_files['info']] - for inout in ['input', 'output']: - removeList = [] # list used as cross check - inOutFiles = [file[:-len("-{}-schema.xml".format(inout))] for file in self.function_files[inout]] - for infoFile in infoFiles: - if infoFile in inOutFiles: - removeList.append(infoFile) - else: - raise ValueError, "Can not find {}-{}-schema.xml in '{}'. Please check for correct spelling of info, input and output files.".format(infoFile, inout, self.schema_files_dir) - - redundantFiles += [file for file in inOutFiles if file not in removeList] - - assert not redundantFiles, "The following tools have missing -info.json files: \n" + "\n".join(set(redundantFiles)) - - return - - - def _ignore_tools_for_kb(self, listOfTools): - """ - (PRIVATE) This helper function prompts the user to make a selection from a given list of tools on which tools to - ignore from this list for further analysis. It first lists the available tools in the console, and asks the user - whether to ignore any of them. If "No" is chosen, the function is exited with an empty list. If "Yes", the user - is asked to give the corresponding indices of the tools to remove. - - :param listOfTools: List of tools found in repository - :return: ignoreTools: List of tools that will be ignored when functions are loaded into init-object - """ - - assert isinstance(listOfTools, list), "'listOfTools' argument must be of type 'list'." - - # print list of tools found in schema dir - mssg = "The following tools have been found in the {} directory:".format(self.schema_files_dir) - print_indexed_list(message=mssg, *listOfTools) - - ignoreTools = [] # initiate list of tools to ignore - - # prompt user to select tools to ignore, if user chooses to ignore tools - mssg = "Would you like to ignore any of the listed tools?" - user_input = user_prompt_yes_no(message=mssg) - if user_input == 1: - selectMssg = "Please select all tools you would like to ignore (separate by space):" - ignoreTools = user_prompt_select_options(message=selectMssg, *listOfTools) - - if not ignoreTools or user_input == 0: # if empty list - print "No tools are ignored." - - return ignoreTools - - - def _get_function_data(self): - """" - (PRIVATE) This function adds a new attribute functionData to the class instance that contains all information in the knowledge base. - - functionData = [ - { - "info": { - "general_info": {"name": str, "type": str, "version": float, "creator": str, "description": str}, - "execution_info": [{"mode": str, "runtime": int, "precision": float, "fidelity": str}, # mode 1 - ... ] - } - , - "input": { - "leafNodes": [ {"xpath": str, "tag": str, "attributes": dict, "value": str, "level": int}, ...] # list of all input leafNodes - "completeNodeSet": [] # list of ALL nodes (for convenience) - "leafNodeSet": [] # list of all leaf nodes (for convenience) - }, - "output": { - "leafNodes": [ {"xpath": str, "tag": str, "attributes": dict, "value": str, "level": int}, ...], # list of all output leafNodes - "completeNodeSet": [] # list of ALL nodes (for convenience) - "leafNodeSet": [] # list of all leaf nodes (for convenience) - - } - }, # tool1 - ... - ] - - :return self.function_data - """ - - self.function_data = [] - - # loop through info-files and extract relevant information from info and corresponding input, output files - for file in self.function_files["info"]: - - # initiate a dict for each function - funcDict = {'info': {'general_info': {}, 'execution_info': [] }} - - # open -info.json to read data - with open(os.path.join(self.schema_files_dir_path, file)) as info: - print "loading {}".format(os.path.join(self.schema_files_dir_path, file)) - infoData = json.load(info) - - # make sure that execution and general info is provided in info-file - for info in ["execution_info", "general_info"]: - assert info in infoData, "{}-key is missing in {}. Please check for spelling errors.".format(info, file) - - # add function info from file to funcDict - for inf in self.GEN_INFO: # looping through general info - - # make sure that function name and type is defined, is string - if inf == 'name': - funcName = infoData["general_info"].get(inf) - assert isinstance(funcName, basestring), "Function name in {} must be of type 'string'!".format(file) - assert len(funcName)>0, "Function name in {} must be non-empty string!".format(file) - - # add general info if provided in file - try: funcDict['info']['general_info'][inf] = infoData["general_info"].get(inf) - except KeyError: - print "Function {} was not found for {} and not added to knowledge base.".format(inf, funcName) - - # assert that exection info is provided for at least one mode - execInfo = infoData["execution_info"] - assert isinstance(execInfo, list), "'execution_info' in info-file for {} must be a list (of dicts).".format(funcName) - assert len(execInfo)>0, "The {} 'execution_info' must have at least one defined mode. Please add a function mode to the info-file.".format(funcName) - - # loop through execution info for each mode - for dictIndex, modeDict in enumerate(execInfo): - - assert isinstance(modeDict, dict), "Each element in 'execution_info'-list in {} must be dictionary.".format(file) - - #add mode dict to exec info - funcDict['info']['execution_info'].append({}) - - # make sure mode name is defined - mode = modeDict["mode"] - assert isinstance(mode, basestring), "Execution mode names in {} must be defined string(s) in the info-file.".format(file) - assert re.match("^[a-zA-Z0-9_]+$", mode), "Execution mode {} in {} must be non-empty string of alphanumeric characters (and underscore).".format(file) - - # add execution info by mode to function dictionary (if that information is provided) - for inf in self.EXEC_INFO: - if inf in modeDict: - funcDict['info']['execution_info'][dictIndex][inf] = modeDict[inf] # add the information to dictionary - else: - raise KeyError, "'{}'-information for mode {} of {} is not available in the info-file!".format(inf, mode, funcName) - - # ensure that the execution modes given in info-file are unique - funcModes = [execDict['mode'] for execDict in funcDict['info']['execution_info']] - if not len(funcModes) == len(set(funcModes)): - duplicateModes = set([mode for mode in funcModes if funcModes.count(mode) > 1]) - raise ValueError, "Duplicate function mode(s) [{}] found in {}.".format(", ".join(duplicateModes), file) - - # get input and output data - for inOut in ['input', 'output']: - funcDict[inOut] = self._get_function_input_output_data(file, inOut, funcDict) - - # add function dictionary to list of function data - self.function_data.append(funcDict) - - # check if circular coupling exists for this function - outputSet = set(funcDict['output']['leafNodeSet']) - for nodeDict in funcDict['input']["leafNodes"]: - nodePath = nodeDict["xpath"] - if nodePath in outputSet: - self.circularConnections[funcName] = [] - self.circularConnections[funcName].append(nodePath) - - return - - - def _get_function_input_output_data(self, file, inOut, functionDict): - """ - (PRIVATE) This helper function writes the information on all leaf nodes from the input and output XML files to a dictionary. \ - If XML file is empty, it empty dict is returned. The element paths are checked for uniqueness. - - :param file: info-file corresponding to the analyzed function - :param inOut: must be "input" or "output" - :param functionDict: - :return: dict: dictionary containing all XPaths and leaf nodes - """ - # initiate data dictionary - dataDict = {"leafNodes": [], "completeNodeSet": [],"leafNodeSet": []} - - # define file and file path to read data from (based on info-file) - func = file[:-10] # remove -info.json to get file name - mapFile = "{}-{}-schema.xml".format(func, inOut) - parseFile = os.path.join(self.schema_files_dir_path, mapFile) - - # if XML file is empty, return empty dict, else parse file - if os.stat(parseFile).st_size == 0: # check size of file - return dataDict - else: - tree = etree.parse(parseFile) - - # remove comments from tree, if any present - comments = tree.xpath("//comment()") - for c in comments: - p = c.getparent() - p.remove(c) - - # iterate through tree and add data to dict, only touch leaf nodes - leafNodesList = [] - completeNodeList = [] - for el in tree.iter(): - elemData = {} - elemPath = MU.xpath_to_uid_xpath(tree.getpath(el), el) - - # check whether indices in path --> uniqueness - indexPattern = ('\[[0-9]+\]') - if re.search(indexPattern, elemPath): - raise ValueError, "Element {} in {} has index and is not unique!".format(elemPath, mapFile) - - # append path to list of all nodes - completeNodeList.append(elemPath) - - if not el.getchildren(): # if leaf node - - # append path to list of leaf nodes - leafNodesList.append(elemPath) - - # add element data to function dict - elemData['xpath'] = elemPath - elemData['tag'] = el.tag - elemData['attributes'] = el.attrib - elemData['level'] = elemPath.count('/') - 1 - elemData['value'] = el.text # adding None if empty - - # add 'modes' attribute if it does not exist - elemData['modes'] = self._get_execution_modes_for_element(el, tree, mapFile, functionDict) - - # remove whitespace from start/end of string, or add None - if el.text is not None: - elemData['value'] = el.text.strip() - else: - elemData['value'] = el.text # adding None if empty - - # add element data to data dict - dataDict['leafNodes'].append(elemData) - - # add complete list of input/output nodes (for convenience, performance later on) - dataDict["leafNodeSet"] = set(leafNodesList) - - # add list of ALL nodes to dictionary (for convenience, performance later on) - dataDict["completeNodeSet"] = set(completeNodeList) - - # check if toolspecific nodes found in file - if any("toolspecific" in node for node in dataDict["leafNodeSet"]): - logger.waning("'toolspecific' nodes found in {}".format(mapFile)) - - return dataDict - - - def _get_execution_modes_for_element(self, element, tree, file, functionDict): - """ - (PRIVATE) This function retrieves the modes attribute of the child node or of its ancestors. If multiple modes - are defined in its ancestry, a warning is given and only the lowest modes definition is returned. Ancestry is - checked for 'modes' attributes regardless of whether it is present in it leaf-node or not. - Once the modes are retrieved, they are checked for validity (present in info-file) and "negativity" (mode - attributes can either be positive or negative). NOTE: If no modes are given in a leaf-node, this node is applied - to ALL function modes. - - :param element: xml element, leaf-node - :param tree: element tree of the current element - :param file: file that is currently analyzed - :param functionDict: data dict containing execution and info data - :return: execModes: string containing all function modes applied to this element - """ - # get element xpath - elementPath = tree.getpath(element) - - # get function modes from info file and assert that they are unique - funcModes = [execDict['mode'] for execDict in functionDict['info']['execution_info']] - execModes = '' # NOTE: if no modes indicated, all modes are applied to node - modesFound = False - - if 'modes' in element.attrib and re.search("[^\s]", element.attrib['modes']): # if 'modes' key present and has characters - assert isinstance(element.attrib['modes'], basestring), "If provided, modes-attribute of elemeent {} in {} must be of type string.".format(elementPath, file) - execModes = element.attrib['modes'] - modesFound = True - - for anc in element.iterancestors(): - if 'modes' in anc.attrib and re.search("[^\s]", anc.attrib['modes']): - if not modesFound: - modesFound = True - execModes = anc.attrib['modes'] - else: - print "WARNING: Multiple 'modes' attributes found in ancestry of element {} in {}; lowest one is applied.".format(elementPath, file) - break - - if re.search("[^\s]", execModes): # if contains any characters - - # get all modes - modesList = execModes.split() - - # check if modes negative (all either negative or positive) - modesNegative = False - negPattern = "^-" - if any(re.search(negPattern, mode) for mode in modesList): - modesNegative = True - - assert all(re.search(negPattern, mode) for mode in modesList), \ - "Either all or none of the 'modes'-attributes of element {} in {} must be negative!".format(elementPath, file) - - # check if each mode is valid (use its positive if modesNegative) - for mode in modesList: - if modesNegative: - assert mode[1:] in funcModes, "Execution mode '{}' of node {} (or its ancestor) in {} was not found in the info-file. Please check spelling or alter info-file.".format(mode[1:], elementPath, file) - else: - assert mode in funcModes, "Execution mode '{}' of node {} (or its ancestor) in {} was not found in the info-file. Please check spelling or alter info-file.".format(mode, elementPath, file) - - return execModes - - - def get_function_graphs_in_kb_schema(self): - """ - This class method generates all graphs for all present functions in the knowledge base, and add these to - the class instance as: self.functionGraphs. - - :return: self.functionGraphs - """ - # get list of all functions in KB - funcList = [self.function_data[i]['info']["general_info"]['name'] for i in range(len(self.function_data))] - - # initiate list of function graphs and add graphs to it - graphList = [] - for func in funcList: - graphList.append(self.get_function_graph(func)) - - # add list of function graphs to class instance - self.functionGraphs = graphList - - return - - - def get_function_graph(self, funcName, inOut=None): - """ - This function builds a directed graph (KadmosGraph object) for the specified function using the "networkx" package. If inOut - argument is specified, only the input or output of the function will be included in the graph, otherwise both. - - The "modes" functionality enables the addition of function modes to certain nodes. If a node is required in certain - execution modes for a tool, the attribute "modes" will indicate this in the CPACS element. When creating function graphs, - graphs are created for EACH mode. Nodes that are only present in certain modes will therefore only be added to the - corresponding function graph. - - :param: funcName: function name for which the graph is generated; must be present in knowledge base. - :param: inOut: default = None; if specified, must be "input" or "output" string. Specification of this argument enables the generation of the function graph with only input or output variables. - :return: functionGraph - """ - - assert isinstance(funcName, basestring), 'Provided function name must be a string!' - - # assert funcName exists and get index of function in self.function_data list - funcIndex = None - for idx, funcDict in enumerate(self.function_data): - if funcDict['info']['general_info']['name'] == funcName: - funcIndex = idx # funcIndex is index of the function in list - break - assert funcIndex is not None, "The provided function name can not be found in knowledge base." - - # assert inOut, if defined, is string and either input or output - if inOut is not None: - assert isinstance(inOut, basestring), "inOut argument must be a string if specified." - assert inOut.lower() in ["input", "output"], "inOut argument must be either 'input' or 'output'." - else: - inOut = ["input", "output"] - - # initiate directed graph and list of edges - DG, edgeTpls = KadmosGraph(), [] - - # get all execution modes for the function from function info - funcModes = set([infoDict['mode'] for infoDict in funcDict['info']['execution_info']]) - - # loop input and output elements - for io in inOut: - - # loop through xml leaf-nodes and add them to function-tuples list, i.e output: (function, leaf-node) - for nodeDict in funcDict[io]['leafNodes']: - modesAttr = nodeDict['modes'] # modes-attrib always present - node = nodeDict['xpath'] # will not contain any indeces - - if re.search("[^\s]", modesAttr): # if contains any characters - - # get all modes - nodeModes = modesAttr.split() - - # check if modes negative (all either negative or positive) - modesNegative = False - negPattern = "^-" - if any(re.search(negPattern, mode) for mode in nodeModes): - - # make sure that all modes in list "positive" for later processing; remove minus sign - # (modesNegative ensures that modes are still regarded as negative) - modesNegative = True - for i, mode in enumerate(nodeModes): - nodeModes[i] = mode[1:] - - """ - Add relevant modes to addModes list (modes that apply to the node). The addModes list plays an - important part in the creation of function graphs, since the creation of function-leafNode tuples - depends on it. - """ - if modesNegative: - # add to all modes but the ones indicated - addModes = [mode for mode in funcModes if not mode in nodeModes] - else: - # add to all modes the ones indicated - addModes = list(set(nodeModes) & set(funcModes)) - - else: - # add to all modes - addModes = list(funcModes) - - # add edge tuple for the node and function - edgeTpls += self._create_edge_tuples(funcName, funcModes, io, node, addModes) - - # loop through all function-leafnode-tuples and add them to graph - DG.add_edges_from(edgeTpls, coupling_strength = 1) # each edge has connection strength of one - - # add node attributes to each node in graph - self._add_node_attribs(funcDict, funcName, funcModes, DG) - - DG.name = funcName - - return DG - - # TODO: (maybe?) add functionality to ignore certain running modes; similar to ignoreList function - - def _create_edge_tuples(self, funcName, funcModes, inOut, node, addModes): - """ - (PRIVATE) This helper function creates a list of edge tuples in order to generate a graph. The function label - depends on the mode that the edge tuple is applied to. If the list of modes is larger than one, the mode name - is indicated in square brackets, i.e. Function[MODE]. - - :param funcName: Function name - :param funcModes: List of all modes that apply to that function - :param inOut: Specifies whether input or output nodes - :param node: Name (xpath) of the input or output node - :param addModes: List of modes that the node is applied to - :return: tpls: list of function-leafNode-tuples that will be added to graph - """ - # initiate list of tuples - tpls = [] - - #make sure that at least one mode is present - assert len(addModes)>0, "At least one mode must be present for node {} of {}!".format(node, funcName) - - for mode in addModes: - - # if more than one mode, use mode in brackets - if len(funcModes) > 1: - funcLabel = '{}[{}]'.format(funcName, mode) - else: - funcLabel = funcName - - # determine whether input or output node - if inOut.lower() == 'input': - tpl = (node, funcLabel) - else: - tpl = (funcLabel, node) - - tpls.append(tpl) - - return tpls - - - def _add_node_attribs(self, funcDict, funcName, funcModes, G): - """ - (PRIVATE) Function that adds node attributes to the nodes in the graph. If the node is - - :param funcDict: Dictionary conainting all function data - :param funcName: Function name - :param funcModes: List of modes applied to the nodes - :param G: Function graph w/o attribs - :return: G: Function graph w/ attribs - """ - - for node in G.nodes_iter(): - if re.match(funcName, node): # if node matches function name - - # add attributes to node - G.node[node]['shape'] = 's' # square for functions - G.node[node]['category'] = 'function' - G.node[node]['label'] = node - G.node[node]['name'] = funcName - G.node[node]['level'] = None - - # check if node has brackets to retrieve execution mode - modePattern = "\[.+\]" - match = re.search(modePattern, node) - if match: - fMode = match.group(0)[1:-1] # take matching string and remove brackets - assert fMode in funcModes, 'Something went wrong! Could not find execution mode {} for {} in list of execution modes.'.format( - fMode, funcName) - else: - fMode = next(iter(funcModes)) # get only execution mode in set - - # loop over execution modes of function and add execution info to graph node - for execMode in funcDict['info']['execution_info']: - if fMode == execMode['mode']: - - # loop over execution info and add the provided information to node; raise error if info missing - for info in self.EXEC_INFO: - G.node[node][info] = execMode[info] - - else: # otherwise variable node - - # add attributes to node - G.node[node]['shape'] = 'o' # circle for variables - G.node[node]['category'] = 'variable' - G.node[node]['label'] = node.split('/')[-1] - G.node[node]['level'] = node.count('/') - 1 - G.node[node]['execution_time'] = 1 # TODO: this is not really needed for nodes - - return G - - - def get_MCG(self, name='RCG-GRAPH'): - """ - Function to create Maximal Connectivity Graph (Pate, 2014) by composing a list of graphs. - - :param: name: Name of the RCG - :return: maximal connectivity graph (RCG) - """ - # TODO: Check with get_rcg function of knowledgebase.py - - functionGraphs = self.functionGraphs - - MCG = RepositoryConnectivityGraph() # initialize RCG - - logger.info('Composing RCG...') - for g in functionGraphs: - MCG = nx.compose(MCG, g) - logger.info('Successfully composed RCG.') - - # Add kb_path and name attribute - MCG.name = name - MCG.kb_schema_path = self.schema_files_dir_path - - return MCG \ No newline at end of file diff --git a/kadmos/utilities/general.py b/kadmos/utilities/general.py index 5f66c59be..13011eff3 100644 --- a/kadmos/utilities/general.py +++ b/kadmos/utilities/general.py @@ -5,6 +5,7 @@ import re import sys import subprocess import logging +from collections import OrderedDict from random import choice @@ -12,6 +13,31 @@ from random import choice logger = logging.getLogger(__name__) +def assert_dict_keys(dic, expected_keys, all_keys_required=False): + """Method to assert that a dictionary has the expected keys and (optionally) to check if is has all the keys.""" + assert isinstance(dic, dict), 'dic should be a dictionary.' + dict_keys = dic.keys() + if all_keys_required: + assert not set(expected_keys).symmetric_difference(dict_keys), 'Not all expected keys are in the dictionary.' + else: + assert set(dict_keys).issubset(expected_keys), 'Dictionary keys are not a subset of the expected keys.' + + +def dict_to_ord_dict(dic, key_order): + """Method to transform a Python dictionary into a Python ordered dictionary. Note that the key_order list can have + items that are not present in the given dictionary. All keys of the dictionary should be in the order though.""" + assert isinstance(dic, dict), 'dic should be of type dict.' + assert isinstance(key_order, list), 'key_order should be of type list.' + assert len(key_order) == len(set(key_order)), 'key_order should consist of unique elements.' + ord_dict = OrderedDict() + for key in key_order: + if key in dic: + ord_dict.update({key:dic[key]}) + del dic[key] + assert not dic, 'Dictionary still contains elements with keys: {}'.format(dic.keys()) + return ord_dict + + def color_list(): """ A list of distinguisable colors. @@ -380,6 +406,9 @@ def make_camel_case(string, make_plural_option=False): words = word_regex_pattern.split(string) string = "".join(w.lower() if i is 0 else w.title() for i, w in enumerate(words)) + if string[-3:] == 'Uid': + string = string[:-3] + 'UID' # TODO: This can also be solved more generically. + if make_plural_option: string = make_plural(string) diff --git a/kadmos/utilities/xml.py b/kadmos/utilities/xml.py index 1b7f47f62..8b430c11a 100644 --- a/kadmos/utilities/xml.py +++ b/kadmos/utilities/xml.py @@ -13,7 +13,7 @@ from general import make_camel_case, unmake_camel_case, make_singular logger = logging.getLogger(__name__) # Settings for the parser -parser = etree.XMLParser() +parser = etree.XMLParser(remove_blank_text=True) def recursively_stringify(tree): @@ -163,7 +163,6 @@ class ExtendedElement(etree.ElementBase): def addloop(self, iter_nesting, function_grouping): # TODO: Make this function more elegant and maybe add it to another class - loop_elements = self.add('loopElements') if type(iter_nesting) == str: @@ -185,7 +184,7 @@ class ExtendedElement(etree.ElementBase): return loop_elements - def add(self, tag, value=None, attrib=None, camel_case_conversion=False, **extra): + def add(self, tag, value=None, attrib=None, camel_case_conversion=False, only_add_if_valued=False, **extra): """Method to add a new sub element to the element :param tag: The sub element tag @@ -199,7 +198,9 @@ class ExtendedElement(etree.ElementBase): :return: An element instance :rtype: Element """ - + # Check if value is mandatory for element addition + if only_add_if_valued and value is None: + return if camel_case_conversion: tag = make_camel_case(tag) @@ -359,7 +360,7 @@ class ExtendedElement(etree.ElementBase): else: for element in elements: # Values - if element.text is not None: + if not element.getchildren(): value = element.findasttext() else: value = element.finddict(element, namespaces, ordered, camel_case_conversion) -- GitLab