Source code for neurosynchro

# -*- mode: python; coding: utf-8 -*-
# Copyright 2017-2018 Peter Williams and collaborators.
# Licensed under the MIT License.

"""Train and use artificial neural network approximations ("regressions") of
synchrotron radiative transfer coefficients as a function of various physical
input parameters.

The meat of the neural network code is in the ``impl`` module to avoid
importing Keras unless needed.

"""
from __future__ import absolute_import, division, print_function

__all__ = '''
AbsLogMapping
DirectMapping
DomainRange
LogMapping
LogitMapping
Mapping
NegLogMapping
NinthRootMapping
SampleData
SignMapping
basic_load
detrivialize_stokes_basis
mapping_from_dict
mapping_from_samples
'''.split()

from collections import OrderedDict
from six.moves import range
import numpy as np
import pandas as pd
from pwkit.io import Path


[docs]class Mapping(object): """An abstract base class for parameter transformations. Mapping classes are used to translate the physical parameters fed into *neurosynchro* into normalized values that are easier to work with numerically. You will not normally need to use them directly. """ desc = None # set by subclasses trainer = None phys_bounds_mode = 'empirical' "Blah blah" normalization_mode = 'gaussian' out_of_sample = 'ignore' def __init__(self, name): self.name = name
[docs] @classmethod def from_info_and_samples(cls, info, phys_samples): """Create a new :class:`Mapping` from a dictionary of information and a set of samples. **Call signature** *info* A dictionary of attributes, passed into :meth:`Mapping.from_dict`. *phys_samples* A 1D Numpy array of samples of this parameter, in no particular order. Return value A new instance of :class:`Mapping` (or one of its subclasses) with initialized bounds parameters. """ inst = cls.from_dict(info, load_bounds=False) valid = np.isfinite(phys_samples) & inst._is_valid(phys_samples) n_rej = phys_samples.size - valid.sum() print('%s: rejecting %d samples out of %d' % (inst.name, n_rej, phys_samples.size)) phys_samples = phys_samples[valid] if phys_samples.size < 3: raise Exception('not enough valid samples for %s' % inst.name) if inst.phys_bounds_mode == 'empirical': inst.p_min = phys_samples.min() inst.p_max = phys_samples.max() elif inst.phys_bounds_mode == 'theta': inst.p_min = 0. inst.p_max = 0.5 * np.pi else: raise ValueError('unrecognized phys_bounds_mode value %r for %s' % (inst.phys_bounds_mode, inst.name)) # Pluggable "transform" transformed = inst._to_xform(phys_samples) if inst.normalization_mode == 'gaussian': inst.x_mean = transformed.mean() inst.x_std = transformed.std() elif inst.normalization_mode == 'unit_interval': # Maps the physical values to the unit interval [0, 1]. inst.x_mean = transformed.min() inst.x_std = transformed.max() - inst.x_mean if inst.x_std == 0: inst.x_std = 1. else: raise ValueError('unrecognized normalization_mode value %r for %s' % (inst.normalization_mode, inst.name)) # Normalize normed = (transformed - inst.x_mean) / inst.x_std inst.n_min = normed.min() inst.n_max = normed.max() return inst
def __repr__(self): return '<Mapping %s %s mean=%r sd=%r>' % (self.name, self.desc, self.x_mean, self.x_std)
[docs] def phys_to_norm(self, phys): """Map "physical" parameters to normalized values **Argument** *phys* An array of "physical" input values (see :ref:`transformations`). **Return values** This method returns a tuple ``(normalized, oos)``. *normalized* The normalized versions of the input data. *oos* An array of booleans of the same shape as the input data. True values indicate inputs that were out of the sample that was used to define the mapping. """ # note: using prefix ~ instead of np.logical_not fails for scalars oos = np.logical_not((phys >= self.p_min) & (phys <= self.p_max)) # catches NaNs any_oos = np.any(oos) if any_oos: if self.out_of_sample == 'ignore': pass elif self.out_of_sample == 'clip': phys = np.clip(phys, self.p_min, self.p_max) elif self.out_of_sample == 'nan': phys = phys.copy() phys[oos] = np.nan else: raise Exception('unrecognized out-of-sample behavior %r' % self.out_of_sample) return (self._to_xform(phys) - self.x_mean) / self.x_std, any_oos
[docs] def norm_to_phys(self, norm): """Map "normalized" parameters to "physical" values **Argument** *norm* An array of "normalized" input values (see :ref:`transformations`). **Return values** This method returns a tuple ``(phys, oos)``. *phys* The physical versions of the input data. *oos* An array of booleans of the same shape as the input data. True values indicate inputs that were out of the sample that was used to define the mapping. """ oos = np.logical_not((norm >= self.n_min) & (norm <= self.n_max)) # catches NaNs any_oos = np.any(oos) if any_oos: if self.out_of_sample == 'ignore': pass elif self.out_of_sample == 'clip': norm = np.clip(norm, self.n_min, self.n_max) elif self.out_of_sample == 'nan': norm = norm.copy() norm[oos] = np.nan else: raise Exception('unrecognized out-of-sample behavior %r' % self.out_of_sample) return self._from_xform(norm * self.x_std + self.x_mean), any_oos
[docs] def to_dict(self): """Serialize this :class:`Mapping` into an ordered dictionary.""" d = OrderedDict() d['name'] = self.name d['maptype'] = self.desc if self.phys_bounds_mode is not None: d['phys_bounds_mode'] = self.phys_bounds_mode if self.normalization_mode is not None: d['normalization_mode'] = self.normalization_mode if self.trainer is not None: d['trainer'] = self.trainer if self.out_of_sample is not None: d['out_of_sample'] = self.out_of_sample d['x_mean'] = self.x_mean d['x_std'] = self.x_std d['phys_min'] = self.p_min d['phys_max'] = self.p_max d['norm_min'] = self.n_min d['norm_max'] = self.n_max return d
[docs] @classmethod def from_dict(cls, info, load_bounds=True): """Deserialize an ordered dictionary into a new :class:`Mapping` instance. **Call signature** *info* A dictionary of parameters, as generated by :meth:`Mapping.to_dict`. *load_bounds* (default: :const:`True`) If true, deserialize bounds information such as the maximum and minimum observed physical values. If :const:`False`, these are left uninitialized. Return value A new :class:`Mapping` instance. """ if str(info['maptype']) != cls.desc: raise ValueError('info is for maptype %s but this class is %s' % (info['maptype'], cls.desc)) inst = cls(str(info['name'])) if 'phys_bounds_mode' in info: inst.phys_bounds_mode = info['phys_bounds_mode'] if 'normalization_mode' in info: inst.normalization_mode = info['normalization_mode'] if 'trainer' in info: inst.trainer = info['trainer'] if 'out_of_sample' in info: inst.out_of_sample = info['out_of_sample'] if load_bounds: inst.x_mean = float(info['x_mean']) inst.x_std = float(info['x_std']) inst.p_min = float(info['phys_min']) inst.p_max = float(info['phys_max']) inst.n_min = float(info['norm_min']) inst.n_max = float(info['norm_max']) return inst
class AbsLogMapping(Mapping): desc = 'abs_log' def _to_xform(self, p): return np.log10(np.abs(p)) def _from_xform(self, x): return 10**x # XXX not invertible! def _is_valid(self, p): return p != 0 class DirectMapping(Mapping): desc = 'direct' def _to_xform(self, p): return p def _from_xform(self, x): return x def _is_valid(self, p): return np.ones(p.shape, dtype=np.bool) class LogMapping(Mapping): desc = 'log' def _to_xform(self, p): return np.log10(p) def _from_xform(self, x): return 10**x def _is_valid(self, p): return (p > 0) class LogitMapping(Mapping): desc = 'logit' def _to_xform(self, p): return np.log(p / (1. - p)) def _from_xform(self, x): return np.exp(x) / (np.exp(x) + 1) def _is_valid(self, p): # Infinities are hard to deal with so we don't allow p = 0 or p = 1. return (p > 0) & (p < 1) class NegLogMapping(Mapping): desc = 'neg_log' def _to_xform(self, p): return np.log10(-p) def _from_xform(self, x): return -(10**x) def _is_valid(self, p): return (p < 0) class NinthRootMapping(Mapping): desc = 'ninth_root' def _to_xform(self, p): return np.cbrt(np.cbrt(p)) def _from_xform(self, x): return x**9 def _is_valid(self, p): return np.ones(p.shape, dtype=np.bool) class SignMapping(Mapping): desc = 'sign' def _to_xform(self, p): return np.sign(p) def _from_xform(self, x): return np.sign(x) # XXX not reversible! def _is_valid(self, p): return (p != 0.) _mappings = { 'abs_log': AbsLogMapping, 'direct': DirectMapping, 'log': LogMapping, 'logit': LogitMapping, 'neg_log': NegLogMapping, 'ninth_root': NinthRootMapping, 'sign': SignMapping, } class Passthrough(object): def __init__(self, d): self.d = d def to_dict(self): return self.d
[docs]def mapping_from_info_and_samples(info, phys_samples): """Create a :class:`Mapping` subclass from configuration data and sample data. **Call signature** *info* A dictionary of configuration info, loaded from ``nn_config.toml`` or created by :meth:`Mapping.to_dict`. *phys_samples* A 1D array of training data used to initialize the bounds of this mapping. Return value A new :class:`Mapping` instance. The particular subclass used depends on the ``maptype`` setting, as documented in the :ref:`map-types`. """ cls = _mappings[info['maptype']] return cls.from_info_and_samples(info, phys_samples)
[docs]def mapping_from_dict(info): """Create a :class:`Mapping` subclass from configuration data. **Call signature** *info* A dictionary of configuration info, loaded from ``nn_config.toml`` or created by :meth:`Mapping.to_dict`. Return value A new :class:`Mapping` instance. The particular subclass used depends on the ``maptype`` setting, as documented in the :ref:`map-types`. Unlike :func:`mapping_from_info_and_samples`, the bounds data are not initialized if they are not already defined in the configuration dictionary. """ maptype = str(info['maptype']) cls = _mappings[maptype] return cls.from_dict(info)
[docs]def basic_load(datadir, drop_metadata=True): """Load a directory of textual tables (such as training set data). **Call signature** *datadir* A path to a directory of textual tables; format described below. *drop_metadata* (default ``True``) If true, columns marked as metadata will be dropped from the returned table. Return value A :class:`pandas.DataFrame` of data, concatenating all of the input tables. The data format is documented in :ref:`make-training-set`. Briefly, each file in *datadir* whose name ends with ``.txt`` will be loaded as a table using :func:`pandas.read_table`. The recommended format is tab-separated values with a single header row. Column names should end in type identifiers such as ``(lin)`` to identify their roles, although this function ignores this information except to drop columns whose names end in ``(meta)`` if so directed. """ datadir = Path(datadir) chunks = [] for item in datadir.glob('*.txt'): chunks.append(pd.read_table(str(item))) data = pd.concat(chunks, ignore_index=True) if drop_metadata: # Drop `foo(mtea)` columns for col in data.columns: if col.endswith('(meta)'): del data[col] return data
class DomainRange(object): n_params = None n_results = None pmaps = None rmaps = None @classmethod def from_info_and_samples(cls, info, df): inst = cls() inst.n_params = len(info['params']) inst.n_results = len(info['results']) inst.pmaps = [] inst.rmaps = [] for i, pinfo in enumerate(info['params']): if not df.columns[i].startswith(pinfo['name'] + '('): raise Exception('alignment error: expect data column %d to be %r; got %r' % (i, pinfo['name'], df.columns[i])) inst.pmaps.append(mapping_from_info_and_samples(pinfo, df[df.columns[i]].values)) n = inst.n_params for i, rinfo in enumerate(info['results']): if not df.columns[n+i].startswith(rinfo['name'] + '(res)'): raise Exception('alignment error: expect data column %d to be %r; got %r' % (n + i, rinfo['name'], df.columns[n + i])) inst.rmaps.append(mapping_from_info_and_samples(rinfo, df[df.columns[n+i]].values)) return inst @classmethod def from_serialized(cls, config_path, result_to_extract=None): """`result_to_extract` is a total lazy hack for the training tool.""" import pytoml with Path(config_path).open('rt') as f: info = pytoml.load(f) inst = cls() inst.pmaps = [] inst.rmaps = [] extracted_info = None for subinfo in info['params']: inst.pmaps.append(mapping_from_dict(subinfo)) for i, subinfo in enumerate(info['results']): if result_to_extract is not None and subinfo['name'] == result_to_extract: extracted_info = subinfo extracted_info['_index'] = i inst.rmaps.append(mapping_from_dict(subinfo)) inst.n_params = len(inst.pmaps) inst.n_results = len(inst.rmaps) if result_to_extract is not None: return inst, extracted_info return inst def __repr__(self): return '\n'.join( ['<%s n_p=%d n_r=%d' % (self.__class__.__name__, self.n_params, self.n_results)] + [' P%d=%r,' % (i, m) for i, m in enumerate(self.pmaps)] + [' R%d=%r,' % (i, m) for i, m in enumerate(self.rmaps)] + ['>']) def into_info(self, info): info['params'] = [m.to_dict() for m in self.pmaps] info['results'] = [m.to_dict() for m in self.rmaps] def load_and_normalize(self, datadir): df = basic_load(datadir) for i, pmap in enumerate(self.pmaps): if not df.columns[i].startswith(pmap.name + '('): raise Exception('alignment error: expect data column %d to be %s(...); got %s' % (i, pmap.name, df.columns[i])) n = self.n_params for i, rmap in enumerate(self.rmaps): if df.columns[n+i] != rmap.name + '(res)': raise Exception('alignment error: expect data column %d to be %s(res); got %s' % (n + i, rmap.name, df.columns[n + i])) return SampleData(self, df) class SampleData(object): df = None domain_range = None phys = None norm = None oos_flags = 0 def __init__(self, domain_range, df): self.df = df self.domain_range = domain_range self.phys = df.values self.norm = np.empty_like(self.phys) for i in range(self.domain_range.n_params): self.norm[:,i], flag = self.domain_range.pmaps[i].phys_to_norm(self.phys[:,i]) if flag: self.oos_flags |= (1 << i) for i in range(self.domain_range.n_results): j = i + self.domain_range.n_params self.norm[:,j], flag = self.domain_range.rmaps[i].phys_to_norm(self.phys[:,j]) if flag: self.oos_flags |= (1 << j) @property def phys_params(self): return self.phys[:,:self.domain_range.n_params] @property def phys_results(self): return self.phys[:,self.domain_range.n_params:] @property def norm_params(self): return self.norm[:,:self.domain_range.n_params] @property def norm_results(self): return self.norm[:,self.domain_range.n_params:]
[docs]def detrivialize_stokes_basis(coeffs, psi): """Re-express coefficients in a basis in which the magnetic field can rotate on the sky. **Arguments** *coeffs* Radiative transfer coefficients in the Stokes basis where the Stokes U axis is aligned with the magnetic field. This is an array of shape ``(S, 8)`` where *S* is the shape of *psi*. Along the inner axis of the array, the coefficients are: ``(j_I, alpha_I, j_Q, alpha_Q, j_V, alpha_V, rho_Q, rho_V)``. This is the representation returned by :meth:`neurosynchro.impl.PhysicalApproximator.compute_all_nontrivial`. *psi* The angle(s) between the magnetic field as projected on the sky and some invariant Stokes U axis, in radians. XXX: sign convention? **Return value** An array of radiative transfer coefficients in which the Stokes U terms are no longer trivial. The shape is ``(S, 11)``. Along the inner axis of the array, the coefficients are: ``(j_I, alpha_I, j_Q, alpha_Q, j_U, alpha_U, j_V, alpha_V, rho_Q, rho_U, rho_V)``. **Details** Synchrotron calculations are generally performed in a basis in which the Stokes U axis is aligned with the magnetic field, which means that the corresponding radiative transfer coefficients are zero ("trivial"). In actual work, however, the magnetic field orientation is not guaranteed to be constant along the direction of propagation. If the Stokes U axis is held fixed along the integration, the Stokes U coefficients become nontrivial. This function transforms coefficients from the former basis to the latter. See Shcherbakov & Huang (2011MNRAS.410.1052S), equations 50-51. Note that the linear polarization axis rotates at twice the rate that psi does, because linear polarization is an *orientation* not an *angle*. """ ji = coeffs[...,0] ai = coeffs[...,1] jq = coeffs[...,2] aq = coeffs[...,3] jv = coeffs[...,4] av = coeffs[...,5] rq = coeffs[...,6] rv = coeffs[...,7] twochi = 2 * (np.pi - psi) # note the sign convention! s = np.sin(twochi) c = np.cos(twochi) xformed = np.empty(coeffs.shape[:-1] + (11,)) xformed[...,0] = ji # j_I xformed[...,1] = ai # alpha_I xformed[...,2] = c * jq # j_Q xformed[...,3] = c * aq # alpha_Q xformed[...,4] = s * jq # j_U xformed[...,5] = s * aq # alpha_U xformed[...,6] = jv # j_V xformed[...,7] = av # alpha_V xformed[...,8] = c * rq # rho_Q xformed[...,9] = s * rq # rho_U xformed[...,10] = rv # rho_V return xformed