Source code for discrete_optimization.generic_tools.lns_cp

#  Copyright (c) 2022 AIRBUS and its affiliates.
#  This source code is licensed under the MIT license found in the
#  LICENSE file in the root directory of this source tree.

import logging
import random
import sys
import time
from abc import abstractmethod
from typing import Any, Dict, Iterable, List, Optional

import numpy as np
from minizinc import Instance

from discrete_optimization.generic_tools.callbacks.callback import (
    Callback,
    CallbackList,
)
from discrete_optimization.generic_tools.cp_tools import (
    CPSolver,
    MinizincCPSolver,
    ParametersCP,
    StatusSolver,
)
from discrete_optimization.generic_tools.do_problem import (
    ModeOptim,
    ParamsObjectiveFunction,
    Problem,
)
from discrete_optimization.generic_tools.do_solver import SolverDO
from discrete_optimization.generic_tools.hyperparameters.hyperparameter import (
    CategoricalHyperparameter,
    IntegerHyperparameter,
    SubBrickHyperparameter,
    SubBrickKwargsHyperparameter,
)
from discrete_optimization.generic_tools.hyperparameters.hyperparametrizable import (
    Hyperparametrizable,
)
from discrete_optimization.generic_tools.lns_mip import (
    InitialSolution,
    PostProcessSolution,
    TrivialPostProcessSolution,
)
from discrete_optimization.generic_tools.result_storage.result_storage import (
    ResultStorage,
    fitness_class,
)

if sys.version_info >= (3, 8):
    from typing import TypedDict  # pylint: disable=no-name-in-module
else:
    from typing_extensions import TypedDict


logger = logging.getLogger(__name__)


