# Copyright (c) 2022 AIRBUS and its affiliates.
# This source code is licensed under the MIT license found in the
# LICENSE file in the root directory of this source tree.
import logging
from copy import deepcopy
from enum import Enum
from functools import partial
from typing import (
Any,
Callable,
Dict,
Hashable,
Iterable,
List,
Optional,
Tuple,
Type,
Union,
)
import matplotlib.pyplot as plt
import numpy as np
import numpy.typing as npt
from discrete_optimization.generic_tools.do_problem import (
EncodingRegister,
ModeOptim,
ObjectiveDoc,
ObjectiveHandling,
ObjectiveRegister,
Problem,
Solution,
TupleFitness,
TypeAttribute,
TypeObjective,
)
from discrete_optimization.generic_tools.graph_api import Graph
from discrete_optimization.rcpsp.fast_function_rcpsp import (
compute_mean_ressource,
sgs_fast,
sgs_fast_partial_schedule_incomplete_permutation_tasks,
)
from discrete_optimization.rcpsp.rcpsp_solution import RCPSPSolution
from discrete_optimization.rcpsp.rcpsp_utils import intersect
from discrete_optimization.rcpsp.special_constraints import (
PairModeConstraint,
SpecialConstraintsDescription,
)
logger = logging.getLogger(__name__)
[docs]
class ScheduleGenerationScheme(Enum):
SERIAL_SGS = 0
PARALLEL_SGS = 1
[docs]
class RCPSPModel(Problem):
"""
Attributes:
resources:
non_renewable_resources:
mode_details:
successors:
horizon:
horizon_multiplier:
tasks_list:
source_task:
sink_task:
name_task:
n_jobs (int):
n_jobs_non_dummy (int): excluding dummy activities Start (0) and End (n)
special_constraints:
do_special_constraints (bool):
relax_the_start_at_end (bool): relax some conditions only if do_special_constraints
fixed_permutation (Optional[List[int]]):
fixed_modes (Optional[List[int]]):
Args:
resources: {resource_name: number_of_resource}
non_renewable_resources: [resource_name3, resource_name4]
mode_details: {job_id: {mode_id: {resource_name1: number_of_resources_needed, resource_name2: ...}} one key being "duration"
successors:
horizon:
horizon_multiplier:
tasks_list: {task_id: list of successor task ids}
source_task:
sink_task:
name_task:
special_constraints:
relax_the_start_at_end:
fixed_permutation:
fixed_modes:
**kwargs:
"""
sgs: ScheduleGenerationScheme
def __init__(
self,
resources: Dict[str, Union[int, List[int]]],
non_renewable_resources: List[str],
mode_details: Dict[Hashable, Dict[int, Dict[str, int]]],
successors: Dict[Hashable, List[Hashable]],
horizon: int,
horizon_multiplier: int = 1,
tasks_list: Optional[List[Hashable]] = None,
source_task: Optional[Hashable] = None,
sink_task: Optional[Hashable] = None,
name_task: Optional[Dict[Hashable, str]] = None,
calendar_details: Optional[Dict[str, List[List[int]]]] = None,
special_constraints: Optional[SpecialConstraintsDescription] = None,
relax_the_start_at_end: bool = True,
fixed_permutation: Optional[List[int]] = None,
fixed_modes: Optional[List[int]] = None,
**kwargs: Any,
):
self.resources = resources
self.resources_list = list(self.resources.keys())
self.non_renewable_resources = non_renewable_resources
self.mode_details = mode_details
self.successors = successors
self.horizon = horizon
self.horizon_multiplier = horizon_multiplier
self.calendar_details = calendar_details
if name_task is None:
self.name_task = {x: str(x) for x in self.mode_details}
else:
self.name_task = name_task
if tasks_list is None:
self.tasks_list = list(self.mode_details.keys())
else:
self.tasks_list = tasks_list
self.n_jobs = len(self.mode_details.keys())
self.n_jobs_non_dummy = self.n_jobs - 2
self.index_task = {self.tasks_list[i]: i for i in range(self.n_jobs)}
if source_task is None:
if all((isinstance(t, int) for t in self.tasks_list)):
self.source_task = min(self.tasks_list) # type: ignore
else:
raise ValueError(
"source_task cannot be None if tasks id given in tasks_list are not all integers."
)
else:
self.source_task = source_task
if sink_task is None:
if all((isinstance(t, int) for t in self.tasks_list)):
self.sink_task = max(self.tasks_list) # type: ignore
else:
raise ValueError(
"sink_task cannot be None if tasks id given in tasks_list are not all integers."
)
else:
self.sink_task = sink_task
self.tasks_list_non_dummy = [
t for t in self.tasks_list if t not in {self.source_task, self.sink_task}
]
self.index_task_non_dummy = {
self.tasks_list_non_dummy[i]: i for i in range(self.n_jobs_non_dummy)
}
self.max_number_of_mode = max(
[len(self.mode_details[key1].keys()) for key1 in self.mode_details.keys()]
)
self.is_multimode = self.max_number_of_mode > 1
self.is_calendar = False
if any(isinstance(self.resources[res], Iterable) for res in self.resources):
self.is_calendar = (
max(
(
len(
{self.resources[res]}
if isinstance(self.resources[res], int)
else set(self.resources[res]) # type: ignore
)
for res in self.resources
)
)
> 1
)
if not self.is_calendar:
self.resources = {
r: self.resources[r]
if isinstance(self.resources[r], int)
else self.resources[r][0] # type: ignore
for r in self.resources
}
(
self.func_sgs,
self.func_sgs_2,
self.compute_mean_resource,
) = create_np_data_and_jit_functions(self)
self.costs: Dict[str, bool] = {
"makespan": True,
"mean_resource_reserve": kwargs.get("mean_resource_reserve", False),
}
if special_constraints is None:
self.do_special_constraints = False
self.special_constraints = SpecialConstraintsDescription()
else:
self.do_special_constraints = True
self.special_constraints = special_constraints
predecessors_dict: Dict[Hashable, List[Hashable]] = {
task: [] for task in self.successors
}
for task in self.successors:
for stask in self.successors[task]:
predecessors_dict[stask] += [task]
for t1, t2 in self.special_constraints.start_at_end:
if t2 not in self.successors[t1]:
self.successors[t1].append(t2)
for t1, t2, off in self.special_constraints.start_at_end_plus_offset:
if t2 not in self.successors[t1]:
self.successors[t1].append(t2)
for t1, t2 in self.special_constraints.start_together:
for predt1 in predecessors_dict[t1]:
if t2 not in self.successors[predt1]:
self.successors[predt1] += [t2]
for predt2 in predecessors_dict[t2]:
if t1 not in self.successors[predt2]:
self.successors[predt2] += [t1]
self.graph = self.compute_graph(
compute_predecessors=self.do_special_constraints
)
if self.do_special_constraints:
self.predecessors = self.graph.predecessors_dict
self.relax_the_start_at_end = relax_the_start_at_end
self.fixed_permutation = fixed_permutation
self.fixed_modes = fixed_modes
[docs]
def update_functions(self) -> None:
(
self.func_sgs,
self.func_sgs_2,
self.compute_mean_resource,
) = create_np_data_and_jit_functions(rcpsp_problem=self)
[docs]
def is_rcpsp_multimode(self) -> bool:
return self.is_multimode
[docs]
def is_varying_resource(self) -> bool:
return self.is_calendar
[docs]
def is_preemptive(self) -> bool:
return False
[docs]
def is_multiskill(self) -> bool:
return False
[docs]
def includes_special_constraint(self) -> bool:
return self.do_special_constraints
[docs]
def get_resource_names(self) -> List[str]:
return self.resources_list
[docs]
def get_tasks_list(self) -> List[Hashable]:
return self.tasks_list
[docs]
def get_resource_availability_array(self, res: str) -> List[int]:
if self.is_varying_resource() and not isinstance(self.resources[res], int):
return self.resources[res] # type: ignore
else:
return self.horizon * [self.resources[res]] # type: ignore
[docs]
def compute_graph(self, compute_predecessors: bool = False) -> Graph:
nodes: List[Tuple[Hashable, Dict[str, Any]]] = [
(
n,
{
str(mode): self.mode_details[n][mode]["duration"]
for mode in self.mode_details[n]
},
)
for n in self.tasks_list
]
edges: List[Tuple[Hashable, Hashable, Dict[str, Any]]] = []
for n in self.successors:
for succ in self.successors[n]:
edges += [(n, succ, {})]
return Graph(
nodes, edges, compute_predecessors=compute_predecessors, undirected=False
)
[docs]
def evaluate_function(self, rcpsp_sol: RCPSPSolution) -> Tuple[int, float, int]:
if rcpsp_sol._schedule_to_recompute:
rcpsp_sol.generate_schedule_from_permutation_serial_sgs()
makespan = rcpsp_sol.rcpsp_schedule[self.sink_task]["end_time"]
if self.costs["mean_resource_reserve"]:
obj_mean_resource_reserve = rcpsp_sol.compute_mean_resource_reserve()
else:
obj_mean_resource_reserve = 0.0
if self.do_special_constraints:
penalty = evaluate_constraints(
solution=rcpsp_sol, constraints=self.special_constraints
)
else:
penalty = 0
return makespan, obj_mean_resource_reserve, penalty
[docs]
def evaluate_from_encoding(
self, int_vector: List[int], encoding_name: str
) -> Dict[str, float]:
if encoding_name == "rcpsp_permutation":
if self.fixed_modes is None:
rcpsp_modes = [1 for i in range(self.n_jobs_non_dummy)]
else:
rcpsp_modes = self.fixed_modes
rcpsp_sol = RCPSPSolution(
problem=self, rcpsp_permutation=int_vector, rcpsp_modes=rcpsp_modes
)
elif encoding_name == "rcpsp_modes":
if self.fixed_permutation is not None:
rcpsp_sol = RCPSPSolution(
problem=self,
rcpsp_permutation=self.fixed_permutation,
rcpsp_modes=int_vector,
)
else:
raise RuntimeError(
"Encoding rcpsp_modes possible "
"only if self.fixed_permutation is not None"
)
else:
raise NotImplementedError(f"Encoding {encoding_name} not implemented")
objectives = self.evaluate(rcpsp_sol)
return objectives
[docs]
def evaluate(self, rcpsp_sol: RCPSPSolution) -> Dict[str, float]: # type: ignore
obj_makespan, obj_mean_resource_reserve, penalty = self.evaluate_function(
rcpsp_sol
)
return {
"makespan": float(obj_makespan),
"mean_resource_reserve": obj_mean_resource_reserve,
"constraint_penalty": float(penalty),
}
[docs]
def evaluate_mobj(self, rcpsp_sol: RCPSPSolution) -> TupleFitness: # type: ignore
return self.evaluate_mobj_from_dict(self.evaluate(rcpsp_sol))
[docs]
def evaluate_mobj_from_dict(self, dict_values: Dict[str, float]) -> TupleFitness:
return TupleFitness(
np.array([-dict_values["makespan"], dict_values["mean_resource_reserve"]]),
2,
)
[docs]
def build_mode_dict(
self, rcpsp_modes_from_solution: List[int]
) -> Dict[Hashable, int]:
modes_dict = {
self.tasks_list_non_dummy[i]: rcpsp_modes_from_solution[i]
for i in range(self.n_jobs_non_dummy)
}
modes_dict[self.source_task] = 1
modes_dict[self.sink_task] = 1
return modes_dict
[docs]
def build_mode_array(self, rcpsp_modes_from_solution: List[int]) -> List[int]:
modes_dict = self.build_mode_dict(
rcpsp_modes_from_solution=rcpsp_modes_from_solution
)
return [modes_dict[t] for t in self.tasks_list]
[docs]
def return_index_task(self, task: Hashable, offset: int = 0) -> int:
return self.index_task[task] + offset
[docs]
def satisfy(self, rcpsp_sol: RCPSPSolution) -> bool: # type: ignore
if rcpsp_sol.rcpsp_schedule_feasible is False:
logger.debug("Schedule flagged as infeasible when generated")
return False
if self.do_special_constraints:
if not check_solution_with_special_constraints(
problem=self,
solution=rcpsp_sol,
relax_the_start_at_end=self.relax_the_start_at_end,
):
return False
modes_dict = self.build_mode_dict(
rcpsp_modes_from_solution=rcpsp_sol.rcpsp_modes
)
start_times = [
rcpsp_sol.rcpsp_schedule[t]["start_time"] for t in rcpsp_sol.rcpsp_schedule
]
for t in start_times:
resource_usage = {}
for res in self.resources_list:
resource_usage[res] = 0
for act_id in rcpsp_sol.rcpsp_schedule:
start = rcpsp_sol.rcpsp_schedule[act_id]["start_time"]
end = rcpsp_sol.rcpsp_schedule[act_id]["end_time"]
mode = modes_dict[act_id]
for res in self.resources_list:
if start <= t < end:
resource_usage[res] += self.mode_details[act_id][mode].get(
res, 0
)
for res in self.resources.keys():
if resource_usage[res] > self.get_resource_available(res, t):
logger.debug(
[
act
for act in rcpsp_sol.rcpsp_schedule
if rcpsp_sol.rcpsp_schedule[act]["start_time"]
<= t
< rcpsp_sol.rcpsp_schedule[act]["end_time"]
]
)
logger.debug(
f"Time step resource violation: time: {t} "
f"res {res} res_usage: {resource_usage[res]}"
f"res_avail: {self.resources[res]}"
)
return False
# Check for non-renewable resource violation
for res in self.non_renewable_resources:
usage = 0
for act_id in rcpsp_sol.rcpsp_schedule:
mode = modes_dict[act_id]
usage += self.mode_details[act_id][mode][res]
if usage > self.get_max_resource_capacity(res):
logger.debug(
f"Non-renewable resource violation: act_id: {act_id}"
f"res {res} res_usage: {usage} res_avail: {self.resources[res]}"
)
return False
# Check precedences / successors
for act_id in list(self.successors.keys()):
for succ_id in self.successors[act_id]:
start_succ = rcpsp_sol.rcpsp_schedule[succ_id]["start_time"]
end_pred = rcpsp_sol.rcpsp_schedule[act_id]["end_time"]
if start_succ < end_pred:
logger.debug(
f"Precedence relationship broken: {act_id} end at {end_pred} "
f"while {succ_id} start at {start_succ}"
)
return False
return True
def __str__(self) -> str:
val = (
"I'm a RCPSP model with "
+ str(self.n_jobs)
+ " tasks.."
+ " and ressources ="
+ str(self.resources_list)
)
return val
[docs]
def get_solution_type(self) -> Type[Solution]:
return RCPSPSolution
[docs]
def get_attribute_register(self) -> EncodingRegister:
dict_register = {
"rcpsp_permutation": {
"name": "rcpsp_permutation",
"type": [TypeAttribute.PERMUTATION, TypeAttribute.PERMUTATION_RCPSP],
"range": range(self.n_jobs_non_dummy),
"n": self.n_jobs_non_dummy,
}
}
max_number_modes = max([len(self.mode_details[x]) for x in self.mode_details])
dict_register["rcpsp_modes"] = {
"name": "rcpsp_modes",
"type": [TypeAttribute.LIST_INTEGER],
"n": self.n_jobs_non_dummy,
"low": 1, # integer.
"up": max_number_modes, # integer.
"arity": max_number_modes,
}
mode_arity = [
len(self.mode_details[task]) for task in self.tasks_list_non_dummy
]
dict_register["rcpsp_modes_arity_fix"] = {
"name": "rcpsp_modes",
"type": [TypeAttribute.LIST_INTEGER_SPECIFIC_ARITY],
"n": self.n_jobs_non_dummy,
"low": 1,
"up": mode_arity,
"arities": mode_arity,
}
return EncodingRegister(dict_register)
[docs]
def get_objective_register(self) -> ObjectiveRegister:
objective_handling = ObjectiveHandling.SINGLE
dict_objective = {
"makespan": ObjectiveDoc(type=TypeObjective.OBJECTIVE, default_weight=-1.0)
}
# "mean_resource_reserve": {"type": TypeObjective.OBJECTIVE, "default_weight": 1}}
if self.do_special_constraints:
objective_handling = ObjectiveHandling.AGGREGATE
dict_objective["constraint_penalty"] = ObjectiveDoc(
type=TypeObjective.PENALTY, default_weight=-100.0
)
return ObjectiveRegister(
objective_sense=ModeOptim.MAXIMIZATION,
objective_handling=objective_handling,
dict_objective_to_doc=dict_objective,
)
[docs]
def compute_resource_consumption(
self, rcpsp_sol: RCPSPSolution
) -> npt.NDArray[np.int_]:
modes_dict = self.build_mode_dict(rcpsp_sol.rcpsp_modes)
makespan = rcpsp_sol.rcpsp_schedule[self.sink_task]["end_time"]
consumptions = np.zeros((len(self.resources), makespan + 1), dtype=np.int_)
for act_id in rcpsp_sol.rcpsp_schedule:
for ir in range(len(self.resources)):
consumptions[
ir,
rcpsp_sol.rcpsp_schedule[act_id]["start_time"]
+ 1 : rcpsp_sol.rcpsp_schedule[act_id]["end_time"]
+ 1,
] += self.mode_details[act_id][modes_dict[act_id]][
self.resources_list[ir]
]
return consumptions
[docs]
def plot_ressource_view(self, rcpsp_sol: RCPSPSolution) -> None:
consumption = self.compute_resource_consumption(rcpsp_sol=rcpsp_sol)
fig, ax = plt.subplots(nrows=len(self.resources_list), sharex=True)
for i in range(len(self.resources_list)):
ax[i].axhline(
y=self.resources[self.resources_list[i]], label=self.resources_list[i]
)
ax[i].plot(consumption[i, :])
ax[i].legend()
[docs]
def copy(self) -> "RCPSPModel":
model = RCPSPModel(
resources=self.resources,
tasks_list=self.tasks_list,
source_task=self.source_task,
sink_task=self.sink_task,
non_renewable_resources=self.non_renewable_resources,
mode_details=deepcopy(self.mode_details),
successors=deepcopy(self.successors),
horizon=self.horizon,
horizon_multiplier=self.horizon_multiplier,
name_task=self.name_task,
mean_resource_reserve=self.costs.get("mean_resource_reserve", False),
fixed_modes=self.fixed_modes,
fixed_permutation=self.fixed_permutation,
)
return model
[docs]
def get_dummy_solution(self) -> RCPSPSolution:
sol = RCPSPSolution(
problem=self,
rcpsp_permutation=list(range(self.n_jobs_non_dummy)),
rcpsp_modes=[1 for i in range(self.n_jobs_non_dummy)],
)
return sol
[docs]
def get_resource_available(self, res: str, time: int) -> int:
if self.is_calendar:
return self.resources.get(res, [0])[time] # type: ignore
return self.resources.get(res, 0) # type: ignore
[docs]
def get_max_resource_capacity(self, res: str) -> int:
if self.is_calendar:
return max(self.resources.get(res, [0])) # type: ignore
return self.resources.get(res, 0) # type: ignore
[docs]
def set_fixed_attributes(self, encoding_str: str, sol: RCPSPSolution) -> None:
att = self.get_attribute_register().dict_attribute_to_type[encoding_str]["name"]
if att == "rcpsp_modes":
self.set_fixed_modes(sol.rcpsp_modes)
elif att == "rcpsp_permutation":
self.set_fixed_permutation(sol.rcpsp_permutation)
[docs]
def set_fixed_modes(self, fixed_modes: List[int]) -> None:
self.fixed_modes = fixed_modes
[docs]
def set_fixed_permutation(self, fixed_permutation: List[int]) -> None:
self.fixed_permutation = fixed_permutation
[docs]
def create_np_data_and_jit_functions(
rcpsp_problem: RCPSPModel,
) -> Tuple[
Callable[
...,
Tuple[Dict[int, Tuple[int, int]], bool],
],
Callable[
...,
Tuple[Dict[int, Tuple[int, int]], bool],
],
Callable[
...,
float,
],
]:
consumption_array = np.zeros(
(
rcpsp_problem.n_jobs,
rcpsp_problem.max_number_of_mode,
len(rcpsp_problem.resources_list),
),
dtype=np.int_,
)
duration_array = np.zeros(
(rcpsp_problem.n_jobs, rcpsp_problem.max_number_of_mode), dtype=np.int_
)
predecessors = np.zeros((rcpsp_problem.n_jobs, rcpsp_problem.n_jobs), dtype=np.int_)
successors = np.zeros((rcpsp_problem.n_jobs, rcpsp_problem.n_jobs), dtype=np.int_)
horizon = rcpsp_problem.horizon
ressource_available = np.zeros(
(len(rcpsp_problem.resources_list), horizon), dtype=np.int_
)
ressource_renewable = np.ones((len(rcpsp_problem.resources_list)), dtype=bool)
minimum_starting_time_array = np.zeros(rcpsp_problem.n_jobs, dtype=np.int_)
for i in range(len(rcpsp_problem.tasks_list)):
task = rcpsp_problem.tasks_list[i]
index_mode = 0
for mode in sorted(
rcpsp_problem.mode_details[rcpsp_problem.tasks_list[i]].keys()
):
for k in range(len(rcpsp_problem.resources_list)):
consumption_array[i, index_mode, k] = rcpsp_problem.mode_details[task][
mode
].get(rcpsp_problem.resources_list[k], 0)
duration_array[i, index_mode] = rcpsp_problem.mode_details[task][mode][
"duration"
]
index_mode += 1
task_index = {rcpsp_problem.tasks_list[i]: i for i in range(rcpsp_problem.n_jobs)}
for k in range(len(rcpsp_problem.resources_list)):
if rcpsp_problem.is_varying_resource():
ressource_available[k, :] = rcpsp_problem.resources[ # type: ignore
rcpsp_problem.resources_list[k]
][: ressource_available.shape[1]]
else:
ressource_available[k, :] = np.full(
ressource_available.shape[1],
rcpsp_problem.resources[rcpsp_problem.resources_list[k]],
dtype=np.int_,
)
if rcpsp_problem.resources_list[k] in rcpsp_problem.non_renewable_resources:
ressource_renewable[k] = False
for i in range(len(rcpsp_problem.tasks_list)):
task = rcpsp_problem.tasks_list[i]
for s in rcpsp_problem.successors[task]:
index_s = task_index[s]
predecessors[index_s, i] = 1
successors[i, index_s] = 1
if "special_constraints" in rcpsp_problem.__dict__.keys():
for t in rcpsp_problem.special_constraints.start_times_window:
if rcpsp_problem.special_constraints.start_times_window[t][0] is not None:
minimum_starting_time_array[
rcpsp_problem.index_task[t]
] = rcpsp_problem.special_constraints.start_times_window[t][0]
func_sgs = partial(
sgs_fast,
consumption_array=consumption_array,
duration_array=duration_array,
predecessors=predecessors,
successors=successors,
horizon=horizon,
ressource_available=ressource_available,
ressource_renewable=ressource_renewable,
minimum_starting_time_array=minimum_starting_time_array,
)
func_sgs_2 = partial(
sgs_fast_partial_schedule_incomplete_permutation_tasks,
consumption_array=consumption_array,
duration_array=duration_array,
predecessors=predecessors,
successors=successors,
horizon=horizon,
ressource_available=ressource_available,
ressource_renewable=ressource_renewable,
minimum_starting_time_array=minimum_starting_time_array,
)
func_compute_mean_resource = partial(
compute_mean_ressource,
consumption_array=consumption_array,
ressource_available=ressource_available,
ressource_renewable=ressource_renewable,
)
return func_sgs, func_sgs_2, func_compute_mean_resource
[docs]
def evaluate_constraints(
solution: RCPSPSolution,
constraints: SpecialConstraintsDescription,
) -> int:
list_constraints_not_respected = compute_constraints_details(solution, constraints)
return sum([x[-1] for x in list_constraints_not_respected])
[docs]
def compute_constraints_details(
solution: RCPSPSolution,
constraints: SpecialConstraintsDescription,
) -> List[Tuple[str, Hashable, Hashable, Optional[int], Optional[int], int]]:
if not solution.rcpsp_schedule_feasible:
return []
start_together = constraints.start_together
start_at_end = constraints.start_at_end
start_at_end_plus_offset = constraints.start_at_end_plus_offset
start_after_nunit = constraints.start_after_nunit
disjunctive = constraints.disjunctive_tasks
list_constraints_not_respected: List[
Tuple[str, Hashable, Hashable, Optional[int], Optional[int], int]
] = []
for (t1, t2) in start_together:
time1 = solution.get_start_time(t1)
time2 = solution.get_start_time(t2)
b = time1 == time2
if not b:
list_constraints_not_respected += [
("start_together", t1, t2, time1, time2, abs(time2 - time1))
]
for (t1, t2) in start_at_end:
time1 = solution.get_end_time(t1)
time2 = solution.get_start_time(t2)
b = time1 == time2
if not b:
list_constraints_not_respected += [
("start_at_end", t1, t2, time1, time2, abs(time2 - time1))
]
for (t1, t2, off) in start_at_end_plus_offset:
time1 = solution.get_end_time(t1) + off
time2 = solution.get_start_time(t2)
b = time2 >= time1
if not b:
list_constraints_not_respected += [
("start_at_end_plus_offset", t1, t2, time1, time2, abs(time2 - time1))
]
for (t1, t2, off) in start_after_nunit:
time1 = solution.get_start_time(t1) + off
time2 = solution.get_start_time(t2)
b = time2 >= time1
if not b:
list_constraints_not_respected += [
("start_after_nunit", t1, t2, time1, time2, abs(time2 - time1))
]
for t1, t2 in disjunctive:
segt = intersect(
[solution.get_start_time(t1), solution.get_end_time(t1)],
[solution.get_start_time(t2), solution.get_end_time(t2)],
)
if segt is not None:
list_constraints_not_respected += [
("disjunctive", t1, t2, None, None, segt[1] - segt[0])
]
for t in constraints.start_times_window:
if constraints.start_times_window[t][0] is not None:
if solution.get_start_time(t) < constraints.start_times_window[t][0]: # type: ignore
list_constraints_not_respected += [
(
"start_window_0",
t,
t,
None,
None,
constraints.start_times_window[t][0] # type: ignore
- solution.get_start_time(t),
)
]
if constraints.start_times_window[t][1] is not None:
if solution.get_start_time(t) > constraints.start_times_window[t][1]: # type: ignore
list_constraints_not_respected += [
(
"start_window_1",
t,
t,
None,
None,
-constraints.start_times_window[t][1] # type: ignore
+ solution.get_start_time(t),
)
]
for t in constraints.end_times_window:
if constraints.end_times_window[t][0] is not None:
if solution.get_end_time(t) < constraints.end_times_window[t][0]: # type: ignore
list_constraints_not_respected += [
(
"end_window_0",
t,
t,
None,
None,
constraints.end_times_window[t][0] - solution.get_end_time(t), # type: ignore
)
]
if constraints.end_times_window[t][1] is not None:
if solution.get_end_time(t) > constraints.end_times_window[t][1]: # type: ignore
list_constraints_not_respected += [
(
"end_window_1",
t,
t,
None,
None,
-constraints.end_times_window[t][1] + solution.get_end_time(t), # type: ignore
)
]
if constraints.pair_mode_constraint is not None:
list_constraints_not_respected += compute_details_mode_constraint(
solution=solution, pair_mode_constraint=constraints.pair_mode_constraint
)
return list_constraints_not_respected
[docs]
def check_solution_with_special_constraints(
problem: RCPSPModel,
solution: RCPSPSolution,
relax_the_start_at_end: bool = True,
) -> bool:
if not solution.rcpsp_schedule_feasible:
return False
start_together = problem.special_constraints.start_together
start_at_end = problem.special_constraints.start_at_end
start_at_end_plus_offset = problem.special_constraints.start_at_end_plus_offset
start_after_nunit = problem.special_constraints.start_after_nunit
disjunctive = problem.special_constraints.disjunctive_tasks
for (t1, t2) in start_together:
if not relax_the_start_at_end:
b = solution.get_start_time(t1) == solution.get_start_time(t2)
if not b:
return False
for (t1, t2) in start_at_end:
if relax_the_start_at_end:
b = solution.get_start_time(t2) >= solution.get_end_time(t1)
else:
b = solution.get_start_time(t2) == solution.get_end_time(t1)
if not b:
return False
for (t1, t2, off) in start_at_end_plus_offset:
b = solution.get_start_time(t2) >= solution.get_end_time(t1) + off
if not b:
logger.debug(("start_at_end_plus_offset NOT respected: ", t1, t2, off))
logger.debug(
(
solution.get_start_time(t2),
" >= ",
solution.get_end_time(t1),
"+",
off,
)
)
return False
for (t1, t2, off) in start_after_nunit:
b = solution.get_start_time(t2) >= solution.get_start_time(t1) + off
if not b:
logger.debug(("start_after_nunit NOT respected: ", t1, t2, off))
return False
for t1, t2 in disjunctive:
if (
intersect(
[solution.get_start_time(t1), solution.get_end_time(t1)],
[solution.get_start_time(t2), solution.get_end_time(t2)],
)
is not None
):
return False
for t in problem.special_constraints.start_times_window:
if problem.special_constraints.start_times_window[t][0] is not None:
if (
solution.get_start_time(t) # type: ignore
< problem.special_constraints.start_times_window[t][0]
):
logger.debug(
(
"start time 0, ",
t,
solution.get_start_time(t),
problem.special_constraints.start_times_window[t][0],
)
)
return False
if problem.special_constraints.start_times_window[t][1] is not None:
if (
solution.get_start_time(t) # type: ignore
> problem.special_constraints.start_times_window[t][1]
):
logger.debug(
(
"start time 1, ",
t,
solution.get_start_time(t),
problem.special_constraints.start_times_window[t][1],
)
)
return False
for t in problem.special_constraints.end_times_window:
if problem.special_constraints.end_times_window[t][0] is not None:
if (
solution.get_end_time(t) # type: ignore
< problem.special_constraints.end_times_window[t][0]
):
logger.debug(
(
"end time 0, ",
t,
solution.get_end_time(t),
problem.special_constraints.end_times_window[t][0],
)
)
return False
if problem.special_constraints.end_times_window[t][1] is not None:
if (
solution.get_end_time(t) # type: ignore
> problem.special_constraints.end_times_window[t][1]
):
logger.debug(
(
"end time 1, ",
t,
solution.get_end_time(t),
problem.special_constraints.end_times_window[t][1],
)
)
return False
if problem.special_constraints.pair_mode_constraint is not None:
b = check_pair_mode_constraint(
solution=solution,
pair_mode_constraint=problem.special_constraints.pair_mode_constraint,
)
if not b:
return False
return True
[docs]
def check_pair_mode_constraint(
solution: RCPSPSolution, pair_mode_constraint: PairModeConstraint
):
if pair_mode_constraint.allowed_mode_assignment is not None:
for ac1, ac2 in pair_mode_constraint.allowed_mode_assignment:
mode_ac1 = solution.get_mode(ac1)
mode_ac2 = solution.get_mode(ac2)
if (mode_ac1, mode_ac2) not in pair_mode_constraint.allowed_mode_assignment[
ac1, ac2
]:
return False
return True
if pair_mode_constraint.same_score_mode is not None:
for ac1, ac2 in pair_mode_constraint.same_score_mode:
score_ac1 = pair_mode_constraint.score_mode[ac1, solution.get_mode(ac1)]
score_ac2 = pair_mode_constraint.score_mode[ac2, solution.get_mode(ac2)]
if score_ac1 != score_ac2:
return False
return True
[docs]
def compute_details_mode_constraint(
solution: RCPSPSolution, pair_mode_constraint: PairModeConstraint
):
list_constraints_not_respected: List[
Tuple[str, Hashable, Hashable, Optional[int], Optional[int], int]
] = []
if pair_mode_constraint.allowed_mode_assignment is not None:
for ac1, ac2 in pair_mode_constraint.allowed_mode_assignment:
mode_ac1 = solution.get_mode(ac1)
mode_ac2 = solution.get_mode(ac2)
if (mode_ac1, mode_ac2) not in pair_mode_constraint.allowed_mode_assignment[
ac1, ac2
]:
list_constraints_not_respected.append(
("pair_mode_assignment", ac1, ac2, mode_ac1, mode_ac2, 100)
)
return list_constraints_not_respected
if pair_mode_constraint.same_score_mode is not None:
for ac1, ac2 in pair_mode_constraint.same_score_mode:
score_ac1 = pair_mode_constraint.score_mode[ac1, solution.get_mode(ac1)]
score_ac2 = pair_mode_constraint.score_mode[ac2, solution.get_mode(ac2)]
if score_ac1 != score_ac2:
list_constraints_not_respected.append(
("pair_mode_score", ac1, ac2, score_ac1, score_ac2, 100)
)
return list_constraints_not_respected