Source code for discrete_optimization.rcpsp.solver.rcpsp_lp_solver_gantt

#  Copyright (c) 2022 AIRBUS and its affiliates.
#  This source code is licensed under the MIT license found in the
#  LICENSE file in the root directory of this source tree.

import logging
import random
from typing import Any, Callable, Dict, List, Optional, Set, Tuple, Union

import networkx as nx
from mip import BINARY, MINIMIZE, Model, xsum

from discrete_optimization.generic_tools.do_problem import (
    ModeOptim,
    ParamsObjectiveFunction,
    Solution,
)
from discrete_optimization.generic_tools.lp_tools import (
    GurobiMilpSolver,
    MilpSolver,
    MilpSolverName,
    PymipMilpSolver,
    map_solver,
)
from discrete_optimization.rcpsp.rcpsp_model import RCPSPModel
from discrete_optimization.rcpsp.rcpsp_solution import RCPSPSolution
from discrete_optimization.rcpsp.solver.rcpsp_solver import SolverRCPSP

try:
    import gurobipy
except ImportError:
    gurobi_available = False
else:
    gurobi_available = True
    import gurobipy as gurobi


logger = logging.getLogger(__name__)


[docs] def intersect(i1, i2): if i2[0] >= i1[1] or i1[0] >= i2[1]: return None else: s = max(i1[0], i2[0]) e = min(i1[1], i2[1]) return [s, e]
[docs] class ConstraintTaskIndividual: list_tuple: List[Tuple[str, int, int, bool]] # task, resource, resource_individual, has or has not to do a task # indicates constraint for a given resource individual that has to do a task def __init__(self, list_tuple): self.list_tuple = list_tuple
[docs] class ConstraintWorkDuration: ressource: str individual: int time_bounds: Tuple[int, int] working_time_upper_bound: int def __init__(self, ressource, individual, time_bounds, working_time_upper_bound): self.ressource = ressource self.individual = individual self.time_bounds = time_bounds self.working_time_upper_bound = working_time_upper_bound
class _Base_LP_MRCPSP_GANTT(MilpSolver, SolverRCPSP): problem: RCPSPModel def __init__( self, problem: RCPSPModel, rcpsp_solution: RCPSPSolution, params_objective_function: Optional[ParamsObjectiveFunction] = None, **kwargs, ): if problem.calendar_details is None: raise ValueError( "rcpsp_model.calendar_details cannot be None for this solver" ) super().__init__( problem=problem, params_objective_function=params_objective_function ) self.rcpsp_solution = rcpsp_solution self.jobs = sorted(list(problem.mode_details.keys())) self.modes_dict = { i + 2: self.rcpsp_solution.rcpsp_modes[i] for i in range(len(self.rcpsp_solution.rcpsp_modes)) } self.modes_dict[1] = 1 self.modes_dict[self.jobs[-1]] = 1 self.rcpsp_schedule = self.rcpsp_solution.rcpsp_schedule self.start_times_dict = {} for task in self.rcpsp_schedule: t = self.rcpsp_schedule[task]["start_time"] if t not in self.start_times_dict: self.start_times_dict[t] = set() self.start_times_dict[t].add((task, t)) self.graph_intersection_time = nx.Graph() for t in self.jobs: self.graph_intersection_time.add_node(t) for t in self.jobs: intersected_jobs = [ task for task in self.rcpsp_schedule if intersect( [ self.rcpsp_schedule[task]["start_time"], self.rcpsp_schedule[task]["end_time"], ], [ self.rcpsp_schedule[t]["start_time"], self.rcpsp_schedule[t]["end_time"], ], ) is not None and t != task ] for tt in intersected_jobs: self.graph_intersection_time.add_edge(t, tt) cliques = [c for c in nx.find_cliques(self.graph_intersection_time)] self.cliques = cliques self.sense_optim = ModeOptim.MINIMIZATION self.params_objective_function.sense_function = self.sense_optim self.constraint_additionnal = {} def retrieve_current_solution( self, get_var_value_for_current_solution: Callable[[Any], float], get_obj_value_for_current_solution: Callable[[], float], ) -> Tuple[Dict[Any, Dict[Any, Dict[Any, Any]]], float]: objective = get_obj_value_for_current_solution() resource_id_usage = { k: { individual: { task: get_var_value_for_current_solution(resource_usage) for task, resource_usage in self.ressource_id_usage[k][ individual ].items() } for individual in self.ressource_id_usage[k] } for k in self.ressource_id_usage } return (resource_id_usage, objective)
[docs] class LP_MRCPSP_GANTT(PymipMilpSolver, _Base_LP_MRCPSP_GANTT): def __init__( self, problem: RCPSPModel, rcpsp_solution: RCPSPSolution, lp_solver=MilpSolverName.CBC, params_objective_function: Optional[ParamsObjectiveFunction] = None, **kwargs, ): super().__init__( problem=problem, rcpsp_solution=rcpsp_solution, params_objective_function=params_objective_function, **kwargs, ) self.lp_solver = lp_solver
[docs] def init_model(self, **args): self.model = Model(sense=MINIMIZE, solver_name=map_solver[self.lp_solver]) self.ressource_id_usage = { k: {i: {} for i in range(len(self.problem.calendar_details[k]))} for k in self.problem.calendar_details.keys() } variables_per_task = {} variables_per_individual = {} constraints_ressource_need = {} for task in self.jobs: start = self.rcpsp_schedule[task]["start_time"] end = self.rcpsp_schedule[task]["end_time"] for k in self.ressource_id_usage: # typically worker needed_ressource = ( self.problem.mode_details[task][self.modes_dict[task]][k] > 0 ) if needed_ressource: for individual in self.ressource_id_usage[k]: available = all( self.problem.calendar_details[k][individual][time] for time in range(start, end) ) if available: key_variable = (k, individual, task) self.ressource_id_usage[k][individual][ task ] = self.model.add_var( name=str(key_variable), var_type=BINARY, obj=random.random(), ) if task not in variables_per_task: variables_per_task[task] = set() if k not in variables_per_individual: variables_per_individual[k] = {} if individual not in variables_per_individual[k]: variables_per_individual[k][individual] = set() variables_per_task[task].add(key_variable) variables_per_individual[k][individual].add(key_variable) ressource_needed = self.problem.mode_details[task][ self.modes_dict[task] ][k] if k not in constraints_ressource_need: constraints_ressource_need[k] = {} constraints_ressource_need[k][task] = self.model.add_constr( xsum( [ self.ressource_id_usage[k][key[1]][key[2]] for key in variables_per_task[task] if key[0] == k ] ) == ressource_needed, name="ressource_" + str(k) + "_" + str(task), ) overlaps_constraints = {} for i in range(len(self.cliques)): tasks = set(self.cliques[i]) for k in variables_per_individual: for individual in variables_per_individual[k]: keys_variable = [ variable for variable in variables_per_individual[k][individual] if variable[2] in tasks ] if len(keys_variable) > 0: overlaps_constraints[ (i, k, individual) ] = self.model.add_constr( xsum( [ self.ressource_id_usage[key[0]][key[1]][key[2]] for key in keys_variable ] ) <= 1 )
# gurobi solver which is usefull to get a pool of solution (indeed, using the other one we dont have usually a lot of # solution since we converge rapidly to the "optimum" (we don't have an objective value..)
[docs] class LP_MRCPSP_GANTT_GUROBI(GurobiMilpSolver, _Base_LP_MRCPSP_GANTT):
[docs] def init_model(self, **args): self.model = gurobi.Model("Gantt") self.ressource_id_usage = { k: {i: {} for i in range(len(self.problem.calendar_details[k]))} for k in self.problem.calendar_details.keys() } variables_per_task = {} variables_per_individual = {} constraints_ressource_need = {} for task in self.jobs: start = self.rcpsp_schedule[task]["start_time"] end = self.rcpsp_schedule[task]["end_time"] for k in self.ressource_id_usage: # typically worker needed_ressource = ( self.problem.mode_details[task][self.modes_dict[task]][k] > 0 ) if needed_ressource: for individual in self.ressource_id_usage[k]: available = all( self.problem.calendar_details[k][individual][time] for time in range(start, end) ) if available: key_variable = (k, individual, task) self.ressource_id_usage[k][individual][ task ] = self.model.addVar( name=str(key_variable), vtype=gurobi.GRB.BINARY ) if task not in variables_per_task: variables_per_task[task] = set() if k not in variables_per_individual: variables_per_individual[k] = {} if individual not in variables_per_individual[k]: variables_per_individual[k][individual] = set() variables_per_task[task].add(key_variable) variables_per_individual[k][individual].add(key_variable) ressource_needed = self.problem.mode_details[task][ self.modes_dict[task] ][k] if k not in constraints_ressource_need: constraints_ressource_need[k] = {} constraints_ressource_need[k][task] = self.model.addLConstr( gurobi.quicksum( [ self.ressource_id_usage[k][key[1]][key[2]] for key in variables_per_task[task] if key[0] == k ] ) == ressource_needed, name="ressource_" + str(k) + "_" + str(task), ) overlaps_constraints = {} for i in range(len(self.cliques)): tasks = set(self.cliques[i]) for k in variables_per_individual: for individual in variables_per_individual[k]: keys_variable = [ variable for variable in variables_per_individual[k][individual] if variable[2] in tasks ] if len(keys_variable) > 0: overlaps_constraints[ (i, k, individual) ] = self.model.addLConstr( gurobi.quicksum( [ self.ressource_id_usage[key[0]][key[1]][key[2]] for key in keys_variable ] ) <= 1 ) self.model.modelSense = gurobi.GRB.MINIMIZE
[docs] def adding_constraint( self, constraint_description: Union[ConstraintTaskIndividual, ConstraintWorkDuration], constraint_name: str = "", ): if isinstance(constraint_description, ConstraintTaskIndividual): if constraint_name == "": constraint_name = str(ConstraintTaskIndividual.__name__) for tupl in constraint_description.list_tuple: ressource, ressource_individual, task, has_to_do = tupl if ressource in self.ressource_id_usage: if ressource_individual in self.ressource_id_usage[ressource]: if ( task in self.ressource_id_usage[ressource][ressource_individual] ): if constraint_name not in self.constraint_additionnal: self.constraint_additionnal[constraint_name] = [] self.constraint_additionnal[constraint_name] += [ self.model.addLConstr( self.ressource_id_usage[ressource][ ressource_individual ][task] == has_to_do ) ] if isinstance(constraint_description, ConstraintWorkDuration): if constraint_name == "": constraint_name = str(ConstraintWorkDuration.__name__) if constraint_name not in self.constraint_additionnal: self.constraint_additionnal[constraint_name] = [] tasks_of_interest = [ t for t in self.rcpsp_schedule if t in self.ressource_id_usage.get( constraint_description.ressource, {} ).get(constraint_description.individual, {}) and ( constraint_description.time_bounds[0] <= self.rcpsp_schedule[t]["start_time"] <= constraint_description.time_bounds[1] or constraint_description.time_bounds[0] <= self.rcpsp_schedule[t]["end_time"] <= constraint_description.time_bounds[1] ) ] logger.debug(tasks_of_interest) self.constraint_additionnal[constraint_name] += [ self.model.addLConstr( gurobi.quicksum( [ self.ressource_id_usage[constraint_description.ressource][ constraint_description.individual ][t] * ( min( constraint_description.time_bounds[1], self.rcpsp_schedule[t]["end_time"], ) - max( constraint_description.time_bounds[0], self.rcpsp_schedule[t]["start_time"], ) ) for t in tasks_of_interest ] ) <= constraint_description.working_time_upper_bound ) ] self.model.update()
[docs] def retrieve_current_solution( self, get_var_value_for_current_solution: Callable[[Any], float], get_obj_value_for_current_solution: Callable[[], float], ) -> Tuple[Dict[Any, Dict[Any, Dict[Any, Any]]], float]: objective = get_obj_value_for_current_solution() resource_id_usage = { k: { individual: { task: get_var_value_for_current_solution(resource_usage) for task, resource_usage in self.ressource_id_usage[k][ individual ].items() } for individual in self.ressource_id_usage[k] } for k in self.ressource_id_usage } return (resource_id_usage, objective)
[docs] def build_objective_function_from_a_solution( self, ressource_usage: Dict[str, Dict[int, Dict[int, bool]]], ignore_tuple: Set[Tuple[str, int, int]] = None, ): objective = gurobi.LinExpr(0.0) if ignore_tuple is None: ignore_tuple = set() for k in ressource_usage: for individual in ressource_usage[k]: for task in ressource_usage[k][individual]: if ressource_usage[k][individual][task] >= 0.5: objective.add(1 - self.ressource_id_usage[k][individual][task]) logger.debug("Setting new objectives = Change task objective") self.model.setObjective(objective)