[docs] class ConstraintHandler(Hyperparametrizable):
[docs] @abstractmethod def adding_constraint_from_results_store( self, cp_solver: CPSolver, child_instance: Instance, result_storage: ResultStorage, last_result_store: Optional[ResultStorage] = None, ) -> Iterable[Any]: ...
[docs] @abstractmethod def remove_constraints_from_previous_iteration( self, cp_solver: CPSolver, child_instance: Instance, previous_constraints: Iterable[Any], ) -> None: ...
[docs] class LNS_CP(SolverDO): hyperparameters = [ SubBrickHyperparameter("cp_solver_cls", choices=[], default=None), SubBrickKwargsHyperparameter( "cp_solver_kwargs", subbrick_hyperparameter="cp_solver_cls" ), SubBrickHyperparameter( "initial_solution_provider_cls", choices=[], default=None ), SubBrickKwargsHyperparameter( "initial_solution_provider_kwargs", subbrick_hyperparameter="initial_solution_provider_cls", ), SubBrickHyperparameter( "constraint_handler_cls", choices=[], default=None, ), SubBrickKwargsHyperparameter( "constraint_handler_kwargs", subbrick_hyperparameter="constraint_handler_cls", ), SubBrickHyperparameter( "post_process_solution_cls", choices=[], default=TrivialPostProcessSolution, ), SubBrickKwargsHyperparameter( "post_process_solution_kwargs", subbrick_hyperparameter="post_process_solution_cls", ), CategoricalHyperparameter( name="skip_first_iteration", choices=[True, False], default=False ), ] def __init__( self, problem: Problem, cp_solver: Optional[MinizincCPSolver] = None, initial_solution_provider: Optional[InitialSolution] = None, constraint_handler: Optional[ConstraintHandler] = None, post_process_solution: Optional[PostProcessSolution] = None, params_objective_function: Optional[ParamsObjectiveFunction] = None, **kwargs: Any, ): super().__init__( problem=problem, params_objective_function=params_objective_function ) kwargs = self.complete_with_default_hyperparameters(kwargs) if cp_solver is None: if kwargs["cp_solver_kwargs"] is None: cp_solver_kwargs = kwargs else: cp_solver_kwargs = kwargs["cp_solver_kwargs"] if kwargs["cp_solver_cls"] is None: raise ValueError( "`cp_solver_cls` cannot be None if `cp_solver` is not specified." ) else: cp_solver_cls = kwargs["cp_solver_cls"] cp_solver = cp_solver_cls(problem=self.problem, **cp_solver_kwargs) cp_solver.init_model(**cp_solver_kwargs) self.cp_solver = cp_solver if constraint_handler is None: if kwargs["constraint_handler_kwargs"] is None: constraint_handler_kwargs = kwargs else: constraint_handler_kwargs = kwargs["constraint_handler_kwargs"] if kwargs["constraint_handler_cls"] is None: raise ValueError( "`constraint_handler_cls` cannot be None if `constraint_handler` is not specified." ) else: constraint_handler_cls = kwargs["constraint_handler_cls"] constraint_handler = constraint_handler_cls( problem=self.problem, **constraint_handler_kwargs ) self.constraint_handler = constraint_handler if post_process_solution is None: if kwargs["post_process_solution_kwargs"] is None: post_process_solution_kwargs = kwargs else: post_process_solution_kwargs = kwargs["post_process_solution_kwargs"] if kwargs["post_process_solution_cls"] is None: post_process_solution = None else: post_process_solution_cls = kwargs["post_process_solution_cls"] post_process_solution = post_process_solution_cls( problem=self.problem, params_objective_function=self.params_objective_function, **post_process_solution_kwargs, ) self.post_process_solution = post_process_solution if initial_solution_provider is None: if kwargs["initial_solution_provider_kwargs"] is None: initial_solution_provider_kwargs = kwargs else: initial_solution_provider_kwargs = kwargs[ "initial_solution_provider_kwargs" ] if kwargs["initial_solution_provider_cls"] is None: initial_solution_provider = ( None # ok if solve_lns with skip_first_iteration ) else: initial_solution_provider_cls = kwargs["initial_solution_provider_cls"] initial_solution_provider = initial_solution_provider_cls( problem=self.problem, params_objective_function=self.params_objective_function, **initial_solution_provider_kwargs, ) self.initial_solution_provider = initial_solution_provider
[docs] def solve_lns( self, parameters_cp: ParametersCP, nb_iteration_lns: int, nb_iteration_no_improvement: Optional[int] = None, skip_first_iteration: bool = False, stop_first_iteration_if_optimal: bool = True, callbacks: Optional[List[Callback]] = None, **kwargs: Any, ) -> ResultStorage: # wrap all callbacks in a single one callbacks_list = CallbackList(callbacks=callbacks) # start of solve callback callbacks_list.on_solve_start(solver=self) # manage None post_process_solution (can happen in subclasses __init__) if self.post_process_solution is None: self.post_process_solution = TrivialPostProcessSolution() sense = self.params_objective_function.sense_function if nb_iteration_no_improvement is None: nb_iteration_no_improvement = 2 * nb_iteration_lns current_nb_iteration_no_improvement = 0 if self.cp_solver.instance is None: self.cp_solver.init_model() if self.cp_solver.instance is None: # for mypy raise RuntimeError( "CP model instance must not be None after calling init_model()!" ) if not skip_first_iteration: if self.initial_solution_provider is None: raise ValueError( "self.initial_solution_provider cannot be None if not skip_first_iteration." ) store_lns = self.initial_solution_provider.get_starting_solution() store_lns = self.post_process_solution.build_other_solution(store_lns) init_solution, objective = store_lns.get_best_solution_fit() if init_solution is None: satisfy = False else: satisfy = self.problem.satisfy(init_solution) logger.debug(f"Satisfy Initial solution {satisfy}") try: logger.debug( f"Nb task preempted = {init_solution.get_nb_task_preemption()}" # type: ignore ) logger.debug(f"Nb max preemption = {init_solution.get_max_preempted()}") # type: ignore except: pass best_objective = objective # end of step callback: stopping? stopping = callbacks_list.on_step_end(step=0, res=store_lns, solver=self) else: best_objective = ( float("inf") if sense == ModeOptim.MINIMIZATION else -float("inf") ) store_lns = None stopping = False result_store: ResultStorage if not stopping: for iteration in range(nb_iteration_lns): logger.info( f"Starting iteration n° {iteration} current objective {best_objective}" ) with self.cp_solver.instance.branch() as child: if iteration == 0 and not skip_first_iteration or iteration >= 1: constraint_iterable = self.constraint_handler.adding_constraint_from_results_store( cp_solver=self.cp_solver, child_instance=child, result_storage=store_lns, last_result_store=store_lns if iteration == 0 else result_store, ) try: if iteration == 0: parameters_cp0 = parameters_cp.copy() parameters_cp0.time_limit = parameters_cp.time_limit_iter0 result_store = self.cp_solver.solve( parameters_cp=parameters_cp0, instance=child ) else: result_store = self.cp_solver.solve( parameters_cp=parameters_cp, instance=child ) logger.info(f"iteration n° {iteration} Solved !!!") logger.info(self.cp_solver.status_solver) if len(result_store.list_solution_fits) > 0: logger.debug("Solved !!!") bsol, fit = result_store.get_best_solution_fit() logger.debug(f"Fitness Before = {fit}") if bsol is not None: logger.debug( f"Satisfaction Before = {self.problem.satisfy(bsol)}" ) else: logger.debug(f"Satisfaction Before = {False}") logger.debug("Post Process..") result_store = ( self.post_process_solution.build_other_solution( result_store ) ) bsol, fit = result_store.get_best_solution_fit() if bsol is not None: logger.debug( f"Satisfaction After = {self.problem.satisfy(bsol)}" ) else: logger.debug(f"Satisfaction After = {False}") if ( sense == ModeOptim.MAXIMIZATION and fit >= best_objective ): if fit > best_objective: current_nb_iteration_no_improvement = 0 else: current_nb_iteration_no_improvement += 1 best_objective = fit elif sense == ModeOptim.MAXIMIZATION: current_nb_iteration_no_improvement += 1 elif ( sense == ModeOptim.MINIMIZATION and fit <= best_objective ): if fit < best_objective: current_nb_iteration_no_improvement = 0 else: current_nb_iteration_no_improvement += 1 best_objective = fit elif sense == ModeOptim.MINIMIZATION: current_nb_iteration_no_improvement += 1 if skip_first_iteration and iteration == 0: store_lns = result_store else: for s, f in list(result_store.list_solution_fits): store_lns.add_solution(solution=s, fitness=f) else: current_nb_iteration_no_improvement += 1 if ( skip_first_iteration and self.cp_solver.status_solver == StatusSolver.OPTIMAL and iteration == 0 and self.problem.satisfy(bsol) and stop_first_iteration_if_optimal ): logger.info("Finish LNS because found optimal solution") break except Exception as e: current_nb_iteration_no_improvement += 1 logger.warning(f"Failed ! reason : {e}") logger.debug( f"{current_nb_iteration_no_improvement} / {nb_iteration_no_improvement}" ) if ( current_nb_iteration_no_improvement > nb_iteration_no_improvement ): logger.info("Finish LNS with maximum no improvement iteration ") break # end of step callback: stopping? if skip_first_iteration: step = iteration else: step = iteration + 1 stopping = callbacks_list.on_step_end( step=step, res=store_lns, solver=self ) if stopping: break # end of solve callback callbacks_list.on_solve_end(res=store_lns, solver=self) return store_lns
[docs] def solve( self, nb_iteration_lns: int, parameters_cp: Optional[ParametersCP] = None, nb_iteration_no_improvement: Optional[int] = None, skip_first_iteration: bool = False, stop_first_iteration_if_optimal: bool = True, callbacks: Optional[List[Callback]] = None, **kwargs: Any, ) -> ResultStorage: if parameters_cp is None: parameters_cp = ParametersCP.default() return self.solve_lns( parameters_cp=parameters_cp, nb_iteration_lns=nb_iteration_lns, nb_iteration_no_improvement=nb_iteration_no_improvement, skip_first_iteration=skip_first_iteration, stop_first_iteration_if_optimal=stop_first_iteration_if_optimal, callbacks=callbacks, **kwargs, )
[docs] class LNS_CPlex(SolverDO): def __init__( self, problem: Problem, cp_solver: CPSolver, initial_solution_provider: InitialSolution, constraint_handler: ConstraintHandler, post_process_solution: Optional[PostProcessSolution] = None, params_objective_function: Optional[ParamsObjectiveFunction] = None, ): super().__init__( problem=problem, params_objective_function=params_objective_function ) self.cp_solver = cp_solver self.initial_solution_provider = initial_solution_provider self.constraint_handler = constraint_handler self.post_process_solution: PostProcessSolution if post_process_solution is None: self.post_process_solution = TrivialPostProcessSolution() else: self.post_process_solution = post_process_solution
[docs] def solve_lns( self, parameters_cp: ParametersCP, nb_iteration_lns: int, nb_iteration_no_improvement: Optional[int] = None, skip_first_iteration: bool = False, callbacks: Optional[List[Callback]] = None, **kwargs: Any, ) -> ResultStorage: # wrap all callbacks in a single one callbacks_list = CallbackList(callbacks=callbacks) # start of solve callback callbacks_list.on_solve_start(solver=self) sense = self.params_objective_function.sense_function if nb_iteration_no_improvement is None: nb_iteration_no_improvement = 2 * nb_iteration_lns current_nb_iteration_no_improvement = 0 if not skip_first_iteration: store_lns = self.initial_solution_provider.get_starting_solution() store_lns = self.post_process_solution.build_other_solution(store_lns) init_solution, objective = store_lns.get_best_solution_fit() satisfy = self.problem.satisfy(init_solution) logger.debug(f"Satisfy Initial solution {satisfy}") try: logger.debug( f"Nb task preempted = {init_solution.get_nb_task_preemption()}" ) logger.debug(f"Nb max preemption = {init_solution.get_max_preempted()}") except: pass best_objective = objective # end of step callback: stopping? stopping = callbacks_list.on_step_end(step=0, res=store_lns, solver=self) else: best_objective = ( float("inf") if sense == ModeOptim.MINIMIZATION else -float("inf") ) store_lns = None stopping = False result_store: ResultStorage if not stopping: for iteration in range(nb_iteration_lns): logger.debug( f"Starting iteration n° {iteration} current objective {best_objective}" ) if iteration == 0 and not skip_first_iteration or iteration >= 1: constraint_iterable = ( self.constraint_handler.adding_constraint_from_results_store( cp_solver=self.cp_solver, child_instance=None, result_storage=store_lns, last_result_store=store_lns if iteration == 0 else result_store, ) ) try: if iteration == 0: p = parameters_cp.default() p.time_limit = parameters_cp.time_limit_iter0 result_store = self.cp_solver.solve(parameters_cp=parameters_cp) else: result_store = self.cp_solver.solve(parameters_cp=parameters_cp) logger.debug(f"iteration n° {iteration} Solved !!!") if len(result_store.list_solution_fits) > 0: logger.debug("Solved !!!") bsol, fit = result_store.get_best_solution_fit() logger.debug(f"Fitness Before = {fit}") logger.debug( f"Satisfaction Before = {self.problem.satisfy(bsol)}" ) logger.debug("Post Process..") result_store = self.post_process_solution.build_other_solution( result_store ) bsol, fit = result_store.get_best_solution_fit() logger.debug(f"Satisfy after : {self.problem.satisfy(bsol)}") if sense == ModeOptim.MAXIMIZATION and fit >= best_objective: if fit > best_objective: current_nb_iteration_no_improvement = 0 else: current_nb_iteration_no_improvement += 1 best_objective = fit elif sense == ModeOptim.MAXIMIZATION: current_nb_iteration_no_improvement += 1 elif sense == ModeOptim.MINIMIZATION and fit <= best_objective: if fit < best_objective: current_nb_iteration_no_improvement = 0 else: current_nb_iteration_no_improvement += 1 best_objective = fit elif sense == ModeOptim.MINIMIZATION: current_nb_iteration_no_improvement += 1 if skip_first_iteration and iteration == 0: store_lns = result_store else: for s, f in list(result_store.list_solution_fits): store_lns.add_solution(solution=s, fitness=f) else: current_nb_iteration_no_improvement += 1 except Exception as e: current_nb_iteration_no_improvement += 1 logger.warning(f"Failed ! reason : {e}") logger.debug( f"{current_nb_iteration_no_improvement} / {nb_iteration_no_improvement}" ) if current_nb_iteration_no_improvement > nb_iteration_no_improvement: logger.info("Finish LNS with maximum no improvement iteration ") break # end of step callback: stopping? if skip_first_iteration: step = iteration else: step = iteration + 1 stopping = callbacks_list.on_step_end( step=step, res=store_lns, solver=self ) if stopping: break # end of solve callback callbacks_list.on_solve_end(res=store_lns, solver=self) return store_lns
[docs] def solve( self, nb_iteration_lns: int, parameters_cp: Optional[ParametersCP] = None, nb_iteration_no_improvement: Optional[int] = None, skip_first_iteration: bool = False, callbacks: Optional[List[Callback]] = None, **kwargs: Any, ) -> ResultStorage: if parameters_cp is None: parameters_cp = ParametersCP.default() return self.solve_lns( parameters_cp=parameters_cp, nb_iteration_lns=nb_iteration_lns, nb_iteration_no_improvement=nb_iteration_no_improvement, skip_first_iteration=skip_first_iteration, callbacks=callbacks, **kwargs, )
[docs] class ConstraintStatus(TypedDict): nb_usage: int nb_improvement: int name: str
[docs] class ConstraintHandlerMix(ConstraintHandler): def __init__( self, problem: Problem, list_constraints_handler: List[ConstraintHandler], list_proba: List[float], update_proba: bool = True, tag_constraint_handler: Optional[List[str]] = None, sequential: bool = False, ): self.problem = problem self.list_constraints_handler = list_constraints_handler self.sequential = sequential if tag_constraint_handler is None: self.tag_constraint_handler = [ str(i) for i in range(len(self.list_constraints_handler)) ] else: self.tag_constraint_handler = tag_constraint_handler self.list_proba = np.array(list_proba) self.list_proba = self.list_proba / np.sum(self.list_proba) self.index_np = np.array(range(len(self.list_proba)), dtype=np.int_) self.current_iteration = 0 self.status: Dict[int, ConstraintStatus] = { i: { "nb_usage": 0, "nb_improvement": 0, "name": self.tag_constraint_handler[i], } for i in range(len(self.list_constraints_handler)) } self.last_index_param: Optional[int] = None self.last_fitness: Optional[fitness_class] = None self.update_proba = update_proba
[docs] def adding_constraint_from_results_store( self, cp_solver: CPSolver, child_instance: Instance, result_storage: ResultStorage, last_result_store: Optional[ResultStorage] = None, ) -> Iterable[Any]: new_fitness = result_storage.get_best_solution_fit()[1] if self.last_index_param is not None: if new_fitness != self.last_fitness: self.status[self.last_index_param]["nb_improvement"] += 1 self.last_fitness = new_fitness if self.update_proba: self.list_proba[self.last_index_param] *= 1.05 self.list_proba = self.list_proba / np.sum(self.list_proba) else: if self.update_proba: self.list_proba[self.last_index_param] *= 0.95 self.list_proba = self.list_proba / np.sum(self.list_proba) else: self.last_fitness = new_fitness if self.sequential: if self.last_index_param is not None: choice = (self.last_index_param + 1) % len( self.list_constraints_handler ) else: choice = 0 else: if random.random() <= 0.95: choice = np.random.choice(self.index_np, size=1, p=self.list_proba)[0] else: max_improvement = max( [ self.status[x]["nb_improvement"] / max(self.status[x]["nb_usage"], 1) for x in self.status ] ) choice = random.choice( [ x for x in self.status if self.status[x]["nb_improvement"] / max(self.status[x]["nb_usage"], 1) == max_improvement ] ) ch = self.list_constraints_handler[int(choice)] self.current_iteration += 1 self.last_index_param = choice self.status[self.last_index_param]["nb_usage"] += 1 logger.debug(f"Status {self.status}") return ch.adding_constraint_from_results_store( cp_solver, child_instance, result_storage, last_result_store )
[docs] def remove_constraints_from_previous_iteration( self, cp_solver: CPSolver, child_instance: Instance, previous_constraints: Iterable[Any], ) -> None: pass