Source code for discrete_optimization.vrp.solver.lp_vrp_iterative

#  Copyright (c) 2022 AIRBUS and its affiliates.
#  This source code is licensed under the MIT license found in the
#  LICENSE file in the root directory of this source tree.

from __future__ import annotations

import logging
import random
from copy import deepcopy
from typing import (
    Any,
    Callable,
    Dict,
    Iterable,
    List,
    Optional,
    Sequence,
    Set,
    Tuple,
    Union,
)

import matplotlib.pyplot as plt
import networkx as nx
import numpy as np

from discrete_optimization.generic_tools.do_solver import ResultStorage
from discrete_optimization.vrp.solver.vrp_solver import SolverVrp
from discrete_optimization.vrp.vrp_model import (
    Customer2D,
    VrpProblem,
    VrpProblem2D,
    VrpSolution,
    compute_length,
    length,
)

try:
    import gurobipy
except ImportError:
    gurobi_available = False
else:
    gurobi_available = True
    from gurobipy import GRB, Model, Var, quicksum


logger = logging.getLogger(__name__)

Node = Tuple[int, int]
Edge = Tuple[Node, Node]


[docs] def build_matrice_distance_np( customer_count: int, customers: Sequence[Customer2D] ) -> Tuple[np.ndarray, np.ndarray]: matrix_x = np.ones((customer_count, customer_count), dtype=np.int_) matrix_y = np.ones((customer_count, customer_count), dtype=np.int_) for i in range(customer_count): matrix_x[i, :] *= int(customers[i].x) matrix_y[i, :] *= int(customers[i].y) matrix_x = matrix_x - np.transpose(matrix_x) matrix_y = matrix_y - np.transpose(matrix_y) distances = np.abs(matrix_x) + np.abs(matrix_y) sorted_distance = np.argsort(distances, axis=1) return sorted_distance, distances
[docs] def build_graph_pruned_vrp( vrp_problem: VrpProblem2D, ) -> Tuple[ nx.DiGraph, nx.DiGraph, Dict[int, Set[Edge]], Dict[int, Set[Edge]], Dict[Node, Set[Edge]], Dict[Node, Set[Edge]], ]: customer_count = vrp_problem.customer_count customers = vrp_problem.customers vehicle_count = vrp_problem.vehicle_count sd, d = build_matrice_distance_np(customer_count, customers) g = nx.DiGraph() g.add_nodes_from( [(v, i) for i in range(customer_count) for v in range(vehicle_count)] ) shape = sd.shape[0] edges_in_customers: Dict[int, Set[Edge]] = {i: set() for i in range(customer_count)} edges_out_customers: Dict[int, Set[Edge]] = { i: set() for i in range(customer_count) } edges_in_merged_graph: Dict[Node, Set[Edge]] = {node: set() for node in g.nodes()} edges_out_merged_graph: Dict[Node, Set[Edge]] = {node: set() for node in g.nodes()} for i in range(shape): nodes_to_add: Iterable[int] = sd[i, 1:10] for n in nodes_to_add: for v in range(vehicle_count): if n == i: continue node_1 = (v, i) node_2 = (v, n) g.add_edge( node_1, node_2, weight=length(customers[i], customers[n]), demand=customers[n].demand, ) g.add_edge( node_2, node_1, weight=length(customers[i], customers[n]), demand=customers[i].demand, ) edges_in_merged_graph[node_2].add((node_1, node_2)) edges_out_merged_graph[node_1].add((node_1, node_2)) edges_in_merged_graph[node_1].add((node_2, node_1)) edges_out_merged_graph[node_2].add((node_2, node_1)) edges_in_customers[n].add((node_1, node_2)) edges_out_customers[i].add((node_1, node_2)) edges_in_customers[i].add((node_2, node_1)) edges_out_customers[n].add((node_2, node_1)) nodes_to_add = range(i, min(i + 5, customer_count)) for n in nodes_to_add: for v in range(vehicle_count): if n == i: continue node_1 = (v, i) node_2 = (v, n) g.add_edge( node_1, node_2, weight=length(customers[i], customers[n]), demand=customers[n].demand, ) g.add_edge( node_2, node_1, weight=length(customers[i], customers[n]), demand=customers[i].demand, ) edges_in_merged_graph[node_2].add((node_1, node_2)) edges_out_merged_graph[node_1].add((node_1, node_2)) edges_in_merged_graph[node_1].add((node_2, node_1)) edges_out_merged_graph[node_2].add((node_2, node_1)) edges_in_customers[n].add((node_1, node_2)) edges_out_customers[i].add((node_1, node_2)) edges_in_customers[i].add((node_2, node_1)) edges_out_customers[n].add((node_2, node_1)) for v in range(vehicle_count): nodes_to_add = [vrp_problem.start_indexes[v], vrp_problem.end_indexes[v]] for n in nodes_to_add: if n == i: continue node_1 = (v, i) node_2 = (v, n) g.add_edge( node_1, node_2, weight=length(customers[i], customers[n]), demand=customers[n].demand, ) g.add_edge( node_2, node_1, weight=length(customers[i], customers[n]), demand=customers[i].demand, ) edges_in_merged_graph[node_2].add((node_1, node_2)) edges_out_merged_graph[node_1].add((node_1, node_2)) edges_in_merged_graph[node_1].add((node_2, node_1)) edges_out_merged_graph[node_2].add((node_2, node_1)) edges_in_customers[n].add((node_1, node_2)) edges_out_customers[i].add((node_1, node_2)) edges_in_customers[i].add((node_2, node_1)) edges_out_customers[n].add((node_2, node_1)) g_empty = nx.DiGraph() g_empty.add_nodes_from( [(v, i) for i in range(customer_count) for v in range(vehicle_count)] ) return ( g, g_empty, edges_in_customers, edges_out_customers, edges_in_merged_graph, edges_out_merged_graph, )
[docs] def compute_start_end_flows_info( start_indexes: List[int], end_indexes: List[int] ) -> Tuple[Dict[int, Dict[str, Any]], Dict[int, Dict[str, Any]]]: start_to_i: Dict[int, Dict[str, Any]] = {} end_to_i: Dict[int, Dict[str, Any]] = {} for i in range(len(start_indexes)): s = start_indexes[i] e = end_indexes[i] if s not in start_to_i: start_to_i[s] = {"nb": 0, "vehicle": set()} if e not in end_to_i: end_to_i[e] = {"nb": 0, "vehicle": set()} start_to_i[s]["nb"] += 1 start_to_i[s]["vehicle"].add(i) end_to_i[e]["nb"] += 1 end_to_i[e]["vehicle"].add(i) return start_to_i, end_to_i
[docs] def init_model_lp( g: nx.DiGraph, edges: Set[Edge], edges_in_customers: Dict[int, Set[Edge]], edges_out_customers: Dict[int, Set[Edge]], edges_in_merged_graph: Dict[Node, Set[Edge]], edges_out_merged_graph: Dict[Node, Set[Edge]], edges_warm_set: Set[Edge], fraction: float, start_indexes: List[int], end_indexes: List[int], vehicle_count: int, vehicle_capacity: List[float], do_lns: bool = False, include_backward: bool = True, include_triangle: bool = False, ) -> Tuple[ "Model", Dict[Edge, "Var"], Dict[Union[str, int], Any], Dict[str, Any], Dict[int, Any], ]: tsp_model = Model("VRP-master") x_var: Dict[Edge, Var] = {} # decision variables on edges constraint_on_edge: Dict[int, Any] = {} edges_to_constraint: Set[Edge] = set() if do_lns: edges_to_constraint = set( random.sample(list(edges), int(fraction * len(edges))) ) for iedge in constraint_on_edge: tsp_model.remove(constraint_on_edge[iedge]) iedge = 0 for e in edges: x_var[e] = tsp_model.addVar( vtype=GRB.BINARY, obj=g[e[0]][e[1]]["weight"], name="x_" + str(e) ) val = 0 if e in edges_warm_set: x_var[e].start = 1 x_var[e].varhintval = 1 val = 1 else: x_var[e].start = 0 x_var[e].varhintval = 0 if e in edges_to_constraint: constraint_on_edge[iedge] = tsp_model.addLConstr( x_var[e] == val, name=str((e, iedge)) ) iedge += 1 constraint_tour_2length: Dict[int, Any] = {} cnt_tour = 0 if include_backward: for edge in edges: if (edge[1], edge[0]) in edges: if (edge[1], edge[0]) == edge: continue if ( edge[0][1] == start_indexes[edge[0][0]] or edge[1][1] == start_indexes[edge[0][0]] ): continue if ( edge[0][1] == end_indexes[edge[0][0]] or edge[1][1] == end_indexes[edge[0][0]] ): continue constraint_tour_2length[cnt_tour] = tsp_model.addLConstr( x_var[edge] + x_var[(edge[1], edge[0])] <= 1, name="tour_" + str(edge), ) cnt_tour += 1 if include_triangle: constraint_triangle: Dict[int, Any] = {} for node in g.nodes(): neigh = set([n for n in nx.neighbors(g, node)]) neigh_2 = { nn: neigh.intersection([n for n in nx.neighbors(g, nn)]) for nn in neigh } for node_neigh in neigh_2: if len(neigh_2[node_neigh]) >= 1: for node_neigh_neigh in neigh_2[node_neigh]: constraint_triangle[cnt_tour] = tsp_model.addLConstr( x_var[(node, node_neigh)] + x_var[(node_neigh, node_neigh_neigh)] + x_var[(node_neigh_neigh, node)] <= 2 ) tsp_model.update() constraint_flow_in: Dict[Union[str, int], Any] = {} constraint_flow_out: Dict[str, Any] = {} start_to_i, end_to_i = compute_start_end_flows_info(start_indexes, end_indexes) for s in start_to_i: for vehicle in start_to_i[s]["vehicle"]: constraint_flow_out["start_v_" + str(vehicle)] = tsp_model.addLConstr( quicksum( [x_var[e] for e in edges_out_customers[s] if e[0][0] == vehicle] ) == 1, name="start_v_" + str(vehicle), ) for s in end_to_i: for vehicle in end_to_i[s]["vehicle"]: constraint_flow_in["end_v_" + str(vehicle)] = tsp_model.addLConstr( quicksum( [x_var[e] for e in edges_in_customers[s] if e[0][0] == vehicle] ) == 1, name="end_v_" + str(vehicle), ) for customer in edges_in_customers: if customer in end_to_i or customer in start_to_i: # Already dealt by previous constraints continue else: constraint_flow_in[customer] = tsp_model.addLConstr( quicksum([x_var[e] for e in edges_in_customers[customer]]) == 1, name="in_" + str(customer), ) c_flow = {} for n in edges_in_merged_graph: if start_indexes[n[0]] == end_indexes[n[0]] or n[1] not in [ start_indexes[n[0]], end_indexes[n[0]], ]: c_flow[n] = tsp_model.addLConstr( quicksum( [x_var[e] for e in edges_in_merged_graph[n]] + [-x_var[e] for e in edges_out_merged_graph[n]] ) == 0, name="flow_" + str(n), ) for v in range(vehicle_count): tsp_model.addLConstr( quicksum( [g[e[0]][e[1]]["demand"] * x_var[e] for e in edges if e[0][0] == v] ) <= vehicle_capacity[v], name="capa_" + str(v), ) tsp_model.setParam("TimeLimit", 800) tsp_model.modelSense = GRB.MINIMIZE tsp_model.setParam(GRB.Param.Threads, 8) tsp_model.setParam(GRB.Param.PoolSolutions, 10000) tsp_model.setParam(GRB.Param.Method, -1) tsp_model.setParam("MIPGapAbs", 0.01) tsp_model.setParam("MIPGap", 0.003) tsp_model.setParam("Heuristics", 0.1) return tsp_model, x_var, constraint_flow_in, constraint_flow_out, constraint_on_edge
[docs] def update_graph( g: nx.DiGraph, edges: Set[Edge], edges_in_customers: Dict[int, Set[Edge]], edges_out_customers: Dict[int, Set[Edge]], edges_in_merged_graph: Dict[Node, Set[Edge]], edges_out_merged_graph: Dict[Node, Set[Edge]], edges_missing: Set[Edge], customers: Sequence[Customer2D], ) -> Tuple[ nx.DiGraph, Set[Edge], Dict[int, Set[Edge]], Dict[int, Set[Edge]], Dict[Node, Set[Edge]], Dict[Node, Set[Edge]], ]: for edge in edges_missing: g.add_edge( edge[0], edge[1], weight=length(customers[edge[0][1]], customers[edge[1][1]]), demand=customers[edge[1][1]].demand, ) g.add_edge( edge[1], edge[0], weight=length(customers[edge[0][1]], customers[edge[1][1]]), demand=customers[edge[0][1]].demand, ) edges_in_merged_graph[edge[1]].add((edge[0], edge[1])) edges_out_merged_graph[edge[0]].add((edge[0], edge[1])) edges_in_customers[edge[1][1]].add((edge[0], edge[1])) edges_out_customers[edge[0][1]].add((edge[0], edge[1])) edges_in_merged_graph[edge[0]].add((edge[1], edge[0])) edges_out_merged_graph[edge[1]].add((edge[1], edge[0])) edges_in_customers[edge[0][1]].add((edge[1], edge[0])) edges_out_customers[edge[1][1]].add((edge[1], edge[0])) edges.add((edge[0], edge[1])) edges.add((edge[1], edge[0])) return ( g, edges, edges_in_customers, edges_out_customers, edges_in_merged_graph, edges_out_merged_graph, )
[docs] def build_warm_edges_and_update_graph( vrp_problem: VrpProblem, vrp_solution: VrpSolution, graph: nx.DiGraph, edges: Set[Edge], edges_in_merged_graph: Dict[Node, Set[Edge]], edges_out_merged_graph: Dict[Node, Set[Edge]], edges_in_customers: Dict[int, Set[Edge]], edges_out_customers: Dict[int, Set[Edge]], ) -> Tuple[List[List[Edge]], Set[Edge]]: vehicle_paths = deepcopy(vrp_solution.list_paths) edges_warm: List[List[Edge]] = [] edges_warm_set: Set[Edge] = set() for i in range(len(vehicle_paths)): vehicle_paths[i] = ( [vrp_problem.start_indexes[i]] + vehicle_paths[i] + [vrp_problem.end_indexes[i]] ) edges_warm_sublist = [ ((i, v1), (i, v2)) for v1, v2 in zip(vehicle_paths[i][:-1], vehicle_paths[i][1:]) ] edges_warm_subset = set(edges_warm_sublist) edges_warm.append(edges_warm_sublist) edges_warm_set.update(edges_warm_subset) missing_edge = [e for e in edges_warm_subset if e not in edges] for edge in missing_edge: graph.add_edge( edge[0], edge[1], weight=vrp_problem.evaluate_function_indexes(edge[0][1], edge[1][1]), demand=vrp_problem.customers[edge[1][1]].demand, ) graph.add_edge( edge[1], edge[0], weight=vrp_problem.evaluate_function_indexes(edge[1][1], edge[0][1]), demand=vrp_problem.customers[edge[0][1]].demand, ) edges_in_merged_graph[edge[1]].add((edge[0], edge[1])) edges_out_merged_graph[edge[0]].add((edge[0], edge[1])) edges_in_customers[edge[1][1]].add((edge[0], edge[1])) edges_out_customers[edge[0][1]].add((edge[0], edge[1])) edges_in_merged_graph[edge[0]].add((edge[1], edge[0])) edges_out_merged_graph[edge[1]].add((edge[1], edge[0])) edges_in_customers[edge[0][1]].add((edge[1], edge[0])) edges_out_customers[edge[1][1]].add((edge[1], edge[0])) edges.add((edge[0], edge[1])) edges.add((edge[1], edge[0])) return edges_warm, edges_warm_set
[docs] class VRPIterativeLP(SolverVrp): edges: Set[Edge] edges_in_customers: Dict[int, Set[Edge]] edges_out_customers: Dict[int, Set[Edge]] edges_in_merged_graph: Dict[Node, Set[Edge]] edges_out_merged_graph: Dict[Node, Set[Edge]] edges_warm_set: Set[Edge] problem: VrpProblem2D model: Optional[Model] = None x_var: Optional[Dict[Edge, Var]] = None constraint_on_edge: Optional[Dict[int, Any]] = None
[docs] def init_model(self, **kwargs: Any) -> None: ( g, g_empty, edges_in_customers, edges_out_customers, edges_in_merged_graph, edges_out_merged_graph, ) = build_graph_pruned_vrp(self.problem) initial_solution = kwargs.get("initial_solution", None) if initial_solution is None: solution = self.problem.get_dummy_solution() else: vehicle_tours_b = initial_solution solution = VrpSolution( problem=self.problem, list_start_index=self.problem.start_indexes, list_end_index=self.problem.end_indexes, list_paths=vehicle_tours_b, length=None, lengths=None, capacities=None, ) edges = set(g.edges()) edges_warm, edges_warm_set = build_warm_edges_and_update_graph( vrp_problem=self.problem, vrp_solution=solution, graph=g, edges=edges, edges_in_merged_graph=edges_in_merged_graph, edges_out_merged_graph=edges_out_merged_graph, edges_in_customers=edges_in_customers, edges_out_customers=edges_out_customers, ) do_lns = kwargs.get("do_lns", False) fraction = kwargs.get("fraction_lns", 0.9) ( tsp_model, x_var, constraint_flow_in, constraint_flow_out, constraint_on_edge, ) = init_model_lp( g=g, edges=edges, edges_in_customers=edges_in_customers, edges_out_customers=edges_out_customers, edges_in_merged_graph=edges_in_merged_graph, edges_out_merged_graph=edges_out_merged_graph, edges_warm_set=edges_warm_set, start_indexes=self.problem.start_indexes, end_indexes=self.problem.end_indexes, do_lns=do_lns, fraction=fraction, vehicle_count=self.problem.vehicle_count, vehicle_capacity=self.problem.vehicle_capacities, ) self.model = tsp_model self.x_var = x_var self.constraint_on_edge = constraint_on_edge self.graph = g self.edges = edges self.edges_in_customers = edges_in_customers self.edges_out_customers = edges_out_customers self.edges_in_merged_graph = edges_in_merged_graph self.edges_out_merged_graph = edges_out_merged_graph self.edges_warm_set = edges_warm_set
[docs] def solve(self, **kwargs: Any) -> ResultStorage: do_lns = kwargs.get("do_lns", False) plot = kwargs.get("plot", True) fraction = kwargs.get("fraction_lns", 0.9) nb_iteration_max = kwargs.get("nb_iteration_max", 20) if self.model is None or self.x_var is None or self.constraint_on_edge is None: self.init_model(**kwargs) if ( self.model is None or self.x_var is None or self.constraint_on_edge is None ): raise RuntimeError( "model, x_var and constraint_on_edge attributes " "should not be None after init_model()" ) limit_time_s = kwargs.get("limit_time_s", 10) self.model.setParam("TimeLimit", limit_time_s) self.model.optimize() objective = self.model.getObjective().getValue() # Query number of multiple objectives, and number of solutions finished = False solutions: List[Dict[int, Set[Edge]]] = [] cost: List[float] = [] nb_components: List[int] = [] iteration = 0 rebuilt_solution: List[Dict[int, List[Node]]] = [] rebuilt_obj: List[float] = [] best_solution_rebuilt_index = 0 best_solution_objective_rebuilt = float("inf") vehicle_count = self.problem.vehicle_count customers = self.problem.customers edges_in_customers = self.edges_in_customers edges_out_customers = self.edges_out_customers edges_in_merged_graph = self.edges_in_merged_graph edges_out_merged_graph = self.edges_out_merged_graph edges = self.edges edges_warm_set = self.edges_warm_set g = self.graph while not finished: solutions_ll = retrieve_solutions(self.model, self.x_var, vehicle_count, g) solutions += [solutions_ll[0][-1]] cost += [objective] ( x_solution, rebuilt_dict, obj, components, components_global, component_all, component_global_all, ) = reevaluate_solutions( solutions=solutions_ll, vehicle_count=vehicle_count, g=g, vrp_problem=self.problem, ) for comp in component_all: update_model_2( model=self.model, x_var=self.x_var, components_global=comp, edges_in_customers=edges_in_customers, edges_out_customers=edges_out_customers, ) nb_components += [len(components_global)] rebuilt_solution += [rebuilt_dict] rebuilt_obj += [obj] logger.debug(f"Objective rebuilt : {rebuilt_obj[-1]}") if obj < best_solution_objective_rebuilt: best_solution_objective_rebuilt = obj best_solution_rebuilt_index = iteration iteration += 1 edges_to_add: Set[Edge] = set() for v in rebuilt_dict: edges_to_add.update( { (e0, e1) for e0, e1 in zip(rebuilt_dict[v][:-1], rebuilt_dict[v][1:]) } ) edges_missing = {e for e in edges_to_add if e not in edges} if len(edges_missing) > 0 or not finished: ( g, edges, edges_in_customers, edges_out_customers, edges_in_merged_graph, edges_out_merged_graph, ) = update_graph( g=g, edges=edges, edges_in_customers=edges_in_customers, edges_out_customers=edges_out_customers, edges_in_merged_graph=edges_in_merged_graph, edges_out_merged_graph=edges_out_merged_graph, edges_missing=edges_missing, customers=customers, ) self.model.reset() self.model = None ( tsp_model, x_var, constraint_flow_in, constraint_flow_out, constraint_on_edge, ) = init_model_lp( g=g, edges=edges, edges_in_customers=edges_in_customers, edges_out_customers=edges_out_customers, edges_in_merged_graph=edges_in_merged_graph, edges_out_merged_graph=edges_out_merged_graph, edges_warm_set=edges_warm_set, start_indexes=self.problem.start_indexes, end_indexes=self.problem.end_indexes, do_lns=do_lns, fraction=fraction, vehicle_count=self.problem.vehicle_count, vehicle_capacity=self.problem.vehicle_capacities, ) self.model = tsp_model self.model.setParam("TimeLimit", limit_time_s) self.x_var = x_var self.constraint_on_edge = constraint_on_edge self.graph = g self.edges = edges self.edges_in_customers = edges_in_customers self.edges_out_customers = edges_out_customers self.edges_in_merged_graph = edges_in_merged_graph self.edges_out_merged_graph = edges_out_merged_graph self.edges_warm_set = edges_warm_set for iedge in self.constraint_on_edge: self.model.remove(self.constraint_on_edge[iedge]) edges_to_constraint = set(self.x_var.keys()) if do_lns: for iedge in self.constraint_on_edge: self.model.remove(self.constraint_on_edge[iedge]) self.model.update() self.constraint_on_edge = {} edges_to_constraint = set() vehicle = set(random.sample(range(vehicle_count), 3)) edges_to_constraint.update( set([e for e in edges if e[0][0] not in vehicle]) ) logger.debug( ( len(edges_to_constraint), " edges constraint over ", len(edges), ) ) iedge = 0 x_var = self.x_var if all((e in edges) for e in edges_to_add): for e in x_var: val = 0 if e in edges_to_add: x_var[e].start = 1 x_var[e].varhintval = 1 val = 1 else: x_var[e].start = 0 x_var[e].varhintval = 0 if e in edges_to_constraint: if do_lns: self.constraint_on_edge[iedge] = self.model.addLConstr( x_var[e] == val, name=str((e, iedge)) ) iedge += 1 else: pass self.model.update() self.model.optimize() objective = self.model.getObjective().getValue() finished = finished or iteration >= nb_iteration_max if plot: plot_solve( solutions=solutions, customers=customers, rebuilt_solution=rebuilt_solution, cost=cost, rebuilt_obj=rebuilt_obj, ) logger.debug(f"Best obj : {best_solution_objective_rebuilt}") solution = VrpSolution( problem=self.problem, list_start_index=self.problem.start_indexes, list_end_index=self.problem.end_indexes, list_paths=[ [x[1] for x in rebuilt_solution[best_solution_rebuilt_index][l][1:-1]] for l in sorted(rebuilt_solution[best_solution_rebuilt_index]) ], length=None, lengths=None, capacities=None, ) _ = self.problem.evaluate(solution) fit = self.aggreg_from_sol(solution) return ResultStorage( list_solution_fits=[(solution, fit)], mode_optim=self.params_objective_function.sense_function, )
[docs] def plot_solve( solutions: List[Dict[int, Set[Edge]]], customers: Sequence[Customer2D], rebuilt_solution: List[Dict[int, List[Node]]], cost: List[float], rebuilt_obj: List[float], ) -> None: fig, ax = plt.subplots(2) for i in range(len(solutions)): ll = [] for v in solutions[i]: for e in solutions[i][v]: ll.append( ax[0].plot( [customers[e[0][1]].x, customers[e[1][1]].x], [customers[e[0][1]].y, customers[e[1][1]].y], color="b", ) ) ax[1].plot( [customers[n[1]].x for n in rebuilt_solution[i][v]], [customers[n[1]].y for n in rebuilt_solution[i][v]], color="orange", ) ax[0].set_title("iter " + str(i) + " obj=" + str(int(cost[i]))) ax[1].set_title("iter " + str(i) + " obj=" + str(int(rebuilt_obj[i]))) plt.draw() plt.pause(0.01) if len(solutions) > 1 and i < len(solutions) - 1: ax[0].cla() ax[1].cla() # ax[0].lines = [] # ax[1].lines = [] plt.show()
[docs] def build_the_cycles( x_solution: Set[Edge], component: Set[Node], start_index: Node, end_index: Node ) -> Tuple[List[Node], Dict[Node, int]]: edge_of_interest = { e for e in x_solution if e[1] in component and e[0] in component } innn = {e[1]: e for e in edge_of_interest} outt = {e[0]: e for e in edge_of_interest} if start_index in outt: some_node = start_index else: some_node = next(e[0] for e in edge_of_interest) end_node = some_node if end_index not in innn else end_index path = [some_node] cur_edge = outt[some_node] indexes = {some_node: 0} cur_index = 1 while cur_edge[1] != end_node: path += [cur_edge[1]] indexes[cur_edge[1]] = cur_index cur_index += 1 cur_edge = outt[cur_edge[1]] if end_index in innn: path += [end_node] indexes[end_node] = cur_index return path, indexes
[docs] def rebuild_tsp_routine( sorted_connected_component: List[Tuple[Set[Node], int]], paths_component: Dict[int, List[Node]], node_to_component: Dict[Node, int], indexes: Dict[int, Dict[Node, int]], graph: nx.DiGraph, edges: Set[Edge], evaluate_function_indexes: Callable[[int, int], float], vrp_model: VrpProblem, start_index: Node, end_index: Node, ) -> Tuple[List[Node], float]: rebuilded_path: List[Node] = list(paths_component[node_to_component[start_index]]) component_end: int = node_to_component[end_index] component_reconnected: Set[int] = {node_to_component[start_index]} path_set: Set[Node] = set(rebuilded_path) total_length_path: int = len(rebuilded_path) while len(component_reconnected) < len(sorted_connected_component): if ( len(component_reconnected) == len(sorted_connected_component) - 1 and end_index != start_index and node_to_component[end_index] != node_to_component[start_index] ): rebuilded_path = rebuilded_path + paths_component[component_end] component_reconnected.add(component_end) else: index_path: Dict[Node, List[int]] = {} for i in range(len(rebuilded_path)): if rebuilded_path[i] not in index_path: index_path[rebuilded_path[i]] = [] index_path[rebuilded_path[i]] += [i] edge_out_of_interest: Set[Edge] = { e for e in edges if e[0] in path_set and e[1] not in path_set } edge_in_of_interest: Set[Edge] = { e for e in edges if e[0] not in path_set and e[1] in path_set } min_out_edge = None min_index_in_path = None min_component = None min_dist = float("inf") backup_min_out_edge = None backup_min_in_edge = None backup_min_index_in_path = None backup_min_component = None backup_min_dist = float("inf") for e in edge_out_of_interest: index_in = index_path[e[0]][0] index_in_1 = min(index_path[e[0]][0] + 1, total_length_path - 1) next_node_1 = rebuilded_path[index_in_1] component_e1 = node_to_component[e[1]] if ( component_e1 == component_end and len(component_reconnected) < len(sorted_connected_component) - 1 ): continue index_component_e1 = indexes[component_e1][e[1]] index_component_e1_plus1 = index_component_e1 + 1 if index_component_e1_plus1 >= len(paths_component[component_e1]): index_component_e1_plus1 = 0 next_node_component_e1 = paths_component[component_e1][ index_component_e1_plus1 ] if (next_node_component_e1, next_node_1) in edge_in_of_interest: cost = ( graph[e[0]][e[1]]["weight"] + graph[next_node_component_e1][next_node_1]["weight"] - graph[e[0]][next_node_1]["weight"] ) if cost < min_dist: min_component = node_to_component[e[1]] min_out_edge = e min_index_in_path = index_in min_dist = cost else: cost = graph[e[0]][e[1]]["weight"] if cost < backup_min_dist: backup_min_component = node_to_component[e[1]] backup_min_out_edge = e backup_min_in_edge = (next_node_component_e1, next_node_1) backup_min_index_in_path = index_in backup_min_dist = cost if ( min_out_edge is None or min_component is None or min_index_in_path is None ): if ( backup_min_component is None or backup_min_out_edge is None or backup_min_in_edge is None or backup_min_index_in_path is None ): # for mypy to realize that we must have define backup values at this point raise RuntimeError("backup values cannot be None now.") logger.debug("Backup") e = backup_min_in_edge graph.add_edge( e[0], e[1], weight=evaluate_function_indexes(e[0][0], e[1][0]) ) graph.add_edge( e[1], e[0], weight=evaluate_function_indexes(e[1][0], e[0][0]) ) min_out_edge = backup_min_out_edge min_index_in_path = backup_min_index_in_path min_component = backup_min_component len_this_component = len(paths_component[min_component]) logger.debug(f"len this component : {len_this_component}") index_of_in_component = indexes[min_component][min_out_edge[1]] new_component: List[Node] = [ paths_component[min_component][ (index_of_in_component + i) % len_this_component ] for i in range(0, -len_this_component, -1) ] logger.debug(f"path component {paths_component[min_component]}") logger.debug(f"New component : {new_component}") rebuilded_path = ( rebuilded_path[: (min_index_in_path + 1)] + new_component + rebuilded_path[(min_index_in_path + 1) :] ) for node1, node2 in zip(new_component[:-1], new_component[1:]): if (node1, node2) not in graph.edges(): graph.add_edge( node1, node2, weight=evaluate_function_indexes(node1[0], node2[0]), ) path_set = set(rebuilded_path) total_length_path = len(rebuilded_path) component_reconnected.add(min_component) lengths, obj, capacities = compute_length( start_index=start_index[1], end_index=end_index[1], solution=[x[1] for x in rebuilded_path[1:-1]], list_customers=vrp_model.customers, method=vrp_model.evaluate_function_indexes, ) return rebuilded_path, obj
[docs] def retrieve_solutions( model: "Model", x_var: Dict[Edge, "Var"], vehicle_count: int, g: nx.DiGraph ) -> List[Tuple[nx.DiGraph, Dict[int, nx.DiGraph], Dict[int, Set[Edge]]]]: nSolutions = model.SolCount solutions: List[Tuple[nx.Digraph, Dict[int, nx.DiGraph], Dict[int, Set[Edge]]]] = [] for s in range(nSolutions): # Set which solution we will query from now on g_empty = {v: nx.DiGraph() for v in range(vehicle_count)} g_merge = nx.DiGraph() x_solution: Dict[int, Set[Edge]] = {v: set() for v in range(vehicle_count)} model.params.SolutionNumber = s for e in x_var: value = x_var[e].getAttr("Xn") if value >= 0.5: vehicle = e[0][0] x_solution[vehicle].add(e) clients = e[0], e[1] if clients[0] not in g_empty[vehicle]: g_empty[vehicle].add_node(clients[0]) if clients[1] not in g_empty[vehicle]: g_empty[vehicle].add_node(clients[1]) if clients[0][1] not in g_merge: g_merge.add_node(clients[0][1]) if clients[1][1] not in g_merge: g_merge.add_node(clients[1][1]) g_empty[vehicle].add_edge( clients[0], clients[1], weight=g[e[0]][e[1]]["weight"] ) g_merge.add_edge( clients[0][1], clients[1][1], weight=g[e[0]][e[1]]["weight"] ) solutions += [ ( g_merge.copy(), g_empty, x_solution.copy(), ) ] print(f"quality of {s}-th solution", model.getAttr("PoolObjVal")) return solutions
[docs] def reevaluate_solutions( solutions: List[Tuple[nx.DiGraph, Dict[int, nx.DiGraph], Dict[int, Set[Edge]]]], vehicle_count: int, g: nx.DiGraph, vrp_problem: VrpProblem, ) -> Tuple[ Dict[int, Set[Edge]], Dict[int, List[Node]], float, Dict[int, List[Tuple[Set[Node], int]]], List[Tuple[Set[Node], int]], List[Dict[int, List[Tuple[Set[Node], int]]]], List[List[Tuple[Set[Node], int]]], ]: rebuilt_solution: List[Dict[int, List[Node]]] = [] rebuilt_obj: List[float] = [] nb_components: List[List[int]] = [] components: List[Dict[int, List[Tuple[Set[Node], int]]]] = [] components_global: List[List[Tuple[Set[Node], int]]] = [] solutions_list: List[Dict[int, Set[Edge]]] = [] logger.debug(f"Vehicle count : {vehicle_count}") for g_merge, g_empty, x_solution in solutions: connected_components: Dict[int, List[Tuple[Set[Node], int]]] = { v: [(set(e), len(e)) for e in nx.weakly_connected_components(g_empty[v])] for v in g_empty } logger.debug( ( "Connected component : ", [len(connected_components[v]) for v in connected_components], ) ) sorted_connected_component: Dict[int, List[Tuple[Set[Node], int]]] = { v: sorted(connected_components[v], key=lambda x: x[1], reverse=True) for v in connected_components } components += [sorted_connected_component] nb_components += [ [len(sorted_connected_component[v]) for v in sorted_connected_component] ] solutions_list += [x_solution.copy()] paths_component: Dict[int, Dict[int, List[Node]]] = { v: {} for v in range(vehicle_count) } indexes_component: Dict[int, Dict[int, Dict[Node, int]]] = { v: {} for v in range(vehicle_count) } node_to_component: Dict[int, Dict[Node, int]] = { v: {} for v in range(vehicle_count) } rebuilt_dict: Dict[int, List[Node]] = {} objective_dict: Dict[int, float] = {} component_global: List[Tuple[Set[Node], int]] = [ (set(e), len(e)) for e in nx.weakly_connected_components(g_merge) ] components_global += [component_global] nb_component_global = len(component_global) logger.debug(f"Global : {nb_component_global}") for v in range(vehicle_count): graph_of_interest = nx.subgraph( g, [e[0] for e in x_solution[v]] + [e[1] for e in x_solution[v]] ).copy() nb = len(sorted_connected_component[v]) for i in range(nb): s = sorted_connected_component[v][i] paths_component[v][i], indexes_component[v][i] = build_the_cycles( x_solution=x_solution[v], component=s[0], start_index=(v, vrp_problem.start_indexes[v]), end_index=(v, vrp_problem.end_indexes[v]), ) node_to_component[v].update({p: i for p in paths_component[v][i]}) rebuilt_dict[v], objective_dict[v] = rebuild_tsp_routine( sorted_connected_component=sorted_connected_component[v], paths_component=paths_component[v], node_to_component=node_to_component[v], start_index=(v, vrp_problem.start_indexes[v]), end_index=(v, vrp_problem.end_indexes[v]), indexes=indexes_component[v], graph=graph_of_interest, edges=set(graph_of_interest.edges()), evaluate_function_indexes=vrp_problem.evaluate_function_indexes, vrp_model=vrp_problem, ) rebuilt_solution += [rebuilt_dict] rebuilt_obj += [sum(list(objective_dict.values()))] logger.debug(("Rebuilt : ", rebuilt_solution, rebuilt_obj)) index_best = min(range(len(rebuilt_obj)), key=lambda x: rebuilt_obj[x]) logger.debug(f"{index_best} / {len(rebuilt_obj)}") logger.debug(f"best : {rebuilt_obj[index_best]}") return ( solutions_list[index_best], rebuilt_solution[index_best], rebuilt_obj[index_best], components[index_best], components_global[index_best], components, components_global, )
[docs] def update_model_2( model: "Model", x_var: Dict[Edge, "Var"], components_global: Dict[int, List[Tuple[Set[Node], int]]], edges_in_customers: Dict[int, Set[Edge]], edges_out_customers: Dict[int, Set[Edge]], ) -> None: for vehicle in components_global: if len(components_global[vehicle]) > 1: logger.debug( f"Nb component for vehicle {vehicle}: {len(components_global[vehicle])}" ) for s in components_global[vehicle]: customers_component: Set[int] = {customer for vehicle, customer in s[0]} edge_in_of_interest = [ e for customer in customers_component for e in edges_in_customers[customer] if e[0][1] not in s[0] and e[1][1] in customers_component ] edge_out_of_interest = [ e for customer in customers_component for e in edges_out_customers[customer] if e[1][1] not in customers_component and e[0][1] in customers_component ] model.addLConstr(quicksum([x_var[e] for e in edge_in_of_interest]) >= 1) model.addLConstr( quicksum([x_var[e] for e in edge_out_of_interest]) >= 1 ) model.update()