Source code for discrete_optimization.rcpsp.solver.rcpsp_lp_lns_solver

#  Copyright (c) 2022 AIRBUS and its affiliates.
#  This source code is licensed under the MIT license found in the
#  LICENSE file in the root directory of this source tree.

import logging
import random
from enum import Enum
from typing import Any, Hashable, List, Mapping, Optional

import mip
import numpy as np

from discrete_optimization.generic_tools.callbacks.callback import Callback
from discrete_optimization.generic_tools.cp_tools import ParametersCP
from discrete_optimization.generic_tools.do_problem import (
    ParamsObjectiveFunction,
    build_evaluate_function_aggregated,
    get_default_objective_setup,
)
from discrete_optimization.generic_tools.lns_mip import (
    LNS_MILP,
    ConstraintHandler,
    InitialSolution,
)
from discrete_optimization.generic_tools.lp_tools import ParametersMilp
from discrete_optimization.generic_tools.ls.local_search import (
    ModeMutation,
    RestartHandlerLimit,
)
from discrete_optimization.generic_tools.ls.simulated_annealing import (
    SimulatedAnnealing,
    TemperatureSchedulingFactor,
)
from discrete_optimization.generic_tools.mutations.mixed_mutation import (
    BasicPortfolioMutation,
)
from discrete_optimization.generic_tools.mutations.mutation_catalog import (
    get_available_mutations,
)
from discrete_optimization.generic_tools.result_storage.result_storage import (
    ResultStorage,
)
from discrete_optimization.rcpsp.mutations.mutation_rcpsp import (
    PermutationMutationRCPSP,
)
from discrete_optimization.rcpsp.rcpsp_model import RCPSPModel
from discrete_optimization.rcpsp.rcpsp_solution import RCPSPSolution
from discrete_optimization.rcpsp.solver import CP_MRCPSP_MZN
from discrete_optimization.rcpsp.solver.rcpsp_lp_solver import (
    LP_MRCPSP,
    LP_MRCPSP_GUROBI,
    LP_RCPSP,
)
from discrete_optimization.rcpsp.solver.rcpsp_pile import (
    GreedyChoice,
    PileSolverRCPSP,
    PileSolverRCPSP_Calendar,
)
from discrete_optimization.rcpsp.solver.rcpsp_solver import SolverRCPSP

logger = logging.getLogger(__name__)


