diff --git a/kadmos/graph/graph_data.py b/kadmos/graph/graph_data.py index 8b571abccb38d573bc432503c195c14b57e1cbb8..eb886bd0e19176b1b6d70bf846984e768fcba816 100644 --- a/kadmos/graph/graph_data.py +++ b/kadmos/graph/graph_data.py @@ -692,8 +692,14 @@ class DataGraph(KadmosGraph): coupled_functions_order = [] for partition, nodes in enumerate(partitions): if len(nodes) > 1: - nodes = self.minimize_feedback(list(nodes), method, multi_start=multi_start, rcb=rcb, - use_runtime_info=use_runtime_info, coupling_dict=coupling_dict) + if 'local_convergers' in self.graph['problem_formulation']: + nodes = self.get_possible_function_order(method, multi_start=multi_start, rcb=rcb, + use_runtime_info=use_runtime_info, + coupling_dict=coupling_dict, + node_selection=list(nodes)) + else: + nodes = self.minimize_feedback(list(nodes), method, multi_start=multi_start, rcb=rcb, + use_runtime_info=use_runtime_info, coupling_dict=coupling_dict) partitions[partition] = nodes coupled_functions_order.extend(nodes) # Make sure the function orders in the partitions are consistent with the overall function order @@ -3042,22 +3048,31 @@ class FundamentalProblemGraph(DataGraph, KeChainMixin): if 'levels' in doe_settings: pf_doe_settings['levels'] = doe_settings['levels'] - def partition_graph(self, n_parts, use_runtime_info=False, tpwgts=None, recursive=True, contig=False, - local_convergers=False, coupling_dict=None, rcb=1.0, node_selection=None): + def partition_graph(self, n_parts, node_selection=None, use_runtime_info=False, local_convergers=False, + rcb_partitioning=0.0, rcb_order=1.0, coupling_dict=None, tpwgts=None): """Partition a graph using the Metis algorithm (http://glaros.dtc.umn.edu/gkhome/metis/metis/overview). - :param n_parts: number of partitions requested (algorithm might provide less) + :param n_parts: number of partitions requested (algorithm might provide less if the number of partitions is + close to the number of nodes) :type n_parts: int - :param use_runtime_info: + :param node_selection: option to give the nodes that need to be included in the partitions. If none are given, + the nodes in the partitions are selected based on the mdao architecture + :type node_selection: list + :param use_runtime_info: option to take the runtime information of the nodes into account when determining the + partitions :type use_runtime_info: bool + :param local_convergers: option to add local convergers to the partitions if feedback within the partition exist + :type local_convergers: bool + :param rcb_partitioning: runtime-coupling balance for partitioning, relative importance between cut edges and + runtime while making the partitions. 1: min cut edges, 0: min runtime + :type rcb_partitioning: float + :param rcb_order: runtime-coupling balance for sequencing, relative importance between feedback and runtime + while determining the function order within the partitions. 1: min feedback, 0: min runtime + :type rcb_order: float + :param coupling_dict: coupling dictionary indicating the input/output relations between the nodes + :type coupling_dict: dict :param tpwgts: list of target partition weights :type tpwgts: list - :param recursive: option to use the recursive bisection or k-way partitioning algorithm - :type recursive: bool - :param contig: Metis option - :type contig: bool - :param local_convergers: - :type local_convergers: bool .. note:: partitioning can only be performed on undirected graphs. Therefore every graph input is translated into an undirected graph. @@ -3080,18 +3095,15 @@ class FundamentalProblemGraph(DataGraph, KeChainMixin): nodes_to_partition = list(node_selection) # For IDF, all functions in the optimizer loop are partitioned except for the objective and constraint functions elif self.graph['problem_formulation']['mdao_architecture'] == self.OPTIONS_ARCHITECTURES[2]: - # Get post-design-variables, coupled and post-coupling functions mg_function_ordering = self.get_mg_function_ordering() post_des_vars = mg_function_ordering[self.FUNCTION_ROLES[4]] post_couplings = mg_function_ordering[self.FUNCTION_ROLES[2]] coupled_nodes = mg_function_ordering[self.FUNCTION_ROLES[1]] - # Get objective and constraint functions constr_obj_vars = self.find_all_nodes(attr_cond=['problem_role', '==', self.PROBLEM_ROLES_VARS[2]]) + \ self.find_all_nodes(attr_cond=['problem_role', '==', self.PROBLEM_ROLES_VARS[1]]) constr_obj_funcs = [self.get_sources(variable)[0] for variable in constr_obj_vars] - # Get the nodes that need to be partitioned nodes_to_partition = [node for node in post_des_vars if node not in constr_obj_funcs] + coupled_nodes + \ [node for node in post_couplings if node not in constr_obj_funcs] @@ -3106,8 +3118,6 @@ class FundamentalProblemGraph(DataGraph, KeChainMixin): for node in nodes_to_partition: assert 'run_time' in self.nodes[node]['performance_info'], 'Run time missing for function ' \ '{}'.format(node) - assert len(nodes_to_partition) >= n_parts, 'Number of partitions ({}) exceeds number of nodes ({})'.format( - n_parts, len(nodes_to_partition)) # Get initial function graph of the nodes that need to be partitioned subgraph = self.get_subgraph_by_function_nodes(nodes_to_partition) @@ -3124,10 +3134,14 @@ class FundamentalProblemGraph(DataGraph, KeChainMixin): min_f, min_variables, min_time = float("inf"), float("inf"), float("inf") number_of_iterations_not_improved = 0 function_graph = initial_function_graph.deepcopy() + if 'problem_formulation' in self.graph and 'function_order' in self.graph['problem_formulation']: + previous_function_order = self.graph['problem_formulation']['function_order'] + else: + previous_function_order = nodes_to_partition # Calculate maximum load imbalance based on the objective - if rcb == 0: - ufactor = 1 + if rcb_partitioning == 0: + initial_ufactor = 1 else: if use_runtime_info: runtimes = [self.nodes[node]['performance_info']['run_time'] for node in nodes_to_partition] @@ -3136,10 +3150,19 @@ class FundamentalProblemGraph(DataGraph, KeChainMixin): else: max_runtime_part = total_time - (n_parts - 1) max_load_imbalance = max_runtime_part / (total_time / float(n_parts)) - ufactor = 1 if max_load_imbalance == 1.0 else int((max_load_imbalance - 1.0) * 1000 * rcb) + initial_ufactor = 1 if max_load_imbalance == 1.0 else int((max_load_imbalance - 1.0) * 1000 * + rcb_partitioning) + # If the number of nodes equals the number of required partitions return the solution immediately if len(nodes_to_partition) == n_parts: best_partitions = [[node] for node in nodes_to_partition] + # If the number of nodes is less than the number of required partitions return the solution immediately and give + # a warning + elif len(nodes_to_partition) < n_parts: + best_partitions = [[node] for node in nodes_to_partition] + logger.warning('Number of partitions ({0}) exceeds number of nodes ({1}). The solution for {1} partitions ' + 'will be returned'.format(n_parts, len(nodes_to_partition))) + # Else partition the graph else: while True: # Combine coupling strengths of feedforward and feedback connections between two nodes to get an @@ -3167,14 +3190,14 @@ class FundamentalProblemGraph(DataGraph, KeChainMixin): g_und.graph['node_weight_attr'] = 'run_time' g_und.graph['edge_weight_attr'] = 'coupling_strength' + # Reset maximum load imbalance to initial value + ufactor = initial_ufactor + # Partition graph using metis while True: - (edgecuts, parts) = metis.part_graph(g_und, n_parts, tpwgts=tpwgts, recursive=recursive, - contig=contig, ufactor=ufactor) + (edgecuts, parts) = metis.part_graph(g_und, n_parts, tpwgts=tpwgts, recursive=True, ufactor=ufactor) if len(set(parts)) != n_parts and ufactor != 1: ufactor = 1 if ufactor < 101 else ufactor - 100 - logger.warning('Metis returned less than {} partitions. Maximum unbalance factor will be ' - 'changed'.format(n_parts)) continue else: break @@ -3187,28 +3210,32 @@ class FundamentalProblemGraph(DataGraph, KeChainMixin): for idx, node in enumerate(g_und.nodes): if parts[idx] == part: nodes.extend(node.split('--') if '--' in node else [node]) + # Get an initial guess for the function order based on the previous partitioning (in order to speed + # up the process) + nodes = [node for node in previous_function_order if node in nodes] # Minimize feedback within the partition - if not nodes: - logger.warning('Metis returned less than {} partitions. Some partitions will be empty'.format( - n_parts)) - else: + if nodes: if local_convergers: - nodes = self.get_possible_function_order('single-swap', node_selection=nodes, rcb=1, + nodes = self.get_possible_function_order('single-swap', node_selection=nodes, rcb=rcb_order, coupling_dict=coupling_dict, use_runtime_info=use_runtime_info) else: - nodes = self.minimize_feedback(nodes, 'single-swap', rcb=1, coupling_dict=coupling_dict, + nodes = self.minimize_feedback(nodes, 'single-swap', rcb=rcb_order, + coupling_dict=coupling_dict, use_runtime_info=use_runtime_info) # Add nodes to the partition list partitions.append(nodes) + # Update function order for next iteration + previous_function_order = [node for part_order in partitions for node in part_order] # Evaluate the properties of the partitioning - n_variables, partition_variables, system_variables, runtime = subgraph.get_partition_info( + n_variables, partition_variables, system_variables, runtime = self.get_partition_info( partitions, coupling_dict=coupling_dict, use_runtime_info=use_runtime_info, local_convergers=local_convergers) # Decide whether new solution is better than the best solution found so far - f = rcb * (n_variables / float(total_couplings)) + (1 - rcb) * (max(runtime) / float(total_time)) + f = rcb_partitioning * (n_variables / float(total_couplings)) + (1 - rcb_partitioning) * \ + (max(runtime) / float(total_time)) if (n_variables == min_variables and max(runtime) < min_time) or \ (max(runtime) == min_time and n_variables < min_variables) or (f < min_f): best_partitions, min_f, min_variables, min_time = partitions, f, n_variables, max(runtime) @@ -3220,31 +3247,32 @@ class FundamentalProblemGraph(DataGraph, KeChainMixin): if number_of_iterations_not_improved > 2: break - # Merge the nodes that can be merged based on process + # Reset the function graph to the initial graph (without merged nodes) function_graph = initial_function_graph.deepcopy() + + # Merge the nodes that can be merged based on process for partition in partitions: nodes = list(partition) - while nodes: - merge_nodes, run_times = [], [] - for idx, node in enumerate(nodes): - # If the nodes before the current node do not supply input to the current node, the nodes - # can be merged - if not set(nodes[:idx]).intersection(coupling_dict[node]): - merge_nodes.append(node) - run_times.append(self.nodes[node]['performance_info']['run_time'] if use_runtime_info - else 1) - else: - break - # Merge the nodes only when the resulting number of nodes is still enough to get the required - # number of partitions - if len(merge_nodes) > 1 and (nx.number_of_nodes(function_graph) - len(merge_nodes) + 1) >= \ - n_parts: - new_node_label = '--'.join(merge_nodes) - function_graph = function_graph.merge_parallel_functions(merge_nodes, - new_label=new_node_label) - function_graph.nodes[new_node_label]['performance_info'] = {'run_time': max(run_times)} - for node in merge_nodes: - nodes.pop(nodes.index(node)) + previous_node = [] + for node in nodes: + # Check if current node can be merged with the previous node. Nodes are only merged if the + # resulting number of nodes is still enough to get the required number of partitions + prev_nodes = previous_node.split('--') if '--' in previous_node else [previous_node] + if previous_node and not any(prev_node in coupling_dict[node] or node in + coupling_dict[prev_node] for prev_node in prev_nodes) and \ + len(function_graph.nodes) - 1 >= n_parts: + merge_nodes = [previous_node, node] + new_node = '--'.join(merge_nodes) + runtime = max(function_graph.nodes[func]['performance_info']['run_time'] for func in + merge_nodes) if use_runtime_info else 1 + function_graph.add_node(new_node, category='function') + for merge_node in merge_nodes: + function_graph = nx.contracted_nodes(function_graph, new_node, merge_node, + self_loops=False) + function_graph.nodes[new_node]['performance_info'] = {'run_time': runtime} + previous_node = new_node + else: + previous_node = node # Get correct coupling strengths in case merged nodes exist in the graph for node1 in function_graph.nodes(): @@ -3260,6 +3288,11 @@ class FundamentalProblemGraph(DataGraph, KeChainMixin): if coupling_strength != 0: function_graph.edges[node1, node2]['coupling_strength'] = coupling_strength + # Check if enough partitions are returned, if not remove empty partitions and give a warning + best_partitions = [partition for partition in best_partitions if partition] + if len(best_partitions) < n_parts: + logger.warning('Metis returned {} instead of {} partitions'.format(len(best_partitions), n_parts)) + # Add local convergers if there are feedback loops in the partitions convergers = [] if local_convergers: @@ -3267,22 +3300,6 @@ class FundamentalProblemGraph(DataGraph, KeChainMixin): if self.check_for_coupling(partition, only_feedback=True): convergers.append(part_nr) - # Update the function order - if node_selection: - function_order = [node for partition in best_partitions for node in partition] - elif self.graph['problem_formulation']['mdao_architecture'] == self.OPTIONS_ARCHITECTURES[2]: - # noinspection PyUnboundLocalVariable - pre_desvars_order = self.sort_nodes_for_process(mg_function_ordering[self.FUNCTION_ROLES[3]]) - partitioned_nodes_order = [node for partition in best_partitions for node in partition] - # noinspection PyUnboundLocalVariable - constr_obj_order = self.sort_nodes_for_process(constr_obj_funcs) - function_order = pre_desvars_order + partitioned_nodes_order + constr_obj_order - else: - pre_coupled_order = self.sort_nodes_for_process(function_ordering[self.FUNCTION_ROLES[0]]) - partitioned_nodes_order = [node for partition in best_partitions for node in partition] - post_coupling_order = self.sort_nodes_for_process(function_ordering[self.FUNCTION_ROLES[2]]) - function_order = pre_coupled_order + partitioned_nodes_order + post_coupling_order - # Add partition id to the nodes for idx, partition in enumerate(best_partitions): for node in partition: @@ -3291,32 +3308,38 @@ class FundamentalProblemGraph(DataGraph, KeChainMixin): # Add partition to the input graph if 'problem_formulation' not in self.graph: self.graph['problem_formulation'] = dict() - self.graph['problem_formulation']['coupled_functions_groups'] = best_partitions self.graph['problem_formulation']['local_convergers'] = convergers self.graph['problem_formulation']['jacobi_convergence'] = [] self.graph['problem_formulation']['sequence_partitions'] = [] - if not node_selection: - self.graph['problem_formulation']['function_order'] = function_order return def get_partition_info(self, partitions, coupling_dict=None, use_runtime_info=False, local_convergers=False): - """ Function to get the number of feedback variables in the partitions and the number system variables (feedback - and feedforward variables between partitions) + """ Function to get information about the partitions. The functions returns: + + - Total number of connections to converge, divided in: + - Number of feedback connections in each partition + - Total number of cut edges due to the partitioning + - Estimation of the runtime for each partition :param partitions: list which indicates which nodes are in which partition :type partitions: list - :param coupling_dict: + :param coupling_dict: coupling dictionary indicating the input/output relations between the nodes :type coupling_dict: dict - :param use_runtime_info: + :param use_runtime_info: option to take the runtime information of the nodes into account when determining the + partitions :type use_runtime_info: bool - :param local_convergers: + :param local_convergers: option to add local convergers to the partitions if feedback within the partition exist :param local_convergers: bool - :return partition_variables: - :rtype partition_variables: list of sets - :return system_variables: - :rtype system_variables: set + :return total_connections: total number of connections to converge + :return partition_variables: number of feedback connections in each partition + :return system_variables: total number of cut edges due to the partitioning + :return run_time_partitions: runtime estimation for each partition + + ..note:: If a converger is present in a partition, the iterations are not taken into account in the runtime + estimation + """ # Get complete function order of nodes in the partitions @@ -3350,27 +3373,26 @@ class FundamentalProblemGraph(DataGraph, KeChainMixin): # Calculate runtime run_time_partitions = [] for partition, nodes in enumerate(partitions): + # If a local converger is present in the partition, nodes are divided in pre/post/coupled functions if local_convergers and self.check_for_coupling(nodes, only_feedback=True): - # Get runtime pre-coupled nodes - pre_coupling_nodes = [] - for idx, node in enumerate(nodes): - if not set(nodes[idx + 1:]).intersection(coupling_dict[node]): - pre_coupling_nodes = nodes[:idx + 1] - else: + pre_coupling_nodes, post_coupling_nodes = [], [] + idx1 = 0 + # Get pre-coupled nodes + for idx1, node in enumerate(nodes): + if set(nodes[idx1 + 1:]).intersection(coupling_dict[node]): + pre_coupling_nodes = nodes[:idx1] break - # Get runtime post-coupled nodes - nodes = [node for node in nodes if node not in pre_coupling_nodes] - input_nodes = [] - post_coupling_nodes = [] - coupled_nodes = list(nodes) - input_nodes.extend([node for node in coupling_dict[node] if node in nodes[idx:]] for idx, node in - enumerate(nodes)) - for idx, node in reversed(list(enumerate(nodes))): - if node not in input_nodes: - post_coupling_nodes = nodes[idx:] - coupled_nodes = nodes[:idx] - else: + # Get the nodes that provide feedback + feedback_nodes = [node for idx, func in enumerate(nodes) for node in coupling_dict[func] if node in + nodes[idx:]] + # Get post-coupling nodes + for idx2, node in reversed(list(enumerate(nodes[idx1:]))): + if node in feedback_nodes: + post_coupling_nodes = nodes[idx2 + 1:] break + # Get coupled nodes + coupled_nodes = [node for node in nodes if node not in pre_coupling_nodes + post_coupling_nodes] + # Calculate runtime (multiple runs of coupled nodes for convergence is not taken into account) run_time_partition = self.get_runtime_sequence(pre_coupling_nodes, coupling_dict=coupling_dict, use_runtime_info=use_runtime_info) + \ self.get_runtime_sequence(coupled_nodes, coupling_dict=coupling_dict, @@ -3384,63 +3406,139 @@ class FundamentalProblemGraph(DataGraph, KeChainMixin): return total_connections, partition_connections, system_connections, run_time_partitions - def select_number_of_partitions(self, partition_range, use_runtime_info=False, plot_pareto_front=False, - local_convergers=False, coupling_dict=None, rcb=1.0): + def select_number_of_partitions(self, partition_range='pareto', node_selection=None, use_runtime_info=False, + local_convergers=False, rcb_partitioning=0.0, rcb_order=1.0, coupling_dict=None, + plot_solutions=False): """ Function to evaluate the properties of different number of partitions and to select the best one. - :param partition_range: range of number of partitions that need to be evaluated - :type partition_range: list - :param include_run_time: - :type include_run_time: - :param plot_pareto_front: Option to plot the characteristics of different number of partitions - :type plot_pareto_front: bool - :return: + :param partition_range: range of number of partitions that need to be evaluated. If set to 'pareto', a pareto + front will be calculated. + :type partition_range: list or str + :param node_selection: option to give the nodes that need to be included in the partitions. If none are given, + the nodes in the partitions are selected based on the mdao architecture + :type node_selection: list + :param use_runtime_info: option to take the runtime information of the nodes into account when determining the + partitions + :type use_runtime_info: bool + :param local_convergers: option to add local convergers to the partitions if feedback within the partition exist + :type local_convergers: bool + :param rcb_partitioning: runtime-coupling balance for partitioning, relative importance between cut edges and + runtime while making the partitions. 1: min cut edges, 0: min runtime + :type rcb_partitioning: float + :param rcb_order: runtime-coupling balance for sequencing, relative importance between feedback and runtime + while determining the function order within the partitions. 1: min feedback, 0: min runtime + :type rcb_order: float + :param coupling_dict: coupling dictionary indicating the input/output relations between the nodes + :type coupling_dict: dict + :param plot_solutions: option to plot the characteristics of different number of partitions + :type plot_solutions: bool """ # Input assertions assert 'function_ordering' in self.graph['problem_formulation'], 'Function ordering is missing' - coupled_nodes = self.graph['problem_formulation']['function_ordering'][self.FUNCTION_ROLES[1]] + + # Get coupling dictionary + if not coupling_dict: + coupling_dict = self.get_coupling_dictionary() + + # Get the nodes that need to be partitioned + if node_selection: + nodes_to_partition = list(node_selection) + # For IDF, all functions in the optimizer loop are partitioned except for the objective and constraint functions + elif self.graph['problem_formulation']['mdao_architecture'] == self.OPTIONS_ARCHITECTURES[2]: + # Get post-design-variables, coupled and post-coupling functions + mg_function_ordering = self.get_mg_function_ordering() + post_des_vars = mg_function_ordering[self.FUNCTION_ROLES[4]] + post_couplings = mg_function_ordering[self.FUNCTION_ROLES[2]] + coupled_nodes = mg_function_ordering[self.FUNCTION_ROLES[1]] + # Get objective and constraint functions + constr_obj_vars = self.find_all_nodes(attr_cond=['problem_role', '==', self.PROBLEM_ROLES_VARS[2]]) + \ + self.find_all_nodes(attr_cond=['problem_role', '==', self.PROBLEM_ROLES_VARS[1]]) + constr_obj_funcs = [self.get_sources(variable)[0] for variable in constr_obj_vars] + # Get the nodes that need to be partitioned + nodes_to_partition = [node for node in post_des_vars if node not in constr_obj_funcs] + coupled_nodes + \ + [node for node in post_couplings if node not in constr_obj_funcs] + # When a converger is present or no system architecture selected, only the coupled functions are partitioned + else: + function_ordering = self.graph['problem_formulation']['function_ordering'] + coupled_nodes = function_ordering[self.FUNCTION_ROLES[1]] + nodes_to_partition = coupled_nodes + + # Check runtime if use_runtime_info: - for node in coupled_nodes: + for node in nodes_to_partition: assert 'run_time' in self.nodes[node]['performance_info'], 'Run time missing for function ' \ '{}'.format(node) - partition_info = [] - partition_results = dict() + # Get the partition range if a pareto front is needed + if partition_range == 'pareto': + pareto = True + partition_range = range(len(nodes_to_partition) + 1)[1:] + else: + pareto = False - if not coupling_dict: - coupling_dict = self.get_coupling_dictionary() + # Initialize variables + partition_info, partition_results = [], [] + # Partition graph for idx, n_partitions in enumerate(partition_range): - - # Partition graph graph = self.deepcopy() - print 'Number of partitions: ', n_partitions - graph.partition_graph(n_partitions, coupling_dict=coupling_dict, use_runtime_info=use_runtime_info, - local_convergers=local_convergers, rcb=rcb) - partitions = graph.graph['problem_formulation']['coupled_functions_groups'] - local_convergers = graph.graph['problem_formulation']['local_convergers'] - - # Evaluate graph - total_var, partition_variables, system_variables, runtime = graph.get_partition_info(partitions, - use_runtime_info=use_runtime_info, - coupling_dict=coupling_dict, - local_convergers=local_convergers) + logger.info('Calculating the solution for {} partitions'.format(n_partitions)) + + if n_partitions == 1: + # Get function order + partitions = graph.minimize_feedback(nodes_to_partition, 'hybrid-swap', rcb=rcb_order, + coupling_dict=coupling_dict, use_runtime_info=use_runtime_info) + partition_variables, runtime = graph.get_feedback_info(partitions, coupling_dict=coupling_dict, + use_runtime_info=use_runtime_info) + # Save information + partition_info.append([idx, n_partitions, [partition_variables], 0, partition_variables, runtime]) + partition_results.append([runtime, partition_variables, idx, partitions, []]) + else: + graph.partition_graph(n_partitions, node_selection=nodes_to_partition, coupling_dict=coupling_dict, + use_runtime_info=use_runtime_info, local_convergers=local_convergers, + rcb_partitioning=rcb_partitioning, rcb_order=rcb_order) + partitions = graph.graph['problem_formulation']['coupled_functions_groups'] + local_convs = graph.graph['problem_formulation']['local_convergers'] + + # Evaluate graph + total_var, partition_variables, system_variables, runtime = graph.get_partition_info( + partitions, use_runtime_info=use_runtime_info, coupling_dict=coupling_dict, + local_convergers=local_convergers) - # Save partition information - partition_info.append([idx, n_partitions, partition_variables, system_variables, total_var, max(runtime)]) - partition_results[n_partitions] = dict() - partition_results[n_partitions]['partitions'] = partitions - partition_results[n_partitions]['local_convergers'] = local_convergers - partition_results[n_partitions]['function_order'] = graph.graph['problem_formulation']['function_order'] + # Get number of partitions (Metis can return less partitions in case the number of partitions is close + # to the number of nodes in the partitions) + n_parts = len(partitions) + + # Save partition information + partition_info.append([idx, n_parts, partition_variables, system_variables, total_var, + max(runtime)]) + partition_results.append([max(runtime), total_var, idx, partitions, local_convs]) + + # If pareto front, get optimal results + if pareto: + # Obtain pareto front + sorted_solutions = sorted(partition_results) + pareto_front = [sorted_solutions[0]] + for solution in sorted_solutions: + if solution[1] < pareto_front[-1][1] and solution[0] != pareto_front[-1][0]: + pareto_front.append(solution) + accepted_solutions = [solution[2] for solution in pareto_front] + # Get optimal solutions + partition_info = [result for result in partition_info if result[0] in accepted_solutions] + partition_results = list(pareto_front) + partition_results.reverse() + for idx, solution in enumerate(partition_info): + solution[0] = idx + partition_range = [solution[1] for solution in partition_info] # Print partition information in table - header = ['Option', '# partitions', '# feedback in partitions', '# system variables', 'Total # variables', + header = ['Option', '# partitions', '# feedback in partitions', '# system connections', 'Total # connections', 'Runtime'] printing.print_in_table(partition_info, headers=header) # Show the options in a graph - if plot_pareto_front: + if plot_solutions: from matplotlib.ticker import MaxNLocator fig, ax = plt.subplots() plt_x, plt_y, txt = [], [], [] @@ -3464,83 +3562,105 @@ class FundamentalProblemGraph(DataGraph, KeChainMixin): idx = partition_range.index(int(sel[0])) # Get result - self.graph['problem_formulation']['coupled_functions_groups'] = partition_results[partition_range[idx]][ - 'partitions'] - self.graph['problem_formulation']['local_convergers'] = partition_results[partition_range[idx]][ - 'local_convergers'] - self.graph['problem_formulation']['jacobi_convergence'] = [] - self.graph['problem_formulation']['sequence_partitions'] = [] - self.graph['problem_formulation']['function_order'] = partition_results[partition_range[idx]]['function_order'] + if partition_range[idx] != 1: + + # Add partition id to the nodes + for part_nr, partition in enumerate(partition_results[idx][3]): + for node in partition: + self.nodes[node]['partition_id'] = part_nr + + # Add partition to the input graph + self.graph['problem_formulation']['coupled_functions_groups'] = partition_results[idx][3] + self.graph['problem_formulation']['local_convergers'] = partition_results[idx][4] + self.graph['problem_formulation']['jacobi_convergence'] = [] + self.graph['problem_formulation']['sequence_partitions'] = [] return - def select_distributed_architecture(self): - """ Function for easy selection of a distributed architecture for a partitioned graph. + def change_settings_distributed_convergence(self): + """ Function to change the settings when the distributed convergence is set as architecture. - :return: Extended problem formulation + Options: + - Add or remove a local converger for each partition + - Set the convergence method within each partition (Gauss-Seidel (default) or Jacobi) + - Let partitions run in sequence instead of parallel """ - # Check if graph is partitioned - assert 'coupled_functions_groups' in self.graph['problem_formulation'], 'Graph is not partitioned. ' \ - 'Distributed architecture is not ' \ - 'possible' + # Input assertions + assert 'coupled_functions_groups' in self.graph['problem_formulation'], 'Graph is not yet partitioned' assert len(self.graph['problem_formulation']['coupled_functions_groups']) > 1, 'Graph must have at least two ' \ - 'partitions to select a ' \ - 'distributed architecture' + 'partitions' - # Get graph info + # Get graph information partitions = self.graph['problem_formulation']['coupled_functions_groups'] - coupling_dict = self.get_coupling_dictionary() - # Select which partitions need a local converger - msg = 'Please select which partitions need a local converger:' + # Change the local convergers + # Check which partitions have feedback and can have a local converger + possible_convergers = [] + for idx, partition in enumerate(partitions): + if not nx.is_directed_acyclic_graph(self.get_subgraph_by_function_nodes(partition)): + possible_convergers.append(idx) + + msg = 'Please select which partitions need a local converger (options: {}):'.format(possible_convergers) while True: local_convergers = prompting.user_prompt_select_options(*range(len(partitions)), allow_empty=True, allow_multi=True, message=msg) - # Check if feedback exists in the chosen partitions - valid_input = True - for converger in local_convergers: - feedback = False - for idx, node in enumerate(partitions[converger]): - if set(partitions[converger][idx+1:]).intersection(coupling_dict[node]): - feedback = True - if not feedback: - print 'Partition {} does not contain feedback and therefore it cannot have a local ' \ - 'converger'.format(converger) - valid_input = False - if not valid_input: - continue - break + # Check whether a valid input is given + if all(converger in possible_convergers for converger in local_convergers): + break + else: + print 'Partitions {} do not contain feedback and therefore they cannot have a local ' \ + 'converger'.format(list(set(local_convergers).symmetric_difference(possible_convergers) - + set(possible_convergers))) - # Select which partitions needs to be solved using the Jacobi method instead of Gauss-Seidel - msg = 'Please select which partitions must be solved using a Jacobi convergence instead of Gauss-Seidel' + # Change convergence method of each partition + msg = 'Please select which partitions must be solved using a Jacobi convergence:' jacobi_convergence = prompting.user_prompt_select_options(*range(len(partitions)), allow_empty=True, allow_multi=True, message=msg) - # Select which partitions must be executed in sequence # TODO: add more checks - msg = 'Please select which partitions must be run in sequence (e.g. [[1, 2], [3, 4]])' + # Select which partitions must be executed in sequence + msg = 'Please select which partitions must be run in sequence (e.g. [[0, 1], [2, 3]]):' while True: valid_input = True sequence_partitions = prompting.user_prompt_string(allow_empty=True, message=msg) sequence_partitions = eval(sequence_partitions) if sequence_partitions else [] - sequence_partitions = [sequence_partitions] if not any(isinstance(el, list) for el in sequence_partitions) \ + sequence_partitions = [sequence_partitions] if not any( + isinstance(el, list) for el in sequence_partitions) \ else sequence_partitions - if not isinstance(sequence_partitions, list) or not any(isinstance(el, list) for el in sequence_partitions): + # Do some checks to see whether a valid input is given + if not isinstance(sequence_partitions, list) or not any( + isinstance(el, list) for el in sequence_partitions): print 'Input should be a list or list of lists' valid_input = False + unique_parts = set() + n_parts = 0 for sequence in sequence_partitions: - for element in sequence: + for idx, element in enumerate(sequence): if not isinstance(element, int): - print 'Invalid input given' + print 'Only integers are allowed!' + valid_input = False + if idx != 0 and sequence[idx] < sequence[idx - 1]: + print 'Partitions must be in increasing order' valid_input = False + if element not in range(len(partitions)): + print 'Partition {} is not present in the graph. Existing partitions are: {}'.format( + element, range(len(partitions))) + valid_input = False + unique_parts.update(set(sequence)) + n_parts += len(sequence) + if not len(unique_parts) == n_parts: + print 'Each partition can only occur once in the list' + valid_input = False + # Ask for new input if the given input is not valid if not valid_input: continue break - # todo: remove or improve - print 'local converger:', local_convergers - print 'jacobi convergence:', jacobi_convergence - print 'sequence partitions:', sequence_partitions + # Print summary of the selected options + print 'Selected options:' + print 'Local convergers:', local_convergers + print 'Jacobi convergence:', jacobi_convergence + print 'Sequences of partitions:', sequence_partitions self.graph['problem_formulation']['local_convergers'] = local_convergers self.graph['problem_formulation']['jacobi_convergence'] = jacobi_convergence diff --git a/kadmos/graph/graph_process.py b/kadmos/graph/graph_process.py index 761ba7379f108e6fe804384a35f1d0f108af764d..a058dcae9c465d912efa69485d4fb5d61cd19749 100644 --- a/kadmos/graph/graph_process.py +++ b/kadmos/graph/graph_process.py @@ -392,8 +392,8 @@ class MdaoProcessGraph(ProcessGraph): convs = self.find_all_nodes( attr_cond=['architecture_role', '==', self.ARCHITECTURE_ROLES_FUNS[2]]) # converger if len(convs) >= 1: - sys_conv = [item for item in convs if - self.SUBSYS_SUFFIX in item and self.SUBSYS_SUFFIX + str(idx) in item] + sys_conv = [item for item in convs if self.SUBSYS_PREFIX + self.CONVERGER_STRING + + self.SUBSYS_SUFFIX + str(idx) == item] assert len(sys_conv) <= 1, '{} subsystem convergers found, max one expected'.format(len(sys_conv)) convs = sys_conv diagonal_order.extend(convs) # converger