Source code for discrete_optimization.rcpsp_multiskill.rcpsp_multiskill_parser

#  Copyright (c) 2022 AIRBUS and its affiliates.
#  This source code is licensed under the MIT license found in the
#  LICENSE file in the root directory of this source tree.

import os
from typing import Dict, Optional, Tuple

from discrete_optimization.datasets import get_data_home
from discrete_optimization.rcpsp_multiskill.rcpsp_multiskill import (
    Employee,
    MS_RCPSPModel,
    SkillDetail,
)


[docs] def get_data_available( data_folder: Optional[str] = None, data_home: Optional[str] = None ): """Get datasets available for rcpsp_multiskill. Params: data_folder: folder where datasets for rcpsp_multiskill whould be find. If None, we look in "rcpsp_multiskill" subdirectory of `data_home`. data_home: root directory for all datasets. Is None, set by default to "~/discrete_optimization_data " """ if data_folder is None: data_home = get_data_home(data_home=data_home) data_folder = f"{data_home}/rcpsp_multiskill" try: files = [f for f in os.listdir(data_folder) if f.endswith(".def")] except FileNotFoundError: files = [] return [os.path.abspath(os.path.join(data_folder, f)) for f in files]
[docs] def parse_imopse( input_data, max_horizon=None, one_unit_per_task=True, preemptive=False ): # parse the input lines = input_data.split("\n") # "General characteristics: # Tasks: 161 # Resources: 10 # Precedence relations: 321 # Number of skill types: 9 # ==================================================================================================================== # ResourceID Salary Skills # 1 14.2 Q2: 0 Q3: 2 Q1: 0 Q4: 2 Q7: 1 Q8: 2 # 2 31.2 Q0: 0 Q4: 2 Q7: 1 Q3: 1 Q8: 2 Q2: 0 # 3 34.4 Q4: 0 Q2: 1 Q6: 2 Q3: 1 Q0: 1 Q5: 0 # 4 26.0 Q5: 2 Q1: 1 Q4: 1 Q8: 2 Q0: 2 Q2: 2 # 5 30.8 Q8: 0 Q7: 1 Q3: 1 Q1: 2 Q4: 1 Q5: 1 # 6 17.3 Q6: 1 Q3: 2 Q4: 2 Q2: 0 Q7: 2 Q1: 0 # 7 19.8 Q1: 2 Q4: 2 Q5: 0 Q7: 1 Q3: 1 Q6: 2 # 8 35.8 Q2: 1 Q0: 1 Q3: 2 Q6: 0 Q7: 0 Q8: 1 # 9 37.6 Q7: 0 Q5: 2 Q2: 0 Q1: 0 Q0: 1 Q3: 1 # 10 23.5 Q8: 1 Q5: 1 Q1: 2 Q6: 0 Q4: 0 Q3: 2 " nb_task = None nb_worker = None nb_precedence_relation = None nb_skills = None resource_zone = False task_zone = False resource_dict = {} task_dict = {} real_skills_found = set() for line in lines: words = line.split() if len(words) == 2 and words[0] == "Tasks:": nb_task = int(words[1]) continue if len(words) == 2 and words[0] == "Resources:": nb_worker = int(words[1]) continue if len(words) == 3 and words[0] == "Precedence" and words[1] == "relations:": nb_precedence_relation = int(words[2]) continue if len(words) == 5 and words[0] == "Number" and words[1] == "of": nb_skills = int(words[4]) continue if len(words) == 0: continue if words[0] == "ResourceID": resource_zone = True continue if words[0] == "TaskID": task_zone = True continue if resource_zone: if words[0][0] == "=": resource_zone = False continue else: id_worker = words[0] resource_dict[id_worker] = {"salary": float(words[1])} for word in words[2:]: if word[0] == "Q": current_skill = word[:-1] continue resource_dict[id_worker][current_skill] = int(word) + 1 real_skills_found.add(current_skill) if task_zone: if words[0][0] == "=": task_zone = False continue else: task_id = int(words[0]) if task_id not in task_dict: task_dict[task_id] = {"id": task_id, "successors": [], "skills": {}} task_dict[task_id]["duration"] = int(words[1]) i = 2 while i < len(words): if words[i][0] == "Q": current_skill = words[i][:-1] task_dict[task_id]["skills"][current_skill] = int(words[i + 1]) + 1 real_skills_found.add(current_skill) i = i + 2 continue else: if "precedence" not in task_dict[task_id]: task_dict[task_id]["precedence"] = [] task_dict[task_id]["precedence"] += [int(words[i])] if int(words[i]) not in task_dict: task_dict[int(words[i])] = { "id": int(words[i]), "successors": [], "skills": {}, } if "successors" not in task_dict[int(words[i])]: task_dict[int(words[i])]["successors"] = [] task_dict[int(words[i])]["successors"] += [task_id] i += 1 sorted_task_names = sorted(task_dict.keys()) task_id_to_new_name = { sorted_task_names[i]: i + 2 for i in range(len(sorted_task_names)) } new_tame_to_original_task_id = { task_id_to_new_name[ind]: ind for ind in task_id_to_new_name } mode_details = { task_id_to_new_name[task_id]: {1: {"duration": task_dict[task_id]["duration"]}} for task_id in task_dict } resource_dict = {int(i): resource_dict[i] for i in resource_dict} skills = real_skills_found for task_id in task_dict: for skill in skills: req_squill = task_dict[task_id]["skills"].get(skill, 0.0) mode_details[task_id_to_new_name[task_id]][1][skill] = req_squill mode_details[1] = {1: {"duration": 0}} for skill in skills: mode_details[1][1][skill] = int(0) max_t = max(mode_details) mode_details[max_t + 1] = {1: {"duration": 0}} for skill in skills: mode_details[max_t + 1][1][skill] = int(0) successors = { task_id_to_new_name[task_id]: [ task_id_to_new_name[t] for t in task_dict[task_id]["successors"] ] + [max_t + 1] for task_id in task_dict } successors[max_t + 1] = [] successors[1] = [k for k in successors] horizon_large = 2 * sum([task_dict[task_id]["duration"] for task_id in task_dict]) max_horizon = horizon_large if max_horizon is None else max_horizon return ( MS_RCPSPModel( skills_set=set(real_skills_found), resources_set=set(), non_renewable_resources=set(), resources_availability={}, employees={ res: Employee( dict_skill={ skill: SkillDetail( skill_value=resource_dict[res][skill], efficiency_ratio=1.0, experience=1.0, ) for skill in resource_dict[res] if skill != "salary" }, salary=resource_dict[res]["salary"], calendar_employee=[True] * max_horizon, ) for res in resource_dict }, employees_availability=[len(resource_dict)] * max_horizon, mode_details=mode_details, successors=successors, horizon=max_horizon, source_task=1, sink_task=max_t + 1, one_unit_per_task_max=one_unit_per_task, preemptive=preemptive, ), new_tame_to_original_task_id, )
[docs] def parse_file( file_path, max_horizon=None, one_unit_per_task=True, preemptive=False ) -> Tuple[MS_RCPSPModel, Dict]: with open(file_path, "r", encoding="utf-8") as input_data_file: input_data = input_data_file.read() rcpsp_model, new_tame_to_original_task_id = parse_imopse( input_data, max_horizon, one_unit_per_task, preemptive=preemptive ) return rcpsp_model, new_tame_to_original_task_id