# Copyright (c) 2022 AIRBUS and its affiliates.
# This source code is licensed under the MIT license found in the
# LICENSE file in the root directory of this source tree.
import logging
import pickle
import time
from typing import Any, List, Optional
from discrete_optimization.generic_tools.callbacks.callback import (
Callback,
CallbackList,
)
from discrete_optimization.generic_tools.do_mutation import Mutation
from discrete_optimization.generic_tools.do_problem import (
ModeOptim,
ParamsObjectiveFunction,
Problem,
Solution,
build_evaluate_function_aggregated,
)
from discrete_optimization.generic_tools.do_solver import SolverDO
from discrete_optimization.generic_tools.ls.local_search import (
ModeMutation,
RestartHandler,
)
from discrete_optimization.generic_tools.result_storage.result_storage import (
ParetoFront,
ResultStorage,
)
logger = logging.getLogger(__name__)
[docs]
class HillClimber(SolverDO):
def __init__(
self,
problem: Problem,
mutator: Mutation,
restart_handler: RestartHandler,
mode_mutation: ModeMutation,
params_objective_function: Optional[ParamsObjectiveFunction] = None,
store_solution: bool = False,
):
super().__init__(
problem=problem, params_objective_function=params_objective_function
)
self.mutator = mutator
self.restart_handler = restart_handler
self.mode_mutation = mode_mutation
self.params_objective_function = params_objective_function
if params_objective_function is not None:
self.mode_optim = params_objective_function.sense_function
else:
self.mode_optim = ModeOptim.MAXIMIZATION
self.store_solution = store_solution
[docs]
def solve(
self,
initial_variable: Solution,
nb_iteration_max: int,
callbacks: Optional[List[Callback]] = None,
**kwargs: Any,
) -> ResultStorage:
callbacks_list = CallbackList(callbacks=callbacks)
objective = self.aggreg_from_dict(self.problem.evaluate(initial_variable))
cur_variable = initial_variable.copy()
if self.store_solution:
store = ResultStorage(
list_solution_fits=[(initial_variable, objective)],
best_solution=initial_variable.copy(),
limit_store=True,
nb_best_store=1000,
)
else:
store = ResultStorage(
list_solution_fits=[(initial_variable, objective)],
best_solution=initial_variable.copy(),
limit_store=True,
nb_best_store=1,
)
cur_objective = objective
cur_best_objective = objective
self.restart_handler.best_fitness = objective
self.restart_handler.solution_best = initial_variable.copy()
iteration = 0
# start of solve callback
callbacks_list.on_solve_start(solver=self)
while iteration < nb_iteration_max:
accept = False
local_improvement = False
global_improvement = False
if self.mode_mutation == ModeMutation.MUTATE:
nv, move = self.mutator.mutate(cur_variable)
objective = self.aggreg_from_sol(nv)
elif self.mode_mutation == ModeMutation.MUTATE_AND_EVALUATE:
nv, move, objective_dict_values = self.mutator.mutate_and_compute_obj(
cur_variable
)
objective = self.aggreg_from_dict(objective_dict_values)
if self.mode_optim == ModeOptim.MINIMIZATION and objective < cur_objective:
accept = True
local_improvement = True
global_improvement = objective < cur_best_objective
elif (
self.mode_optim == ModeOptim.MAXIMIZATION and objective > cur_objective
):
accept = True
local_improvement = True
global_improvement = objective > cur_best_objective
if accept:
cur_objective = objective
cur_variable = nv
else:
cur_variable = move.backtrack_local_move(nv)
if self.store_solution:
store.add_solution(nv.copy(), objective)
if global_improvement:
logger.debug(f"iter {iteration}")
logger.debug(f"new obj {objective} better than {cur_best_objective}")
cur_best_objective = objective
if not self.store_solution:
store.add_solution(cur_variable.copy(), objective)
# Update the temperature
self.restart_handler.update(
nv, objective, global_improvement, local_improvement
)
# Update info in restart handler
cur_variable, cur_objective = self.restart_handler.restart(
cur_variable, cur_objective
)
# possibly restart somewhere
iteration += 1
# end of step callback: stopping?
stopping = callbacks_list.on_step_end(
step=iteration, res=store, solver=self
)
if stopping:
break
store.finalize()
# end of solve callback
callbacks_list.on_solve_end(res=store, solver=self)
return store
[docs]
class HillClimberPareto(HillClimber):
def __init__(
self,
problem: Problem,
mutator: Mutation,
restart_handler: RestartHandler,
mode_mutation: ModeMutation,
params_objective_function: Optional[ParamsObjectiveFunction] = None,
store_solution: bool = False,
):
super().__init__(
problem=problem,
mutator=mutator,
restart_handler=restart_handler,
mode_mutation=mode_mutation,
params_objective_function=params_objective_function,
store_solution=store_solution,
)
[docs]
def solve(
self,
initial_variable: Solution,
nb_iteration_max: int,
update_iteration_pareto: int = 1000,
callbacks: Optional[List[Callback]] = None,
**kwargs: Any,
) -> ParetoFront:
callbacks_list = CallbackList(callbacks=callbacks)
objective = self.aggreg_from_dict(self.problem.evaluate(initial_variable))
pareto_front = ParetoFront(
list_solution_fits=[(initial_variable, objective)],
best_solution=initial_variable.copy(),
limit_store=True,
nb_best_store=1000,
)
cur_variable = initial_variable.copy()
cur_objective = objective
cur_best_objective = objective
self.restart_handler.best_fitness = objective
self.restart_handler.solution_best = initial_variable.copy()
iteration = 0
# start of solve callback
callbacks_list.on_solve_start(solver=self)
while iteration < nb_iteration_max:
accept = False
local_improvement = False
global_improvement = False
if iteration % update_iteration_pareto == 0:
pareto_front.finalize()
if self.mode_mutation == ModeMutation.MUTATE:
nv, move = self.mutator.mutate(cur_variable)
objective = self.aggreg_from_sol(nv)
elif self.mode_mutation == ModeMutation.MUTATE_AND_EVALUATE:
nv, move, objective_dict_values = self.mutator.mutate_and_compute_obj(
cur_variable
)
objective = self.aggreg_from_dict(objective_dict_values)
if self.mode_optim == ModeOptim.MINIMIZATION and objective < cur_objective:
accept = True
local_improvement = True
global_improvement = objective < cur_best_objective
pareto_front.add_solution(nv.copy(), objective)
elif (
self.mode_optim == ModeOptim.MINIMIZATION and objective == cur_objective
):
accept = True
local_improvement = True
global_improvement = objective == cur_best_objective
pareto_front.add_solution(nv.copy(), objective)
elif (
self.mode_optim == ModeOptim.MAXIMIZATION and objective > cur_objective
):
accept = True
local_improvement = True
global_improvement = objective > cur_best_objective
pareto_front.add_solution(nv.copy(), objective)
elif (
self.mode_optim == ModeOptim.MAXIMIZATION and objective == cur_objective
):
accept = True
local_improvement = True
global_improvement = objective == cur_best_objective
pareto_front.add_solution(nv.copy(), objective)
if accept:
logger.debug(f"Accept : {objective}")
cur_objective = objective
cur_variable = nv
else:
cur_variable = move.backtrack_local_move(nv)
if global_improvement:
logger.debug(f"iter {iteration}")
logger.debug(f"new obj {objective} better than {cur_best_objective}")
cur_best_objective = objective
# Update the temperature
self.restart_handler.update(
nv, objective, global_improvement, local_improvement
)
logger.debug(f"Len pareto : {pareto_front.len_pareto_front()}")
# Update info in restart handler
cur_variable, cur_objective = self.restart_handler.restart(
cur_variable, cur_objective
)
# possibly restart somewhere
iteration += 1
# end of step callback: stopping?
stopping = callbacks_list.on_step_end(
step=iteration, res=pareto_front, solver=self
)
if stopping:
break
pareto_front.finalize()
# end of solve callback
callbacks_list.on_solve_end(res=pareto_front, solver=self)
return pareto_front