Source code for discrete_optimization.pickup_vrp.solver.ortools_solver
# Copyright (c) 2022 AIRBUS and its affiliates.
# This source code is licensed under the MIT license found in the
# LICENSE file in the root directory of this source tree.
from __future__ import print_function
import logging
from enum import Enum
from functools import partial
from typing import Any, Dict, List, Optional, Tuple, Union
import numpy as np
from ortools.constraint_solver import (
pywrapcp,
routing_enums_pb2,
routing_parameters_pb2,
)
from ortools.util.optional_boolean_pb2 import BOOL_FALSE, BOOL_TRUE
from discrete_optimization.generic_tools.callbacks.callback import (
Callback,
CallbackList,
)
from discrete_optimization.generic_tools.do_problem import ParamsObjectiveFunction
from discrete_optimization.generic_tools.exceptions import SolveEarlyStop
from discrete_optimization.generic_tools.hyperparameters.hyperparameter import (
CategoricalHyperparameter,
EnumHyperparameter,
)
from discrete_optimization.generic_tools.result_storage.result_storage import (
ResultStorage,
)
from discrete_optimization.pickup_vrp.gpdp import (
GPDP,
GPDPSolution,
Node,
build_matrix_distance,
build_matrix_time,
)
from discrete_optimization.pickup_vrp.solver.pickup_vrp_solver import SolverPickupVrp
logger = logging.getLogger(__name__)
[docs]
class NodePosition(Enum):
"""Node position inside a trajectory.
Useful e.g. to distinguish between the starting node and ending node
when the trajectory is a loop.
"""
START = "start"
INTERMEDIATE = "intermediate"
END = "end"
[docs]
class ParametersCost:
def __init__(
self,
dimension_name: str,
global_span: bool = True,
sum_over_vehicles: bool = False,
coefficient_vehicles: Union[float, List[float]] = 100,
):
self.dimension_name = dimension_name
self.global_span = global_span
self.sum_over_vehicles = sum_over_vehicles
self.coefficient_vehicles = coefficient_vehicles
self.different_coefficient = (
isinstance(self.coefficient_vehicles, list)
and len(set(self.coefficient_vehicles)) > 1
)
[docs]
@staticmethod
def default() -> "ParametersCost":
return ParametersCost(
dimension_name="Distance",
global_span=True,
sum_over_vehicles=False,
coefficient_vehicles=100,
)
[docs]
def apply_cost(
list_parameters_cost: List[ParametersCost], routing: pywrapcp.RoutingModel
) -> None:
dimension_names = set([p.dimension_name for p in list_parameters_cost])
dimension_dict = {d: routing.GetDimensionOrDie(d) for d in dimension_names}
for p in list_parameters_cost:
if p.global_span:
dimension_dict[p.dimension_name].SetGlobalSpanCostCoefficient(
p.coefficient_vehicles
)
else:
if p.different_coefficient:
for i in range(len(p.coefficient_vehicles)): # type: ignore
dimension_dict[p.dimension_name].SetSpanCostCoefficientForVehicle(
p.coefficient_vehicles[i], i # type: ignore
)
else:
dimension_dict[p.dimension_name].SetSpanCostCoefficientForAllVehicles(
p.coefficient_vehicles[0] # type: ignore
)
LocalSearchMetaheuristic = Enum(
value="LocalSearchMetaheuristic",
names={
name: val.number
for name, val in routing_enums_pb2.LocalSearchMetaheuristic.DESCRIPTOR.enum_values_by_name.items()
},
module=__name__, # to avoid pickle issues
)
"""Enumeration of local search meta-heuristic.
Extracted from ortools Enum-like class.
https://developers.google.com/optimization/routing/routing_options#local_search_options
"""
FirstSolutionStrategy = Enum(
value="FirstSolutionStrategy",
names={
name: val.number
for name, val in routing_enums_pb2.FirstSolutionStrategy.DESCRIPTOR.enum_values_by_name.items()
},
module=__name__, # to avoid pickle issues
)
"""Enumeration of first solution strategies.
Extracted from ortools Enum-like class.
https://developers.google.com/optimization/routing/routing_options#first_solution_strategy
"""
status_description = {
val: key
for key, val in pywrapcp.RoutingModel.__dict__.items()
if key.startswith("ROUTING_")
}
"""Mapping from status integer to description string.
This maps the integer returned by routing_model.status to the corresponding string.
We use the attributes of RoutingModel to construct the dictionary.
https://developers.google.com/optimization/routing/routing_options#search_status
"""
[docs]
class ORToolsGPDP(SolverPickupVrp):
problem: GPDP
hyperparameters = [
EnumHyperparameter(
name="first_solution_strategy",
enum=FirstSolutionStrategy,
default=FirstSolutionStrategy.SAVINGS,
),
EnumHyperparameter(
name="local_search_metaheuristic",
enum=LocalSearchMetaheuristic,
default=LocalSearchMetaheuristic.GUIDED_LOCAL_SEARCH,
),
CategoricalHyperparameter(name="use_lns", choices=[True, False], default=True),
CategoricalHyperparameter(name="use_cp", choices=[True, False], default=True),
CategoricalHyperparameter(
name="use_cp_sat", choices=[True, False], default=False
),
]
def __init__(
self,
problem: GPDP,
factor_multiplier_distance: float = 1,
factor_multiplier_time: float = 1,
params_objective_function: Optional[ParamsObjectiveFunction] = None,
**kwargs: Any,
):
super().__init__(
problem=problem, params_objective_function=params_objective_function
)
self.dimension_names: List[str] = []
self.factor_multiplier_distance = factor_multiplier_distance # 10**3
self.factor_multiplier_time = factor_multiplier_time # 10**3
[docs]
def init_model(self, **kwargs: Any) -> None:
kwargs = self.complete_with_default_hyperparameters(kwargs)
include_time_windows = kwargs.get("include_time_windows", False)
include_time_windows_cluster = kwargs.get("include_time_windows_cluster", False)
include_cumulative = kwargs.get("include_cumulative", False)
include_mandatory = kwargs.get("include_mandatory", True)
include_demand = kwargs.get("include_demand", True)
include_resource_dimension = kwargs.get("include_resource_dimension", False)
include_time_dimension = kwargs.get("include_time_dimension", True)
list_parameters_cost = kwargs.get("parameters_cost", [ParametersCost.default()])
one_visit_per_cluster = kwargs.get("one_visit_per_cluster", False)
include_pickup_and_delivery = kwargs.get("include_pickup_and_delivery", False)
include_pickup_and_delivery_per_cluster = kwargs.get(
"include_pickup_and_delivery_per_cluster", False
)
use_matrix = kwargs.get("use_matrix", True)
include_equilibrate_charge = kwargs.get("include_equilibrate_charge", False)
hard_equilibrate = kwargs.get("hard_equilibrate", False)
use_constant_max_slack_time = kwargs.get("use_constant_max_slack_time", True)
constant_max_slack_time = kwargs.get("max_slack_time", 1000)
use_max_slack_time_per_node = kwargs.get("use_max_slack_time_per_node", False)
max_slack_time_per_node = kwargs.get(
"max_slack_time_per_node", self.problem.slack_time_bound_per_node
)
force_start_cumul_time_zero = kwargs.get("force_start_cumul_time_zero", True)
max_time_per_vehicle = kwargs.get("max_time_per_vehicle", 1000000)
max_distance_per_vehicle = kwargs.get("max_distance_per_vehicle", 1000000)
set_transit_cost_by_default = kwargs.get("set_transit_cost_by_default", True)
# use or not the distance callback function to evaluate the cost of each edge.
# Whatever the cost function you want to optimize (via the list_parameters_cost argument),
# these cost are still included in the objective function
if use_matrix:
matrix_distance_int = kwargs.get("matrix", None)
if matrix_distance_int is None:
matrix_distance = build_matrix_distance(self.problem)
matrix_distance_int = np.array(
matrix_distance * self.factor_multiplier_distance, dtype=np.int_
)
if include_time_dimension:
matrix_time = build_matrix_time(self.problem)
matrix_time_int = np.array(
matrix_time * self.factor_multiplier_time, dtype=np.int_
)
capacities_dict = {
r: [
self.problem.capacities[v][r][1]
for v in range(self.problem.number_vehicle)
]
for r in self.problem.resources_set
}
# Neg Capacity version :
neg_capacity_version = kwargs.get("neg_capacity_version", True)
demands: Dict[str, List[float]]
if neg_capacity_version:
demands = {
r: [
max(0.0, -self.problem.resources_flow_node[node].get(r, 0.0))
for node in self.problem.list_nodes
]
for r in self.problem.resources_set
}
else:
demands = {
r: [
self.problem.resources_flow_node[node].get(r, 0.0)
for node in self.problem.list_nodes
]
for r in self.problem.resources_set
}
self.demands = demands
# Create the routing index manager.
manager = pywrapcp.RoutingIndexManager(
len(self.problem.all_nodes_dict),
self.problem.number_vehicle,
[
self.problem.index_nodes[self.problem.origin_vehicle[v]]
for v in range(self.problem.number_vehicle)
],
[
self.problem.index_nodes[self.problem.target_vehicle[v]]
for v in range(self.problem.number_vehicle)
],
)
routing = pywrapcp.RoutingModel(manager)
consider_empty_route_cost = kwargs.get("consider_empty_route_cost", True)
if consider_empty_route_cost:
for v in range(self.problem.number_vehicle):
try:
# Works for <ortools9.2
routing.ConsiderEmptyRouteCostsForVehicle(True, v)
except:
# Works for >=ortools9.2
routing.SetVehicleUsedWhenEmpty(True, v)
logger.info("routing init")
if use_matrix:
# Create and register a transit callback.
def distance_callback(from_index: int, to_index: int) -> int:
"""Returns the distance between the two nodes."""
# Convert from routing variable Index to distance matrix NodeIndex.
from_node = manager.IndexToNode(from_index)
to_node = manager.IndexToNode(to_index)
return matrix_distance_int[from_node, to_node]
else:
# Create and register a transit callback.
def distance_callback(from_index: int, to_index: int) -> int:
"""Returns the distance between the two nodes."""
# Convert from routing variable Index to distance matrix NodeIndex.
from_node = manager.IndexToNode(from_index)
to_node = manager.IndexToNode(to_index)
return int(
self.problem.distance_delta[
self.problem.nodes_to_index[from_node]
].get(self.problem.nodes_to_index[to_node], 1000000000)
)
if include_equilibrate_charge:
charge = routing.RegisterTransitCallback(lambda i, j: 1)
charge_constraint = kwargs.get("charge_constraint", {})
if all(charge_constraint[i][1] is not None for i in charge_constraint):
max_charge = max([charge_constraint[i][1] for i in charge_constraint])
else:
max_charge = len(self.problem.list_nodes)
routing.AddDimension(
charge,
0,
max_charge,
True,
"charge",
)
charge_dimension = routing.GetDimensionOrDie("charge")
self.dimension_names += ["charge"]
for i in range(self.problem.number_vehicle):
index = routing.End(i)
constraint = charge_constraint[i]
if constraint[0] is not None:
if not hard_equilibrate:
charge_dimension.SetCumulVarSoftLowerBound(
index, constraint[0], 100000000
)
else:
charge_dimension.CumulVar(index).SetMin(constraint[0])
if constraint[1] is not None:
if not hard_equilibrate:
charge_dimension.SetCumulVarSoftUpperBound(
index, constraint[1], 100000000
)
else:
charge_dimension.CumulVar(index).SetMax(constraint[1])
transit_distance_callback_index = routing.RegisterTransitCallback(
distance_callback
)
# Define cost of each arc.
if set_transit_cost_by_default:
routing.SetArcCostEvaluatorOfAllVehicles(transit_distance_callback_index)
logger.info("Distance callback done")
if include_resource_dimension:
def ressource_transition(
from_index: int,
to_index: int,
ressource: str,
problem: GPDP,
vehicle: int,
) -> int:
"""Return the ressource consumption from one node to another."""
# Convert from routing variable Index to demands NodeIndex.
from_node = manager.IndexToNode(from_index)
to_node = manager.IndexToNode(to_index)
l = problem.resources_flow_edges.get(
(
self.problem.nodes_to_index[from_node],
self.problem.nodes_to_index[to_node],
),
{ressource: 0},
)[ressource] + problem.resources_flow_node.get(
to_node, {ressource: 0}
).get(
ressource, 0
)
return int(l)
demand_callback_index_dict = {
r: [
routing.RegisterTransitCallback(
partial(
ressource_transition,
ressource=r,
problem=self.problem,
vehicle=v,
)
)
for v in range(self.problem.number_vehicle)
]
for r in demands
}
for r in demand_callback_index_dict:
routing.AddDimensionWithVehicleTransitAndCapacity(
demand_callback_index_dict[r],
0, # null capacity slack
capacities_dict[r],
# vehicle maximum capacities
True, # start cumul to zero
"Ressource_" + str(r),
)
self.dimension_names += ["Ressource_" + str(r)]
if include_demand:
# Add Capacity constraint.
def demand_callback(from_index: int, ressource: str) -> int:
"""Returns the demand of the node."""
# Convert from routing variable Index to demands NodeIndex.
from_node = manager.IndexToNode(from_index)
return int(demands[ressource][from_node])
demand_callback_index_dict = {
r: routing.RegisterUnaryTransitCallback(
partial(demand_callback, ressource=r)
)
for r in demands
}
for r in demand_callback_index_dict:
routing.AddDimensionWithVehicleCapacity(
demand_callback_index_dict[r],
0, # null capacity slack
capacities_dict[r], # vehicle maximum capacities
True, # start cumul to zero
"Capacity_" + str(r),
)
self.dimension_names += ["Capacity_" + str(r)]
if one_visit_per_cluster:
for cluster in list(self.problem.clusters_to_node):
nodes = [
manager.NodeToIndex(self.problem.index_nodes[n])
for n in self.problem.clusters_to_node[cluster]
if n
not in {
self.problem.origin_vehicle[v]
for v in self.problem.origin_vehicle
}
and n
not in {
self.problem.target_vehicle[v]
for v in self.problem.target_vehicle
}
]
routing.AddDisjunction(nodes, 1000000000, 1)
routing.solver().Add(
routing.solver().Sum([routing.ActiveVar(i) for i in nodes]) == 1
)
slacks = [
int(self.problem.time_delta_node[t] * self.factor_multiplier_time)
for t in self.problem.list_nodes
]
max_slack = max(slacks)
if use_constant_max_slack_time:
max_slack = max(max_slack, constant_max_slack_time)
elif use_max_slack_time_per_node:
max_slack = max(
max_slack,
max([max_slack_time_per_node[n][1] for n in max_slack_time_per_node]),
)
if include_time_dimension:
def time_callback(from_index: int, to_index: int) -> int:
"""Returns the travel time between the two nodes."""
# Convert from routing variable Index to time matrix NodeIndex.
from_node = manager.IndexToNode(from_index)
to_node = manager.IndexToNode(to_index)
return matrix_time_int[from_node, to_node]
transit_time_callback_index = routing.RegisterTransitCallback(time_callback)
time = "Time"
routing.AddDimension(
transit_time_callback_index,
max_slack, # allow waiting time
max_time_per_vehicle, # maximum time per vehicle
force_start_cumul_time_zero, # force or not start cumul to zero.
time,
)
self.dimension_names += [time]
time_dimension = routing.GetDimensionOrDie("Time")
for task in self.problem.all_nodes_dict:
index = manager.NodeToIndex(self.problem.index_nodes[task])
if index == -1:
continue
routing.solver().Add(
time_dimension.SlackVar(index)
>= int(
self.problem.time_delta_node[task] * self.factor_multiplier_time
)
)
if include_pickup_and_delivery:
distance = "Distance"
routing.AddDimension(
transit_distance_callback_index,
0, # allow waiting time on this dimension
max_distance_per_vehicle, # maximum distance per vehicle
True, # force distance cumul to zero.
distance,
)
self.dimension_names += [distance]
distance_dimension = routing.GetDimensionOrDie(distance)
for pickup_deliver in self.problem.list_pickup_deliverable:
pickup = pickup_deliver[0]
deliver = pickup_deliver[1]
for p in pickup:
for d in deliver:
pickup_index = manager.NodeToIndex(self.problem.index_nodes[p])
delivery_index = manager.NodeToIndex(
self.problem.index_nodes[d]
)
routing.AddPickupAndDelivery(pickup_index, delivery_index)
routing.solver().Add(
routing.VehicleVar(pickup_index)
== routing.VehicleVar(delivery_index)
)
routing.solver().Add(
distance_dimension.CumulVar(pickup_index)
<= distance_dimension.CumulVar(delivery_index)
)
if include_pickup_and_delivery_per_cluster:
distance = "Distance"
routing.AddDimension(
transit_distance_callback_index,
0, # allow waiting time
max_distance_per_vehicle, # maximum time per vehicle
True, # force start cumul to zero.
distance,
)
self.dimension_names += [distance]
distance_dimension = routing.GetDimensionOrDie(distance)
if self.problem.list_pickup_deliverable_per_cluster is None:
raise ValueError(
"When include_pickup_and_delivery_per_cluster is True, "
"self.problem.list_pickup_deliverable_per_cluster cannot be None."
)
for pickup_deliver in self.problem.list_pickup_deliverable_per_cluster:
pickup = pickup_deliver[0]
deliver = pickup_deliver[1]
for p in pickup:
for d in deliver:
pp = [
manager.NodeToIndex(x)
for x in self.problem.clusters_to_node[p]
]
dd = [
manager.NodeToIndex(x)
for x in self.problem.clusters_to_node[d]
]
pickup = routing.AddDisjunction(pp)
deliver = routing.AddDisjunction(dd)
routing.AddPickupAndDeliverySets(pickup, deliver)
routing.solver().Add(
routing.solver().Sum(
[
routing.ActiveVar(i)
* distance_dimension.CumulVar(i)
for i in pp
]
+ [
-routing.ActiveVar(i)
* distance_dimension.CumulVar(i)
for i in dd
]
)
< 0
)
if include_cumulative:
starts = {}
intervals = {}
time_dimension = routing.GetDimensionOrDie("Time")
i = 0
for (set_of_task, limit) in self.problem.cumulative_constraints:
list_of_tasks = list(set_of_task)
index_tasks = [
manager.NodeToIndex(self.problem.index_nodes[t])
for t in list_of_tasks
]
time_delta = [
self.problem.time_delta_node[t] * self.factor_multiplier_time
for t in list_of_tasks
]
for task, tdelta in zip(index_tasks, time_delta):
if task not in starts:
starts[task] = time_dimension.CumulVar(task)
intervals[task] = routing.solver().IntervalVar(
0,
10000000,
tdelta,
max_slack,
0,
10000000,
False,
f"interval_{task}",
)
routing.solver().Add(
intervals[task].StartExpr() == starts[task]
)
routing.solver().Add(
intervals[task].EndExpr()
== starts[task] + time_dimension.SlackVar(task)
)
routing.solver().Add(
intervals[task].DurationExpr()
== time_dimension.SlackVar(task)
)
routing.solver().Add(
routing.solver().Cumulative(
[intervals[t] for t in index_tasks],
[1 for t in index_tasks],
limit,
"asset_" + str(i),
)
)
i += 1
logger.info("cumulative done")
if include_mandatory:
mandatory_nodes = [
manager.NodeToIndex(self.problem.index_nodes[n])
for n in self.problem.mandatory_node_info
if self.problem.mandatory_node_info[n]
and n
not in {
self.problem.origin_vehicle[v] for v in self.problem.origin_vehicle
}
and n
not in {
self.problem.target_vehicle[v] for v in self.problem.target_vehicle
}
]
if len(mandatory_nodes) > 0:
routing.AddDisjunction(
mandatory_nodes, 1000000000, len(mandatory_nodes)
)
routing.solver().Add(
routing.solver().Sum(
[routing.ActiveVar(i) for i in mandatory_nodes]
)
== len(mandatory_nodes)
)
other_nodes = [
manager.NodeToIndex(self.problem.index_nodes[n])
for n in self.problem.mandatory_node_info
if not self.problem.mandatory_node_info[n]
and n
not in {
self.problem.origin_vehicle[v] for v in self.problem.origin_vehicle
}
and n
not in {
self.problem.target_vehicle[v] for v in self.problem.target_vehicle
}
]
if len(other_nodes) > 0:
routing.AddDisjunction(other_nodes, 0, len(other_nodes))
include_node_vehicle = kwargs.get("include_node_vehicle", True)
if include_node_vehicle:
if self.problem.node_vehicle is not None:
for n in self.problem.node_vehicle:
routing.SetAllowedVehiclesForIndex(
self.problem.node_vehicle[n],
manager.NodeToIndex(self.problem.index_nodes[n]),
)
if "Time" in self.dimension_names:
time_dimension = routing.GetDimensionOrDie("Time")
for i in range(self.problem.number_vehicle):
routing.AddVariableMinimizedByFinalizer(
time_dimension.CumulVar(routing.Start(i))
)
routing.AddVariableMinimizedByFinalizer(
time_dimension.CumulVar(routing.End(i))
)
if "Distance" in self.dimension_names:
dim = routing.GetDimensionOrDie("Distance")
for i in range(self.problem.number_vehicle):
routing.AddVariableMinimizedByFinalizer(dim.CumulVar(routing.Start(i)))
routing.AddVariableMinimizedByFinalizer(dim.CumulVar(routing.End(i)))
origins = {self.problem.origin_vehicle[v] for v in self.problem.origin_vehicle}
targets = {self.problem.target_vehicle[v] for v in self.problem.target_vehicle}
if include_time_windows:
time_dimension = routing.GetDimensionOrDie("Time")
for node in self.problem.all_nodes_dict:
if node in origins:
continue
if node in targets:
continue
index = manager.NodeToIndex(self.problem.index_nodes[node])
if node in self.problem.time_windows_nodes:
mini, maxi = self.problem.time_windows_nodes[node]
if mini is not None and maxi is not None:
time_dimension.SetCumulVarSoftLowerBound(
index, self.factor_multiplier_time * mini, 10000
)
time_dimension.SetCumulVarSoftUpperBound(
index, self.factor_multiplier_time * maxi, 10000
)
elif mini is not None:
time_dimension.SetCumulVarSoftLowerBound(
index, self.factor_multiplier_time * mini, 10000
)
elif maxi is not None:
time_dimension.SetCumulVarSoftUpperBound(
index, self.factor_multiplier_time * maxi, 10000
)
for vehicle in range(self.problem.number_vehicle):
index = routing.Start(vehicle)
if (
self.problem.origin_vehicle[vehicle]
in self.problem.time_windows_nodes
):
mini, maxi = self.problem.time_windows_nodes[
self.problem.origin_vehicle[vehicle]
]
if mini is not None and maxi is not None:
time_dimension.CumulVar(index).SetRange(mini, maxi)
elif mini is not None:
time_dimension.CumulVar(index).SetMin(mini)
elif maxi is not None:
time_dimension.CumulVar(index).SetMax(maxi)
index = routing.End(vehicle)
if (
self.problem.target_vehicle[vehicle]
in self.problem.time_windows_nodes
):
mini, maxi = self.problem.time_windows_nodes[
self.problem.target_vehicle[vehicle]
]
if mini is not None and maxi is not None:
time_dimension.CumulVar(index).SetRange(
self.factor_multiplier_time * mini,
self.factor_multiplier_time * maxi,
)
elif mini is not None:
time_dimension.CumulVar(index).SetMin(
self.factor_multiplier_time * mini
)
elif maxi is not None:
time_dimension.CumulVar(index).SetMax(
self.factor_multiplier_time * maxi
)
if include_time_windows_cluster:
time_dimension = routing.GetDimensionOrDie("Time")
for cluster in list(self.problem.clusters_to_node):
mini, maxi = self.problem.time_windows_cluster[cluster]
if mini is not None or maxi is not None:
nodes = [
manager.NodeToIndex(self.problem.index_nodes[n])
for n in self.problem.clusters_to_node[cluster]
if n
not in {
self.problem.origin_vehicle[v]
for v in self.problem.origin_vehicle
}
and n
not in {
self.problem.target_vehicle[v]
for v in self.problem.target_vehicle
}
]
if mini is not None:
routing.solver().Add(
routing.solver().Sum(
[
routing.ActiveVar(i) * time_dimension.CumulVar(i)
for i in nodes
]
)
>= mini
)
if maxi is not None:
routing.solver().Add(
routing.solver().Sum(
[
routing.ActiveVar(i) * time_dimension.CumulVar(i)
for i in nodes
]
)
<= mini
)
if "Distance" not in self.dimension_names:
dimension_name = "Distance"
routing.AddDimension(
transit_distance_callback_index,
0, # no slack
max_distance_per_vehicle, # vehicle maximum travel distance
True, # start cumul to zero
dimension_name,
)
self.dimension_names += ["Distance"]
apply_cost(list_parameters_cost=list_parameters_cost, routing=routing)
self.manager = manager
self.routing = routing
self.search_parameters = self.build_search_parameters(**kwargs)
logger.info("Routing problem initialized ")
[docs]
def build_search_parameters(
self, **kwargs: Any
) -> routing_parameters_pb2.RoutingSearchParameters:
first_solution_strategy: Union[int, FirstSolutionStrategy] = kwargs[
"first_solution_strategy"
]
if not isinstance(first_solution_strategy, int):
first_solution_strategy = int(first_solution_strategy.value)
local_search_metaheuristic = kwargs["local_search_metaheuristic"]
if not isinstance(local_search_metaheuristic, int):
local_search_metaheuristic = int(local_search_metaheuristic.value)
one_visit_per_cluster = kwargs.get("one_visit_per_cluster", False)
use_lns = kwargs["use_lns"]
use_cp = kwargs["use_cp"]
use_cp_sat = kwargs["use_cp_sat"]
search_parameters = pywrapcp.DefaultRoutingSearchParameters()
search_parameters.first_solution_strategy = first_solution_strategy
search_parameters.local_search_metaheuristic = local_search_metaheuristic
if use_lns:
if one_visit_per_cluster:
search_parameters.local_search_operators.use_inactive_lns = BOOL_TRUE
search_parameters.local_search_operators.use_path_lns = BOOL_TRUE
if one_visit_per_cluster:
search_parameters.local_search_operators.use_extended_swap_active = (
BOOL_TRUE
)
search_parameters.use_cp = BOOL_TRUE if use_cp else BOOL_FALSE
search_parameters.use_cp_sat = BOOL_TRUE if use_cp_sat else BOOL_FALSE
search_parameters.time_limit.seconds = kwargs.get("time_limit", 100)
return search_parameters
[docs]
def solve(
self,
search_parameters: Optional[
routing_parameters_pb2.RoutingSearchParameters
] = None,
callbacks: Optional[List[Callback]] = None,
**kwargs: Any,
) -> ResultStorage:
callbacks_list = CallbackList(callbacks=callbacks)
if search_parameters is None:
search_parameters = self.search_parameters
monitor = RoutingMonitor(self, callback=callbacks_list)
self.routing.AddSearchMonitor(monitor)
try:
if (
"initial_solution" in kwargs
and kwargs.get("initial_solution") is not None
):
initial_sol = self.routing.ReadAssignmentFromRoutes(
kwargs["initial_solution"], True
)
print("laucnhing with assignment")
self.routing.SolveFromAssignmentWithParameters(
initial_sol, search_parameters
)
else:
self.routing.SolveWithParameters(search_parameters)
except SolveEarlyStop as e:
logger.info(e)
return monitor.res
[docs]
class RoutingMonitor(pywrapcp.SearchMonitor):
def __init__(self, do_solver: ORToolsGPDP, callback: Callback):
super().__init__(do_solver.routing.solver())
self.callback = callback
self.model = do_solver.routing
self.problem = do_solver.problem
self.do_solver = do_solver
self._counter = 0
self._best_objective = np.inf
self._counter_limit = 10000000
self.nb_solutions = 0
self.res = ResultStorage(
[],
mode_optim=self.do_solver.params_objective_function.sense_function,
limit_store=False,
)
[docs]
def AtSolution(self) -> bool:
logger.info(
f"New solution found : --Cur objective : {self.model.CostVar().Max()}"
)
logger.debug(status_description[self.model.status()])
if self.model.CostVar().Max() < self._best_objective:
self._best_objective = self.model.CostVar().Max()
self.retrieve_current_solution()
self._counter = 0
else:
if self.nb_solutions % 100 == 0:
self.retrieve_current_solution()
self._counter += 1
if self._counter > self._counter_limit:
self.model.solver().FinishCurrentSearch()
self.nb_solutions += 1
# end of step callback: stopping?
stopping = self.callback.on_step_end(
step=self.nb_solutions, res=self.res, solver=self.do_solver
)
if stopping:
raise SolveEarlyStop(
f"{self.do_solver.__class__.__name__}.solve() stopped by user callback."
)
return not stopping
[docs]
def retrieve_current_solution(self) -> None:
vehicle_count = self.problem.number_vehicle
vehicle_tours: Dict[int, List[int]] = {i: [] for i in range(vehicle_count)}
dimension_output: Dict[
Tuple[int, int, NodePosition], Dict[str, Tuple[float, float, float]]
] = {}
dimensions_names = self.do_solver.dimension_names
dimensions: Dict[str, pywrapcp.RoutingDimension] = {
r: self.model.GetDimensionOrDie(r) for r in dimensions_names
}
objective = 0.0
route_distance = 0.0
for vehicle_id in range(vehicle_count):
index = self.model.Start(vehicle_id)
route_load: Dict[str, float] = {r: 0.0 for r in self.problem.resources_set}
cnt = 0
while not self.model.IsEnd(index) or cnt > 10000:
node_index = self.do_solver.manager.IndexToNode(index)
if cnt == 0:
node_position = NodePosition.START
else:
node_position = NodePosition.INTERMEDIATE
vehicle_tours[vehicle_id] += [node_index]
try:
dimension_output[(vehicle_id, node_index, node_position)] = {
r: (
dimensions[r].CumulVar(index).Min(),
dimensions[r].CumulVar(index).Max(),
dimensions[r].SlackVar(index).Value(),
)
for r in dimensions
}
except Exception as e:
logger.warning(("1,", e))
break
cnt += 1
for r in route_load:
route_load[r] += self.do_solver.demands[r][node_index]
previous_index = index
try:
index = self.model.NextVar(index).Value()
except Exception as e:
logger.warning(("3", e))
break
route_distance += self.model.GetArcCostForVehicle(
previous_index, index, vehicle_id
)
if self.model.IsEnd(index):
vehicle_tours[vehicle_id] += [
self.do_solver.manager.IndexToNode(index)
]
try:
dimension_output[
(
vehicle_id,
self.do_solver.manager.IndexToNode(index),
NodePosition.END,
)
] = {
r: (
dimensions[r].CumulVar(index).Min(),
dimensions[r].CumulVar(index).Max(),
0,
)
for r in dimensions
}
except Exception as e:
logger.warning(("2,", e))
break
sol: Tuple[
Dict[int, List[int]],
Dict[
Tuple[int, int, NodePosition],
Dict[str, Tuple[float, float, float]],
],
float,
float,
float,
] = (
vehicle_tours,
dimension_output,
route_distance,
objective,
self.model.CostVar().Max(),
)
gpdp_sol = convert_to_gpdpsolution(self.problem, sol)
fit = self.do_solver.aggreg_from_sol(gpdp_sol)
self.res.add_solution(solution=gpdp_sol, fitness=fit)
[docs]
def convert_to_gpdpsolution(
problem: GPDP,
sol: Tuple[
Dict[int, List[int]],
Dict[Tuple[int, int, NodePosition], Dict[str, Tuple[float, float, float]]],
float,
float,
float,
],
) -> GPDPSolution:
(
vehicle_tours,
dimension_output,
route_distance,
objective,
cost,
) = sol
times: Dict[Node, float] = {}
trajectories = {
v: [problem.list_nodes[nodeindex] for nodeindex in traj]
for v, traj in vehicle_tours.items()
}
for (v, nodeindex, node_position), output in dimension_output.items():
node = problem.list_nodes[nodeindex]
if (node not in times) or (node_position != NodePosition.START):
if "Time" in output:
times[node] = output["Time"][1]
resource_evolution: Dict[Node, Dict[Node, List[int]]] = {}
return GPDPSolution(
problem=problem,
trajectories=trajectories,
times=times,
resource_evolution=resource_evolution,
)