Source code for discrete_optimization.rcpsp.robust_rcpsp

#  Copyright (c) 2023 AIRBUS and its affiliates.
#  This source code is licensed under the MIT license found in the
#  LICENSE file in the root directory of this source tree.

from collections import defaultdict
from enum import Enum
from typing import Any, Dict, Hashable, List, Sequence, Tuple

import numpy as np
from scipy.stats import poisson, randint, rv_discrete

from discrete_optimization.generic_tools.do_problem import (
    MethodAggregating,
    RobustProblem,
    Solution,
)
from discrete_optimization.rcpsp import RCPSPModel
from discrete_optimization.rcpsp.rcpsp_solution import RCPSPSolution


[docs] def tree() -> Dict[Any, Any]: return defaultdict(tree)
[docs] class AggregRCPSPModel(RobustProblem, RCPSPModel): list_problem: Sequence[RCPSPModel] def __init__( self, list_problem: Sequence[RCPSPModel], method_aggregating: MethodAggregating ): RobustProblem.__init__( self, list_problem=list_problem, method_aggregating=method_aggregating ) RCPSPModel.__init__( self, resources=list_problem[0].resources, non_renewable_resources=list_problem[0].non_renewable_resources, mode_details=list_problem[0].mode_details, successors=list_problem[0].successors, horizon=list_problem[0].horizon, horizon_multiplier=list_problem[0].horizon_multiplier, tasks_list=list_problem[0].tasks_list, source_task=list_problem[0].source_task, sink_task=list_problem[0].sink_task, name_task=list_problem[0].name_task, calendar_details=list_problem[0].calendar_details, special_constraints=list_problem[0].special_constraints if list_problem[0].do_special_constraints else None, relax_the_start_at_end=list_problem[0].relax_the_start_at_end, fixed_permutation=list_problem[0].fixed_permutation, fixed_modes=list_problem[0].fixed_modes, )
[docs] def get_dummy_solution(self) -> RCPSPSolution: a: RCPSPSolution = self.list_problem[0].get_dummy_solution() a._schedule_to_recompute = True return a
[docs] def get_unique_rcpsp_model(self) -> RCPSPModel: # Create a unique rcpsp instance coherent with the aggregating method. model = self.list_problem[0].copy() for job in model.mode_details: for mode in model.mode_details[job]: for res in model.mode_details[job][mode]: rs = np.array( [ self.list_problem[i].mode_details[job][mode][res] for i in range(self.nb_problem) ] ) agg = int(self.agg_vec(rs)) model.mode_details[job][mode][res] = agg return model
[docs] def evaluate_from_encoding( self, int_vector: List[int], encoding_name: str ) -> Dict[str, float]: fits = [ self.list_problem[i].evaluate_from_encoding(int_vector, encoding_name) for i in range(self.nb_problem) ] keys = fits[0].keys() aggreg = {} for k in keys: vals = np.array([fit[k] for fit in fits]) aggreg[k] = self.agg_vec(vals) return aggreg
[docs] def evaluate(self, variable: RCPSPSolution) -> Dict[str, float]: # type: ignore fits = [] for i in range(self.nb_problem): var: RCPSPSolution = variable.lazy_copy() var.rcpsp_schedule = {} var._schedule_to_recompute = True var.problem = self.list_problem[i] fit = self.list_problem[i].evaluate(var) fits += [fit] keys = fits[0].keys() aggreg = {} for k in keys: vals = np.array([fit[k] for fit in fits]) aggreg[k] = self.agg_vec(vals) return aggreg
[docs] class MethodBaseRobustification(Enum): AVERAGE = 0 WORST_CASE = 1 BEST_CASE = 2 PERCENTILE = 3 SAMPLE = 4
[docs] class MethodRobustification: method_base: MethodBaseRobustification percentile: float def __init__(self, method_base: MethodBaseRobustification, percentile: float = 50): self.method_base = method_base self.percentile = percentile
[docs] def create_poisson_laws_duration( rcpsp_model: RCPSPModel, range_around_mean: int = 3 ) -> Dict[Hashable, Dict[int, Dict[str, Tuple[int, int, int]]]]: poisson_dict: Dict[Hashable, Dict[int, Dict[str, Tuple[int, int, int]]]] = {} source = rcpsp_model.source_task sink = rcpsp_model.sink_task for job in rcpsp_model.mode_details: poisson_dict[job] = {} for mode in rcpsp_model.mode_details[job]: poisson_dict[job][mode] = {} duration = rcpsp_model.mode_details[job][mode]["duration"] if job in {source, sink}: poisson_dict[job][mode]["duration"] = (duration, duration, duration) else: min_duration = max(1, duration - range_around_mean) max_duration = duration + range_around_mean poisson_dict[job][mode]["duration"] = ( min_duration, duration, max_duration, ) return poisson_dict
[docs] def create_poisson_laws_resource( rcpsp_model: RCPSPModel, range_around_mean: int = 1 ) -> Dict[Hashable, Dict[int, Dict[str, Tuple[int, int, int]]]]: poisson_dict: Dict[Hashable, Dict[int, Dict[str, Tuple[int, int, int]]]] = {} source = rcpsp_model.source_task sink = rcpsp_model.sink_task limit_resource = rcpsp_model.resources resources_non_renewable = rcpsp_model.non_renewable_resources for job in rcpsp_model.mode_details: poisson_dict[job] = {} for mode in rcpsp_model.mode_details[job]: poisson_dict[job][mode] = {} for resource in rcpsp_model.mode_details[job][mode]: if resource == "duration": continue if resource in resources_non_renewable: continue resource_consumption = rcpsp_model.mode_details[job][mode][resource] if job in {source, sink}: poisson_dict[job][mode][resource] = ( resource_consumption, resource_consumption, resource_consumption, ) else: min_rc = max(0, resource_consumption - range_around_mean) max_rc = min( resource_consumption + range_around_mean, rcpsp_model.get_max_resource_capacity(resource), ) poisson_dict[job][mode][resource] = ( min_rc, resource_consumption, max_rc, ) return poisson_dict
[docs] def create_poisson_laws( base_rcpsp_model: RCPSPModel, range_around_mean_resource: int = 1, range_around_mean_duration: int = 3, do_uncertain_resource: bool = True, do_uncertain_duration: bool = True, ) -> Dict[Hashable, Dict[int, Dict[str, Tuple[int, int, int]]]]: poisson_laws: Dict[Hashable, Dict[int, Dict[str, Tuple[int, int, int]]]] = tree() if do_uncertain_duration: poisson_laws_duration = create_poisson_laws_duration( base_rcpsp_model, range_around_mean=range_around_mean_duration ) for job in poisson_laws_duration: for mode in poisson_laws_duration[job]: for res in poisson_laws_duration[job][mode]: poisson_laws[job][mode][res] = poisson_laws_duration[job][mode][res] if do_uncertain_resource: poisson_laws_resource = create_poisson_laws_resource( base_rcpsp_model, range_around_mean=range_around_mean_resource ) for job in poisson_laws_resource: for mode in poisson_laws_resource[job]: for res in poisson_laws_resource[job][mode]: poisson_laws[job][mode][res] = poisson_laws_resource[job][mode][res] return poisson_laws
[docs] class UncertainRCPSPModel: def __init__( self, base_rcpsp_model: RCPSPModel, poisson_laws: Dict[Hashable, Dict[int, Dict[str, Tuple[int, int, int]]]], uniform_law: bool = True, ): self.base_rcpsp_model = base_rcpsp_model self.poisson_laws = poisson_laws self.probas: Dict[Hashable, Dict[int, Dict[str, Dict[str, Any]]]] = {} for activity in poisson_laws: self.probas[activity] = {} for mode in poisson_laws[activity]: self.probas[activity][mode] = {} for detail in poisson_laws[activity][mode]: min_, mean_, max_ = poisson_laws[activity][mode][detail] if uniform_law: rv = randint(low=min_, high=max_ + 1) else: rv = poisson(mean_) self.probas[activity][mode][detail] = { "value": np.arange(min_, max_ + 1, 1), "proba": np.zeros((max_ - min_ + 1)), } for k in range(len(self.probas[activity][mode][detail]["value"])): self.probas[activity][mode][detail]["proba"][k] = rv.pmf( self.probas[activity][mode][detail]["value"][k] ) self.probas[activity][mode][detail]["proba"] /= np.sum( self.probas[activity][mode][detail]["proba"] ) self.probas[activity][mode][detail][ "prob-distribution" ] = rv_discrete( name=str(activity) + "-" + str(mode) + "-" + str(detail), values=( self.probas[activity][mode][detail]["value"], self.probas[activity][mode][detail]["proba"], ), )
[docs] def create_rcpsp_model( self, method_robustification: MethodRobustification ) -> RCPSPModel: model = self.base_rcpsp_model.copy() for activity in self.probas: if activity in { self.base_rcpsp_model.source_task, self.base_rcpsp_model.sink_task, }: continue for mode in self.probas[activity]: for detail in self.probas[activity][mode]: if ( method_robustification.method_base == MethodBaseRobustification.AVERAGE ): model.mode_details[activity][mode][detail] = int( self.probas[activity][mode][detail][ "prob-distribution" ].mean() ) if ( method_robustification.method_base == MethodBaseRobustification.WORST_CASE ): model.mode_details[activity][mode][detail] = self.probas[ activity ][mode][detail]["prob-distribution"].support()[1] if ( method_robustification.method_base == MethodBaseRobustification.BEST_CASE ): model.mode_details[activity][mode][detail] = self.probas[ activity ][mode][detail]["prob-distribution"].support()[0] if ( method_robustification.method_base == MethodBaseRobustification.PERCENTILE ): model.mode_details[activity][mode][detail] = max( int( self.probas[activity][mode][detail][ "prob-distribution" ].isf(q=1 - method_robustification.percentile / 100) ), 1, ) if ( method_robustification.method_base == MethodBaseRobustification.SAMPLE ): model.mode_details[activity][mode][detail] = self.probas[ activity ][mode][detail]["prob-distribution"].rvs(size=1)[0] return model