Source code for driftai.run.runner

from abc import ABC, abstractmethod
from pathlib import Path
import warnings

from .run_manage import RunPool, RunGenerator
from driftai.result_report import Result
from driftai.utils import print_progress_bar

import numpy as np

[docs]class AbstractRunner(ABC):
[docs] @abstractmethod def run(self, approach, resume=False): """ Runs an approach """ pass
[docs]class SingleRunner(AbstractRunner): """ Runs an approach in a single machine """ def _load_runs(self, runnable_approach, resume): if not resume: # Remove old runs print("Removing previous runs...") runnable_approach.approach.runs.clear() runnable_approach.approach.update() print("Generating runs...") runs = RunGenerator.from_runnable_approach(runnable_approach) runnable_approach.approach.runs = runs print("Saving new runs...") runnable_approach.approach.update() else: print("Resuming runs...") print("Reading runs...") runs = runnable_approach.approach.runs return runs
[docs] def run(self, runnable_approach, resume=False): # Generate or load the runs runs = self._load_runs(runnable_approach, resume) if len(runs) == 0: print("Cannot load runs. Did you generated them?") return # Count finished and left runs n_done_runs = len([r for r in runs if r.status == "finished"]) n_left_runs = len(runs) - n_done_runs # If resume is True and there aren't runs to run warn the user if resume and n_left_runs == 0: warnings.warn("All runs are finished. Regenerate the runs or run with resume=False") return print("Running...") print_progress_bar(n_done_runs, len(runs)) # Execute the runs for run in RunPool(runs, resume).iteruns(): run.status = "running" run.update() # Get the data which will be using to train and validate train_data = run.get_train_data() test_data = run.get_test_data() parameters = run.run_parameters # Fit and inference model = runnable_approach.learn(train_data, parameters) predictions = runnable_approach.inference(model, test_data) # Convert predictions to python list in order to serialize them if isinstance(predictions, np.ndarray): predictions = predictions.tolist() # Set the results and store them run.results = Result(None, result=predictions) run.update() # Update the progress bar n_done_runs += 1 print_progress_bar(n_done_runs, len(runs))
[docs]class DaskRunner(AbstractRunner): """ Runs an approach in a single machine using dask parallelization """
[docs] def run(self, approach, resume=False): # TODO: See test/resources/my_run_dask.py pass
[docs]class CloudRunner(AbstractRunner): """ Runs an approach in a single machine using dask parallelization """
[docs] def run(self, approach, resume=False): # TODO: See test/resources/my_run_dask.py pass