Source code for discrete_optimization.rcpsp_multiskill.solvers.ms_rcpsp_lp_lns_solver

#  Copyright (c) 2022 AIRBUS and its affiliates.
#  This source code is licensed under the MIT license found in the
#  LICENSE file in the root directory of this source tree.

import random
from typing import Any, Hashable, Mapping

from discrete_optimization.generic_tools.do_problem import (
    ParamsObjectiveFunction,
    build_evaluate_function_aggregated,
    get_default_objective_setup,
)
from discrete_optimization.generic_tools.lns_mip import (
    ConstraintHandler,
    InitialSolution,
)
from discrete_optimization.generic_tools.lp_tools import MilpSolverName
from discrete_optimization.generic_tools.result_storage.result_storage import (
    ResultStorage,
)
from discrete_optimization.rcpsp.rcpsp_solution import RCPSPSolution
from discrete_optimization.rcpsp.solver.rcpsp_lp_lns_solver import (
    InitialMethodRCPSP,
    InitialSolutionRCPSP,
)
from discrete_optimization.rcpsp_multiskill.rcpsp_multiskill import (
    MS_RCPSPModel,
    MS_RCPSPSolution,
)
from discrete_optimization.rcpsp_multiskill.solvers.lp_model import LP_Solver_MRSCPSP


[docs] class InitialSolutionMS_RCPSP(InitialSolution): def __init__( self, problem: MS_RCPSPModel, params_objective_function: ParamsObjectiveFunction = None, initial_method: InitialMethodRCPSP = InitialMethodRCPSP.PILE, ): self.problem = problem self.params_objective_function = params_objective_function if self.params_objective_function is None: self.params_objective_function = get_default_objective_setup( problem=self.problem ) self.aggreg, _ = build_evaluate_function_aggregated( problem=self.problem, params_objective_function=self.params_objective_function, ) self.initial_method = initial_method
[docs] def get_starting_solution(self) -> ResultStorage: multi_skill_rcpsp = self.problem.build_multimode_rcpsp_calendar_representative() init_solution = InitialSolutionRCPSP( problem=multi_skill_rcpsp, params_objective_function=self.params_objective_function, initial_method=self.initial_method, ) s = init_solution.get_starting_solution() list_solution_fits = [] class_solution = self.problem.get_solution_type() if class_solution is None: class_solution = self.problem.to_variant_model().get_solution_type() for sol, fit in s.list_solution_fits: sol: RCPSPSolution = sol mode = sol.rcpsp_modes modes = { multi_skill_rcpsp.tasks_list_non_dummy[i]: mode[i] for i in range(len(mode)) } modes[self.problem.source_task] = 1 modes[self.problem.sink_task] = 1 ms_rcpsp_solution = class_solution( problem=self.problem, priority_list_task=sol.rcpsp_permutation, modes_vector=sol.rcpsp_modes, priority_worker_per_task=[ [w for w in self.problem.employees] for i in range(self.problem.n_jobs_non_dummy) ], ) list_solution_fits += [(ms_rcpsp_solution, self.aggreg(ms_rcpsp_solution))] return ResultStorage( list_solution_fits=list_solution_fits, mode_optim=self.params_objective_function.sense_function, )
[docs] class ConstraintHandlerFixStartTime(ConstraintHandler): def __init__(self, problem: MS_RCPSPModel, fraction_fix_start_time: float = 0.9): self.problem = problem self.fraction_fix_start_time = fraction_fix_start_time
[docs] def adding_constraint_from_results_store( self, milp_solver: LP_Solver_MRSCPSP, result_storage: ResultStorage ) -> Mapping[Hashable, Any]: nb_jobs = self.problem.nb_tasks constraints_dict = {} current_solution, fit = result_storage.get_best_solution_fit() start = [] for j in current_solution.schedule: start_time_j = current_solution.schedule[j]["start_time"] mode = current_solution.modes[j] start += [(milp_solver.start_times_task[j], start_time_j)] start += [(milp_solver.modes[j][mode], 1)] for m in milp_solver.modes[j]: start += [(milp_solver.modes[j][m], 1 if mode == m else 0)] milp_solver.model.start = start # Fix start time for a subset of task. jobs_to_fix = set( random.sample( list(current_solution.rcpsp_schedule), int(self.fraction_fix_start_time * nb_jobs), ) ) constraints_dict["fix_start_time"] = [] for job_to_fix in jobs_to_fix: constraints_dict["fix_start_time"].append( milp_solver.model.add_constr( milp_solver.start_times_task[job_to_fix] - current_solution.schedule[job_to_fix]["start_time"] == 0 ) ) if milp_solver.lp_solver == MilpSolverName.GRB: milp_solver.model.solver.update() return constraints_dict
[docs] def remove_constraints_from_previous_iteration( self, milp_solver: LP_Solver_MRSCPSP, previous_constraints: Mapping[Hashable, Any], ): milp_solver.model.remove(previous_constraints["fix_start_time"]) if milp_solver.lp_solver == MilpSolverName.GRB: milp_solver.model.solver.update()
[docs] class ConstraintHandlerStartTimeIntervalMRCPSP(ConstraintHandler): def __init__( self, problem: MS_RCPSPModel, fraction_to_fix: float = 0.9, minus_delta: int = 2, plus_delta: int = 2, ): self.problem = problem self.fraction_to_fix = fraction_to_fix self.minus_delta = minus_delta self.plus_delta = plus_delta
[docs] def adding_constraint_from_results_store( self, milp_solver: LP_Solver_MRSCPSP, result_storage: ResultStorage ) -> Mapping[Hashable, Any]: current_solution: MS_RCPSPSolution = result_storage.get_best_solution() start = [] for j in current_solution.schedule: start_time_j = current_solution.schedule[j]["start_time"] mode = current_solution.modes[j] start += [(milp_solver.start_times_task[j], start_time_j)] start += [(milp_solver.modes[j][mode], 1)] for m in milp_solver.modes[j]: start += [(milp_solver.modes[j][m], 1 if mode == m else 0)] milp_solver.model.start = start constraints_dict = {"range_start_time": []} max_time = max( [ current_solution.schedule[x]["end_time"] for x in current_solution.schedule ] ) last_jobs = [ x for x in current_solution.schedule if current_solution.schedule[x]["end_time"] >= max_time - 5 ] nb_jobs = self.problem.nb_tasks jobs_to_fix = set( random.sample( list(current_solution.schedule), int(self.fraction_to_fix * nb_jobs) ) ) for lj in last_jobs: if lj in jobs_to_fix: jobs_to_fix.remove(lj) for job in jobs_to_fix: start_time_j = current_solution.schedule[job]["start_time"] min_st = max(start_time_j - self.minus_delta, 0) max_st = min(start_time_j + self.plus_delta, max_time) constraints_dict["range_start_time"].append( milp_solver.model.add_constr( milp_solver.start_times_task[job] <= max_st ) ) constraints_dict["range_start_time"].append( milp_solver.model.add_constr( milp_solver.start_times_task[job] >= min_st ) ) if milp_solver.lp_solver == MilpSolverName.GRB: milp_solver.model.solver.update() return constraints_dict
[docs] def remove_constraints_from_previous_iteration( self, milp_solver: LP_Solver_MRSCPSP, previous_constraints: Mapping[Hashable, Any], ): milp_solver.model.remove(previous_constraints["range_start_time"]) if milp_solver.lp_solver == MilpSolverName.GRB: milp_solver.model.solver.update()