# Copyright (c) 2022 AIRBUS and its affiliates.
# This source code is licensed under the MIT license found in the
# LICENSE file in the root directory of this source tree.
import logging
import time
from abc import abstractmethod
from typing import Any, Hashable, List, Mapping, Optional
from discrete_optimization.generic_tools.callbacks.callback import (
Callback,
CallbackList,
)
from discrete_optimization.generic_tools.do_problem import (
ModeOptim,
ParamsObjectiveFunction,
Problem,
)
from discrete_optimization.generic_tools.do_solver import SolverDO
from discrete_optimization.generic_tools.hyperparameters.hyperparametrizable import (
Hyperparametrizable,
)
from discrete_optimization.generic_tools.lp_tools import MilpSolver, ParametersMilp
from discrete_optimization.generic_tools.result_storage.result_storage import (
ResultStorage,
)
logger = logging.getLogger(__name__)
[docs]
class ConstraintHandler(Hyperparametrizable):
[docs]
@abstractmethod
def adding_constraint_from_results_store(
self, milp_solver: MilpSolver, result_storage: ResultStorage
) -> Mapping[Hashable, Any]:
...
[docs]
@abstractmethod
def remove_constraints_from_previous_iteration(
self, milp_solver: MilpSolver, previous_constraints: Mapping[Hashable, Any]
) -> None:
...
[docs]
class InitialSolution(Hyperparametrizable):
[docs]
@abstractmethod
def get_starting_solution(self) -> ResultStorage:
...
[docs]
class InitialSolutionFromSolver(InitialSolution):
def __init__(self, solver: SolverDO, **kwargs: Any):
self.solver = solver
self.dict = kwargs
[docs]
@abstractmethod
def get_starting_solution(self) -> ResultStorage:
return self.solver.solve(**self.dict)
[docs]
class TrivialInitialSolution(InitialSolution):
def __init__(self, solution: ResultStorage, **kwargs: Any):
self.solution = solution
self.dict = kwargs
[docs]
@abstractmethod
def get_starting_solution(self) -> ResultStorage:
return self.solution
[docs]
class PostProcessSolution(Hyperparametrizable):
# From solution from MIP or CP you can build other solution.
# Here you can have many different approaches:
# if solution from mip/cp are not feasible you can code a repair function
# you can also do mall changes (filling gap in a schedule) to try to improve the solution
# you can also run algorithms from the new found solution.
[docs]
@abstractmethod
def build_other_solution(self, result_storage: ResultStorage) -> ResultStorage:
...
[docs]
class TrivialPostProcessSolution(PostProcessSolution):
def __init__(self, **kwargs):
...
[docs]
def build_other_solution(self, result_storage: ResultStorage) -> ResultStorage:
return result_storage
[docs]
class LNS_MILP(SolverDO):
def __init__(
self,
problem: Problem,
milp_solver: MilpSolver,
initial_solution_provider: InitialSolution,
constraint_handler: ConstraintHandler,
post_process_solution: Optional[PostProcessSolution] = None,
params_objective_function: Optional[ParamsObjectiveFunction] = None,
):
super().__init__(
problem=problem, params_objective_function=params_objective_function
)
self.milp_solver = milp_solver
self.initial_solution_provider = initial_solution_provider
self.constraint_handler = constraint_handler
self.post_process_solution: PostProcessSolution
if post_process_solution is None:
self.post_process_solution = TrivialPostProcessSolution()
else:
self.post_process_solution = post_process_solution
[docs]
def solve_lns(
self,
parameters_milp: ParametersMilp,
nb_iteration_lns: int,
nb_iteration_no_improvement: Optional[int] = None,
skip_first_iteration: Optional[bool] = False,
callbacks: Optional[List[Callback]] = None,
**args: Any,
) -> ResultStorage:
# wrap all callbacks in a single one
callbacks_list = CallbackList(callbacks=callbacks)
# start of solve callback
callbacks_list.on_solve_start(solver=self)
sense = self.params_objective_function.sense_function
if nb_iteration_no_improvement is None:
nb_iteration_no_improvement = 2 * nb_iteration_lns
current_nb_iteration_no_improvement = 0
if not skip_first_iteration:
store_lns = self.initial_solution_provider.get_starting_solution()
init_solution, objective = store_lns.get_best_solution_fit()
constraint_iterable = (
self.constraint_handler.adding_constraint_from_results_store(
milp_solver=self.milp_solver, result_storage=store_lns
)
)
best_objective = objective
# end of step callback: stopping?
stopping = callbacks_list.on_step_end(step=0, res=store_lns, solver=self)
else:
best_objective = float("inf")
constraint_iterable = {"empty": []}
store_lns = None
stopping = False
if not stopping:
for iteration in range(nb_iteration_lns):
result_store = self.milp_solver.solve(
parameters_milp=parameters_milp, **args
)
logger.debug("Solved !!!")
bsol, fit = result_store.get_best_solution_fit()
logger.debug(f"Fitness = {fit}")
logger.debug("Post Process..")
result_store = self.post_process_solution.build_other_solution(
result_store
)
bsol, fit = result_store.get_best_solution_fit()
logger.debug(f"After postpro = {fit}")
if sense == ModeOptim.MAXIMIZATION and fit >= best_objective:
if fit > best_objective:
current_nb_iteration_no_improvement = 0
else:
current_nb_iteration_no_improvement += 1
best_objective = fit
if sense == ModeOptim.MINIMIZATION and fit <= best_objective:
if fit < best_objective:
current_nb_iteration_no_improvement = 0
else:
current_nb_iteration_no_improvement += 1
best_objective = fit
if skip_first_iteration and iteration == 0:
store_lns = result_store
for s, f in result_store.list_solution_fits:
if store_lns is None: # for mypy, should never happen
raise RuntimeError(
"store_lns should have been initialized for now"
)
store_lns.add_solution(solution=s, fitness=f)
logger.debug("Removing constraint:")
self.constraint_handler.remove_constraints_from_previous_iteration(
milp_solver=self.milp_solver,
previous_constraints=constraint_iterable,
)
logger.debug("Adding constraint:")
constraint_iterable = (
self.constraint_handler.adding_constraint_from_results_store(
milp_solver=self.milp_solver, result_storage=result_store
)
)
if current_nb_iteration_no_improvement > nb_iteration_no_improvement:
logger.info("Finish LNS with maximum no improvement iteration ")
break
# end of step callback: stopping?
if skip_first_iteration:
step = iteration
else:
step = iteration + 1
stopping = callbacks_list.on_step_end(
step=step, res=store_lns, solver=self
)
if stopping:
break
if store_lns is None: # for mypy, should never happen
raise RuntimeError("store_lns should have been initialized for now")
# end of solve callback
callbacks_list.on_solve_end(res=store_lns, solver=self)
return store_lns
[docs]
def solve(
self,
parameters_milp: ParametersMilp,
nb_iteration_lns: int,
nb_iteration_no_improvement: Optional[int] = None,
skip_first_iteration: Optional[bool] = False,
callbacks: Optional[List[Callback]] = None,
**kwargs: Any,
) -> ResultStorage:
return self.solve_lns(
parameters_milp=parameters_milp,
nb_iteration_lns=nb_iteration_lns,
nb_iteration_no_improvement=nb_iteration_no_improvement,
skip_first_iteration=skip_first_iteration,
callbacks=callbacks,
**kwargs,
)