Source code for discrete_optimization.pickup_vrp.solver.lp_solver_pymip

#  Copyright (c) 2022 AIRBUS and its affiliates.
#  This source code is licensed under the MIT license found in the
#  LICENSE file in the root directory of this source tree.

import logging
import random
from typing import (
    Any,
    Callable,
    Dict,
    Hashable,
    Iterable,
    List,
    Optional,
    Set,
    Tuple,
    Union,
    cast,
)

import mip
import networkx as nx

from discrete_optimization.generic_tools.callbacks.callback import (
    Callback,
    CallbackList,
)
from discrete_optimization.generic_tools.do_problem import (
    ParamsObjectiveFunction,
    Solution,
)
from discrete_optimization.generic_tools.lp_tools import ParametersMilp, PymipMilpSolver
from discrete_optimization.generic_tools.result_storage.result_storage import (
    ResultStorage,
    TupleFitness,
)
from discrete_optimization.pickup_vrp.gpdp import GPDP, Edge, GPDPSolution, Node
from discrete_optimization.pickup_vrp.solver.lp_solver import (
    TemporaryResult,
    build_graph_solution,
    construct_edges_in_out_dict,
    convert_temporaryresult_to_gpdpsolution,
    reevaluate_result,
)
from discrete_optimization.pickup_vrp.solver.pickup_vrp_solver import SolverPickupVrp

logger = logging.getLogger(__name__)


