Source code for discrete_optimization.generic_tools.lp_tools

#  Copyright (c) 2022 AIRBUS and its affiliates.
#  This source code is licensed under the MIT license found in the
#  LICENSE file in the root directory of this source tree.

import logging
from abc import abstractmethod
from enum import Enum
from typing import Any, Callable, List, Optional, Tuple, Union

import mip

from discrete_optimization.generic_tools.callbacks.callback import (
    Callback,
    CallbackList,
)
from discrete_optimization.generic_tools.do_problem import Solution
from discrete_optimization.generic_tools.do_solver import SolverDO
from discrete_optimization.generic_tools.exceptions import SolveEarlyStop
from discrete_optimization.generic_tools.result_storage.multiobj_utils import (
    TupleFitness,
)
from discrete_optimization.generic_tools.result_storage.result_storage import (
    ResultStorage,
)

try:
    import gurobipy
except ImportError:
    gurobi_available = False
else:
    gurobi_available = True
    GRB = gurobipy.GRB

try:
    import docplex
    from docplex.mp.constr import LinearConstraint
    from docplex.mp.dvar import Var
    from docplex.mp.model import Model
    from docplex.mp.progress import SolutionListener
    from docplex.mp.solution import SolveSolution
except ImportError:
    cplex_available = False
else:
    cplex_available = True


logger = logging.getLogger(__name__)


