Source code for discrete_optimization.knapsack.solvers.knapsack_decomposition

#  Copyright (c) 2023 AIRBUS and its affiliates.
#  This source code is licensed under the MIT license found in the
#  LICENSE file in the root directory of this source tree.
import logging
import os
import random
from typing import Any, Dict, List, Optional, Set, Type

from discrete_optimization.generic_tools.callbacks.callback import (
    Callback,
    CallbackList,
)
from discrete_optimization.generic_tools.hyperparameters.hyperparameter import (
    FloatHyperparameter,
    IntegerHyperparameter,
    SubBrickHyperparameter,
    SubBrickKwargsHyperparameter,
)
from discrete_optimization.generic_tools.result_storage.result_storage import (
    ResultStorage,
)
from discrete_optimization.knapsack.knapsack_model import (
    KnapsackModel,
    KnapsackSolution,
    create_subknapsack_model,
)
from discrete_optimization.knapsack.knapsack_solvers import solve
from discrete_optimization.knapsack.solvers.cp_solvers import (
    CPKnapsackMZN,
    CPKnapsackMZN2,
)
from discrete_optimization.knapsack.solvers.dyn_prog_knapsack import KnapsackDynProg
from discrete_optimization.knapsack.solvers.greedy_solvers import GreedyBest
from discrete_optimization.knapsack.solvers.knapsack_asp_solver import KnapsackASPSolver
from discrete_optimization.knapsack.solvers.knapsack_solver import SolverKnapsack
from discrete_optimization.knapsack.solvers.lp_solvers import (
    KnapsackORTools,
    LPKnapsack,
    LPKnapsackCBC,
    LPKnapsackGurobi,
)

cur_folder = os.path.abspath(os.path.dirname(__file__))
logger = logging.getLogger(__name__)


subsolvers = [
    KnapsackORTools,
    LPKnapsack,
    LPKnapsackCBC,
    LPKnapsackGurobi,
    KnapsackASPSolver,
    KnapsackDynProg,
    CPKnapsackMZN,
    CPKnapsackMZN2,
]


[docs] class KnapsackDecomposedSolver(SolverKnapsack): """ This solver is based on the current observation. From a given knapsack model and one current solution, if we decide to freeze the decision variable for a subset of items, the remaining problem to solve is also a knapsack problem, with fewer items and smaller capacity. A solution to this subproblem can be found by any knapsack solver and a full solution to the original problem, can be rebuilt. KnapsackDecomposedSolver is a basic iterative solver that starts from a given solution, then freeze random items, solve subproblem with a custom root solver, rebuild original solution and repeat the process. """ hyperparameters = [ FloatHyperparameter( name="proportion_to_remove", low=0.0, high=1.0, default=0.7 ), IntegerHyperparameter(name="nb_iteration", low=0, high=int(10e6), default=100), SubBrickHyperparameter( name="initial_solver", choices=subsolvers, default=GreedyBest ), SubBrickKwargsHyperparameter( name="initial_solver_kwargs", subbrick_hyperparameter="initial_solver" ), SubBrickHyperparameter( name="root_solver", choices=subsolvers, default=GreedyBest ), SubBrickKwargsHyperparameter( name="root_solver_kwargs", subbrick_hyperparameter="root_solver" ), ]
[docs] def rebuild_sol( self, sol: KnapsackSolution, original_knapsack_model: KnapsackModel, original_solution: KnapsackSolution, indexes_to_remove: Set[int], ): """ Rebuild a knapsack solution object from a partial solution. :param sol: solution to a sub-knapsack problem :param original_knapsack_model: original knapsack model to solve :param original_solution: original base solution :param indexes_to_remove: indexes of item removed when building the sub-knapsack problem. :return: A new solution object for the original problem. """ list_taken = [0 for i in range(original_knapsack_model.nb_items)] for i in indexes_to_remove: list_taken[i] = original_solution.list_taken[i] for j in range(len(sol.list_taken)): original_index = original_knapsack_model.index_to_index_list[ sol.problem.list_items[j].index ] list_taken[original_index] = sol.list_taken[j] solution = KnapsackSolution( problem=original_knapsack_model, list_taken=list_taken ) return solution
[docs] def solve( self, callbacks: Optional[List[Callback]] = None, **kwargs: Any ) -> ResultStorage: # wrap all callbacks in a single one callbacks_list = CallbackList(callbacks=callbacks) # start of solve callback callbacks_list.on_solve_start(solver=self) kwargs = self.complete_with_default_hyperparameters(kwargs) initial_solver: Type[SolverKnapsack] = kwargs["initial_solver"] if kwargs["initial_solver_kwargs"] is None: initial_solver_kwargs: Dict[str, Any] = {} else: initial_solver_kwargs = kwargs["initial_solver_kwargs"] sub_solver: Type[SolverKnapsack] = kwargs["root_solver"] if kwargs["root_solver_kwargs"] is None: sub_solver_kwargs: Dict[str, Any] = {} else: sub_solver_kwargs = kwargs["root_solver_kwargs"] nb_iteration = kwargs["nb_iteration"] proportion_to_remove = kwargs["proportion_to_remove"] initial_results = solve( method=initial_solver, problem=self.problem, **initial_solver_kwargs ) results_storage = ResultStorage( list_solution_fits=initial_results.list_solution_fits, mode_optim=self.params_objective_function.sense_function, ) logger.info( f"Initial solution fitness : {results_storage.get_best_solution_fit()[1]}" ) all_indexes = set(range(self.problem.nb_items)) for j in range(nb_iteration): sol, fit = results_storage.get_best_solution_fit() indexes_to_remove = set( random.sample( list(all_indexes), int(proportion_to_remove * self.problem.nb_items), ) ) sub_model = create_subknapsack_model( knapsack_model=self.problem, solution=sol, indexes_to_remove=indexes_to_remove, ) res = solve(method=sub_solver, problem=sub_model, **sub_solver_kwargs) best_sol, fit = res.get_best_solution_fit() reb_sol = self.rebuild_sol( sol=best_sol, original_solution=sol, original_knapsack_model=self.problem, indexes_to_remove=indexes_to_remove, ) fit = self.aggreg_from_sol(reb_sol) logger.info(f"Iteration {j}/{nb_iteration} : --- Current fitness {fit}") results_storage.list_solution_fits += [(reb_sol, fit)] stopping = callbacks_list.on_step_end( step=j, res=results_storage, solver=self ) if stopping: break # end of solve callback callbacks_list.on_solve_end(res=results_storage, solver=self) return results_storage