[docs] class MipModelException(Exception): def __init__(self, message: str): self.message = message def __str__(self) -> str: return self.message def __repr__(self) -> str: return self.message
[docs] def retrieve_current_solution( get_var_value_for_current_solution: Callable[[Any], float], get_obj_value_for_current_solution: Callable[[], float], variable_decisions: Dict[str, Any], ) -> Tuple[Dict[str, Dict[Hashable, Any]], float]: results: Dict[str, Dict[Hashable, Any]] = {} xsolution: Dict[int, Dict[Edge, int]] = { v: {} for v in variable_decisions["variables_edges"] } obj = float(get_obj_value_for_current_solution()) for vehicle in variable_decisions["variables_edges"]: for edge in variable_decisions["variables_edges"][vehicle]: value = get_var_value_for_current_solution( variable_decisions["variables_edges"][vehicle][edge] ) if value <= 0.1: continue xsolution[vehicle][edge] = 1 results["variables_edges"] = cast(Dict[Hashable, Any], xsolution) for key in variable_decisions: if key == "variables_edges": continue results[key] = {} for key_2 in variable_decisions[key]: if isinstance(variable_decisions[key][key_2], dict): results[key][key_2] = {} for key_3 in variable_decisions[key][key_2]: if isinstance(variable_decisions[key][key_2][key_3], dict): results[key][key_2][key_3] = {} for key_4 in variable_decisions[key][key_2][key_3]: value = get_var_value_for_current_solution( variable_decisions[key][key_2][key_3] ) results[key][key_2][key_3][key_4] = value else: value = get_var_value_for_current_solution( variable_decisions[key][key_2][key_3] ) results[key][key_2][key_3] = value else: value = get_var_value_for_current_solution( variable_decisions[key][key_2] ) results[key][key_2] = value return results, obj
[docs] class LinearFlowSolver(PymipMilpSolver, SolverPickupVrp): problem: GPDP def __init__( self, problem: GPDP, params_objective_function: Optional[ParamsObjectiveFunction] = None, **kwargs: Any, ): super().__init__( problem=problem, params_objective_function=params_objective_function ) self.model: Optional[mip.Model] = None self.constraint_on_edge: Dict[int, Any] = {} self.variable_order: Dict[Node, Any] = {}
[docs] def one_visit_per_node( self, model: mip.Model, nodes_of_interest: Iterable[Node], variables_edges: Dict[int, Dict[Edge, Any]], edges_in_all_vehicles: Dict[Node, Set[Tuple[int, Edge]]], edges_out_all_vehicles: Dict[Node, Set[Tuple[int, Edge]]], ) -> None: constraint_one_visit = {} for node in nodes_of_interest: constraint_one_visit[node] = model.add_constr( mip.quicksum( [variables_edges[x[0]][x[1]] for x in edges_in_all_vehicles[node]] ) == 1, name="visit_" + str(node), ) constraint_one_visit[(node, "out")] = model.add_constr( mip.quicksum( [variables_edges[x[0]][x[1]] for x in edges_out_all_vehicles[node]] ) == 1, name="visitout_" + str(node), )
[docs] def one_visit_per_clusters( self, model: mip.Model, nodes_of_interest: Iterable[Node], variables_edges: Dict[int, Dict[Edge, Any]], edges_in_all_vehicles: Dict[Node, Set[Tuple[int, Edge]]], edges_out_all_vehicles: Dict[Node, Set[Tuple[int, Edge]]], ) -> None: constraint_cluster: Dict[Hashable, Any] = {} for cluster in self.problem.clusters_to_node: constraint_cluster[cluster] = model.add_constr( mip.quicksum( [ variables_edges[x[0]][x[1]] for node in self.problem.clusters_to_node[cluster] for x in edges_in_all_vehicles[node] if node in nodes_of_interest and x[0] in self.problem.vehicles_representative ] ) >= 1, name="visit_" + str(cluster), ) constraint_cluster[(cluster, "out")] = model.add_constr( mip.quicksum( [ variables_edges[x[0]][x[1]] for node in self.problem.clusters_to_node[cluster] for x in edges_out_all_vehicles[node] if node in nodes_of_interest and x[0] in self.problem.vehicles_representative ] ) == 1, name="visitout_" + str(cluster), )
[docs] def resources_constraint( self, model: mip.Model, variables_edges: Dict[int, Dict[Edge, Any]], edges_in_all_vehicles: Dict[Node, Set[Tuple[int, Edge]]], edges_out_all_vehicles: Dict[Node, Set[Tuple[int, Edge]]], ) -> Dict[str, Dict[str, Dict[Node, Any]]]: resources = self.problem.resources_set resources_variable_coming = { r: { node: model.add_var( var_type=mip.CONTINUOUS, name="res_coming_" + str(r) + "_" + str(node), ) for node in edges_in_all_vehicles } for r in resources } resources_variable_leaving = { r: { node: model.add_var( var_type=mip.CONTINUOUS, name="res_leaving_" + str(r) + "_" + str(node), ) for node in edges_in_all_vehicles } for r in resources } for v in self.problem.origin_vehicle: for r in resources_variable_coming: model.add_constr( resources_variable_coming[r][self.problem.origin_vehicle[v]] == self.problem.resources_flow_node[self.problem.origin_vehicle[v]][ r ] ) model.add_constr( resources_variable_leaving[r][self.problem.origin_vehicle[v]] == self.problem.resources_flow_node[self.problem.origin_vehicle[v]][ r ] ) all_origin = set(self.problem.origin_vehicle.values()) for r in resources_variable_coming: for node in resources_variable_coming[r]: for vehicle, edge in edges_in_all_vehicles[node]: if edge[0] == edge[1]: continue model.add_constr( resources_variable_coming[r][node] >= resources_variable_leaving[r][edge[0]] + self.problem.resources_flow_edges[edge][r] - 100000 * (1 - variables_edges[vehicle][edge]) ) if node not in all_origin: model.add_constr( resources_variable_leaving[r][node] == resources_variable_coming[r][node] + self.problem.resources_flow_node[node][r] ) return { "resources_variable_coming": resources_variable_coming, "resources_variable_leaving": resources_variable_leaving, }
[docs] def simple_capacity_constraint( self, model: mip.Model, variables_edges: Dict[int, Dict[Edge, Any]], edges_in_all_vehicles: Dict[Node, Set[Tuple[int, Edge]]], edges_out_all_vehicles: Dict[Node, Set[Tuple[int, Edge]]], ) -> Dict[str, Dict[int, Dict[str, Any]]]: # Case where we don't have to track the resource flow etc... # we just want to check that we respect the capacity constraint (like in VRP with capacity) # we admit that the resource demand/consumption are positive. # corresponding to negative flow values in the problem definition. consumption_per_vehicle: Dict[int, Dict[str, Any]] = { v: { r: model.add_var( var_type=mip.CONTINUOUS, lb=0, ub=self.problem.capacities[v][r][1], name="consumption_v_" + str(v) + "_" + str(r), ) for r in self.problem.resources_set } for v in self.problem.capacities } for v in consumption_per_vehicle: for r in consumption_per_vehicle[v]: model.add_constr( consumption_per_vehicle[v][r] == mip.quicksum( variables_edges[v][e] * ( -self.problem.resources_flow_node[e[1]][r] - self.problem.resources_flow_edges[e][r] ) for e in variables_edges[v] if e[1] != self.problem.origin_vehicle[v] ) ) # redundant : model.add_constr( consumption_per_vehicle[v][r] <= self.problem.capacities[v][r][1], name="constraint_capa_" + str(v) + "_" + str(r), ) return {"consumption_per_vehicle": consumption_per_vehicle}
[docs] def time_evolution( self, model: mip.Model, variables_edges: Dict[int, Dict[Edge, Any]], edges_in_all_vehicles: Dict[Node, Set[Tuple[int, Edge]]], edges_out_all_vehicles: Dict[Node, Set[Tuple[int, Edge]]], ) -> Dict[str, Dict[Node, Any]]: time_coming = { node: model.add_var( var_type=mip.CONTINUOUS, name="time_coming_" + str(node) ) for node in edges_in_all_vehicles } time_leaving = { node: model.add_var( var_type=mip.CONTINUOUS, name="time_leaving_" + str(node) ) for node in edges_in_all_vehicles } for v in self.problem.origin_vehicle: model.add_constr(time_coming[self.problem.origin_vehicle[v]] == 0) all_origin = set(self.problem.origin_vehicle.values()) for node in time_leaving: for vehicle, edge in edges_in_all_vehicles[node]: if edge[0] == edge[1]: continue model.add_constr( time_coming[node] >= time_leaving[edge[0]] + self.problem.time_delta[edge[0]][edge[1]] - 100000 * (1 - variables_edges[vehicle][edge]) ) if node not in all_origin: model.add_constr( time_leaving[node] - time_coming[node] - self.problem.time_delta_node[node] >= 0 ) return {"time_coming": time_coming, "time_leaving": time_leaving}
[docs] def init_model(self, **kwargs: Any) -> None: solver_name = kwargs.get("solver_name", mip.CBC) model: mip.Model = mip.Model( "GPDP-flow", sense=mip.MINIMIZE, solver_name=solver_name ) include_backward = kwargs.get("include_backward", True) include_triangle = kwargs.get("include_triangle", False) include_subtour = kwargs.get("include_subtour", False) include_resources = kwargs.get("include_resources", False) include_capacity = kwargs.get("include_capacity", False) include_time_evolution = kwargs.get("include_time_evolution", False) one_visit_per_node = kwargs.get("one_visit_per_node", True) one_visit_per_cluster = kwargs.get("one_visit_per_cluster", False) nb_vehicle = self.problem.number_vehicle name_vehicles = sorted(self.problem.origin_vehicle.keys()) if self.problem.graph is None: self.problem.compute_graph() if self.problem.graph is None: raise RuntimeError( "self.problem.graph cannot be None " "after calling self.problem.compute_graph()." ) graph = self.problem.graph nodes_of_interest = [ n for n in graph.get_nodes() if n not in self.problem.nodes_origin and n not in self.problem.nodes_target ] variables_edges: Dict[int, Dict[Edge, Any]] if self.problem.any_grouping: variables_edges = {j: {} for j in range(nb_vehicle)} for k in self.problem.group_identical_vehicles: j = self.problem.group_identical_vehicles[k][0] variables_edges[j] = { e: model.add_var( var_type=mip.BINARY, name="flow_" + str(j) + "_" + str(e), obj=graph.edges_infos_dict[e]["distance"], ) for e in self.problem.get_edges_for_vehicle(j) } if len(self.problem.group_identical_vehicles[k]) > 0: for vv in self.problem.group_identical_vehicles[k][1:]: variables_edges[vv] = variables_edges[j] else: variables_edges = { j: { e: model.add_var( var_type=mip.BINARY, name="flow_" + str(j) + "_" + str(e), obj=graph.edges_infos_dict[e]["distance"], ) for e in self.problem.get_edges_for_vehicle(j) } for j in range(nb_vehicle) } self.nodes_of_interest = nodes_of_interest self.variables_edges = variables_edges all_variables: Dict[str, Dict[Any, Any]] = {"variables_edges": variables_edges} constraint_loop = {} for vehicle in variables_edges: for e in variables_edges[vehicle]: if e[0] == e[1]: constraint_loop[(vehicle, e)] = model.add_constr( variables_edges[vehicle][e] == 0, name="loop_" + str((vehicle, e)), ) ( edges_in_all_vehicles, edges_out_all_vehicles, edges_in_per_vehicles, edges_out_per_vehicles, edges_in_all_vehicles_cluster, edges_out_all_vehicles_cluster, edges_in_per_vehicles_cluster, edges_out_per_vehicles_cluster, ) = construct_edges_in_out_dict( variables_edges=variables_edges, clusters_dict=self.problem.clusters_dict ) constraints_out_flow: Dict[Tuple[int, Node], Any] = {} constraints_in_flow: Dict[Union[Tuple[int, Node], Node], Any] = {} constraints_flow_conservation: Dict[ Union[Tuple[int, Node], Tuple[int, Node, str]], Any ] = {} count_origin: Dict[Node, int] = {} count_target: Dict[Node, int] = {} for vehicle in range(nb_vehicle): node_origin = self.problem.origin_vehicle[name_vehicles[vehicle]] node_target = self.problem.target_vehicle[name_vehicles[vehicle]] if node_origin not in count_origin: count_origin[node_origin] = 0 if node_target not in count_target: count_target[node_target] = 0 count_origin[node_origin] += 1 count_target[node_target] += 1 for vehicle in range(nb_vehicle): node_origin = self.problem.origin_vehicle[name_vehicles[vehicle]] node_target = self.problem.target_vehicle[name_vehicles[vehicle]] same_node = node_origin == node_target constraints_out_flow[(vehicle, node_origin)] = model.add_constr( mip.quicksum( [ variables_edges[vehicle][edge] for edge in edges_out_per_vehicles[vehicle].get( node_origin, set() ) if edge[1] != node_origin or same_node ] ) == count_origin[node_origin], name="outflow_" + str((vehicle, node_origin)), ) constraints_in_flow[(vehicle, node_target)] = model.add_constr( mip.quicksum( [ variables_edges[vehicle][edge] for edge in edges_in_per_vehicles[vehicle].get( node_target, set() ) if edge[0] != node_target or same_node ] ) == count_target[node_target], name="inflow_" + str((vehicle, node_target)), ) # Avoid loop constraints_out_flow[(vehicle, node_target)] = model.add_constr( mip.quicksum( [ variables_edges[vehicle][edge] for edge in edges_out_per_vehicles[vehicle].get( node_target, set() ) if edge[1] != node_target or same_node ] ) <= 0, name="outflow_" + str((vehicle, node_target)), ) for vehicle in range(nb_vehicle): origin = self.problem.origin_vehicle[name_vehicles[vehicle]] target = self.problem.target_vehicle[name_vehicles[vehicle]] model.add_constr( mip.quicksum( [ variables_edges[v][edge] for v, edge in edges_out_all_vehicles[origin] if self.problem.origin_vehicle[name_vehicles[v]] != origin ] ) == 0 ) model.add_constr( mip.quicksum( [ variables_edges[v][edge] for v, edge in edges_in_all_vehicles[target] if self.problem.target_vehicle[name_vehicles[v]] != target ] ) == 0 ) for vehicle in range(nb_vehicle): node_origin = self.problem.origin_vehicle[name_vehicles[vehicle]] node_target = self.problem.target_vehicle[name_vehicles[vehicle]] same_node = node_origin == node_target for node in edges_in_per_vehicles[vehicle]: if same_node or node not in {node_origin, node_target}: constraints_flow_conservation[(vehicle, node)] = model.add_constr( mip.quicksum( [ variables_edges[vehicle][e] for e in edges_in_per_vehicles[vehicle].get(node, set()) if e[1] != e[0] ] + [ -variables_edges[vehicle][e] for e in edges_out_per_vehicles[vehicle].get( node, set() ) if e[1] != e[0] ] ) == 0, name="convflow_" + str((vehicle, node)), ) constraints_flow_conservation[ (vehicle, node, "in") ] = model.add_constr( mip.quicksum( [ variables_edges[vehicle][e] for e in edges_in_per_vehicles[vehicle].get(node, set()) if e[1] != e[0] ] ) <= 1, name="valueflow_" + str((vehicle, node)), ) if include_backward: constraint_tour_2length = {} cnt_tour = 0 for vehicle in range(nb_vehicle): node_origin = self.problem.origin_vehicle[name_vehicles[vehicle]] node_target = self.problem.target_vehicle[name_vehicles[vehicle]] for edge in variables_edges[vehicle]: if (edge[1], edge[0]) in variables_edges[vehicle]: if (edge[1], edge[0]) == edge: continue if edge[0] == node_origin or edge[1] == node_origin: continue if edge[0] == node_target or edge[1] == node_target: continue constraint_tour_2length[cnt_tour] = model.add_constr( variables_edges[vehicle][edge] + variables_edges[vehicle][(edge[1], edge[0])] <= 1, name="tour_" + str((vehicle, edge)), ) cnt_tour += 1 if include_triangle: constraint_triangle = {} cnt_triangle = 0 for node in graph.graph_nx.nodes(): neigh = set([n for n in nx.neighbors(graph.graph_nx, node)]) neigh_2 = { nn: neigh.intersection( [n for n in nx.neighbors(graph.graph_nx, nn)] ) for nn in neigh } for node_neigh in neigh_2: if len(neigh_2[node_neigh]) >= 1: for node_neigh_neigh in neigh_2[node_neigh]: for vehicle in range(nb_vehicle): constraint_triangle[cnt_triangle] = model.add_constr( variables_edges[vehicle][(node, node_neigh)] + variables_edges[vehicle][ (node_neigh, node_neigh_neigh) ] + variables_edges[vehicle][(node_neigh_neigh, node)] <= 2, name="triangle_" + str(cnt_triangle), ) cnt_triangle += 1 if include_subtour: variables_order = { node: model.add_var( var_type=mip.CONTINUOUS, lb=0, ub=len(self.problem.all_nodes) + 1, name="order_" + str(node), ) for node in edges_in_all_vehicles } constraints_order = {} for vehicle in range(nb_vehicle): node_origin = self.problem.origin_vehicle[name_vehicles[vehicle]] constraints_order[node_origin] = model.add_constr( variables_order[node_origin] == 0, name="order_" + str(node_origin) ) use_big_m = True use_indicator = False for node in variables_order: if node not in constraints_order: for vehicle, edge in edges_in_all_vehicles[node]: if use_big_m: constraints_order[node] = model.add_constr( variables_order[node] >= variables_order[edge[0]] + 1 - 1000 * (1 - variables_edges[vehicle][edge]), name="order_" + str(node), ) if use_indicator: raise MipModelException( "indicator constraint dont exist in pymip" ) for vehicle in range(nb_vehicle): for node in [self.problem.origin_vehicle[name_vehicles[vehicle]]]: if node in edges_in_all_vehicles: constraints_in_flow[node] = model.add_constr( mip.quicksum( [ variables_edges[x[0]][x[1]] for x in edges_in_all_vehicles[node] ] ) == 0, name="no_inflow_start_" + str(node), ) if one_visit_per_node: self.one_visit_per_node( model=model, nodes_of_interest=nodes_of_interest, variables_edges=variables_edges, edges_in_all_vehicles=edges_in_all_vehicles, edges_out_all_vehicles=edges_out_all_vehicles, ) if one_visit_per_cluster: self.one_visit_per_clusters( model=model, nodes_of_interest=nodes_of_interest, variables_edges=variables_edges, edges_in_all_vehicles=edges_in_all_vehicles, edges_out_all_vehicles=edges_out_all_vehicles, ) if include_capacity: all_variables.update( self.simple_capacity_constraint( model=model, variables_edges=variables_edges, edges_in_all_vehicles=edges_in_all_vehicles, edges_out_all_vehicles=edges_out_all_vehicles, ) ) if include_resources: all_variables.update( self.resources_constraint( model=model, variables_edges=variables_edges, edges_in_all_vehicles=edges_in_all_vehicles, edges_out_all_vehicles=edges_out_all_vehicles, ) ) if include_time_evolution: all_variables.update( self.time_evolution( model=model, variables_edges=variables_edges, edges_in_all_vehicles=edges_in_all_vehicles, edges_out_all_vehicles=edges_out_all_vehicles, ) ) self.edges_in_all_vehicles: Dict[ Node, Set[Tuple[int, Edge]] ] = edges_in_all_vehicles self.edges_out_all_vehicles: Dict[ Node, Set[Tuple[int, Edge]] ] = edges_out_all_vehicles self.edges_in_per_vehicles: Dict[ int, Dict[Node, Set[Edge]] ] = edges_in_per_vehicles self.edges_out_per_vehicles: Dict[ int, Dict[Node, Set[Edge]] ] = edges_out_per_vehicles self.edges_in_all_vehicles_cluster: Dict[ Hashable, Set[Tuple[int, Edge]] ] = edges_in_all_vehicles_cluster self.edges_out_all_vehicles_cluster: Dict[ Hashable, Set[Tuple[int, Edge]] ] = edges_out_all_vehicles_cluster self.edges_in_per_vehicles_cluster: Dict[ int, Dict[Hashable, Set[Edge]] ] = edges_in_per_vehicles_cluster self.edges_out_per_vehicles_cluster: Dict[ int, Dict[Hashable, Set[Edge]] ] = edges_out_per_vehicles_cluster self.variable_decisions = all_variables self.model = model self.model.sense = mip.MINIMIZE self.solver_name = solver_name self.clusters_version = one_visit_per_cluster
[docs] def retrieve_solutions( self, parameters_milp: ParametersMilp, **kwargs: Any ) -> ResultStorage: kwargs["limit_store"] = False return super().retrieve_solutions(parameters_milp=parameters_milp, **kwargs)
[docs] def retrieve_current_solution( self, get_var_value_for_current_solution: Callable[[Any], float], get_obj_value_for_current_solution: Callable[[], float], ) -> Solution: """ Not used here as GurobiMilpSolver.solve() is overriden """ temporaryresult = self.retrieve_current_temporaryresult( get_var_value_for_current_solution=get_var_value_for_current_solution, get_obj_value_for_current_solution=get_obj_value_for_current_solution, ) return convert_temporaryresult_to_gpdpsolution( temporaryresult=temporaryresult, problem=self.problem )
[docs] def retrieve_ith_temporaryresult(self, i: int) -> TemporaryResult: get_var_value_for_current_solution = ( lambda var: self.get_var_value_for_ith_solution(var=var, i=i) ) get_obj_value_for_current_solution = ( lambda: self.get_obj_value_for_ith_solution(i=i) ) return self.retrieve_current_temporaryresult( get_var_value_for_current_solution=get_var_value_for_current_solution, get_obj_value_for_current_solution=get_obj_value_for_current_solution, )
[docs] def retrieve_current_temporaryresult( self, get_var_value_for_current_solution: Callable[[Any], float], get_obj_value_for_current_solution: Callable[[], float], ) -> TemporaryResult: res, obj = retrieve_current_solution( get_var_value_for_current_solution=get_var_value_for_current_solution, get_obj_value_for_current_solution=get_obj_value_for_current_solution, variable_decisions=self.variable_decisions, ) if self.problem.graph is None: raise RuntimeError( "self.problem.graph cannot be None " "when calling retrieve_ith_temporaryresult()." ) temporaryresult = build_graph_solution( results=res, obj=obj, graph=self.problem.graph ) reevaluate_result(temporaryresult, problem=self.problem) return temporaryresult
[docs] def solve_one_iteration( self, parameters_milp: Optional[ParametersMilp] = None, **kwargs: Any ) -> List[TemporaryResult]: self.optimize_model(parameters_milp=parameters_milp, **kwargs) list_temporary_results: List[TemporaryResult] = [] for i in range(self.nb_solutions): list_temporary_results.append(self.retrieve_ith_temporaryresult(i=i)) return list_temporary_results
[docs] def solve_iterative( self, parameters_milp: Optional[ParametersMilp] = None, callbacks: Optional[List[Callback]] = None, **kwargs: Any, ) -> ResultStorage: # wrap all callbacks in a single one callbacks_list = CallbackList(callbacks=callbacks) # start of solve callback callbacks_list.on_solve_start(solver=self) if self.model is None: self.init_model(**kwargs) if self.model is None: # for mypy raise RuntimeError( "self.model must not be None after self.init_model()." ) if parameters_milp is None: parameters_milp = ParametersMilp.default() do_lns = kwargs.get("do_lns", True) reinit_model_at_each_iteration = kwargs.get( "reinit", self.solver_name == mip.CBC ) nb_iteration_max = kwargs.get("nb_iteration_max", 10) solutions: List[TemporaryResult] = self.solve_one_iteration( parameters_milp=parameters_milp, **kwargs ) first_solution: TemporaryResult = solutions[0] if ( (first_solution.rebuilt_dict is None) or (first_solution.connected_components_per_vehicle is None) or (first_solution.component_global is None) ): raise RuntimeError( "Temporary result attributes rebuilt_dict, component_global" "and connected_components_per_vehicle cannot be None after solving." ) if reinit_model_at_each_iteration: self.model.reset() self.init_model(**kwargs) subtour: Union[SubtourAddingConstraint, SubtourAddingConstraintCluster] if self.clusters_version: subtour = SubtourAddingConstraintCluster( problem=self.problem, linear_solver=self ) else: subtour = SubtourAddingConstraint(problem=self.problem, linear_solver=self) res = ResultStorage( list_solution_fits=self.convert_temporaryresults(solutions), mode_optim=self.params_objective_function.sense_function, ) list_constraints_tuples: List[Any] = [] list_constraints_tuples += subtour.adding_component_constraints( [first_solution] ) nb_iteration = 0 if ( max( [ len(first_solution.connected_components_per_vehicle[v]) for v in first_solution.connected_components_per_vehicle ] ) == 1 ): finished = True else: finished = callbacks_list.on_step_end( step=nb_iteration, res=res, solver=self ) while not finished: rebuilt_dict = first_solution.rebuilt_dict c = ConstraintHandlerOrWarmStart( linear_solver=self, problem=self.problem, do_lns=do_lns ) c.adding_constraint(rebuilt_dict) solutions = self.solve_one_iteration( parameters_milp=parameters_milp, **kwargs ) first_solution = solutions[0] if ( (first_solution.rebuilt_dict is None) or (first_solution.connected_components_per_vehicle is None) or (first_solution.component_global is None) ): raise RuntimeError( "Temporary result attributes rebuilt_dict, component_global" "and connected_components_per_vehicle cannot be None after solving." ) if self.clusters_version: subtour = SubtourAddingConstraintCluster( problem=self.problem, linear_solver=self ) else: subtour = SubtourAddingConstraint( problem=self.problem, linear_solver=self ) if reinit_model_at_each_iteration: self.model.reset() self.init_model(**kwargs) self.reapply_constraint(list_constraints_tuples) self.constraint_on_edge = {} list_constraints_tuples += subtour.adding_component_constraints( [first_solution] ) if ( max( [ len(first_solution.connected_components_per_vehicle[v]) for v in first_solution.connected_components_per_vehicle ] ) == 1 and not do_lns ): finished = True else: nb_iteration += 1 res.list_solution_fits += self.convert_temporaryresults(solutions) stopping = callbacks_list.on_step_end( step=nb_iteration, res=res, solver=self ) finished = stopping or nb_iteration > nb_iteration_max # end of solve callback callbacks_list.on_solve_end(res=res, solver=self) return res
[docs] def solve( self, parameters_milp: Optional[ParametersMilp] = None, callbacks: Optional[List[Callback]] = None, **kwargs: Any, ) -> ResultStorage: return self.solve_iterative( parameters_milp=parameters_milp, callbacks=callbacks, **kwargs, )
[docs] def convert_temporaryresults( self, temporary_results: List[TemporaryResult] ) -> List[Tuple[Solution, Union[float, TupleFitness]]]: list_solution_fits: List[Tuple[Solution, Union[float, TupleFitness]]] = [] for temporaryresult in temporary_results: solution = convert_temporaryresult_to_gpdpsolution( temporaryresult=temporaryresult, problem=self.problem ) fit = self.aggreg_from_sol(solution) list_solution_fits.append((solution, fit)) return list_solution_fits
[docs] def reapply_constraint( self, list_constraint_tuple: List[Tuple[Set[Tuple[int, int]], int]] ) -> None: if self.model is None: raise RuntimeError( "self.model must not be None when calling reapply_constraint()." ) for edge, val in list_constraint_tuple: self.model.add_constr( mip.quicksum( [ self.variable_decisions["variables_edges"][e[0]][e[1]] for e in edge ] ) >= val )
[docs] class SubtourAddingConstraint: def __init__( self, problem: GPDP, linear_solver: LinearFlowSolver, lazy: bool = False ): self.problem = problem self.linear_solver = linear_solver self.lazy = lazy
[docs] def adding_component_constraints( self, list_solution: List[TemporaryResult] ) -> List[Any]: c = [] for l in list_solution: if l.connected_components_per_vehicle is None: raise RuntimeError( "Temporary result attributes " "connected_components_per_vehicle cannot be None after solving." ) for v in l.connected_components_per_vehicle: if self.lazy: c += update_model_lazy( self.problem, self.linear_solver, l.connected_components_per_vehicle[v], self.linear_solver.edges_in_all_vehicles, self.linear_solver.edges_out_all_vehicles, )[1] else: c += update_model( self.problem, self.linear_solver, l.connected_components_per_vehicle[v], self.linear_solver.edges_in_all_vehicles, self.linear_solver.edges_out_all_vehicles, )[1] return c
[docs] class SubtourAddingConstraintCluster: def __init__(self, problem: GPDP, linear_solver: LinearFlowSolver): self.problem = problem self.linear_solver = linear_solver
[docs] def adding_component_constraints( self, list_solution: List[TemporaryResult] ) -> List[Any]: c = [] for l in list_solution: if l.connected_components_per_vehicle is None: raise RuntimeError( "Temporary result attributes " "connected_components_per_vehicle cannot be None after solving." ) for v in l.connected_components_per_vehicle: connected_components = l.connected_components_per_vehicle[v] conn = [] for x in connected_components: s = set([self.problem.clusters_dict[k] for k in x[0]]) conn += [(s, len(s))] c += update_model_cluster_tsp( self.problem, self.linear_solver, conn, self.linear_solver.edges_in_all_vehicles_cluster, self.linear_solver.edges_out_all_vehicles_cluster, ) return c
[docs] class ConstraintHandlerOrWarmStart: def __init__( self, linear_solver: LinearFlowSolver, problem: GPDP, do_lns: bool = True, remove_constr: bool = True, ): self.linear_solver = linear_solver self.problem = problem self.do_lns = do_lns self.remove_constr = remove_constr
[docs] def adding_constraint(self, rebuilt_dict: Dict[int, List[Node]]) -> None: if self.linear_solver.model is None: raise RuntimeError("self.linear_solver.model cannot be None at this point") vehicle_keys = self.linear_solver.variable_decisions["variables_edges"].keys() edges_to_add: Dict[int, Set[Edge]] = { v: set() for v in vehicle_keys if rebuilt_dict[v] is not None } for v in rebuilt_dict: if rebuilt_dict[v] is None: continue edges_to_add[v].update( {(e0, e1) for e0, e1 in zip(rebuilt_dict[v][:-1], rebuilt_dict[v][1:])} ) logger.info(f"edges to add {edges_to_add}") edges_missing = { (v, e) for e in edges_to_add[v] if e not in self.linear_solver.variable_decisions["variables_edges"][v] } logger.info(f"missing : {edges_missing}") if len(edges_missing) > 0: logger.warning("Some edges are missing.") if self.do_lns: for iedge in self.linear_solver.constraint_on_edge: try: self.linear_solver.model.remove( self.linear_solver.constraint_on_edge[iedge] ) except IndexError as e: pass self.linear_solver.constraint_on_edge = {} edges_to_constraint: Dict[int, Set[Edge]] = { v: set() for v in range(self.problem.number_vehicle) } vehicle_to_not_constraints = set( random.sample( range(self.problem.number_vehicle), min(2, self.problem.number_vehicle) ) ) for v in range(self.problem.number_vehicle): if v not in vehicle_to_not_constraints: edges_to_constraint[v].update( set( random.sample( list( self.linear_solver.variable_decisions[ "variables_edges" ][v] ), int( 0.9 * len( self.linear_solver.variable_decisions[ "variables_edges" ][v] ) ), ) ) ) else: edges_to_constraint[v].update( set( random.sample( list( self.linear_solver.variable_decisions[ "variables_edges" ][v] ), int( 0.4 * len( self.linear_solver.variable_decisions[ "variables_edges" ][v] ) ), ) ) ) logger.info( ( sum([len(edges_to_constraint[v]) for v in edges_to_constraint]), " edges constraint over ", sum( [ len(self.linear_solver.variable_decisions["variables_edges"][v]) for v in self.linear_solver.variable_decisions[ "variables_edges" ] ] ), ) ) iedge = 0 start_list = [] for v in vehicle_keys: logger.debug(f"Rebuild dict = , {rebuilt_dict[v]}") for edge in self.linear_solver.variable_decisions["variables_edges"][v]: val = 0.0 if v in edges_to_add and edge in edges_to_add[v]: start_list.append( ( self.linear_solver.variable_decisions["variables_edges"][v][ edge ], 1.0, ) ) val = 1.0 else: start_list.append( ( self.linear_solver.variable_decisions["variables_edges"][v][ edge ], 0.0, ) ) if self.do_lns: if ( rebuilt_dict[v] is not None and v in edges_to_constraint and edge in edges_to_constraint[v] ): self.linear_solver.constraint_on_edge[ iedge ] = self.linear_solver.model.add_constr( self.linear_solver.variable_decisions["variables_edges"][v][ edge ] == val, name="c_" + str(v) + "_" + str(edge) + "_" + str(val), ) iedge += 1 self.linear_solver.model.start = start_list
[docs] def update_model_cluster_tsp( problem: GPDP, lp_solver: LinearFlowSolver, components_global: List[Tuple[Set[Hashable], int]], edges_in_all_vehicles_cluster: Dict[Hashable, Set[Tuple[int, Edge]]], edges_out_all_vehicles_cluster: Dict[Hashable, Set[Tuple[int, Edge]]], ) -> List[Any]: if lp_solver.model is None: raise RuntimeError("self.lp_solver.model cannot be None at this point.") len_component_global = len(components_global) list_constraints = [] if len_component_global > 1: logger.debug(f"Nb component : {len_component_global}") for s in components_global: edge_in_of_interest = [ e for cluster in s[0] for e in edges_in_all_vehicles_cluster[cluster] if problem.clusters_dict[e[1][0]] not in s[0] and problem.clusters_dict[e[1][1]] in s[0] ] edge_out_of_interest = [ e for cluster in s[0] for e in edges_out_all_vehicles_cluster[cluster] if problem.clusters_dict[e[1][0]] in s[0] and problem.clusters_dict[e[1][1]] not in s[0] ] if not any( problem.clusters_dict[problem.target_vehicle[v]] in s[0] for v in problem.origin_vehicle ): list_constraints += [ lp_solver.model.add_constr( mip.quicksum( [ lp_solver.variable_decisions["variables_edges"][e[0]][ e[1] ] for e in edge_out_of_interest ] ) >= 1, name="component_out_" + str(s)[:10], ) ] if not any( problem.clusters_dict[problem.origin_vehicle[v]] in s[0] for v in problem.origin_vehicle ): list_constraints += [ lp_solver.model.add_constr( mip.quicksum( [ lp_solver.variable_decisions["variables_edges"][e[0]][ e[1] ] for e in edge_in_of_interest ] ) >= 1, name="component_in_" + str(s)[:10], ) ] return list_constraints
[docs] def update_model( problem: GPDP, lp_solver: LinearFlowSolver, components_global: List[Tuple[Set[Node], int]], edges_in_all_vehicles: Dict[Node, Set[Tuple[int, Edge]]], edges_out_all_vehicles: Dict[Node, Set[Tuple[int, Edge]]], ) -> Tuple[List[Any], List[Tuple[List[Tuple[int, Edge]], int]]]: if lp_solver.model is None: raise RuntimeError("self.lp_solver.model cannot be None at this point.") len_component_global = len(components_global) list_constraints = [] list_constraints_tuple: List[Tuple[List[Tuple[int, Edge]], int]] = [] if len_component_global > 1: logger.debug(f"Nb component : {len_component_global}") for s in components_global: edge_in_of_interest = [ e for n in s[0] for e in edges_in_all_vehicles[n] if e[1][0] not in s[0] and e[1][1] in s[0] ] edge_out_of_interest = [ e for n in s[0] for e in edges_out_all_vehicles[n] if e[1][0] in s[0] and e[1][1] not in s[0] ] if not any( problem.target_vehicle[v] in s[0] for v in problem.origin_vehicle ): list_constraints_tuple += [(edge_out_of_interest, 1)] list_constraints += [ lp_solver.model.add_constr( mip.quicksum( [ lp_solver.variable_decisions["variables_edges"][e[0]][ e[1] ] for e in edge_out_of_interest ] ) >= 1, name="component_out_" + str(s)[:10], ) ] if not any( problem.origin_vehicle[v] in s[0] for v in problem.origin_vehicle ): list_constraints_tuple += [(edge_in_of_interest, 1)] list_constraints += [ lp_solver.model.add_constr( mip.quicksum( [ lp_solver.variable_decisions["variables_edges"][e[0]][ e[1] ] for e in edge_in_of_interest ] ) >= 1, name="component_in_" + str(s)[:10], ) ] if len_component_global > 1: constraints_order = {} try: variable_order = lp_solver.model.variable_order except: variable_order = { node: lp_solver.model.add_var( var_type=mip.CONTINUOUS, name="order_" + str(node) ) for node in edges_in_all_vehicles } for vehicle in range(lp_solver.problem.number_vehicle): node_origin = lp_solver.problem.origin_vehicle[vehicle] constraints_order[node_origin] = lp_solver.model.add_constr( variable_order[node_origin] == 0, name="order_" + str(node_origin) ) lp_solver.variable_order = variable_order c = max(components_global, key=lambda x: x[1]) for s in [c]: use_big_m = True for node in s[0]: if node not in constraints_order: for vehicle, edge in edges_in_all_vehicles[node]: if edge[0] == edge[1]: continue if use_big_m: constraints_order[node] = lp_solver.model.add_constr( variable_order[node] >= variable_order[edge[0]] + 1 - 1000 * ( 1 - lp_solver.variable_decisions["variables_edges"][ vehicle ][edge] ), name="order_" + str(node), ) return list_constraints, list_constraints_tuple
[docs] def update_model_lazy( problem: GPDP, lp_solver: LinearFlowSolver, components_global: List[Tuple[Set[Node], int]], edges_in_all_vehicles: Dict[Node, Set[Tuple[int, Edge]]], edges_out_all_vehicles: Dict[Node, Set[Tuple[int, Edge]]], do_constraint_on_order: bool = False, ) -> List[Any]: if lp_solver.model is None: raise RuntimeError("self.lp_solver.model cannot be None at this point.") len_component_global = len(components_global) list_constraints = [] if len_component_global > 1: logger.debug(f"Nb component : {len_component_global}") for s in components_global: edge_in_of_interest = [ e for n in s[0] for e in edges_in_all_vehicles[n] if e[1][0] not in s[0] and e[1][1] in s[0] ] edge_out_of_interest = [ e for n in s[0] for e in edges_out_all_vehicles[n] if e[1][0] in s[0] and e[1][1] not in s[0] ] if not any( problem.target_vehicle[v] in s[0] for v in problem.origin_vehicle ): list_constraints += [ lp_solver.model.add_lazy_constr( mip.quicksum( [ lp_solver.variable_decisions["variables_edges"][e[0]][ e[1] ] for e in edge_out_of_interest ] ) >= 1 ) ] if not any( problem.origin_vehicle[v] in s[0] for v in problem.origin_vehicle ): list_constraints += [ lp_solver.model.add_lazy_constr( mip.quicksum( [ lp_solver.variable_decisions["variables_edges"][e[0]][ e[1] ] for e in edge_in_of_interest ] ) >= 1 ) ] if len_component_global > 1 and do_constraint_on_order: constraints_order = {} try: variable_order = lp_solver.variable_order except: variable_order = { node: lp_solver.model.add_var( var_type=mip.CONTINUOUS, name="order_" + str(node) ) for node in edges_in_all_vehicles } for vehicle in range(lp_solver.problem.number_vehicle): node_origin = lp_solver.problem.origin_vehicle[vehicle] constraints_order[node_origin] = lp_solver.model.add_constr( variable_order[node_origin] == 0 ) lp_solver.variable_order = variable_order c = max(components_global, key=lambda x: x[1]) for s in [c]: use_big_m = True for node in s[0]: if node not in constraints_order: for vehicle, edge in edges_in_all_vehicles[node]: if edge[0] == edge[1]: continue if use_big_m: constraints_order[node] = lp_solver.model.add_lazy_constr( variable_order[node] >= variable_order[edge[0]] + 1 - 1000 * ( 1 - lp_solver.variable_decisions["variables_edges"][ vehicle ][edge] ) ) return list_constraints