[docs] class MilpSolverName(Enum): CBC = 0 GRB = 1
map_solver = {MilpSolverName.GRB: mip.GRB, MilpSolverName.CBC: mip.CBC}
[docs] class ParametersMilp: def __init__( self, time_limit: int, pool_solutions: int, mip_gap_abs: float, mip_gap: float, retrieve_all_solution: bool, n_solutions_max: int, pool_search_mode: int = 0, ): self.time_limit = time_limit self.pool_solutions = pool_solutions self.mip_gap_abs = mip_gap_abs self.mip_gap = mip_gap self.retrieve_all_solution = retrieve_all_solution self.n_solutions_max = n_solutions_max self.pool_search_mode = pool_search_mode
[docs] @staticmethod def default() -> "ParametersMilp": return ParametersMilp( time_limit=30, pool_solutions=10000, mip_gap_abs=0.0000001, mip_gap=0.000001, retrieve_all_solution=True, n_solutions_max=10000, )
[docs] class MilpSolver(SolverDO): model: Optional[Any]
[docs] @abstractmethod def init_model(self, **kwargs: Any) -> None: ...
[docs] def retrieve_solutions( self, parameters_milp: ParametersMilp, **kwargs ) -> ResultStorage: """Retrieve solutions found by internal solver. Args: parameters_milp: **kwargs: passed to ResultStorage.__init__() Returns: """ if parameters_milp.retrieve_all_solution: n_solutions = min(parameters_milp.n_solutions_max, self.nb_solutions) else: n_solutions = 1 list_solution_fits: List[Tuple[Solution, Union[float, TupleFitness]]] = [] for i in range(n_solutions): solution = self.retrieve_ith_solution(i=i) fit = self.aggreg_from_sol(solution) list_solution_fits.append((solution, fit)) return ResultStorage( list_solution_fits=list_solution_fits, mode_optim=self.params_objective_function.sense_function, best_solution=min(list_solution_fits, key=lambda x: x[1])[0], **kwargs, )
[docs] @abstractmethod def retrieve_current_solution( self, get_var_value_for_current_solution: Callable[[Any], float], get_obj_value_for_current_solution: Callable[[], float], ) -> Solution: """Retrieve current solution from internal gurobi solution. This converts internal gurobi solution into a discrete-optimization Solution. This method can be called after the solve in `retrieve_solutions()` or during solve within a gurobi/pymilp/cplex callback. The difference will be the `get_var_value_for_current_solution` and `get_obj_value_for_current_solution` callables passed. Args: get_var_value_for_current_solution: function extracting the value of the given variable for the current solution will be different when inside a callback or after the solve is finished get_obj_value_for_current_solution: function extracting the value of the objective for the current solution. Returns: the converted solution at d-o format """ ...
[docs] def retrieve_ith_solution(self, i: int) -> Solution: """Retrieve i-th solution from internal milp model. Args: i: Returns: """ get_var_value_for_current_solution = ( lambda var: self.get_var_value_for_ith_solution(var=var, i=i) ) get_obj_value_for_current_solution = ( lambda: self.get_obj_value_for_ith_solution(i=i) ) return self.retrieve_current_solution( get_var_value_for_current_solution=get_var_value_for_current_solution, get_obj_value_for_current_solution=get_obj_value_for_current_solution, )
[docs] @abstractmethod def solve( self, callbacks: Optional[List[Callback]] = None, parameters_milp: Optional[ParametersMilp] = None, **kwargs: Any, ) -> ResultStorage: ...
[docs] @abstractmethod def get_var_value_for_ith_solution(self, var: Any, i: int) -> float: """Get value for i-th solution of a given variable.""" pass
[docs] @abstractmethod def get_obj_value_for_ith_solution(self, i: int) -> float: """Get objective value for i-th solution.""" pass
@property @abstractmethod def nb_solutions(self) -> int: """Number of solutions found by the solver.""" pass
[docs] class PymipMilpSolver(MilpSolver): """Milp solver wrapping a solver from pymip library.""" model: Optional[mip.Model] = None
[docs] def solve( self, parameters_milp: Optional[ParametersMilp] = None, **kwargs: Any ) -> ResultStorage: if parameters_milp is None: parameters_milp = ParametersMilp.default() self.optimize_model(parameters_milp=parameters_milp, **kwargs) return self.retrieve_solutions(parameters_milp=parameters_milp)
[docs] def prepare_model( self, parameters_milp: Optional[ParametersMilp] = None, **kwargs: Any ) -> None: """Set Gurobi Model parameters according to parameters_milp""" if parameters_milp is None: parameters_milp = ParametersMilp.default() if self.model is None: self.init_model(**kwargs) if self.model is None: raise RuntimeError( "self.model must not be None after self.init_model()." ) self.model.max_mip_gap = parameters_milp.mip_gap self.model.max_mip_gap_abs = parameters_milp.mip_gap_abs self.model.sol_pool_size = parameters_milp.pool_solutions self.model.max_seconds = parameters_milp.time_limit self.model.max_solutions = parameters_milp.n_solutions_max
[docs] def optimize_model( self, parameters_milp: Optional[ParametersMilp] = None, **kwargs: Any ) -> None: """Optimize the mip Model. The solutions are yet to be retrieved via `self.retrieve_solutions()`. """ if self.model is None: self.init_model(**kwargs) if self.model is None: raise RuntimeError( "self.model must not be None after self.init_model()." ) self.prepare_model(parameters_milp=parameters_milp, **kwargs) self.model.optimize() logger.info(f"Solver found {self.model.num_solutions} solutions") logger.info(f"Objective : {self.model.objective_value}")
[docs] def get_var_value_for_ith_solution(self, var: mip.Var, i: int) -> float: # type: ignore # avoid isinstance checks for efficiency """Get value for i-th solution of a given variable.""" return var.xi(i)
[docs] def get_obj_value_for_ith_solution(self, i: int) -> float: """Get objective value for i-th solution.""" if self.model is None: # for mypy raise RuntimeError( "self.model should not be None when calling this method." ) return self.model.objective_values[i]
@property def nb_solutions(self) -> int: """Number of solutions found by the solver.""" if self.model is None: return 0 else: return self.model.num_solutions
[docs] class GurobiMilpSolver(MilpSolver): """Milp solver wrapping a solver from gurobi library.""" model: Optional["gurobipy.Model"] = None early_stopping_exception: Optional[Exception] = None
[docs] def solve( self, callbacks: Optional[List[Callback]] = None, parameters_milp: Optional[ParametersMilp] = None, **kwargs: Any, ) -> ResultStorage: self.early_stopping_exception = None callbacks_list = CallbackList(callbacks=callbacks) if parameters_milp is None: parameters_milp = ParametersMilp.default() # callback: solve start callbacks_list.on_solve_start(solver=self) if self.model is None: self.init_model(**kwargs) if self.model is None: raise RuntimeError( "self.model must not be None after self.init_model()." ) self.prepare_model(parameters_milp=parameters_milp, **kwargs) # wrap user callback in a gurobi callback gurobi_callback = GurobiCallback(do_solver=self, callback=callbacks_list) self.model.optimize(gurobi_callback) # raise potential exception found during callback (useful for optuna pruning, and debugging) if self.early_stopping_exception: if isinstance(self.early_stopping_exception, SolveEarlyStop): logger.info(self.early_stopping_exception) else: raise self.early_stopping_exception # get result storage res = gurobi_callback.res # callback: solve end callbacks_list.on_solve_end(res=res, solver=self) return res
[docs] def prepare_model( self, parameters_milp: Optional[ParametersMilp] = None, **kwargs: Any ) -> None: """Set Gurobi Model parameters according to parameters_milp""" if self.model is None: self.init_model(**kwargs) if self.model is None: # for mypy raise RuntimeError( "self.model must not be None after self.init_model()." ) if parameters_milp is None: parameters_milp = ParametersMilp.default() self.model.setParam(gurobipy.GRB.Param.TimeLimit, parameters_milp.time_limit) self.model.setParam(gurobipy.GRB.Param.MIPGapAbs, parameters_milp.mip_gap_abs) self.model.setParam(gurobipy.GRB.Param.MIPGap, parameters_milp.mip_gap) self.model.setParam( gurobipy.GRB.Param.PoolSolutions, parameters_milp.pool_solutions ) self.model.setParam("PoolSearchMode", parameters_milp.pool_search_mode)
[docs] def optimize_model( self, parameters_milp: Optional[ParametersMilp] = None, **kwargs: Any ) -> None: """Optimize the Gurobi Model. The solutions are yet to be retrieved via `self.retrieve_solutions()`. No callbacks are passed to the internal solver, and no result_storage is created """ if self.model is None: self.init_model(**kwargs) if self.model is None: raise RuntimeError( "self.model must not be None after self.init_model()." ) self.prepare_model(parameters_milp=parameters_milp, **kwargs) self.model.optimize() logger.info(f"Problem has {self.model.NumObj} objectives") logger.info(f"Solver found {self.model.SolCount} solutions") logger.info(f"Objective : {self.model.getObjective().getValue()}")
[docs] def get_var_value_for_ith_solution(self, var: "gurobipy.Var", i: int): # type: ignore # avoid isinstance checks for efficiency """Get value for i-th solution of a given variable.""" if self.model is None: # for mypy raise RuntimeError( "self.model should not be None when calling this method." ) self.model.params.SolutionNumber = i return var.getAttr("Xn")
[docs] def get_obj_value_for_ith_solution(self, i: int) -> float: """Get objective value for i-th solution.""" if self.model is None: # for mypy raise RuntimeError( "self.model should not be None when calling this method." ) self.model.params.SolutionNumber = i return self.model.getAttr("PoolObjVal")
@property def nb_solutions(self) -> int: """Number of solutions found by the solver.""" if self.model is None: return 0 else: return self.model.SolCount
[docs] class GurobiCallback: def __init__(self, do_solver: GurobiMilpSolver, callback: Callback): self.do_solver = do_solver self.callback = callback self.res = ResultStorage( [], mode_optim=self.do_solver.params_objective_function.sense_function, limit_store=False, ) self.nb_solutions = 0 def __call__(self, model, where) -> None: if where == GRB.Callback.MIPSOL: try: # retrieve and store new solution sol = self.do_solver.retrieve_current_solution( get_var_value_for_current_solution=model.cbGetSolution, get_obj_value_for_current_solution=lambda: model.cbGet( GRB.Callback.MIPSOL_OBJ ), ) fit = self.do_solver.aggreg_from_sol(sol) self.res.add_solution(solution=sol, fitness=fit) self.nb_solutions += 1 # end of step callback: stopping? stopping = self.callback.on_step_end( step=self.nb_solutions, res=self.res, solver=self.do_solver ) except Exception as e: # catch exceptions because gurobi ignore them and do not stop solving self.do_solver.early_stopping_exception = e stopping = True else: if stopping: self.do_solver.early_stopping_exception = SolveEarlyStop( f"{self.do_solver.__class__.__name__}.solve() stopped by user callback." ) if stopping: model.terminate()
[docs] class CplexMilpSolver(MilpSolver): model: Optional["docplex.mp.model.Model"] results_solve: Optional[List["SolveSolution"]]
[docs] def solve( self, parameters_milp: Optional[ParametersMilp] = None, **kwargs: Any ) -> ResultStorage: if not cplex_available: logger.debug( "One or several docplex didn't work, therefore your script might crash." ) if self.model is None: self.init_model(**kwargs) if self.model is None: # for mypy raise RuntimeError( "self.model must not be None after self.init_model()." ) if parameters_milp is None: parameters_milp = ParametersMilp.default() self.model.time_limit = parameters_milp.time_limit self.model.parameters.mip.tolerances.mipgap = parameters_milp.mip_gap listener = None if parameters_milp.retrieve_all_solution or parameters_milp.n_solutions_max > 1: class SolutionStorage(SolutionListener): def __init__(self): super().__init__() self.intermediate_solutions = [] def notify_solution(self, sol): self.intermediate_solutions += [sol] listener = SolutionStorage() self.model.add_progress_listener(listener) results: "SolveSolution" = self.model.solve(log_output=True) if listener is None: self.results_solve = [results] else: self.results_solve = listener.intermediate_solutions + [results] # logger.info(f"Solver found {results.get()} solutions") logger.info(f"Objective : {self.results_solve[-1].get_objective_value()}") return self.retrieve_solutions(parameters_milp=parameters_milp)
[docs] def get_var_value_for_ith_solution( self, var: "docplex.mp.dvar.Var", i: int ) -> float: return self.results_solve[i].get_var_value(var)
[docs] def get_obj_value_for_ith_solution(self, i: int) -> float: return self.results_solve[i]
@property def nb_solutions(self) -> int: """Number of solutions found by the solver.""" if self.results_solve is None: return 0 else: return len(self.results_solve)