[docs] class InitialMethodRCPSP(Enum): DUMMY = 0 PILE = 1 PILE_CALENDAR = 2 LS = 3 GA = 4 CP = 5
[docs] class InitialSolutionRCPSP(InitialSolution): def __init__( self, problem: RCPSPModel, params_objective_function: ParamsObjectiveFunction = None, initial_method: InitialMethodRCPSP = InitialMethodRCPSP.PILE, ): self.problem = problem self.params_objective_function = params_objective_function if self.params_objective_function is None: self.params_objective_function = get_default_objective_setup( problem=self.problem ) self.aggreg, _ = build_evaluate_function_aggregated( problem=self.problem, params_objective_function=self.params_objective_function, ) self.initial_method = initial_method
[docs] def get_starting_solution(self) -> ResultStorage: if self.initial_method == InitialMethodRCPSP.PILE: logger.info("Compute greedy") greedy_solver = PileSolverRCPSP(self.problem) store_solution = greedy_solver.solve( greedy_choice=GreedyChoice.MOST_SUCCESSORS ) if self.initial_method == InitialMethodRCPSP.PILE_CALENDAR: logger.info("Compute greedy") greedy_solver = PileSolverRCPSP_Calendar(self.problem) store_solution = greedy_solver.solve( greedy_choice=GreedyChoice.MOST_SUCCESSORS ) elif self.initial_method == InitialMethodRCPSP.DUMMY: logger.info("Compute dummy") solution = self.problem.get_dummy_solution() fit = self.aggreg(solution) store_solution = ResultStorage( list_solution_fits=[(solution, fit)], best_solution=solution, mode_optim=self.params_objective_function.sense_function, ) elif self.initial_method == InitialMethodRCPSP.CP: solver = CP_MRCPSP_MZN( problem=self.problem, params_objective_function=self.params_objective_function, ) store_solution = solver.solve(parameters_cp=ParametersCP.default()) elif self.initial_method == InitialMethodRCPSP.LS: dummy = self.problem.get_dummy_solution() _, mutations = get_available_mutations(self.problem, dummy) list_mutation = [ mutate[0].build(self.problem, dummy, **mutate[1]) for mutate in mutations if mutate[0] == PermutationMutationRCPSP ] mixed_mutation = BasicPortfolioMutation( list_mutation, np.ones((len(list_mutation))) ) res = RestartHandlerLimit(500) sa = SimulatedAnnealing( problem=self.problem, mutator=mixed_mutation, restart_handler=res, temperature_handler=TemperatureSchedulingFactor(2, res, 0.9999), mode_mutation=ModeMutation.MUTATE, params_objective_function=self.params_objective_function, store_solution=True, ) store_solution = sa.solve(dummy, nb_iteration_max=10000) return store_solution
[docs] class ConstraintHandlerFixStartTime(ConstraintHandler): def __init__(self, problem: RCPSPModel, fraction_fix_start_time: float = 0.9): self.problem = problem self.fraction_fix_start_time = fraction_fix_start_time
[docs] def adding_constraint_from_results_store( self, milp_solver: LP_RCPSP, result_storage: ResultStorage ) -> Mapping[Hashable, Any]: nb_jobs = self.problem.n_jobs + 2 constraints_dict = {} current_solution, fit = result_storage.get_best_solution_fit() current_solution: RCPSPSolution = current_solution # Starting point : start = [] for j in self.problem.tasks_list: start_time_j = current_solution.rcpsp_schedule[j]["start_time"] for t in milp_solver.index_time: if start_time_j == t: start += [(milp_solver.x[milp_solver.index_in_var[j]][t], 1)] else: start += [(milp_solver.x[milp_solver.index_in_var[j]][t], 0)] milp_solver.model.start = start # Fix start time for a subset of task. jobs_to_fix = set( random.sample( list(current_solution.rcpsp_schedule), int(self.fraction_fix_start_time * nb_jobs), ) ) constraints_dict["fix_start_time"] = [] for job_to_fix in jobs_to_fix: for t in milp_solver.index_time: if current_solution.rcpsp_schedule[job_to_fix]["start_time"] == t: constraints_dict["fix_start_time"].append( milp_solver.model.add_constr( milp_solver.x[milp_solver.index_in_var[job_to_fix]][t] == 1 ) ) else: constraints_dict["fix_start_time"].append( milp_solver.model.add_constr( milp_solver.x[milp_solver.index_in_var[job_to_fix]][t] == 0 ) ) if milp_solver.solver_name == mip.GRB: milp_solver.model.solver.update() return constraints_dict
[docs] def remove_constraints_from_previous_iteration( self, milp_solver: LP_RCPSP, previous_constraints: Mapping[Hashable, Any] ): milp_solver.model.remove(previous_constraints["fix_start_time"]) if milp_solver.solver_name == mip.GRB: milp_solver.model.solver.update()
[docs] class ConstraintHandlerStartTimeInterval(ConstraintHandler): def __init__( self, problem: RCPSPModel, fraction_to_fix: float = 0.9, minus_delta: int = 2, plus_delta: int = 2, ): self.problem = problem self.fraction_to_fix = fraction_to_fix self.minus_delta = minus_delta self.plus_delta = plus_delta
[docs] def adding_constraint_from_results_store( self, milp_solver: LP_RCPSP, result_storage: ResultStorage ) -> Mapping[Hashable, Any]: constraints_dict = {} current_solution, fit = result_storage.get_best_solution_fit() # Starting point : current_solution: RCPSPSolution = current_solution # Starting point : start = [] for j in self.problem.tasks_list: start_time_j = current_solution.rcpsp_schedule[j]["start_time"] for t in milp_solver.index_time: if start_time_j == t: start += [(milp_solver.x[milp_solver.index_in_var[j]][t], 1)] else: start += [(milp_solver.x[milp_solver.index_in_var[j]][t], 0)] milp_solver.model.start = start constraints_dict["range_start_time"] = [] max_time = max( [ current_solution.rcpsp_schedule[x]["end_time"] for x in current_solution.rcpsp_schedule ] ) last_jobs = [ x for x in current_solution.rcpsp_schedule if current_solution.rcpsp_schedule[x]["end_time"] >= max_time - 5 ] nb_jobs = self.problem.n_jobs jobs_to_fix = set( random.sample( list(current_solution.rcpsp_schedule), int(self.fraction_to_fix * nb_jobs), ) ) for lj in last_jobs: if lj in jobs_to_fix: jobs_to_fix.remove(lj) for job in jobs_to_fix: start_time_j = current_solution.rcpsp_schedule[job]["start_time"] min_st = max(start_time_j - self.minus_delta, 0) max_st = min(start_time_j + self.plus_delta, max_time) for t in milp_solver.index_time: if t < min_st or t > max_st: constraints_dict["range_start_time"].append( milp_solver.model.add_constr( milp_solver.x[milp_solver.index_in_var[job]][t] == 0 ) ) if milp_solver.solver_name == mip.GRB: milp_solver.model.solver.update() return constraints_dict
[docs] def remove_constraints_from_previous_iteration( self, milp_solver: LP_RCPSP, previous_constraints: Mapping[Hashable, Any] ): milp_solver.model.remove(previous_constraints["range_start_time"]) if milp_solver.solver_name == mip.GRB: milp_solver.model.solver.update()
[docs] class ConstraintHandlerStartTimeIntervalMRCPSP(ConstraintHandler): def __init__( self, problem: RCPSPModel, fraction_to_fix: float = 0.9, minus_delta: int = 2, plus_delta: int = 2, ): self.problem = problem self.fraction_to_fix = fraction_to_fix self.minus_delta = minus_delta self.plus_delta = plus_delta
[docs] def adding_constraint_from_results_store( self, milp_solver: LP_MRCPSP, result_storage: ResultStorage ) -> Mapping[Hashable, Any]: current_solution, fit = result_storage.get_best_solution_fit() current_solution: RCPSPSolution = current_solution st = milp_solver.start_solution if ( self.problem.evaluate(st)["makespan"] < self.problem.evaluate(current_solution)["makespan"] ): current_solution = st start = [] modes_dict = self.problem.build_mode_dict(current_solution.rcpsp_modes) for j in current_solution.rcpsp_schedule: start_time_j = current_solution.rcpsp_schedule[j]["start_time"] mode_j = modes_dict[j] start += [ ( milp_solver.durations[j], self.problem.mode_details[j][mode_j]["duration"], ) ] for k in milp_solver.variable_per_task[j]: task, mode, time = k if start_time_j == time and mode == mode_j: start += [(milp_solver.x[k], 1)] else: start += [(milp_solver.x[k], 0)] milp_solver.model.start = start constraints_dict = {"range_start_time": []} max_time = max( [ current_solution.rcpsp_schedule[x]["end_time"] for x in current_solution.rcpsp_schedule ] ) last_jobs = [ x for x in current_solution.rcpsp_schedule if current_solution.rcpsp_schedule[x]["end_time"] >= max_time - 5 ] nb_jobs = self.problem.n_jobs jobs_to_fix = set( random.sample( list(current_solution.rcpsp_schedule), int(self.fraction_to_fix * nb_jobs), ) ) for lj in last_jobs: if lj in jobs_to_fix: jobs_to_fix.remove(lj) for job in jobs_to_fix: start_time_j = current_solution.rcpsp_schedule[job]["start_time"] min_st = max(start_time_j - self.minus_delta, 0) max_st = min(start_time_j + self.plus_delta, max_time) for key in milp_solver.variable_per_task[job]: t = key[2] if t < min_st or t > max_st: constraints_dict["range_start_time"].append( milp_solver.model.add_constr(milp_solver.x[key] == 0) ) if milp_solver.solver_name == mip.GRB: milp_solver.model.solver.update() return constraints_dict
[docs] def remove_constraints_from_previous_iteration( self, milp_solver: LP_MRCPSP, previous_constraints: Mapping[Hashable, Any] ): milp_solver.model.remove(previous_constraints["range_start_time"]) if milp_solver.solver_name == mip.GRB: milp_solver.model.solver.update()
[docs] class ConstraintHandlerStartTimeIntervalMRCPSP_GRB(ConstraintHandler): def __init__( self, problem: RCPSPModel, fraction_to_fix: float = 0.9, minus_delta: int = 2, plus_delta: int = 2, ): self.problem = problem self.fraction_to_fix = fraction_to_fix self.minus_delta = minus_delta self.plus_delta = plus_delta
[docs] def adding_constraint_from_results_store( self, milp_solver: LP_MRCPSP_GUROBI, result_storage: ResultStorage ) -> Mapping[Hashable, Any]: current_solution, fit = result_storage.get_best_solution_fit() st = milp_solver.start_solution if ( self.problem.evaluate(st)["makespan"] < self.problem.evaluate(current_solution)["makespan"] ): current_solution = st start = [] modes_dict = self.problem.build_mode_dict(current_solution.rcpsp_modes) for j in current_solution.rcpsp_schedule: start_time_j = current_solution.rcpsp_schedule[j]["start_time"] mode_j = modes_dict[j] start += [ ( milp_solver.durations[j], self.problem.mode_details[j][mode_j]["duration"], ) ] for k in milp_solver.variable_per_task[j]: task, mode, time = k if start_time_j == time and mode == mode_j: milp_solver.x[k].start = 1 milp_solver.starts[j].start = start_time_j else: milp_solver.x[k].start = 0 constraints_dict = {"range_start_time": []} max_time = max( [ current_solution.rcpsp_schedule[x]["end_time"] for x in current_solution.rcpsp_schedule ] ) last_jobs = [ x for x in current_solution.rcpsp_schedule if current_solution.rcpsp_schedule[x]["end_time"] >= max_time - 5 ] nb_jobs = self.problem.n_jobs jobs_to_fix = set( random.sample( list(current_solution.rcpsp_schedule), int(self.fraction_to_fix * nb_jobs), ) ) for lj in last_jobs: if lj in jobs_to_fix: jobs_to_fix.remove(lj) for job in jobs_to_fix: start_time_j = current_solution.rcpsp_schedule[job]["start_time"] min_st = max(start_time_j - self.minus_delta, 0) max_st = min(start_time_j + self.plus_delta, max_time) for key in milp_solver.variable_per_task[job]: t = key[2] if t < min_st or t > max_st: constraints_dict["range_start_time"].append( milp_solver.model.addLConstr(milp_solver.x[key] == 0) ) milp_solver.model.update() return constraints_dict
[docs] def remove_constraints_from_previous_iteration( self, milp_solver: LP_MRCPSP_GUROBI, previous_constraints: Mapping[Hashable, Any], ): milp_solver.model.remove(previous_constraints.get("range_start_time", [])) milp_solver.model.update()
[docs] class LNS_LP_RCPSP_SOLVER(SolverRCPSP): problem: RCPSPModel def __init__( self, problem: RCPSPModel, params_objective_function: Optional[ParamsObjectiveFunction] = None, **kwargs ): super().__init__( problem=problem, params_objective_function=params_objective_function ) solver = LP_MRCPSP(problem=problem, **kwargs) solver.init_model(greedy_start=False) self.parameters_milp = ParametersMilp( time_limit=100, pool_solutions=1000, mip_gap_abs=0.001, mip_gap=0.001, retrieve_all_solution=True, n_solutions_max=100, ) self.constraint_handler = ConstraintHandlerStartTimeIntervalMRCPSP( problem=problem, fraction_to_fix=0.6, minus_delta=5, plus_delta=5 ) self.initial_solution_provider = InitialSolutionRCPSP( problem=problem, initial_method=InitialMethodRCPSP.DUMMY, params_objective_function=self.params_objective_function, ) self.lns_solver = LNS_MILP( problem=problem, milp_solver=solver, initial_solution_provider=self.initial_solution_provider, constraint_handler=self.constraint_handler, params_objective_function=self.params_objective_function, )
[docs] def solve( self, callbacks: Optional[List[Callback]] = None, **kwargs ) -> ResultStorage: return self.lns_solver.solve_lns( parameters_milp=self.parameters_milp, nb_iteration_lns=kwargs.get("nb_iteration_lns", 100), callbacks=callbacks, )