# Once for All: Train One Network and Specialize it for Efficient Deployment # Han Cai, Chuang Gan, Tianzhe Wang, Zhekai Zhang, Song Han # International Conference on Learning Representations (ICLR), 2020. import copy import random import numpy as np from tqdm import tqdm __all__ = ["EvolutionFinder"] class EvolutionFinder: def __init__(self, efficiency_predictor, accuracy_predictor, Robustness_predictor, **kwargs): self.efficiency_predictor = efficiency_predictor self.accuracy_predictor = accuracy_predictor self.robustness_predictor = Robustness_predictor # evolution hyper-parameters self.arch_mutate_prob = kwargs.get("arch_mutate_prob", 0.1) self.resolution_mutate_prob = kwargs.get("resolution_mutate_prob", 0.5) self.population_size = kwargs.get("population_size", 100) self.max_time_budget = kwargs.get("max_time_budget", 500) self.parent_ratio = kwargs.get("parent_ratio", 0.25) self.mutation_ratio = kwargs.get("mutation_ratio", 0.5) @property def arch_manager(self): return self.accuracy_predictor.arch_encoder def update_hyper_params(self, new_param_dict): self.__dict__.update(new_param_dict) def random_valid_sample(self, constraint): while True: sample = self.arch_manager.random_sample_arch() efficiency = self.efficiency_predictor.get_efficiency(sample) if efficiency <= constraint: return sample, efficiency def mutate_sample(self, sample, constraint): while True: new_sample = copy.deepcopy(sample) self.arch_manager.mutate_resolution(new_sample, self.resolution_mutate_prob) self.arch_manager.mutate_arch(new_sample, self.arch_mutate_prob) efficiency = self.efficiency_predictor.get_efficiency(new_sample) if efficiency <= constraint: return new_sample, efficiency def crossover_sample(self, sample1, sample2, constraint): while True: new_sample = copy.deepcopy(sample1) for key in new_sample.keys(): if not isinstance(new_sample[key], list): new_sample[key] = random.choice([sample1[key], sample2[key]]) else: for i in range(len(new_sample[key])): new_sample[key][i] = random.choice( [sample1[key][i], sample2[key][i]] ) efficiency = self.efficiency_predictor.get_efficiency(new_sample) if efficiency <= constraint: return new_sample, efficiency def run_evolution_search(self, constraint, verbose=False, **kwargs): """Run a single roll-out of regularized evolution to a fixed time budget.""" self.update_hyper_params(kwargs) mutation_numbers = int(round(self.mutation_ratio * self.population_size)) parents_size = int(round(self.parent_ratio * self.population_size)) best_valids = [-100] population = [] # (validation, robustness, sample, latency) tuples child_pool = [] efficiency_pool = [] best_info = None if verbose: print("Generate random population...") for _ in range(self.population_size): sample, efficiency = self.random_valid_sample(constraint) child_pool.append(sample) efficiency_pool.append(efficiency) accs = self.accuracy_predictor.predict_acc(child_pool) robs = self.robustness_predictor.predict_rob(child_pool) for i in range(self.population_size): population.append((accs[i].item(), robs[i].item(), child_pool[i], efficiency_pool[i])) if verbose: print("Start Evolution...") # After the population is seeded, proceed with evolving the population. with tqdm( total=self.max_time_budget, desc="Searching with constraint (%s)" % constraint, disable=(not verbose), ) as t: for i in range(self.max_time_budget): parents = sorted(population, key=lambda x: x[0])[::-1][:parents_size] acc = parents[0][0] rob = parents[0][1] t.set_postfix({"acc": parents[0][0] , "rob":parents[0][1]}) if not verbose and (i + 1) % 100 == 0: print("Iter: {} Acc: {} Rob: {}".format(i + 1, parents[0][0],parents[0][1])) if acc > best_valids[-1]: best_valids.append(acc) best_info = parents[0] else: best_valids.append(best_valids[-1]) population = parents child_pool = [] efficiency_pool = [] for j in range(mutation_numbers): par_sample = population[np.random.randint(parents_size)][2] # Mutate new_sample, efficiency = self.mutate_sample(par_sample, constraint) child_pool.append(new_sample) efficiency_pool.append(efficiency) for j in range(self.population_size - mutation_numbers): par_sample1 = population[np.random.randint(parents_size)][2] par_sample2 = population[np.random.randint(parents_size)][2] # Crossover new_sample, efficiency = self.crossover_sample( par_sample1, par_sample2, constraint ) child_pool.append(new_sample) efficiency_pool.append(efficiency) accs = self.accuracy_predictor.predict_acc(child_pool) robs = self.robustness_predictor.predict_rob(child_pool) for j in range(self.population_size): population.append( (accs[j].item(), robs[j].item(), child_pool[j], efficiency_pool[j]) ) t.update(1) return best_valids, best_info