# Copyright (c) 2022 AIRBUS and its affiliates.
# This source code is licensed under the MIT license found in the
# LICENSE file in the root directory of this source tree.
import logging
import random
from typing import Any, Callable, Dict, List, Optional, Tuple, Union
import numpy as np
from deap import algorithms, base, creator, tools
from discrete_optimization.generic_tools.do_mutation import Mutation
from discrete_optimization.generic_tools.do_problem import (
EncodingRegister,
ModeOptim,
ObjectiveHandling,
ParamsObjectiveFunction,
Problem,
Solution,
TypeAttribute,
build_evaluate_function_aggregated,
)
from discrete_optimization.generic_tools.do_solver import SolverDO
from discrete_optimization.generic_tools.ea.deap_wrappers import generic_mutate_wrapper
from discrete_optimization.generic_tools.ea.ga import (
DeapCrossover,
DeapMutation,
DeapSelection,
)
from discrete_optimization.generic_tools.result_storage.result_storage import (
ResultStorage,
TupleFitness,
)
logger = logging.getLogger(__name__)
_default_crossovers = {
TypeAttribute.LIST_BOOLEAN: DeapCrossover.CX_UNIFORM,
TypeAttribute.LIST_INTEGER: DeapCrossover.CX_ONE_POINT,
TypeAttribute.LIST_INTEGER_SPECIFIC_ARITY: DeapCrossover.CX_ONE_POINT,
TypeAttribute.PERMUTATION: DeapCrossover.CX_UNIFORM_PARTIALY_MATCHED,
}
_default_mutations = {
TypeAttribute.LIST_BOOLEAN: DeapMutation.MUT_FLIP_BIT,
TypeAttribute.LIST_INTEGER: DeapMutation.MUT_UNIFORM_INT,
TypeAttribute.LIST_INTEGER_SPECIFIC_ARITY: DeapMutation.MUT_UNIFORM_INT,
TypeAttribute.PERMUTATION: DeapMutation.MUT_SHUFFLE_INDEXES,
}
[docs]
class Nsga(SolverDO):
"""NSGA
Args:
problem:
the problem to solve
encoding:
name (str) of an encoding registered in the register solution of Problem
or a dictionary of the form {'type': TypeAttribute, 'n': int} where type refers to a TypeAttribute and n
to the dimension of the problem in this encoding (e.g. length of the vector)
by default, the first encoding in the problem register_solution will be used.
"""
def __init__(
self,
problem: Problem,
objectives: Union[str, List[str]],
mutation: Optional[Union[Mutation, DeapMutation]] = None,
crossover: Optional[DeapCrossover] = None,
encoding: Optional[Union[str, Dict[str, Any]]] = None,
objective_weights: Optional[List[float]] = None,
pop_size: int = 100,
max_evals: Optional[int] = None,
mut_rate: float = 0.1,
crossover_rate: float = 0.9,
deap_verbose: bool = True,
):
self.problem = problem
if not hasattr(self.problem, "evaluate_from_encoding"):
raise ValueError("self.problem shoud define an evaluate_from_encoding()")
# self.problem.evaluate_from_encoding: Callable[[List[int], str], Dict[str, float]]
self._pop_size = pop_size
if max_evals is not None:
self._max_evals = max_evals
else:
self._max_evals = 100 * self._pop_size
logger.warning(
"No value specified for max_evals. Using the default 10*pop_size - This should really be set carefully"
)
self._mut_rate = mut_rate
self._crossover_rate = crossover_rate
self._deap_verbose = deap_verbose
# set encoding
register_solution: EncodingRegister = problem.get_attribute_register()
encoding_name = None
if encoding is not None and isinstance(encoding, str):
# check name specified is in problem register
if encoding in register_solution.dict_attribute_to_type.keys():
encoding_name = encoding
self._encoding_variable_name = register_solution.dict_attribute_to_type[
encoding_name
]["name"]
self._encoding_type = register_solution.dict_attribute_to_type[
encoding_name
]["type"][0]
self.n = register_solution.dict_attribute_to_type[encoding_name]["n"]
if self._encoding_type == TypeAttribute.LIST_INTEGER:
self.arity = register_solution.dict_attribute_to_type[
encoding_name
]["arity"]
self.arities = [self.arity for i in range(self.n)]
else:
self.arity = None
if self._encoding_type == TypeAttribute.LIST_INTEGER_SPECIFIC_ARITY:
self.arities = register_solution.dict_attribute_to_type[
encoding_name
]["arities"]
if encoding is not None and isinstance(encoding, Dict):
# check there is a type key and a n key
if (
"name" in encoding.keys()
and "type" in encoding.keys()
and "n" in encoding.keys()
):
encoding_name = "custom"
self._encoding_variable_name = encoding["name"]
self._encoding_type = encoding["type"][0]
self.n = encoding["n"]
if "arity" in encoding.keys():
self.arity = encoding["arity"]
self.arities = [self.arity for i in range(self.n)]
if "arities" in encoding.keys():
self.arities = register_solution.dict_attribute_to_type[
encoding_name
]["arities"]
else:
logger.warning(
"Erroneous encoding provided as input (encoding name not matching encoding of problem or custom "
"definition not respecting encoding dict entry format, trying to use default one instead"
)
if encoding_name is None:
if len(register_solution.dict_attribute_to_type.keys()) == 0:
raise Exception(
"An encoding of type TypeAttribute should be specified or at least 1 TypeAttribute "
"should be defined in the RegisterSolution of your Problem"
)
encoding_name = list(register_solution.dict_attribute_to_type.keys())[0]
self._encoding_variable_name = register_solution.dict_attribute_to_type[
encoding_name
]["name"]
self._encoding_type = register_solution.dict_attribute_to_type[
encoding_name
]["type"][0]
self.n = register_solution.dict_attribute_to_type[encoding_name]["n"]
if self._encoding_type == TypeAttribute.LIST_INTEGER:
self.arity = register_solution.dict_attribute_to_type[encoding_name][
"arity"
]
self.arities = [self.arity for i in range(self.n)]
else:
self.arity = None
if self._encoding_type == TypeAttribute.LIST_INTEGER_SPECIFIC_ARITY:
self.arities = register_solution.dict_attribute_to_type[encoding_name][
"arities"
]
self._encoding_name = encoding_name
if self._encoding_type == TypeAttribute.LIST_BOOLEAN:
self.arity = 2
self.arities = [2 for i in range(self.n)]
logger.debug(
f"Encoding used by the GA: {self._encoding_name}: {self._encoding_type} of length {self.n}"
)
if isinstance(objectives, str):
self._objectives = [objectives]
else:
self._objectives = objectives
self._objective_weights: List[float]
if (objective_weights is None) or (
objective_weights is not None
and (len(objective_weights) != len(self._objectives))
):
logger.warning(
"Objective weight issue: no weight given or size of weights and objectives lists mismatch. "
"Setting all weights to default 1 value."
)
self._objective_weights = [1 for i in range(len(self._objectives))]
else:
self._objective_weights = objective_weights
self.params_objective_function = ParamsObjectiveFunction(
objective_handling=ObjectiveHandling.MULTI_OBJ,
objectives=self._objectives,
weights=self._objective_weights,
sense_function=ModeOptim.MAXIMIZATION,
)
self.aggreg_from_sol: Union[
Callable[[Solution], float], Callable[[Solution], TupleFitness]
]
(
self.aggreg_from_sol,
self.aggreg_from_dict,
) = build_evaluate_function_aggregated(
problem=problem, params_objective_function=self.params_objective_function
)
nobj = len(self._objectives)
ref_points = tools.uniform_reference_points(nobj=nobj)
# DEAP toolbox setup
self._toolbox = base.Toolbox()
# Define representation
creator.create("fitness", base.Fitness, weights=tuple(self._objective_weights))
creator.create(
"individual", list, fitness=creator.fitness
) # associate the fitness function to the individual type
# Create the individuals required by the encoding
if self._encoding_type == TypeAttribute.LIST_BOOLEAN:
self._toolbox.register(
"bit", random.randint, 0, 1
) # Each element of a solution is a bit (i.e. an int between 0 and 1 incl.)
self._toolbox.register(
"individual",
tools.initRepeat,
creator.individual,
self._toolbox.bit,
n=self.n,
) # An individual (aka solution) contains n bits
elif self._encoding_type == TypeAttribute.PERMUTATION:
self._toolbox.register(
"permutation_indices", random.sample, range(self.n), self.n
)
self._toolbox.register(
"individual",
tools.initIterate,
creator.individual,
self._toolbox.permutation_indices,
)
elif self._encoding_type == TypeAttribute.LIST_INTEGER:
self._toolbox.register("int_val", random.randint, 0, self.arity - 1)
self._toolbox.register(
"individual",
tools.initRepeat,
creator.individual,
self._toolbox.int_val,
n=self.n,
)
elif self._encoding_type == TypeAttribute.LIST_INTEGER_SPECIFIC_ARITY:
gen_idx = lambda: [random.randint(0, arity - 1) for arity in self.arities]
self._toolbox.register(
"individual", tools.initIterate, creator.individual, gen_idx
)
self._toolbox.register(
"population",
tools.initRepeat,
list,
self._toolbox.individual,
n=self._pop_size,
) # A population is made of pop_size individuals
# Define objective function
self._toolbox.register(
"evaluate",
self.evaluate_problem,
)
# Define crossover
if crossover is None:
self._crossover = _default_crossovers[self._encoding_type]
else:
self._crossover = crossover
if self._crossover == DeapCrossover.CX_UNIFORM:
self._toolbox.register("mate", tools.cxUniform, indpb=self._crossover_rate)
elif self._crossover == DeapCrossover.CX_ONE_POINT:
self._toolbox.register("mate", tools.cxOnePoint)
elif self._crossover == DeapCrossover.CX_TWO_POINT:
self._toolbox.register("mate", tools.cxTwoPoint)
elif self._crossover == DeapCrossover.CX_UNIFORM_PARTIALY_MATCHED:
self._toolbox.register("mate", tools.cxUniformPartialyMatched, indpb=0.5)
elif self._crossover == DeapCrossover.CX_ORDERED:
self._toolbox.register("mate", tools.cxOrdered)
elif self._crossover == DeapCrossover.CX_PARTIALY_MATCHED:
self._toolbox.register("mate", tools.cxPartialyMatched)
else:
logger.warning("Crossover of specified type not handled!")
# Define mutation
self._mutation: Union[Mutation, DeapMutation]
if mutation is None:
self._mutation = _default_mutations[self._encoding_type]
else:
self._mutation = mutation
if isinstance(self._mutation, Mutation):
self._toolbox.register(
"mutate",
generic_mutate_wrapper,
problem=self.problem,
encoding_name=self._encoding_variable_name,
indpb=self._mut_rate,
solution_fn=self.problem.get_solution_type(),
custom_mutation=mutation,
)
elif isinstance(self._mutation, DeapMutation):
if self._mutation == DeapMutation.MUT_FLIP_BIT:
self._toolbox.register(
"mutate", tools.mutFlipBit, indpb=self._mut_rate
) # Choice of mutation operator
elif self._mutation == DeapMutation.MUT_SHUFFLE_INDEXES:
self._toolbox.register(
"mutate", tools.mutShuffleIndexes, indpb=self._mut_rate
) # Choice of mutation operator
elif self._mutation == DeapMutation.MUT_UNIFORM_INT:
self._toolbox.register(
"mutate",
tools.mutUniformInt,
low=0,
up=self.arities,
indpb=self._mut_rate,
)
# No choice of selection: In NSGA, only 1 selection: Non Dominated Sorted Selection
self._toolbox.register("select", tools.selNSGA3, ref_points=ref_points)
[docs]
def evaluate_problem(self, int_vector: List[int]) -> Tuple[float, ...]:
objective_values = self.problem.evaluate_from_encoding( # type: ignore
int_vector, self._encoding_variable_name
)
val = tuple([objective_values[obj_name] for obj_name in self._objectives])
return val
[docs]
def solve(self, **kwargs: Any) -> ResultStorage:
# Define the statistics to collect at each generation
stats = tools.Statistics(lambda ind: ind.fitness.values)
stats.register("avg", np.mean, axis=0)
stats.register("std", np.std, axis=0)
stats.register("min", np.min, axis=0)
stats.register("max", np.max, axis=0)
logbook = tools.Logbook()
logbook.header = "gen", "evals", "std", "min", "avg", "max"
# Initialise the population (here at random)
pop = self._toolbox.population()
invalid_ind = [ind for ind in pop if not ind.fitness.valid]
fitnesses = self._toolbox.map(self._toolbox.evaluate, invalid_ind)
for ind, fit in zip(invalid_ind, fitnesses):
ind.fitness.values = fit
# Compile statistics about the population
record = stats.compile(pop)
logbook.record(gen=0, evals=len(invalid_ind), **record)
logger.debug(logbook.stream)
# Begin the generational process
ngen = int(self._max_evals / self._pop_size)
logger.debug(f"ngen: {ngen}")
for gen in range(1, ngen):
offspring = algorithms.varAnd(
pop, self._toolbox, self._crossover_rate, self._mut_rate
)
# Evaluate the individuals with an invalid fitness
invalid_ind = [ind for ind in offspring if not ind.fitness.valid]
fitnesses = self._toolbox.map(self._toolbox.evaluate, invalid_ind)
for ind, fit in zip(invalid_ind, fitnesses):
ind.fitness.values = fit
# Select the next generation population from parents and offspring
pop = self._toolbox.select(pop + offspring, self._pop_size)
# Compile statistics about the new population
record = stats.compile(pop)
logbook.record(gen=gen, evals=len(invalid_ind), **record)
logger.debug(logbook.stream)
sols = []
for s in pop:
s_pure_int = [i for i in s]
kwargs = {self._encoding_variable_name: s_pure_int, "problem": self.problem}
problem_sol = self.problem.get_solution_type()(**kwargs)
fits = self.aggreg_from_sol(problem_sol)
sols.append((problem_sol, fits))
rs = ResultStorage(
list_solution_fits=sols,
best_solution=None,
mode_optim=self.params_objective_function.sense_function,
)
return rs