Source code for discrete_optimization.rcpsp.plots.rcpsp_utils_preemptive

#  Copyright (c) 2022 AIRBUS and its affiliates.
#  This source code is licensed under the MIT license found in the
#  LICENSE file in the root directory of this source tree.

import logging
from typing import List, Union

import matplotlib.pyplot as plt
import numpy as np
from matplotlib.collections import PatchCollection
from matplotlib.font_manager import FontProperties
from matplotlib.patches import Polygon as pp
from shapely.geometry import Polygon

from discrete_optimization.generic_tools.plot_utils import (
    get_cmap,
    get_cmap_with_nb_colors,
)
from discrete_optimization.rcpsp.rcpsp_model_preemptive import (
    RCPSPModelPreemptive,
    RCPSPSolutionPreemptive,
)

logger = logging.getLogger(__name__)


[docs] def compute_resource_consumption( rcpsp_model: RCPSPModelPreemptive, rcpsp_sol: RCPSPSolutionPreemptive, list_resources: List[Union[int, str]] = None, future_view=True, ): modes_dict = rcpsp_model.get_modes_dict(rcpsp_sol) modes_dict[rcpsp_model.source_task] = 1 modes_dict[rcpsp_model.sink_task] = 1 last_activity = rcpsp_model.sink_task makespan = rcpsp_sol.get_end_time(last_activity) if list_resources is None: list_resources = rcpsp_model.resources_list consumptions = np.zeros((len(list_resources), makespan + 1)) for act_id in rcpsp_model.get_tasks_list(): for ir in range(len(list_resources)): use_ir = rcpsp_model.mode_details[act_id][modes_dict[act_id]].get( list_resources[ir], 0 ) for s, e in zip( rcpsp_sol.get_start_times_list(act_id), rcpsp_sol.get_end_times_list(act_id), ): if future_view: consumptions[ir, s + 1 : e + 1] += use_ir else: consumptions[ir, s:e] += use_ir return consumptions, np.arange(0, makespan + 1, 1)
[docs] def compute_nice_resource_consumption( rcpsp_model: RCPSPModelPreemptive, rcpsp_sol: RCPSPSolutionPreemptive, list_resources: List[Union[int, str]] = None, ): if list_resources is None: list_resources = rcpsp_model.resources_list c_future, times = compute_resource_consumption( rcpsp_model, rcpsp_sol, list_resources=list_resources, future_view=True ) c_past, times = compute_resource_consumption( rcpsp_model, rcpsp_sol, list_resources=list_resources, future_view=False ) merged_times = {i: [] for i in range(len(list_resources))} merged_cons = {i: [] for i in range(len(list_resources))} for r in range(len(list_resources)): for index_t in range(len(times)): merged_times[r] += [times[index_t], times[index_t]] merged_cons[r] += [c_future[r, index_t], c_past[r, index_t]] for r in merged_times: merged_times[r] = np.array(merged_times[r]) merged_cons[r] = np.array(merged_cons[r]) return merged_times, merged_cons
[docs] def plot_ressource_view( rcpsp_model: RCPSPModelPreemptive, rcpsp_sol: RCPSPSolutionPreemptive, list_resource: List[Union[int, str]] = None, title_figure="", x_lim=None, fig=None, ax=None, ): modes_dict = rcpsp_model.get_modes_dict(rcpsp_sol) with_calendar = rcpsp_model.is_varying_resource() if list_resource is None: list_resource = rcpsp_model.resources_list if ax is None: fig, ax = plt.subplots(nrows=len(list_resource), figsize=(10, 5), sharex=True) if len(list_resource) == 1: ax = [ax] fig.suptitle(title_figure) polygons_ax = {i: [] for i in range(len(list_resource))} labels_ax = {i: [] for i in range(len(list_resource))} sorted_activities = sorted( rcpsp_model.get_tasks_list(), key=lambda x: rcpsp_sol.get_start_time(x) ) for j in sorted_activities: time_starts = rcpsp_sol.get_start_times_list(j) # rcpsp_schedule[j]["starts"] time_ends = rcpsp_sol.get_end_times_list(j) # rcpsp_schedule[j]["ends"] index = 0 for time_start, time_end in zip(time_starts, time_ends): for i in range(len(list_resource)): cons = rcpsp_model.mode_details[j][modes_dict[j]].get( list_resource[i], 0 ) if cons == 0: continue bound = ( rcpsp_model.get_resource_available(list_resource[i], 0) if not with_calendar else max( rcpsp_model.get_resource_availability_array(list_resource[i]) ) ) for k in range(0, bound + 5): polygon = Polygon( [ (time_start, k), (time_end, k), (time_end, k + cons), (time_start, k + cons), (time_start, k), ] ) areas = [p.intersection(polygon).area for p in polygons_ax[i]] if len(areas) == 0 or max(areas) == 0: polygons_ax[i].append(polygon) labels_ax[i].append(j) ax[i].annotate( j, xy=((time_start + time_end) / 2, (k + cons / 2)), font_properties=FontProperties(size=6, weight="bold"), verticalalignment="center", horizontalalignment="left", color="k", clip_on=True, ) break index += 1 for i in range(len(list_resource)): patches = [] for polygon in polygons_ax[i]: x, y = polygon.exterior.xy ax[i].plot(x, y, zorder=-1, color="b") patches.append(pp(xy=polygon.exterior.coords)) p = PatchCollection(patches, cmap=get_cmap("Blues"), alpha=0.4) ax[i].add_collection(p) merged_times, merged_cons = compute_nice_resource_consumption( rcpsp_model, rcpsp_sol, list_resources=list_resource ) for i in range(len(list_resource)): ax[i].plot( merged_times[i], merged_cons[i], color="r", linewidth=2, label="Consumption " + str(list_resource[i]), zorder=1, ) if not with_calendar: ax[i].axhline( y=rcpsp_model.get_resource_available(list_resource[i], 0), linestyle="--", label="Limit : " + str(list_resource[i]), zorder=0, ) else: ax[i].plot( merged_times[i], [ rcpsp_model.get_resource_available(list_resource[i], m) for m in merged_times[i] ], linestyle="--", label="Limit : " + str(list_resource[i]), zorder=0, ) ax[i].legend(fontsize=5) lims = ax[i].get_xlim() if x_lim is None: ax[i].set_xlim([lims[0], 1.0 * lims[1]]) else: ax[i].set_xlim(x_lim) ax[-1].set_xlabel("Timestep") return fig
[docs] def plot_task_gantt( rcpsp_model: RCPSPModelPreemptive, rcpsp_sol: RCPSPSolutionPreemptive, fig=None, ax=None, x_lim=None, title=None, current_t=None, ): if fig is None or ax is None: fig, ax = plt.subplots(1, figsize=(7, 7)) ax.set_title("Gantt Task") if title is None: ax.set_title("Gantt Task") else: ax.set_title(title) tasks = rcpsp_model.tasks_list nb_task = len(tasks) sorted_task_by_start = sorted( rcpsp_sol.rcpsp_schedule, key=lambda x: 100000 * rcpsp_sol.rcpsp_schedule[x]["starts"][0] + rcpsp_model.index_task[x], ) sorted_task_by_end = sorted( rcpsp_sol.rcpsp_schedule, key=lambda x: 100000 * rcpsp_sol.rcpsp_schedule[x]["ends"][-1] + rcpsp_model.index_task[x], ) max_time = rcpsp_sol.rcpsp_schedule[sorted_task_by_end[-1]]["ends"][-1] min_time = rcpsp_sol.rcpsp_schedule[sorted_task_by_start[0]]["starts"][0] patches = [] for j in range(nb_task): nb_colors = len(tasks) // 2 colors = get_cmap_with_nb_colors("hsv", nb_colors) for start, end in zip( rcpsp_sol.rcpsp_schedule[tasks[j]]["starts"], rcpsp_sol.rcpsp_schedule[tasks[j]]["ends"], ): box = [ (j - 0.25, start), (j - 0.25, end), (j + 0.25, end), (j + 0.25, start), (j - 0.25, start), ] polygon = Polygon([(b[1], b[0]) for b in box]) x, y = polygon.exterior.xy ax.plot(x, y, zorder=-1, color="b") patches.append( pp(xy=polygon.exterior.coords, facecolor=colors((j - 1) % nb_colors)) ) ax.annotate( tasks[j], xy=( ( rcpsp_sol.rcpsp_schedule[tasks[j]]["starts"][0] + rcpsp_sol.rcpsp_schedule[tasks[j]]["ends"][0] ) / 2, j, ), font_properties=FontProperties(size=7, weight="bold"), verticalalignment="center", horizontalalignment="left", color="k", clip_on=True, ) p = PatchCollection(patches, match_original=True, alpha=0.4) ax.add_collection(p) if x_lim is None: ax.set_xlim((min_time, max_time)) else: ax.set_xlim(x_lim) ax.set_ylim((-0.5, nb_task)) ax.set_yticks(range(nb_task)) ax.set_yticklabels( tuple([str(tasks[j]) for j in range(nb_task)]), fontdict={"size": 5} ) ax.set_ylabel("Task number") ax.set_xlabel("Timestep") return fig
[docs] def compute_schedule_per_resource_individual( rcpsp_model: RCPSPModelPreemptive, rcpsp_sol: RCPSPSolutionPreemptive, resource_types_to_consider: List[str] = None, ): modes_dict = rcpsp_model.build_mode_dict( rcpsp_modes_from_solution=rcpsp_sol.rcpsp_modes ) if resource_types_to_consider is None: resources = rcpsp_model.resources_list else: resources = resource_types_to_consider sorted_task_by_start = sorted( rcpsp_sol.rcpsp_schedule, key=lambda x: 100000 * rcpsp_sol.rcpsp_schedule[x]["starts"][0] + rcpsp_model.index_task[x], ) sorted_task_by_end = sorted( rcpsp_sol.rcpsp_schedule, key=lambda x: 100000 * rcpsp_sol.rcpsp_schedule[x]["ends"][-1] + rcpsp_model.index_task[x], ) max_time = rcpsp_sol.rcpsp_schedule[sorted_task_by_end[-1]]["ends"][-1] min_time = rcpsp_sol.rcpsp_schedule[sorted_task_by_start[0]]["starts"][0] with_calendar = rcpsp_model.is_varying_resource() array_ressource_usage = { resources[i]: { "activity": np.zeros( ( max_time - min_time + 1, max(rcpsp_model.resources[resources[i]]) if with_calendar else rcpsp_model.resources[resources[i]], ) ), "binary_activity": np.zeros( ( max_time - min_time + 1, max(rcpsp_model.resources[resources[i]]) if with_calendar else rcpsp_model.resources[resources[i]], ) ), "total_activity": np.zeros( max(rcpsp_model.resources[resources[i]]) if with_calendar else rcpsp_model.resources[resources[i]] ), "activity_last_n_hours": np.zeros( ( max_time - min_time + 1, max(rcpsp_model.resources[resources[i]]) if with_calendar else rcpsp_model.resources[resources[i]], ) ), "boxes_time": [], } for i in range(len(resources)) } total_time = max_time - min_time + 1 nhour = int(min(8, total_time / 2 - 1)) index_to_time = {i: min_time + i for i in range(max_time - min_time + 1)} time_to_index = {index_to_time[i]: i for i in index_to_time} for activity in sorted_task_by_start: mode = modes_dict[activity] start_times = rcpsp_sol.rcpsp_schedule[activity]["starts"] end_times = rcpsp_sol.rcpsp_schedule[activity]["ends"] for start_time, end_time in zip(start_times, end_times): if end_time == start_time: continue resources_needed = { r: rcpsp_model.mode_details[activity][mode].get(r, 0) for r in resources } for r in resources_needed: if r not in array_ressource_usage: continue rneeded = resources_needed[r] if not with_calendar: range_interest = range( array_ressource_usage[r]["activity"].shape[1] ) else: range_interest = range( rcpsp_model.resources[r][time_to_index[start_time]] ) while rneeded > 0: availables_people_r = [ i for i in range_interest if array_ressource_usage[r]["activity"][ time_to_index[start_time], i ] == 0 ] logger.debug(f"{len(availables_people_r)} people available : ") if len(availables_people_r) > 0: resource = min( availables_people_r, key=lambda x: array_ressource_usage[r]["total_activity"][x], ) # greedy choice, # the one who worked the less until now. array_ressource_usage[r]["activity"][ time_to_index[start_time] : time_to_index[end_time], resource, ] = ( rcpsp_model.index_task[activity] if isinstance(activity, str) else activity ) array_ressource_usage[r]["binary_activity"][ time_to_index[start_time] : time_to_index[end_time], resource, ] = 1 array_ressource_usage[r]["total_activity"][resource] += ( end_time - start_time ) array_ressource_usage[r]["activity_last_n_hours"][ :, resource ] = np.convolve( array_ressource_usage[r]["binary_activity"][:, resource], np.array([1] * nhour + [0] + [0] * nhour), mode="same", ) array_ressource_usage[r]["boxes_time"] += [ [ ( resource - 0.25, start_time + 0.01, rcpsp_model.index_task[activity], ), ( resource - 0.25, end_time - 0.01, rcpsp_model.index_task[activity], ), ( resource + 0.25, end_time - 0.01, rcpsp_model.index_task[activity], ), ( resource + 0.25, start_time + 0.01, rcpsp_model.index_task[activity], ), ( resource - 0.25, start_time + 0.01, rcpsp_model.index_task[activity], ), ] ] # for plot purposes. rneeded -= 1 else: logger.debug(f"r_needed {rneeded}") logger.debug(f"Ressource needed : {resources_needed}") logger.debug(f"ressource : {r}") logger.debug(f"activity : {activity}") logger.warning("Problem, can't build schedule") logger.debug(array_ressource_usage[r]["activity"]) rneeded = 0 return array_ressource_usage
[docs] def plot_resource_individual_gantt( rcpsp_model: RCPSPModelPreemptive, rcpsp_sol: RCPSPSolutionPreemptive, resource_types_to_consider: List[str] = None, title_figure="", x_lim=None, fig=None, ax=None, current_t=None, ): array_ressource_usage = compute_schedule_per_resource_individual( rcpsp_model, rcpsp_sol, resource_types_to_consider=resource_types_to_consider ) sorted_task_by_start = sorted( rcpsp_sol.rcpsp_schedule, key=lambda x: 100000 * rcpsp_sol.rcpsp_schedule[x]["starts"][0] + rcpsp_model.index_task[x], ) sorted_task_by_end = sorted( rcpsp_sol.rcpsp_schedule, key=lambda x: 100000 * rcpsp_sol.rcpsp_schedule[x]["ends"][-1] + rcpsp_model.index_task[x], ) max_time = rcpsp_sol.rcpsp_schedule[sorted_task_by_end[-1]]["ends"][-1] min_time = rcpsp_sol.rcpsp_schedule[sorted_task_by_start[0]]["starts"][0] resources_list = list(array_ressource_usage.keys()) if fig is None or ax is None: fig, ax = plt.subplots(len(array_ressource_usage), figsize=(10, 5)) fig.suptitle(title_figure) if len(array_ressource_usage) == 1: ax = [ax] for i in range(len(resources_list)): patches = [] nb_colors = len(sorted_task_by_start) // 2 colors = get_cmap_with_nb_colors("hsv", nb_colors) for boxe in array_ressource_usage[resources_list[i]]["boxes_time"]: polygon = Polygon([(b[1], b[0]) for b in boxe]) activity = boxe[0][2] x, y = polygon.exterior.xy ax[i].plot(x, y, zorder=-1, color="b") patches.append( pp( xy=polygon.exterior.coords, facecolor=colors((activity - 1) % nb_colors), ) ) p = PatchCollection( patches, match_original=True, alpha=0.4, ) ax[i].add_collection(p) ax[i].set_title(resources_list[i]) if x_lim is None: ax[i].set_xlim((min_time, max_time)) else: ax[i].set_xlim(x_lim) try: ax[i].set_ylim((-0.5, rcpsp_model.resources[resources_list[i]])) ax[i].set_yticks(range(rcpsp_model.resources[resources_list[i]])) ax[i].set_yticklabels( tuple([j for j in range(rcpsp_model.resources[resources_list[i]])]), fontdict={"size": 7}, ) except: m = max(rcpsp_model.resources[resources_list[i]]) ax[i].set_ylim((-0.5, m)) ax[i].set_yticks(range(m)) ax[i].set_yticklabels(tuple([j for j in range(m)]), fontdict={"size": 7}) ax[i].grid(True) if current_t is not None: ax[i].axvline(x=current_t, label="pyplot vertical line", color="r", ls="--") ax[-1].set_xlabel("Timestep") return fig