From 278fea26d37a080996f261721b889d433b64ee1c Mon Sep 17 00:00:00 2001 From: Anne-Liza <a.m.r.m.bruggeman@student.tudelft.nl> Date: Tue, 7 Aug 2018 11:46:08 +0200 Subject: [PATCH] Added balancing factor to partitioning algorithm Former-commit-id: c5f1cd10035853080b0d58674737c91fb3bea8fc --- kadmos/graph/graph_data.py | 254 ++++++++++++++++++++----------------- 1 file changed, 137 insertions(+), 117 deletions(-) diff --git a/kadmos/graph/graph_data.py b/kadmos/graph/graph_data.py index bc2ee86ee..ca1fb9b03 100644 --- a/kadmos/graph/graph_data.py +++ b/kadmos/graph/graph_data.py @@ -3069,6 +3069,7 @@ class FundamentalProblemGraph(DataGraph, KeChainMixin): # Input assertions if not node_selection: assert 'function_ordering' in self.graph['problem_formulation'], 'Function ordering is missing' + assert n_parts > 1, 'Number of partitions must be greater than 1' # Get coupling dictionary if not coupling_dict: @@ -3100,11 +3101,13 @@ class FundamentalProblemGraph(DataGraph, KeChainMixin): coupled_nodes = function_ordering[self.FUNCTION_ROLES[1]] nodes_to_partition = coupled_nodes - # Check runtime + # Check runtime and number of nodes if use_runtime_info: for node in nodes_to_partition: assert 'run_time' in self.nodes[node]['performance_info'], 'Run time missing for function ' \ '{}'.format(node) + assert len(nodes_to_partition) >= n_parts, 'Number of partitions ({}) exceeds number of nodes ({})'.format( + n_parts, len(nodes_to_partition)) # Get initial function graph of the nodes that need to be partitioned subgraph = self.get_subgraph_by_function_nodes(nodes_to_partition) @@ -3122,129 +3125,146 @@ class FundamentalProblemGraph(DataGraph, KeChainMixin): number_of_iterations_not_improved = 0 function_graph = initial_function_graph.deepcopy() - while True: - # Combine coupling strengths of feedforward and feedback connections between two nodes to get an undirected - # graph with the correct edge weights - remove_edges = [] - for edge in function_graph.edges(): - if (edge[0], edge[1]) in remove_edges: - continue - elif (edge[1], edge[0]) in function_graph.edges(): - function_graph.edges[edge[0], edge[1]]['coupling_strength'] += function_graph.edges[ - edge[1], edge[0]]['coupling_strength'] - remove_edges.append((edge[1], edge[0])) - for edge in remove_edges: - function_graph.remove_edge(edge[0], edge[1]) - - # Get undirected graph - g_und = function_graph.to_undirected() - - # Add runtime to the nodes of the undirected graph for metis - for node in g_und.nodes(): - g_und.nodes[node]['run_time'] = g_und.nodes[node]['performance_info']['run_time'] if \ - use_runtime_info else 1 - - # Set the runtime as node weight and the coupling strength as edge weight for metis - g_und.graph['node_weight_attr'] = 'run_time' - g_und.graph['edge_weight_attr'] = 'coupling_strength' - - # Partition graph using metis - (edgecuts, parts) = metis.part_graph(g_und, n_parts, tpwgts=tpwgts, recursive=recursive, contig=contig) - - # Create a list with the nodes in each partition - partitions = [] - for part in range(n_parts): - - # Get nodes in this partition - nodes = [] - for idx, node in enumerate(g_und.nodes): - if parts[idx] == part: - nodes.extend(node.split('--') if '--' in node else [node]) - - # Minimize feedback within the partition - if not nodes: - logger.warning('Metis returned less than {} partitions. Some partitions will be empty'.format( - n_parts)) - else: - nodes = self.get_possible_function_order('single-swap', node_selection=nodes, rcb=rcb, - coupling_dict=coupling_dict, - use_runtime_info=use_runtime_info) if local_convergers \ - else self.minimize_feedback(nodes, 'single-swap', rcb=rcb, coupling_dict=coupling_dict, - use_runtime_info=use_runtime_info) - - # Add nodes to the partition list - partitions.append(nodes) - - # Evaluate the properties of the partitioning - partition_variables, system_variables, runtime = subgraph.get_partition_info( - partitions, coupling_dict=coupling_dict, use_runtime_info=use_runtime_info, - local_convergers=local_convergers) - n_variables = system_variables + sum(partition_variables) - - # Decide whether new solution is better than the best solution found so far - f = rcb * (n_variables / float(total_couplings)) + (1 - rcb) * (max(runtime) / float(total_time)) - if (n_variables == min_variables and max(runtime) < min_time) or \ - (max(runtime) == min_time and n_variables < min_variables) or (f < min_f): - best_partitions, min_f, min_variables, min_time = partitions, f, n_variables, max(runtime) - number_of_iterations_not_improved = 0 + # Calculate maximum load imbalance based on the objective + if rcb == 0: + ufactor = 1 + else: + if use_runtime_info: + runtimes = [self.nodes[node]['performance_info']['run_time'] for node in nodes_to_partition] + lowest_runtimes = sorted(runtimes)[:n_parts - 1] + max_runtime_part = total_time - sum(lowest_runtimes) else: - number_of_iterations_not_improved += 1 + max_runtime_part = total_time - (n_parts - 1) + max_load_imbalance = max_runtime_part / (total_time / float(n_parts)) + ufactor = 1 if max_load_imbalance == 1.0 else int((max_load_imbalance - 1.0) * 1000 * rcb) - # If the third iteration does not give an improvement the iterations are stopped - if number_of_iterations_not_improved > 2: - break + if len(nodes_to_partition) == n_parts: + best_partitions = [[node] for node in nodes_to_partition] + else: + while True: + # Combine coupling strengths of feedforward and feedback connections between two nodes to get an + # undirected graph with the correct edge weights + remove_edges = [] + for edge in function_graph.edges(): + if (edge[0], edge[1]) in remove_edges: + continue + elif (edge[1], edge[0]) in function_graph.edges(): + function_graph.edges[edge[0], edge[1]]['coupling_strength'] += function_graph.edges[ + edge[1], edge[0]]['coupling_strength'] + remove_edges.append((edge[1], edge[0])) + for edge in remove_edges: + function_graph.remove_edge(edge[0], edge[1]) + + # Get undirected graph + g_und = function_graph.to_undirected() + + # Add runtime to the nodes of the undirected graph for metis + for node in g_und.nodes(): + g_und.nodes[node]['run_time'] = g_und.nodes[node]['performance_info']['run_time'] if \ + use_runtime_info else 1 + + # Set the runtime as node weight and the coupling strength as edge weight for metis + g_und.graph['node_weight_attr'] = 'run_time' + g_und.graph['edge_weight_attr'] = 'coupling_strength' + + # Partition graph using metis + while True: + (edgecuts, parts) = metis.part_graph(g_und, n_parts, tpwgts=tpwgts, recursive=recursive, + contig=contig, ufactor=ufactor) + if len(set(parts)) != n_parts and ufactor != 1: + ufactor = 1 if ufactor < 101 else ufactor - 100 + logger.warning('Metis returned less than {} partitions. Maximum unbalance factor will be ' + 'changed'.format(n_parts)) + continue + else: + break - # Merge the nodes that can be merged based on process - function_graph = initial_function_graph.deepcopy() - for partition in partitions: - nodes = list(partition) - while nodes: - merge_nodes, run_times = [], [] - for idx, node in enumerate(nodes): - # If the nodes before the current node do not supply input to the current node, the nodes can - # be merged - if not set(nodes[:idx]).intersection(coupling_dict[node]): - merge_nodes.append(node) - run_times.append(self.nodes[node]['performance_info']['run_time'] if use_runtime_info else - 1) + # Create a list with the nodes in each partition + partitions = [] + for part in range(n_parts): + # Get nodes in this partition + nodes = [] + for idx, node in enumerate(g_und.nodes): + if parts[idx] == part: + nodes.extend(node.split('--') if '--' in node else [node]) + # Minimize feedback within the partition + if not nodes: + logger.warning('Metis returned less than {} partitions. Some partitions will be empty'.format( + n_parts)) + else: + if local_convergers: + nodes = self.get_possible_function_order('single-swap', node_selection=nodes, rcb=1, + coupling_dict=coupling_dict, + use_runtime_info=use_runtime_info) else: - break - if len(merge_nodes) > 1: - new_node_label = '--'.join(merge_nodes) - try: + nodes = self.minimize_feedback(nodes, 'single-swap', rcb=1, coupling_dict=coupling_dict, + use_runtime_info=use_runtime_info) + # Add nodes to the partition list + partitions.append(nodes) + + # Evaluate the properties of the partitioning + n_variables, partition_variables, system_variables, runtime = subgraph.get_partition_info( + partitions, coupling_dict=coupling_dict, use_runtime_info=use_runtime_info, + local_convergers=local_convergers) + + # Decide whether new solution is better than the best solution found so far + f = rcb * (n_variables / float(total_couplings)) + (1 - rcb) * (max(runtime) / float(total_time)) + if (n_variables == min_variables and max(runtime) < min_time) or \ + (max(runtime) == min_time and n_variables < min_variables) or (f < min_f): + best_partitions, min_f, min_variables, min_time = partitions, f, n_variables, max(runtime) + number_of_iterations_not_improved = 0 + else: + number_of_iterations_not_improved += 1 + + # If the third iteration does not give an improvement the iterations are stopped + if number_of_iterations_not_improved > 2: + break + + # Merge the nodes that can be merged based on process + function_graph = initial_function_graph.deepcopy() + for partition in partitions: + nodes = list(partition) + while nodes: + merge_nodes, run_times = [], [] + for idx, node in enumerate(nodes): + # If the nodes before the current node do not supply input to the current node, the nodes + # can be merged + if not set(nodes[:idx]).intersection(coupling_dict[node]): + merge_nodes.append(node) + run_times.append(self.nodes[node]['performance_info']['run_time'] if use_runtime_info + else 1) + else: + break + # Merge the nodes only when the resulting number of nodes is still enough to get the required + # number of partitions + if len(merge_nodes) > 1 and (nx.number_of_nodes(function_graph) - len(merge_nodes) + 1) >= \ + n_parts: + new_node_label = '--'.join(merge_nodes) function_graph = function_graph.merge_parallel_functions(merge_nodes, new_label=new_node_label) function_graph.nodes[new_node_label]['performance_info'] = {'run_time': max(run_times)} - except AssertionError: - pass - for node in merge_nodes: - nodes.pop(nodes.index(node)) - - # Get correct coupling strengths in case merged nodes exist in the graph - for node1 in function_graph.nodes(): - for node2 in function_graph.nodes(): - coupling_strength = 0 - source_nodes = node1.split('--') if '--' in node1 else [node1] - target_nodes = node2.split('--') if '--' in node2 else [node2] - for source in source_nodes: - for target in target_nodes: - if (source, target) in initial_function_graph.edges(): - coupling_strength += initial_function_graph.edges[source, target][ - 'coupling_strength'] - if coupling_strength != 0: - function_graph.edges[node1, node2]['coupling_strength'] = coupling_strength + for node in merge_nodes: + nodes.pop(nodes.index(node)) + + # Get correct coupling strengths in case merged nodes exist in the graph + for node1 in function_graph.nodes(): + for node2 in function_graph.nodes(): + coupling_strength = 0 + source_nodes = node1.split('--') if '--' in node1 else [node1] + target_nodes = node2.split('--') if '--' in node2 else [node2] + for source in source_nodes: + for target in target_nodes: + if (source, target) in initial_function_graph.edges(): + coupling_strength += initial_function_graph.edges[source, target][ + 'coupling_strength'] + if coupling_strength != 0: + function_graph.edges[node1, node2]['coupling_strength'] = coupling_strength # Add local convergers if there are feedback loops in the partitions convergers = [] if local_convergers: for part_nr, partition in enumerate(best_partitions): - converger = False - for idx, node in enumerate(partition): - if not set(partition[idx:]).intersection(coupling_dict[node]): - continue - else: - converger = True - if converger: + if self.check_for_coupling(partition, only_feedback=True): convergers.append(part_nr) # Update the function order @@ -3264,7 +3284,7 @@ class FundamentalProblemGraph(DataGraph, KeChainMixin): function_order = pre_coupled_order + partitioned_nodes_order + post_coupling_order # Add partition to the input graph - if not 'problem_formulation' in self.graph: + if 'problem_formulation' not in self.graph: self.graph['problem_formulation'] = dict() self.graph['problem_formulation']['coupled_functions_groups'] = best_partitions @@ -3320,6 +3340,7 @@ class FundamentalProblemGraph(DataGraph, KeChainMixin): elif source in function_order and source not in nodes: system_connections += coupling_dict[target][source] partition_connections.append(partition_feedback) + total_connections = system_connections + sum(partition_connections) # Calculate runtime run_time_partitions = [] @@ -3356,7 +3377,7 @@ class FundamentalProblemGraph(DataGraph, KeChainMixin): use_runtime_info=use_runtime_info) run_time_partitions.append(run_time_partition) - return partition_connections, system_connections, run_time_partitions + return total_connections, partition_connections, system_connections, run_time_partitions def select_number_of_partitions(self, partition_range, use_runtime_info=False, plot_pareto_front=False, local_convergers=False, coupling_dict=None, rcb=1.0): @@ -3396,11 +3417,10 @@ class FundamentalProblemGraph(DataGraph, KeChainMixin): local_convergers = graph.graph['problem_formulation']['local_convergers'] # Evaluate graph - partition_variables, system_variables, runtime = graph.get_partition_info(partitions, + total_var, partition_variables, system_variables, runtime = graph.get_partition_info(partitions, use_runtime_info=use_runtime_info, coupling_dict=coupling_dict, local_convergers=local_convergers) - total_var = system_variables + sum(partition_variables) # Save partition information partition_info.append([idx, n_partitions, partition_variables, system_variables, total_var, max(runtime)]) @@ -3433,7 +3453,7 @@ class FundamentalProblemGraph(DataGraph, KeChainMixin): plt.show() # Select the number of partitions - selmsg = 'Please select number of partitions:' + selmsg = 'Please select the desired option:' sel = prompting.user_prompt_select_options(*partition_range, message=selmsg, allow_empty=False, allow_multi=False) idx = partition_range.index(int(sel[0])) -- GitLab