Source code for discrete_optimization.facility.solvers.gphh_facility

"""Genetic programming based solver for facility location problem.
"""

#  Copyright (c) 2022 AIRBUS and its affiliates.
#  This source code is licensed under the MIT license found in the
#  LICENSE file in the root directory of this source tree.

import operator
from enum import Enum
from typing import Any, Callable, Dict, Iterable, List, Optional, Set

import matplotlib.pyplot as plt
import networkx as nx
import numpy as np
import numpy.typing as npt
from deap import algorithms, creator, gp, tools
from deap.base import Fitness, Toolbox
from deap.gp import (
    Primitive,
    PrimitiveSet,
    PrimitiveSetTyped,
    PrimitiveTree,
    Terminal,
    genHalfAndHalf,
)

from discrete_optimization.facility.facility_model import FacilityProblem
from discrete_optimization.facility.solvers.facility_solver import SolverFacility
from discrete_optimization.facility.solvers.greedy_solvers import (
    GreedySolverDistanceBased,
)
from discrete_optimization.generic_tools.do_problem import ParamsObjectiveFunction
from discrete_optimization.generic_tools.ghh_tools import argsort, protected_div
from discrete_optimization.generic_tools.result_storage.result_storage import (
    ResultStorage,
)


[docs] def distance( problem: FacilityProblem, customer_index: int, **kwargs: Any ) -> List[float]: """Compute distance to facilitied for a given customer index Args: problem (FacilityProblem): problem instance customer_index (int): customer index to compute distances to facilities """ return [ problem.evaluate_customer_facility( facility=f, customer=problem.customers[customer_index] ) for f in problem.facilities ]
[docs] def demand_minus_capacity( problem: FacilityProblem, customer_index: int, **kwargs: Any ) -> List[float]: """Compute demand-capacity feature for a given customer_index Args: problem (FacilityProblem): problem instance customer_index (int): customer index to compute distances to facilities """ return [ problem.customers[customer_index].demand - f.capacity for f in problem.facilities ]
[docs] def capacity( problem: FacilityProblem, customer_index: int, **kwargs: Any ) -> List[float]: """Capacity feature. Args: problem (FacilityProblem): problem instance customer_index (int): [unused] customer index to compute distances to facilities """ return [f.capacity for f in problem.facilities]
[docs] def closest_facility( problem: FacilityProblem, customer_index: int, **kwargs: Any ) -> int: """Closest facility feature for a given customer index. Args: problem (FacilityProblem): problem instance customer_index (int): [unused] customer index to compute distances to facilities Returns (int): closest facility index """ return min( range(len(problem.facilities)), key=lambda x: problem.evaluate_customer_facility( facility=problem.facilities[x], customer=problem.customers[customer_index] ), )
[docs] class FeatureEnum(Enum): DISTANCE = "distance" CAPACITIES = "capacities" DEMAND_MINUS_CAPACITY = "demand_minus_capacity"
feature_function_map: Dict[FeatureEnum, Callable[..., Iterable[float]]] = { FeatureEnum.DISTANCE: distance, FeatureEnum.CAPACITIES: capacity, FeatureEnum.DEMAND_MINUS_CAPACITY: demand_minus_capacity, }
[docs] class ParametersGPHH: """Custom class to parametrize the GPHH solver. Attributes: set_feature: the set of feature to consider set_primitives: set of operator/primitive to consider. """ def __init__( self, set_feature: Set[FeatureEnum], set_primitives: PrimitiveSetTyped, tournament_ratio: float, pop_size: int, n_gen: int, min_tree_depth: int, max_tree_depth: int, crossover_rate: float, mutation_rate: float, deap_verbose: bool, ): self.set_feature = set_feature self.set_primitives = set_primitives self.tournament_ratio = tournament_ratio self.pop_size = pop_size self.n_gen = n_gen self.min_tree_depth = min_tree_depth self.max_tree_depth = max_tree_depth self.crossover_rate = crossover_rate self.mutation_rate = mutation_rate self.deap_verbose = deap_verbose
[docs] @staticmethod def default() -> "ParametersGPHH": set_feature = { FeatureEnum.DISTANCE, FeatureEnum.DEMAND_MINUS_CAPACITY, } pset = PrimitiveSetTyped("main", [list, list], list) # take profit, list of ressource consumption, avearage delta consumption pset.addPrimitive( lambda x, y: [max(xx, yy) for xx, yy in zip(x, y)], [list, list], list, name="max_element", ) pset.addPrimitive( lambda x, y: [min(xx, yy) for xx, yy in zip(x, y)], [list, list], list, name="min_element", ) pset.addPrimitive( lambda x, y: [protected_div(xx, yy) for xx, yy in zip(x, y)], [list, list], list, name="protected_div_list", ) pset.addPrimitive( lambda x, y: [xx - yy for xx, yy in zip(x, y)], [list, list], list, name="sub_list", ) pset.addPrimitive( lambda x, y: [xx + yy for xx, yy in zip(x, y)], [list, list], list, name="plus_list", ) pset.addTerminal(1, int, name="dummy") return ParametersGPHH( set_feature=set_feature, set_primitives=pset, tournament_ratio=0.1, pop_size=10, n_gen=2, min_tree_depth=1, max_tree_depth=4, crossover_rate=0.7, mutation_rate=0.3, deap_verbose=True, )
[docs] class GPHH(SolverFacility): def __init__( self, training_domains: List[FacilityProblem], problem: FacilityProblem, weight: int = 1, params_gphh: Optional[ParametersGPHH] = None, params_objective_function: Optional[ParamsObjectiveFunction] = None, ): super().__init__( problem=problem, params_objective_function=params_objective_function ) self.training_domains = training_domains if params_gphh is None: self.params_gphh = ParametersGPHH.default() else: self.params_gphh = params_gphh self.set_feature = self.params_gphh.set_feature self.list_feature = list(self.set_feature) self.list_feature_names = [value.value for value in list(self.list_feature)] self.pset: PrimitiveSet = self.init_primitives(self.params_gphh.set_primitives) self.weight = weight self.greedy_solver = GreedySolverDistanceBased(problem=self.problem)
[docs] def init_model(self, **kwargs: Any) -> None: tournament_ratio = self.params_gphh.tournament_ratio pop_size = self.params_gphh.pop_size min_tree_depth = self.params_gphh.min_tree_depth max_tree_depth = self.params_gphh.max_tree_depth creator.create("FitnessMin", Fitness, weights=(self.weight,)) creator.create("Individual", PrimitiveTree, fitness=creator.FitnessMin) self.toolbox = Toolbox() self.toolbox.register( "expr", genHalfAndHalf, pset=self.pset, min_=min_tree_depth, max_=max_tree_depth, ) self.toolbox.register( "individual", tools.initIterate, creator.Individual, self.toolbox.expr ) self.toolbox.register( "population", tools.initRepeat, list, self.toolbox.individual ) self.toolbox.register("compile", gp.compile, pset=self.pset) self.toolbox.register( "evaluate", self.evaluate_heuristic, domains=self.training_domains ) self.toolbox.register( "select", tools.selTournament, tournsize=int(tournament_ratio * pop_size) ) self.toolbox.register("mate", gp.cxOnePoint) self.toolbox.register("expr_mut", gp.genFull, min_=0, max_=max_tree_depth) self.toolbox.register( "mutate", gp.mutUniform, expr=self.toolbox.expr_mut, pset=self.pset ) self.toolbox.decorate( "mate", gp.staticLimit(key=operator.attrgetter("height"), max_value=17) ) self.toolbox.decorate( "mutate", gp.staticLimit(key=operator.attrgetter("height"), max_value=17) ) stats_fit = tools.Statistics(lambda ind: ind.fitness.values) stats_size = tools.Statistics(len) mstats = tools.MultiStatistics(fitness=stats_fit, size=stats_size) mstats.register("avg", np.mean) mstats.register("std", np.std) mstats.register("min", np.min) mstats.register("max", np.max)
[docs] def solve(self, **kwargs: Any) -> ResultStorage: stats_fit = tools.Statistics(lambda ind: ind.fitness.values) stats_size = tools.Statistics(len) mstats = tools.MultiStatistics(fitness=stats_fit, size=stats_size) mstats.register("avg", np.mean) mstats.register("std", np.std) mstats.register("min", np.min) mstats.register("max", np.max) pop = self.toolbox.population(n=self.params_gphh.pop_size) hof = tools.HallOfFame(1000) self.hof = hof pop, log = algorithms.eaSimple( pop, self.toolbox, cxpb=self.params_gphh.crossover_rate, mutpb=self.params_gphh.mutation_rate, ngen=self.params_gphh.n_gen, stats=mstats, halloffame=hof, verbose=True, ) self.best_heuristic = hof[0] self.final_pop = pop self.func_heuristic = self.toolbox.compile(expr=self.best_heuristic) result = self.build_solution( domain=self.problem, func_heuristic=self.func_heuristic ) return result
[docs] def init_primitives(self, pset: PrimitiveSetTyped) -> PrimitiveSet: for i in range(len(self.list_feature)): pset.renameArguments(**{"ARG" + str(i): self.list_feature[i].value}) return pset
[docs] def build_solution( self, domain: FacilityProblem, individual: Optional[Any] = None, func_heuristic: Optional[Callable[..., npt.ArrayLike]] = None, ) -> ResultStorage: if func_heuristic is None: func_heuristic = self.toolbox.compile(expr=individual) raw_values: List[Iterable[int]] = [] for j in range(domain.customer_count): input_features: List[Iterable[float]] = [ feature_function_map[lf](problem=domain, customer_index=j) for lf in self.list_feature ] output_value = func_heuristic(*input_features) raw_values.append(argsort(output_value)) result = self.greedy_solver.solve( **{"prio": {c: raw_values[c] for c in range(len(raw_values))}} ) return result
[docs] def evaluate_heuristic( self, individual: Any, domains: List[FacilityProblem] ) -> List[float]: vals: List[float] = [] func_heuristic = self.toolbox.compile(expr=individual) for domain in domains: result = self.build_solution( individual=individual, domain=domain, func_heuristic=func_heuristic ) value: float = result.get_best_solution_fit()[1] # type: ignore # could also be None or TupleFitness vals.append(value) fitness = [np.mean(vals)] return [fitness[0] - 10 * self.evaluate_complexity(individual)]
[docs] def evaluate_complexity(self, individual: Any) -> float: all_primitives_list = [] all_features_list = [] for i in range(len(individual)): if isinstance(individual[i], Primitive): all_primitives_list.append(individual[i].name) elif isinstance(individual[i], Terminal): all_features_list.append(individual[i].value) n_operators = len(all_primitives_list) n_features = len(all_features_list) val = 1.0 * n_operators + 1.0 * n_features return val
[docs] def plot_solution(self) -> None: nodes, edges, labels = gp.graph(self.best_heuristic) g = nx.Graph() g.add_nodes_from(nodes) g.add_edges_from(edges) pos = nx.drawing.spring_layout(g) nx.draw_networkx_nodes(g, pos) nx.draw_networkx_edges(g, pos) nx.draw_networkx_labels(g, pos, labels) plt.show()