Source code for discrete_optimization.generic_rcpsp_tools.large_neighborhood_search_scheduling

#  Copyright (c) 2022 AIRBUS and its affiliates.
#  This source code is licensed under the MIT license found in the
#  LICENSE file in the root directory of this source tree.

from enum import Enum
from typing import Any, Optional

import discrete_optimization.rcpsp.solver.rcpsp_cp_lns_solver as rcpsp_lns
from discrete_optimization.generic_rcpsp_tools.generic_rcpsp_solver import (
    SolverGenericRCPSP,
)
from discrete_optimization.generic_rcpsp_tools.graph_tools_rcpsp import (
    build_graph_rcpsp_object,
)
from discrete_optimization.generic_rcpsp_tools.ls_solver import (
    LS_SOLVER,
    LS_RCPSP_Solver,
)
from discrete_optimization.generic_rcpsp_tools.neighbor_builder import (
    OptionNeighborRandom,
    build_neighbor_mixing_cut_parts,
    build_neighbor_mixing_methods,
    build_neighbor_random,
    mix_both,
)
from discrete_optimization.generic_rcpsp_tools.neighbor_tools_rcpsp import (
    BasicConstraintBuilder,
    ConstraintHandlerScheduling,
    NeighborBuilderMix,
    NeighborBuilderSubPart,
    NeighborConstraintBreaks,
    NeighborRandomAndNeighborGraph,
    ParamsConstraintBuilder,
)
from discrete_optimization.generic_rcpsp_tools.solution_repair import (
    NeighborRepairProblems,
)
from discrete_optimization.generic_rcpsp_tools.typing import ANY_RCPSP
from discrete_optimization.generic_tools.cp_tools import CPSolverName, MinizincCPSolver
from discrete_optimization.generic_tools.do_problem import ParamsObjectiveFunction
from discrete_optimization.generic_tools.hyperparameters.hyperparameter import (
    CategoricalHyperparameter,
    EnumHyperparameter,
    FloatHyperparameter,
    IntegerHyperparameter,
    SubBrickHyperparameter,
    SubBrickKwargsHyperparameter,
)
from discrete_optimization.generic_tools.lns_cp import LNS_CP, ConstraintHandler
from discrete_optimization.generic_tools.lns_mip import (
    InitialSolution,
    InitialSolutionFromSolver,
    PostProcessSolution,
)
from discrete_optimization.rcpsp.solver.cp_lns_methods_preemptive import (
    PostProLeftShift,
)
from discrete_optimization.rcpsp.solver.cp_solvers import (
    CP_MRCPSP_MZN,
    CP_MRCPSP_MZN_PREEMPTIVE,
    CP_RCPSP_MZN,
    CP_RCPSP_MZN_PREEMPTIVE,
)
from discrete_optimization.rcpsp_multiskill.solvers.cp_solvers import (
    CP_MS_MRCPSP_MZN,
    CP_MS_MRCPSP_MZN_PREEMPTIVE,
)
from discrete_optimization.rcpsp_multiskill.solvers.lns_post_process_rcpsp import (
    PostProMSRCPSP,
)


