# -*- mode: python; coding: utf-8 -*-
# Copyright 2017-2018 Peter Williams and collaborators.
# Licensed under the MIT License.
"""The main neural network code.
"""
from __future__ import absolute_import, division, print_function
__all__ = '''
NSModel
PhysicalApproximator
'''.split()
import os.path
from six.moves import range
import numpy as np
from pwkit import cgs
from pwkit.numutil import broadcastize
from keras import models, layers, optimizers
from . import DomainRange
hardcoded_nu_ref = 1.0
hardcoded_ne_ref = 1.0
class NSModel(models.Sequential):
"""Neuro-Synchro Model -- just keras.models.Sequential extended with some
helpers specific to our data structures. If you run the `ns_setup` method
you can train the neural net in our system.
"""
# NOTE: __init__() must take no arguments in order for keras to be able to
# deserialize NSModels from the HDF5 format.
def ns_setup(self, result_index, data):
self.result_index = int(result_index)
self.domain_range = data.domain_range
self.data = data
assert self.result_index < self.domain_range.n_results
return self # convenience
def ns_fit(self, **kwargs):
"""Train this ANN model on the data in `self.data`. This function just
takes care of extracting the right parameter and avoiding NaNs.
"""
nres = self.data.norm_results[:,self.result_index]
ok = np.isfinite(nres)
nres = nres[ok].reshape((-1, 1))
npar = self.data.norm_params[ok]
return self.fit(npar, nres, **kwargs)
def ns_validate(self, filter=True, to_phys=True):
"""Test this network by having it predict all of the values in our training
sample. Returns `(params, actual, nn)`, where `params` is shape `(N,
self.data.n_params)` and is the input parameters, `actual` is shape
`(N,)` and is the actual values returned by the calculator, and `nn`
is shape `(N,)` and is the values predicted by the neural net.
If `filter` is true, the results will be filtered such that neither
`actual` nor `nn` contain non-finite values.
If `to_phys` is true, the values will be returned in the physical
coordinate system. Otherwise they will be returned in the normalized
coordinate system.
"""
if to_phys:
par = self.data.phys_params
res = self.data.phys_results[:,self.result_index]
else:
par = self.data.norm_params
res = self.data.norm_results[:,self.result_index]
npred = self.predict(self.data.norm_params)[:,0]
if filter:
ok = np.isfinite(res) & np.isfinite(npred)
par = par[ok]
res = res[ok]
npred = npred[ok]
if to_phys:
pred, _ = self.domain_range.rmaps[self.result_index].norm_to_phys(npred)
else:
pred = npred
return par, res, pred
def ns_sigma_clip(self, n_norm_sigma):
"""Assuming that self is already a decent approximation of the input data, try
to improve things by NaN-ing out any measurements that are extremely
discrepant with our approximation -- under the assumption that these
are cases where the calculator went haywire.
Note that this destructively modifies `self.data`.
`n_norm_sigma` is the threshold above which discrepant values are
flagged. It is evaluated using the differences between the neural net
prediction and the training data in the *normalized* coordinate
system.
Returns the number of flagged points.
"""
nres = self.data.norm_results[:,self.result_index]
npred = self.predict(self.data.norm_params)[:,0]
err = npred - nres
m = np.nanmean(err)
s = np.nanstd(err)
bad = (np.abs((err - m) / s) > n_norm_sigma)
self.data.phys[bad,self.domain_range.n_params+self.result_index] = np.nan
self.data.norm[bad,self.domain_range.n_params+self.result_index] = np.nan
return bad.sum()
def ns_plot(self, param_index, plot_err=False, to_phys=False, thin=100):
"""Make a diagnostic plot comparing the approximation to the "actual" results
from the calculator.
"""
import omega as om
par, act, nn = self.ns_validate(filter=True, to_phys=to_phys)
if plot_err:
err = nn - act
p = om.quickXY(par[::thin,param_index], err[::thin], 'Error', lines=0)
else:
p = om.quickXY(par[::thin,param_index], act[::thin], 'Full calc', lines=0)
p.addXY(par[::thin,param_index], nn[::thin], 'Neural', lines=0)
return p
[docs]class PhysicalApproximator(object):
"""This class approximates the eight nontrivial RT coefficients using a
physically-based parameterization.
See :ref:`how-to-use` for detailed documentation of how to prepare and
train the neural networks used by this class.
**Constructor argument**
**nn_dir**
The path to a directory containing trained neural network data. This
directory should contain the configuration file ``nn_config.toml`` and
serialized neural network weights in files with names like
``rho_Q_sign.h5``.
"""
results = 'j_I alpha_I rho_Q rho_V j_frac_pol alpha_frac_pol j_V_share alpha_V_share rho_Q_sign'.split()
def __init__(self, nn_dir):
self.domain_range = DomainRange.from_serialized(os.path.join(nn_dir, 'nn_config.toml'))
for i, r in enumerate(self.results):
assert self.domain_range.rmaps[i].name == r
m = models.load_model(
os.path.join(nn_dir, '%s.h5' % r),
custom_objects = {'NSModel': NSModel}
)
m.result_index = i
m.domain_range = self.domain_range
setattr(self, r, m)
[docs] @broadcastize(4, ret_spec=None)
def compute_all_nontrivial(self, nu, B, n_e, theta, **kwargs):
"""Compute the nontrivial radiative transfer coefficients.
**Arguments**
*nu*
The observing frequency, in Hz. This and all parameters may be
scalars or arrays; they are broadcast to a common shape before
performing the computations.
*B*
The local magnetic field strength, in G.
*n_e*
The local density of synchrotron-emitting particles, in cm^-3.
*theta*
The angle between the line of sight and the local magnetic field,
in radians.
``**kwargs``
Other arguments to the synchrotron model; these can vary depending
on which particle distribution was used.
**Return values**
A tuple of ``(coeffs, oos)``:
*coeffs*
The radiative transfer coefficients in the Stokes basis where the
Stokes U axis is aligned with the magnetic field. This is an array
of shape ``S + (8,)`` where *S* is the shape of the broadcasted
input parameters. Along the inner axis of the array, the coefficients
are: ``(j_I, alpha_I, j_Q, alpha_Q, j_V, alpha_V, rho_Q, rho_V)``.
*oos*
An array of integers reporting where the calculations encountered
out-of-sample values, that is, inputs or outputs beyond the range in
which the neural networks were trained. The shape of this array is
the same as that of the broadcased input parameters, or a scalar if
the inputs were all scalars. For each set of input parameters, the
least significant bit is set to 1 if the first input parameter was
out-of-sample, where "first" is defined by the order in which these
parameters are listed in the ``nn_config.toml`` file. The next more
significant bit is set if the second input parameter was out of
sample, and so on. After all of the input parameters, there are 9
flag bits indicating whether any of the *output* results were
out-of-sample, relative to the range of normalized values
encountered in the training set. The order in which these parameters
are processed is ``j_I``, ``alpha_I``, ``rho_Q``, ``rho_V``,
``j_frac_pol``, ``alpha_frac_pol``, ``j_V_share``,
``alpha_V_share``, ``rho_Q_sign``. Therefore if the synchrotron
model takes 4 input parameters and the ``rho_Q_sign`` output is the
only one to have been out-of-sample, the resulting ``oos`` value
will be ``0x1000``.
"""
# Turn the standard parameters into the ones used in our computations
no_B = np.logical_not(B > 0)
nu_cyc = cgs.e * B / (2 * np.pi * cgs.me * cgs.c)
nu_cyc[no_B] = 1e7 # fake to avoid div-by-0 for now
kwargs['s'] = nu / nu_cyc
# Normalize theta (assuming it could take on any value
theta = theta % (2 * np.pi)
w = (theta > np.pi)
theta[w] = 2 * np.pi - theta[w]
flip = (theta > 0.5 * np.pi)
theta[flip] = np.pi - theta[flip]
kwargs['theta'] = theta
# Normalize inputs and check for out-of-sample.
oos_flags = 0
norm = np.empty(nu.shape + (self.domain_range.n_params,))
for i, mapping in enumerate(self.domain_range.pmaps):
norm[...,i], flag = mapping.phys_to_norm(kwargs[mapping.name])
if flag:
oos_flags |= (1 << i)
# Compute base outputs.
j_I, flag = self.domain_range.rmaps[0].norm_to_phys(self.j_I.predict(norm)[...,0])
if flag:
oos_flags |= (1 << (self.domain_range.n_params + 0))
alpha_I, flag = self.domain_range.rmaps[1].norm_to_phys(self.alpha_I.predict(norm)[...,0])
if flag:
oos_flags |= (1 << (self.domain_range.n_params + 1))
rho_Q, flag = self.domain_range.rmaps[2].norm_to_phys(self.rho_Q.predict(norm)[...,0])
if flag:
oos_flags |= (1 << (self.domain_range.n_params + 2))
rho_V, flag = self.domain_range.rmaps[3].norm_to_phys(self.rho_V.predict(norm)[...,0])
if flag:
oos_flags |= (1 << (self.domain_range.n_params + 3))
j_frac_pol, flag = self.domain_range.rmaps[4].norm_to_phys(self.j_frac_pol.predict(norm)[...,0])
if flag:
oos_flags |= (1 << (self.domain_range.n_params + 4))
alpha_frac_pol, flag = self.domain_range.rmaps[5].norm_to_phys(self.alpha_frac_pol.predict(norm)[...,0])
if flag:
oos_flags |= (1 << (self.domain_range.n_params + 5))
j_V_share, flag = self.domain_range.rmaps[6].norm_to_phys(self.j_V_share.predict(norm)[...,0])
if flag:
oos_flags |= (1 << (self.domain_range.n_params + 6))
alpha_V_share, flag = self.domain_range.rmaps[7].norm_to_phys(self.alpha_V_share.predict(norm)[...,0])
if flag:
oos_flags |= (1 << (self.domain_range.n_params + 7))
rho_Q_sign, flag = self.domain_range.rmaps[8].norm_to_phys(self.rho_Q_sign.predict(norm)[...,0])
if flag:
oos_flags |= (1 << (self.domain_range.n_params + 8))
# Patch up B = 0 in the obvious way. (Although if we ever have to deal
# with nontrivial cold plasma densities, zones of zero B might affect
# the RT if they cause refraction or what-have-you.)
j_I[no_B] = 0.
alpha_I[no_B] = 0.
# Un-transform, baking in the invariant that our Q parameters are
# always negative and the V parameters are always positive (given our
# theta normalization).
j_P = j_frac_pol * j_I
j_V = j_V_share * j_P
j_Q = -np.sqrt(1 - j_V_share**2) * j_P
alpha_P = alpha_frac_pol * alpha_I
alpha_V = alpha_V_share * alpha_P
alpha_Q = -np.sqrt(1 - alpha_V_share**2) * alpha_P
rho_Q = rho_Q * rho_Q_sign
# Now apply the known scalings.
n_e_scale = n_e / hardcoded_ne_ref
j_I *= n_e_scale
alpha_I *= n_e_scale
j_Q *= n_e_scale
alpha_Q *= n_e_scale
j_V *= n_e_scale
alpha_V *= n_e_scale
rho_Q *= n_e_scale
rho_V *= n_e_scale
freq_scale = nu / hardcoded_nu_ref
j_I *= freq_scale
alpha_I /= freq_scale
j_Q *= freq_scale
alpha_Q /= freq_scale
j_V *= freq_scale
alpha_V /= freq_scale
rho_Q /= freq_scale
rho_V /= freq_scale
theta_sign_term = np.ones(n_e.shape, dtype=np.int)
theta_sign_term[flip] = -1
j_V *= theta_sign_term
alpha_V *= theta_sign_term
rho_V *= theta_sign_term
# Pack it up and we're done.
result = np.empty(n_e.shape + (8,))
result[...,0] = j_I
result[...,1] = alpha_I
result[...,2] = j_Q
result[...,3] = alpha_Q
result[...,4] = j_V
result[...,5] = alpha_V
result[...,6] = rho_Q
result[...,7] = rho_V
return result, oos_flags