Source code for schedy.experiments

# -*- coding: utf-8 -*-

from __future__ import absolute_import, division, print_function, unicode_literals
from builtins import *
from six import raise_from

import requests
from requests.compat import urljoin
import functools
import logging

from . import errors, encoding
from .random import _DISTRIBUTION_TYPES
from .jobs import Job, _make_job, _job_from_response
from .pagination import PageObjectsIterator
from .compat import json_dumps

logger = logging.getLogger(__name__)

def _check_status(status):
    return status in (Experiment.RUNNING, Experiment.DONE)

[docs]class Experiment(object): #: Status of a running experiment. RUNNING = 'RUNNING' #: Status of a completed (or paused) experiment. DONE = 'DONE' def __init__(self, name, status=RUNNING): ''' Base-class for all experiments. Args: name (str): Name of the experiment. An experiment is uniquely identified by its name. status (str): Status of the experiment. See :ref:`experiment_status`. ''' = name self.status = status self._db = None
[docs] def add_job(self, **kwargs): ''' Adds a new job to this experiment. Args: hyperparameters (dict): A dictionnary of hyperparameters values. status (str): Job status. See :ref:`job_status`. Default: QUEUED. results (dict): A dictionnary of result values. Default: No results (empty dictionary). Returns: schedy.Job: The instance of the new job. ''' partial_job = Job( job_id=None, experiment=None, **kwargs) assert self._db is not None, 'Experiment was not added to a database' url = self._jobs_url() map_def = partial_job._to_map_definition() data = json_dumps(map_def, cls=encoding.SchedyJSONEncoder) response = self._db._authenticated_request('POST', url, data=data) errors._handle_response_errors(response) return _job_from_response(self, response)
[docs] def next_job(self): ''' Returns a new job to be worked on. This job will be set in the ``RUNNING`` state. This function handles everything so that two workers never start working on the same job. Returns: schedy.Job: The instance of the requested job. ''' assert self._db is not None, 'Experiment was not added to a database' url = urljoin(self._db._experiment_url(, 'nextjob/') job = None # Try obtaining a job and running it until we manage to get hold of a # job we can indeed run (concurrent trials to run a job can cause us to # fail, so try and try again) while job is None: response = self._db._authenticated_request('GET', url) if response.status_code == raise errors.NoJobError('No job left for experiment {}.'.format(, None) errors._handle_response_errors(response) job = _job_from_response(self, response) try: job.try_run() except errors.UnsafeUpdateError: job = None logger.debug('Two workers tried to start working on the same job, retrying.', exc_info=True) return job
[docs] def all_jobs(self): ''' Retrieves all the jobs belonging to this experiment. Returns: iterator of :py:class:`schedy.Job`: An iterator over all the jobs of this experiment. ''' assert self._db is not None, 'Experiment was not added to a database' url = self._jobs_url() return PageObjectsIterator( reqfunc=functools.partial(self._db._authenticated_request, 'GET', url), obj_creation_func=functools.partial(_make_job, self), )
[docs] def get_job(self, job_id): ''' Retrieves a job by id. Args: job_id (str): Id of the job to retrieve. Returns: schedy.Job: Instance of the requested job. ''' assert self._db is not None, 'Experiment was not added to a database' url = self._db._job_url(, job_id) response = self._db._authenticated_request('GET', url) errors._handle_response_errors(response) job = _job_from_response(self, response) return job
def __str__(self): try: return '{}(name={!r}, params={})'.format(self.__class__.__name__,, self._get_params()) except NotImplementedError: return '{}(name={!r})'.format(self.__class__.__name__,
[docs] def push_updates(self): ''' Push all the updates made to this experiment to the service. ''' assert self._db is not None, 'Experiment was not added to a database' url = self._db._experiment_url( content = self._to_map_definition() data = json_dumps(content, cls=encoding.SchedyJSONEncoder) response = self._db._authenticated_request('PUT', url, data=data) errors._handle_response_errors(response)
[docs] def delete(self, ensure=True): ''' Deletes this experiment. Args: ensure (bool): If true, an exception will be raised if the experiment was deleted before this call. ''' assert self._db is not None, 'Experiment was not added to a database' url = self._db._experiment_url( if ensure: headers = {'If-Match': '*'} else: headers = dict() response = self._db._authenticated_request('DELETE', url, headers=headers) errors._handle_response_errors(response)
@classmethod def _create_from_params(cls, name, status, params): raise NotImplementedError() def _get_params(self): raise NotImplementedError() def _to_map_definition(self): try: scheduler = self._SCHEDULER_NAME except AttributeError as e: raise_from(AttributeError('Experiment implementations should define a _SCHEDULER_NAME attribute'), e) return { 'status': self.status, 'scheduler': {scheduler: self._get_params()}, } @staticmethod def _from_map_definition(schedulers, map_def): try: name = str(map_def['name']) status = str(map_def['status']) scheduler_def_map = dict(map_def['scheduler']) if len(scheduler_def_map) != 1: raise ValueError('Invalid scheduler definition: {}.'.format(scheduler_def_map)) sched_def_key, sched_def_val = next(iter(scheduler_def_map.items())) scheduler = str(sched_def_key) params = sched_def_val except (ValueError, KeyError) as e: raise_from(ValueError('Invalid map definition for experiment.'), e) if not _check_status(status): raise ValueError('Invalid or unknown status value: {}.'.format(status)) try: exp_type = schedulers[scheduler] except KeyError as e: raise ValueError('Invalid or unregistered scheduler name: {}.'.format(scheduler)) return exp_type._create_from_params( name=name, status=status, params=params) def _jobs_url(self): return urljoin(self._db._experiment_url(, 'jobs/')
[docs]class ManualSearch(Experiment): ''' Represents a manual search, that is to say an experiment for which the only jobs returned by :py:meth:`schedy.Experiment.next_job` are jobs that were queued beforehand (by using :py:meth:`schedy.Experiment.add_job` for example). ''' _SCHEDULER_NAME = 'Manual' @classmethod def _create_from_params(cls, name, status, params): if params is not None: raise ValueError('Expected no parameters for manual search, found {}.'.format(type(params))) return cls(name=name, status=status) def _get_params(self): return None
[docs]class RandomSearch(Experiment): _SCHEDULER_NAME = 'RandomSearch' def __init__(self, name, distributions, status=Experiment.RUNNING): ''' Represents a random search, that is to say en experiment that returns jobs with random hyperparameters when no job was queued manually using :py:meth:`schedy.Experiment.add_job`. If you create a job manually for this experiment, it must have only and all the hyperparameters specified in the ``distributions`` parameter. Args: name (str): Name of the experiment. An experiment is uniquely identified by its name. distributions (dict): A dictionary of distributions (see :py:mod:`schedy.random`), whose keys are the names of the hyperparameters. status (str): Status of the experiment. See :ref:`experiment_status`. ''' super().__init__(name, status) self.distributions = distributions @classmethod def _create_from_params(cls, name, status, params): try: items = params.items() except AttributeError as e: raise ValueError('Expected parameters as a dict, found {}.'.format(type(params))) distributions = dict() for key, dist_def in items: try: dist_name_raw, dist_args = next(iter(dist_def.items())) dist_name = str(dist_name_raw) except (KeyError, TypeError) as e: raise_from(ValueError('Invalid distribution definition.'), e) try: dist_type = _DISTRIBUTION_TYPES[dist_name] except KeyError as e: raise ValueError('Invalid or unknown distribution type: {}.'.format(dist_name)) distributions[key] = dist_type._from_args(dist_args) return cls(name=name, distributions=distributions, status=status) def _get_params(self): return {key: {dist._FUNC_NAME: dist._args()} for key, dist in self.distributions.items()}
[docs]class PopulationBasedTraining(Experiment): _SCHEDULER_NAME = 'PBT' def __init__(self, name, objective, result_name, exploit, explore=dict(), initial_distributions=dict(), population_size=None, status=Experiment.RUNNING, max_generations=None): ''' Implements Population Based Training (see `paper <>`_). You have two ways to specify the initial jobs for Population Based training. You can create them manually using :py:meth:`schedy.Experiment.add_job`, or you can specify the ``initial_distributions`` and ``population_size`` parameters. If you create a job manually for this experiment, it must have at least the hyperparameters specified in the ``explore`` parameter. Args: name (str): Name of the experiment. An experiment is uniquely identified by its name. objective (str): The objective of the training, either :py:attr:`schedy.pbt.MINIMIZE` (to minimize a result) or :py:attr:`schedy.pbt.MAXIMIZE` (to maximize a result). result_name (str): The name of the result to optimize. This result must be present in the results of all :py:attr:`~schedy.Experiment.RUNNING` jobs of this experiment. exploit (schedy.pbt.ExploitStrategy): Strategy to use to exploit the results (i.e. to focus on the most promising jobs). explore (dict): Strategy to use to explore new hyperparameter values. The keys of the dictionary are the name of the hyperparameters (str), and the values are the strategy associated with the hyperparameter (:py:class:`schedy.pbt.ExploreStrategy`). Values for the omitted hyperparameters will not be explored. This parameter is optional: if you do not specify any explore strategy, only exploitation will be used. initial_distributions (dict): The initial distributions for the hyperparameters, as dictionary of distributions (see :py:mod:`schedy.random`) whose keys are the names of the hyperparameters. This parameter optional, you can also create the initial jobs manually. If you use this parameter, make sure to use ``population_size`` as well. population_size (int): Number of initial jobs to create, before starting to exploit/explore (i.e. size of the population). It does **not** have to be the number of jobs you can process in parallel. The original paper used values between 10 and 80. status (str): Status of the experiment. See :ref:`experiment_status`. max_generations (int): Maximum number of generations to run before marking the experiment the experiments as done (:py:ref:`experiment_status`). When the maximum number of generations is reached, subsequent calls to :py:meth:`schedy.Experiment.next_job` will raise :py:class:`schedy.errors.NoJobError`, to indicate that the job queue is empty. ''' super().__init__(name, status) self.objective = objective self.result_name = result_name self.exploit = exploit self.explore = explore self.initial_distributions = initial_distributions self.population_size = population_size self.max_generations = max_generations @classmethod def _create_from_params(cls, name, status, params): kwargs = { 'name': name, 'status': status, 'objective': params['objective'], 'result_name': params['qualityResultName'], } exploit_type, exploit_params = next(iter(params['exploit'].items())) kwargs['exploit'] = _EXPLOIT_STRATEGIES[exploit_type]._from_params(exploit_params) population_size = params.get('numParallel') if population_size is not None: kwargs['population_size'] = population_size initial_distributions = params.get('init') if initial_distributions is not None: init_param = dict() for hp, dist_map in initial_distributions.items(): dist_name, dist_params = next(iter(dist_map.items())) init_param[hp] = _DISTRIBUTION_TYPES[dist_name]._from_args(dist_params) kwargs['initial_distributions'] = init_param explore = params.get('explore') if explore is not None: explore_map = dict() for hp, strat_map in explore.items(): strat_name, strat_params = next(iter(strat_map.items())) explore_map[hp] = _EXPLORE_STRATEGIES[strat_name]._from_params(strat_params) kwargs['explore'] = explore_map max_generations = params.get('max_generations') if max_generations is not None: kwargs['max_generations'] = max_generations return cls(**kwargs) def _get_params(self): params = { 'objective': self.objective, 'qualityResultName': self.result_name, } if self.population_size: params['numParallel'] = self.population_size if self.initial_distributions: params['init'] = { name: {dist._FUNC_NAME: dist._args()} for name, dist in self.initial_distributions.items() } params['exploit'] = {self.exploit._EXPLOIT_STRATEGY_NAME: self.exploit._get_params()} if self.explore: params['explore'] = { name: {strat._EXPLORE_STRATEGY_NAME: strat._get_params()} for name, strat in self.explore.items() } if self.max_generations: params['maxGenerations'] = self.max_generations return params
def _make_experiment(db, data): try: exp_data = dict(data) except ValueError as e: raise_from(errors.UnhandledResponseError('Expected experiment data as a dict, received {}.'.format(type(data)), None), e) try: exp = Experiment._from_map_definition(db._schedulers, exp_data) except ValueError as e: raise_from(errors.UnhandledResponseError('Response contains an invalid experiment', None), e) exp._db = db return exp