[docs] class ConstraintHandlerType(Enum): MIX_SUBPROBLEMS = 0 SOLUTION_REPAIR = 1
[docs] def build_default_cp_model(rcpsp_problem: ANY_RCPSP, partial_solution=None, **kwargs): if rcpsp_problem.is_preemptive(): if rcpsp_problem.is_multiskill(): solver = CP_MS_MRCPSP_MZN_PREEMPTIVE( problem=rcpsp_problem, cp_solver_name=kwargs.get("cp_solver_name", CPSolverName.CHUFFED), ) solver.init_model( output_type=True, model_type="ms_rcpsp_preemptive", nb_preemptive=kwargs.get("nb_preemptive", 13), max_preempted=kwargs.get("max_preempted", 100), partial_solution=partial_solution, possibly_preemptive=[ rcpsp_problem.preemptive_indicator[t] for t in rcpsp_problem.tasks_list ], unit_usage_preemptive=kwargs.get("unit_usage_preemptive", True), exact_skills_need=False, add_calendar_constraint_unit=False, add_partial_solution_hard_constraint=kwargs.get( "add_partial_solution_hard_constraint", False ), **{ x: kwargs[x] for x in kwargs if x not in [ "nb_preemptive", "max_preempted", "fake_tasks", "add_partial_solution_hard_constraint", ] }, ) return solver if rcpsp_problem.is_preemptive(): if not rcpsp_problem.is_multiskill(): if rcpsp_problem.is_rcpsp_multimode(): solver = CP_MRCPSP_MZN_PREEMPTIVE( problem=rcpsp_problem, cp_solver_name=kwargs.get("cp_solver_name", CPSolverName.CHUFFED), ) solver.init_model( output_type=True, model_type="multi-preemptive", nb_preemptive=kwargs.get("nb_preemptive", 13), max_preempted=kwargs.get("max_preempted", 100), fake_tasks=kwargs.get("fake_tasks", True), partial_solution=partial_solution, possibly_preemptive=[ rcpsp_problem.preemptive_indicator[t] for t in rcpsp_problem.tasks_list ], add_partial_solution_hard_constraint=kwargs.get( "add_partial_solution_hard_constraint", False ), **{ x: kwargs[x] for x in kwargs if x not in [ "nb_preemptive", "max_preempted", "fake_tasks", "add_partial_solution_hard_constraint", ] }, ) return solver if not rcpsp_problem.is_rcpsp_multimode(): solver = CP_RCPSP_MZN_PREEMPTIVE( problem=rcpsp_problem, cp_solver_name=kwargs.get("cp_solver_name", CPSolverName.CHUFFED), ) solver.init_model( output_type=True, model_type="single-preemptive", nb_preemptive=kwargs.get("nb_preemptive", 13), max_preempted=kwargs.get("max_preempted", 100), fake_tasks=kwargs.get("fake_tasks", True), partial_solution=partial_solution, possibly_preemptive=[ rcpsp_problem.preemptive_indicator[t] for t in rcpsp_problem.tasks_list ], add_partial_solution_hard_constraint=kwargs.get( "add_partial_solution_hard_constraint", False ), **{ x: kwargs[x] for x in kwargs if x not in [ "nb_preemptive", "max_preempted", "fake_tasks", "add_partial_solution_hard_constraint", ] }, ) return solver if not rcpsp_problem.is_preemptive(): if rcpsp_problem.is_multiskill(): solver = CP_MS_MRCPSP_MZN( problem=rcpsp_problem, cp_solver_name=kwargs.get("cp_solver_name", CPSolverName.CHUFFED), one_ressource_per_task=kwargs.get( "one_ressource_per_task", rcpsp_problem.one_unit_per_task_max ), ) solver.init_model( output_type=True, exact_skills_need=False, partial_solution=partial_solution, add_partial_solution_hard_constraint=kwargs.get( "add_partial_solution_hard_constraint", False ), **{ x: kwargs[x] for x in kwargs if x not in ["add_partial_solution_hard_constraint"] }, ) return solver if not rcpsp_problem.is_multiskill(): if rcpsp_problem.is_rcpsp_multimode(): solver = CP_MRCPSP_MZN( problem=rcpsp_problem, cp_solver_name=kwargs.get("cp_solver_name", CPSolverName.CHUFFED), ) solver.init_model( output_type=True, partial_solution=partial_solution, add_partial_solution_hard_constraint=kwargs.get( "add_partial_solution_hard_constraint", False ), **{ x: kwargs[x] for x in kwargs if x not in ["add_partial_solution_hard_constraint"] }, ) return solver if not rcpsp_problem.is_rcpsp_multimode(): solver = CP_RCPSP_MZN( problem=rcpsp_problem, cp_solver_name=kwargs.get("cp_solver_name", CPSolverName.CHUFFED), ) solver.init_model( output_type=True, partial_solution=partial_solution, add_partial_solution_hard_constraint=kwargs.get( "add_partial_solution_hard_constraint", False ), **{ x: kwargs[x] for x in kwargs if x not in ["add_partial_solution_hard_constraint"] }, ) return solver
[docs] def build_default_postpro(rcpsp_problem: ANY_RCPSP, partial_solution=None, **kwargs): if not rcpsp_problem.is_multiskill(): if rcpsp_problem.is_preemptive(): post_process_solution = PostProLeftShift( problem=rcpsp_problem, params_objective_function=None, do_ls=kwargs.get("do_ls", False), ) return post_process_solution if partial_solution is not None: return None if not rcpsp_problem.is_preemptive(): post_process_solution = rcpsp_lns.PostProcessLeftShift( rcpsp_problem=rcpsp_problem, partial_solution=partial_solution ) return post_process_solution if rcpsp_problem.is_multiskill(): if rcpsp_problem.is_preemptive(): return None else: post_process_solution = PostProMSRCPSP( problem=rcpsp_problem, params_objective_function=None ) return post_process_solution
[docs] def build_default_initial_solution(rcpsp_problem: ANY_RCPSP, **kwargs): if rcpsp_problem.is_multiskill(): initial_solution_provider = InitialSolutionFromSolver( LS_RCPSP_Solver(problem=rcpsp_problem, ls_solver=LS_SOLVER.SA), nb_iteration_max=500, ) return initial_solution_provider if not rcpsp_problem.is_multiskill(): initial_solution_provider = InitialSolutionFromSolver( LS_RCPSP_Solver(problem=rcpsp_problem, ls_solver=LS_SOLVER.SA), nb_iteration_max=200, ) return initial_solution_provider
[docs] def build_constraint_handler(rcpsp_problem: ANY_RCPSP, graph, **kwargs): constraint_handler_type = kwargs.get( "constraint_handler_type", ConstraintHandlerType.MIX_SUBPROBLEMS ) constraint_handler = None if constraint_handler_type == ConstraintHandlerType.MIX_SUBPROBLEMS: n1 = NeighborBuilderSubPart( problem=rcpsp_problem, graph=graph, nb_cut_part=kwargs.get("nb_cut_part", 10), ) n2 = NeighborRandomAndNeighborGraph( problem=rcpsp_problem, graph=graph, fraction_subproblem=kwargs.get("fraction_subproblem", 0.05), ) if rcpsp_problem.includes_special_constraint(): n3 = NeighborConstraintBreaks( problem=rcpsp_problem, graph=graph, fraction_subproblem=kwargs.get("fraction_subproblem", 0.05), other_constraint_handler=n1, ) n_mix = NeighborBuilderMix( list_neighbor=[n1, n2, n3], weight_neighbor=[0.2, 0.5, 0.3] ) else: n_mix = NeighborBuilderMix( list_neighbor=[n1, n2], weight_neighbor=[0.5, 0.5] ) basic_constraint_builder = BasicConstraintBuilder( neighbor_builder=n_mix, preemptive=kwargs.get("preemptive", False), multiskill=kwargs.get("multiskill", False), ) if "params_list" in kwargs: params_list = kwargs["params_list"] else: if kwargs["params_0_kwargs"] is None: params_0 = ParamsConstraintBuilder( minus_delta_primary=6000, plus_delta_primary=6000, minus_delta_secondary=400, plus_delta_secondary=400, constraint_max_time_to_current_solution=False, ) else: params_0_cls = kwargs["params_0_cls"] params_0_kwargs = kwargs["params_0_kwargs"] params_0 = params_0_cls(**params_0_kwargs) if kwargs["params_1_kwargs"] is None: params_1 = ParamsConstraintBuilder( minus_delta_primary=6000, plus_delta_primary=6000, minus_delta_secondary=0, plus_delta_secondary=0, constraint_max_time_to_current_solution=False, ) else: params_1_cls = kwargs["params_1_cls"] params_1_kwargs = kwargs["params_1_kwargs"] params_1 = params_1_cls(**params_1_kwargs) params_list = [params_0, params_1] constraint_handler = ConstraintHandlerScheduling( problem=rcpsp_problem, basic_constraint_builder=basic_constraint_builder, params_list=params_list, use_makespan_of_subtasks=kwargs.get("use_makespan_of_subtasks", False), ) elif constraint_handler_type == ConstraintHandlerType.SOLUTION_REPAIR: if "params_list" in kwargs: params_list = kwargs["params_list"] else: if kwargs["params_0_kwargs"] is None: params_0 = ParamsConstraintBuilder( minus_delta_primary=6000, plus_delta_primary=6000, minus_delta_secondary=400, plus_delta_secondary=400, constraint_max_time_to_current_solution=False, ) else: params_0_cls = kwargs["params_0_cls"] params_0_kwargs = kwargs["params_0_kwargs"] params_0 = params_0_cls(**params_0_kwargs) if kwargs["params_1_kwargs"] is None: params_1 = ParamsConstraintBuilder( minus_delta_primary=5000, plus_delta_primary=5000, minus_delta_secondary=2, plus_delta_secondary=2, constraint_max_time_to_current_solution=False, ) else: params_1_cls = kwargs["params_1_cls"] params_1_kwargs = kwargs["params_1_kwargs"] params_1 = params_1_cls(**params_1_kwargs) params_list = [params_0, params_1] constraint_handler = NeighborRepairProblems( problem=rcpsp_problem, params_list=params_list ) return constraint_handler
[docs] def build_constraint_handler_helper(rcpsp_problem: ANY_RCPSP, graph, **kwargs): option = kwargs.get("option_neighbor_operator", 2) constraint_handler = None if option == 0: constraint_handler = build_neighbor_random( option_neighbor=kwargs.get( "option_neighbor_random", OptionNeighborRandom.MIX_ALL ), rcpsp_model=rcpsp_problem, ) if option == 1: constraint_handler = mix_both( rcpsp_model=rcpsp_problem, option_neighbor_random=kwargs.get( "option_neighbor_random", OptionNeighborRandom.MIX_ALL ), graph=graph, fraction_subproblem=kwargs.get("fraction_subproblem", 0.05), cut_part=kwargs.get("cut_part", 10), params_list=kwargs.get( "params_list", [ ParamsConstraintBuilder( minus_delta_primary=6000, plus_delta_primary=6000, minus_delta_secondary=400, plus_delta_secondary=400, constraint_max_time_to_current_solution=False, ), ParamsConstraintBuilder( minus_delta_primary=6000, plus_delta_primary=6000, minus_delta_secondary=0, plus_delta_secondary=0, constraint_max_time_to_current_solution=False, ), ], ), ) if option == 2: constraint_handler = build_neighbor_mixing_methods( rcpsp_model=rcpsp_problem, graph=graph, fraction_subproblem=kwargs.get("fraction_subproblem", 0.05), cut_part=kwargs.get("cut_part", 10), params_list=kwargs.get( "params_list", [ ParamsConstraintBuilder( minus_delta_primary=6000, plus_delta_primary=6000, minus_delta_secondary=400, plus_delta_secondary=400, constraint_max_time_to_current_solution=False, ), ParamsConstraintBuilder( minus_delta_primary=6000, plus_delta_primary=6000, minus_delta_secondary=0, plus_delta_secondary=0, constraint_max_time_to_current_solution=False, ), ], ), ) if option == 3: constraint_handler = build_neighbor_mixing_cut_parts( rcpsp_model=rcpsp_problem, graph=graph, params_list=kwargs.get( "params_list", [ ParamsConstraintBuilder( minus_delta_primary=6000, plus_delta_primary=6000, minus_delta_secondary=400, plus_delta_secondary=400, constraint_max_time_to_current_solution=False, ), ParamsConstraintBuilder( minus_delta_primary=6000, plus_delta_primary=6000, minus_delta_secondary=0, plus_delta_secondary=0, constraint_max_time_to_current_solution=False, ), ], ), ) return constraint_handler
[docs] class LargeNeighborhoodSearchScheduling(LNS_CP, SolverGenericRCPSP): hyperparameters = [ EnumHyperparameter( name="cp_solver_name", enum=CPSolverName, default=CPSolverName.CHUFFED ), CategoricalHyperparameter(name="do_ls", choices=[True, False], default=False), EnumHyperparameter( name="constraint_handler_type", enum=ConstraintHandlerType, default=ConstraintHandlerType.MIX_SUBPROBLEMS, ), FloatHyperparameter( name="fraction_subproblem", default=0.05, low=0.0, high=1.0 ), IntegerHyperparameter(name="nb_cut_part", default=10, low=0, high=100), CategoricalHyperparameter( name="use_makespan_of_subtasks", choices=[True, False], default=False ), SubBrickHyperparameter( name="params_0_cls", choices=[ParamsConstraintBuilder], default=ParamsConstraintBuilder, ), SubBrickKwargsHyperparameter( name="params_0_kwargs", subbrick_hyperparameter="params_0_cls" ), SubBrickHyperparameter( name="params_1_cls", choices=[ParamsConstraintBuilder], default=ParamsConstraintBuilder, ), SubBrickKwargsHyperparameter( name="params_1_kwargs", subbrick_hyperparameter="params_1_cls" ), ] def __init__( self, problem: ANY_RCPSP, partial_solution=None, cp_solver: Optional[MinizincCPSolver] = None, initial_solution_provider: Optional[InitialSolution] = None, constraint_handler: Optional[ConstraintHandler] = None, post_process_solution: Optional[PostProcessSolution] = None, params_objective_function: Optional[ParamsObjectiveFunction] = None, **kwargs: Any, ): SolverGenericRCPSP.__init__( self, problem=problem, params_objective_function=params_objective_function ) graph = build_graph_rcpsp_object(self.problem) kwargs = self.complete_with_default_hyperparameters(kwargs) if cp_solver is None: if "cp_solver_kwargs" not in kwargs or kwargs["cp_solver_kwargs"] is None: cp_solver_kwargs = kwargs else: cp_solver_kwargs = kwargs["cp_solver_kwargs"] if "cp_solver_cls" not in kwargs or kwargs["cp_solver_cls"] is None: cp_solver = build_default_cp_model( rcpsp_problem=problem, partial_solution=partial_solution, **cp_solver_kwargs, ) else: cp_solver_cls = kwargs["cp_solver_cls"] cp_solver = cp_solver_cls(problem=self.problem, **cp_solver_kwargs) cp_solver.init_model(**cp_solver_kwargs) self.cp_solver = cp_solver if constraint_handler is None: if ( "constraint_handler_kwargs" not in kwargs or kwargs["constraint_handler_kwargs"] is None ): constraint_handler_kwargs = kwargs else: constraint_handler_kwargs = kwargs["constraint_handler_kwargs"] if ( "constraint_handler_cls" not in kwargs or kwargs["constraint_handler_cls"] is None ): constraint_handler = build_constraint_handler( rcpsp_problem=self.problem, graph=graph, multiskill=self.problem.is_multiskill(), preemptive=self.problem.is_preemptive(), **constraint_handler_kwargs, ) else: constraint_handler_cls = kwargs["constraint_handler_cls"] constraint_handler = constraint_handler_cls( problem=self.problem, **constraint_handler_kwargs ) self.constraint_handler = constraint_handler if post_process_solution is None: if ( "post_process_solution_kwargs" not in kwargs or kwargs["post_process_solution_kwargs"] is None ): post_process_solution_kwargs = kwargs else: post_process_solution_kwargs = kwargs["post_process_solution_kwargs"] if ( "post_process_solution_cls" not in kwargs or kwargs["post_process_solution_cls"] is None ): post_process_solution = build_default_postpro( rcpsp_problem=self.problem, partial_solution=partial_solution, **post_process_solution_kwargs, ) else: post_process_solution_cls = kwargs["post_process_solution_cls"] post_process_solution = post_process_solution_cls( problem=self.problem, params_objective_function=self.params_objective_function, **post_process_solution_kwargs, ) self.post_process_solution = post_process_solution if initial_solution_provider is None: if ( "initial_solution_provider_kwargs" not in kwargs or kwargs["initial_solution_provider_kwargs"] is None ): initial_solution_provider_kwargs = kwargs else: initial_solution_provider_kwargs = kwargs[ "initial_solution_provider_kwargs" ] if ( "initial_solution_provider_cls" not in kwargs or kwargs["initial_solution_provider_cls"] is None ): initial_solution_provider = build_default_initial_solution( rcpsp_problem=self.problem, **initial_solution_provider_kwargs, ) else: initial_solution_provider_cls = kwargs["initial_solution_provider_cls"] initial_solution_provider = initial_solution_provider_cls( problem=self.problem, params_objective_function=self.params_objective_function, **initial_solution_provider_kwargs, ) self.initial_solution_provider = initial_solution_provider