# Copyright (c) 2022 AIRBUS and its affiliates.
# This source code is licensed under the MIT license found in the
# LICENSE file in the root directory of this source tree.
import logging
import pickle
import random
import time
from abc import abstractmethod
from typing import Any, Callable, Dict, List, Optional
import numpy as np
from discrete_optimization.generic_tools.callbacks.callback import (
Callback,
CallbackList,
)
from discrete_optimization.generic_tools.do_mutation import Mutation
from discrete_optimization.generic_tools.do_problem import (
ModeOptim,
ObjectiveHandling,
ParamsObjectiveFunction,
Problem,
Solution,
)
from discrete_optimization.generic_tools.do_solver import SolverDO
from discrete_optimization.generic_tools.ls.local_search import (
ModeMutation,
RestartHandler,
)
from discrete_optimization.generic_tools.result_storage.result_storage import (
ResultStorage,
)
logger = logging.getLogger(__name__)
[docs]
class TemperatureScheduling:
nb_iteration: int
restart_handler: RestartHandler
temperature: float
[docs]
@abstractmethod
def next_temperature(self) -> float:
...
[docs]
class SimulatedAnnealing(SolverDO):
aggreg_from_sol: Callable[[Solution], float]
aggreg_from_dict: Callable[[Dict[str, float]], float]
def __init__(
self,
problem: Problem,
mutator: Mutation,
restart_handler: RestartHandler,
temperature_handler: TemperatureScheduling,
mode_mutation: ModeMutation,
params_objective_function: Optional[ParamsObjectiveFunction] = None,
store_solution: bool = False,
):
super().__init__(
problem=problem, params_objective_function=params_objective_function
)
self.mutator = mutator
self.restart_handler = restart_handler
self.temperature_handler = temperature_handler
self.mode_mutation = mode_mutation
if (
self.params_objective_function.objective_handling
== ObjectiveHandling.MULTI_OBJ
):
raise NotImplementedError(
"SimulatedAnnealing is not implemented for multi objective optimization."
)
self.mode_optim = self.params_objective_function.sense_function
self.store_solution = store_solution
[docs]
def solve(
self,
initial_variable: Solution,
nb_iteration_max: int,
callbacks: Optional[List[Callback]] = None,
**kwargs: Any,
) -> ResultStorage:
callbacks_list = CallbackList(callbacks=callbacks)
objective = self.aggreg_from_dict(self.problem.evaluate(initial_variable))
cur_variable = initial_variable.copy()
cur_objective = objective
cur_best_objective = objective
if self.store_solution:
store = ResultStorage(
list_solution_fits=[(initial_variable, objective)],
best_solution=initial_variable.copy(),
limit_store=True,
mode_optim=self.params_objective_function.sense_function,
nb_best_store=1000,
)
else:
store = ResultStorage(
list_solution_fits=[(initial_variable, objective)],
best_solution=initial_variable.copy(),
limit_store=True,
mode_optim=self.params_objective_function.sense_function,
nb_best_store=1,
)
self.restart_handler.best_fitness = objective
self.restart_handler.solution_best = initial_variable.copy()
iteration = 0
# start of solve callback
callbacks_list.on_solve_start(solver=self)
while iteration < nb_iteration_max:
local_improvement = False
global_improvement = False
if self.mode_mutation == ModeMutation.MUTATE:
nv, move = self.mutator.mutate(cur_variable)
objective = self.aggreg_from_dict(self.problem.evaluate(nv))
else: # self.mode_mutation == ModeMutation.MUTATE_AND_EVALUATE:
nv, move, objective_dict_values = self.mutator.mutate_and_compute_obj(
cur_variable
)
objective = self.aggreg_from_dict(objective_dict_values)
logger.debug(
f"{iteration} / {nb_iteration_max} {objective} {cur_objective}"
)
if self.mode_optim == ModeOptim.MINIMIZATION and objective < cur_objective:
accept = True
local_improvement = True
global_improvement = objective < cur_best_objective
elif (
self.mode_optim == ModeOptim.MAXIMIZATION and objective > cur_objective
):
accept = True
local_improvement = True
global_improvement = objective > cur_best_objective
else:
r = random.random()
fac = 1 if self.mode_optim == ModeOptim.MAXIMIZATION else -1
p = np.exp(
fac
* (objective - cur_objective)
/ self.temperature_handler.temperature
)
accept = p > r
if accept:
cur_objective = objective
cur_variable = nv
logger.debug(f"iter accepted {iteration}")
logger.debug(f"acceptance {objective}")
else:
cur_variable = move.backtrack_local_move(nv)
if self.store_solution:
store.add_solution(nv.copy(), objective)
if global_improvement:
logger.info(f"iter {iteration}")
logger.info(f"new obj {objective} better than {cur_best_objective}")
cur_best_objective = objective
if not self.store_solution:
store.add_solution(cur_variable.copy(), objective)
self.temperature_handler.next_temperature()
# Update the temperature
self.restart_handler.update(
nv, objective, global_improvement, local_improvement
)
# Update info in restart handler
cur_variable, cur_objective = self.restart_handler.restart( # type: ignore
cur_variable, cur_objective
)
# possibly restart somewhere
iteration += 1
# end of step callback: stopping?
stopping = callbacks_list.on_step_end(
step=iteration, res=store, solver=self
)
if stopping:
break
store.finalize()
# end of solve callback
callbacks_list.on_solve_end(res=store, solver=self)
return store
[docs]
class TemperatureSchedulingFactor(TemperatureScheduling):
def __init__(
self,
temperature: float,
restart_handler: RestartHandler,
coefficient: float = 0.99,
):
self.temperature = temperature
self.restart_handler = restart_handler
self.coefficient = coefficient
[docs]
def next_temperature(self) -> float:
self.temperature *= self.coefficient
return self.temperature