# Copyright (C) 2012, Christof Buchbender
# BSD License (License.txt)
import math
import os
import string
import sys
import ConfigParser
import numpy as np
from pysqlite2 import dbapi2 as sqlite
from scipy.ndimage import gaussian_filter
from generaltools import log_tools
import astrolyze.functions.constants as const
from astrolyze.functions import units
[docs]class Map(object):
'''
``Map`` is the parent Class for the ``maps``-package. It contains all
functions that are common to all supported map-formats, i.e. Fits,
GILDAS and Miriad. This class is only supposed to be called through
the FitsMap, GildasMap, and MiriadMap classes.
Parameters
----------
map_name : string
The name and path of the file that is to be initialized to the maps
package.
'''
def __init__(self, map_name, **kwargs):
'''
Initialize a map to maps.
'''
USER = os.getenv("USER")
config_path = "/home/{}/.astrolyze/".format(USER)
config_file = "astrolyze.cfg"
self.config = ConfigParser.ConfigParser()
self.config.read("{}{}".format(config_path, config_file))
# Definition of the unit nomenclatures.
self.log = log_tools.init_logger(
directory="/home/{}/.astrolyze/".format(USER),
name="astrolyze"
)
USER = os.getenv("USER")
self.database_prefix = self.config.get("General", "database_prefix")
self.database_prefix = self.database_prefix.format(USER)
# Load configuration from config_file
self.jansky_beam_names = self.config.get(
"Units", "jansky_beam_names"
).split(',')
self.jansky_pixel_names = self.config.get(
"Units", "jansky_pixel_names"
).split(',')
self.tmb_names = self.config.get(
"Units", "tmb_names"
).split(',')
self.MJy_per_sterad_names = self.config.get(
"Units", "MJy_per_sterad_names"
).split(',')
self.erg_sec_pixel_names = self.config.get(
"Units", "erg_sec_pixel_names"
).split(',')
self.erg_sec_beam_names = self.config.get(
"Units", "erg_sec_beam_names"
).split(',')
self.erg_sec_sterad_names = self.config.get(
"Units", "erg_sec_sterad_names"
).split(',')
self.known_units = (self.jansky_beam_names + self.jansky_pixel_names +
self.tmb_names + self.MJy_per_sterad_names +
self.erg_sec_beam_names +
self.erg_sec_sterad_names +
self.erg_sec_pixel_names)
# Definition of possible data format endings for the different
# programs.
self.gildas_formats = self.config.get(
"Formats", "gildas_formats"
).split(',')
self.fits_formats = self.config.get(
"Formats", "fits_formats"
).split(',')
self.miriad_formats = ['',None]
self.class_formats = self.config.get(
"Formats", "class_formats"
).split(',')
# name_convention is not needed anymore. Only kept for backward
# compatibality.
self.map_name = map_name
# Test if the file exists. Directory for Miriad.
# File for Fits and GILDAS.
if ((not os.path.isdir(self.map_name)
and not os.path.isfile(self.map_name))):
self.log.critical(
'Exiting: {} does not exist'.format(self.map_name)
)
raise SystemExit
# Get Informations from the file name.
self.map_nameList = map_name.split('/')[-1].split('_')
if len(self.map_nameList) < 5:
self.log.critical(
"This map does not follow the Naming Convention for Astrolyze"
)
raise SystemExit
self.map_nameList = map_name.split('/')[-1].split('_')
self.prefix_list = map_name.split('/')[0:-1]
self.comments = []
self.source = self.map_nameList[0].split('/')[-1]
if len(self.prefix_list) > 0:
self.prefix = string.join(self.prefix_list, '/') + '/'
elif len(self.prefix_list) == 0:
self.prefix = ''
self.telescope = self.map_nameList[1]
self.species = self._resolveSpecies()
self.fluxUnit = self.map_nameList[3]
# Check dataFormat.
if self.map_name.endswith('.fits'):
self.dataFormat = 'fits'
self.map_nameList[-1] = self.map_nameList[-1].replace(
'.fits', ''
)
for i in self.gildas_formats:
if self.map_name.endswith('.' + i):
self.dataFormat = i
self.map_nameList[-1] = self.map_nameList[-1].replace(
'.' + i, ''
)
for i in self.class_formats:
if self.map_name.endswith('.' + i):
self.dataFormat = i
self.map_nameList[-1] = self.map_nameList[-1].replace(
'.' + i, ''
)
if os.path.isdir(self.map_name):
# Miriad Data Format uses directories
self.dataFormat = None
self.resolution = self._resolveResolution()
# Entries after the fifth are considered comments.
if len(self.map_nameList) > 5:
for i in range(len(self.map_nameList) - 6):
self.comments += [self.map_nameList[i + 5]]
self.comments += [self.map_nameList[-1]]
#!!!!! TODO: Bas implementation. Try should only contain very short
# parts of the program otherwise errors in the program are camouflaged.
try:
self.connection = sqlite.connect(str(self.database_prefix) +
'parameter.db')
self.cursor = self.connection.cursor()
self.cursor.execute("SELECT * FROM Galaxies WHERE Name = ?",
(self.source.upper(),))
self.params = self.cursor.fetchall()[0]
self.type = self.params[2]
self.distance = self.params[3]
self.vlsr = self.params[4]
self.centralPosition = self.params[5]
self.pa = self.params[6]
self.inclination = self.params[7]
self.R25 = self.params[8]
self.cursor.close()
self.connection.close()
except:
self.params = None
self.type = None
self.distance = None
self.vlsr = None
self.centralPosition = None
self.pa = None
self.inclination = None
self.R25 = None
try:
self.connection = sqlite.connect(str(self.database_prefix) +
'parameter.db')
self.cursor = self.connection.cursor()
self.cursor.execute("SELECT * FROM Lines WHERE Name = ?",
(self.species.upper(),))
self.params = self.cursor.fetchall()[0]
self.frequency = self.params[2]
self.wavelength = self.params[3]
self.cursor.close()
self.connection.close()
except:
pass
try:
self.connection = sqlite.connect(str(self.database_prefix) +
'parameter.db')
self.cursor = self.connection.cursor()
self.cursor.execute("SELECT * FROM cal_error WHERE Telescope = "
" ? AND Species = ?", (self.telescope.upper(),
self.species.upper()))
self.params = self.cursor.fetchall()[0]
self.calibrationError = self.params[3]
self.cursor.close()
self.connection.close()
except:
self.calibrationError = np.nan
self.get_beam_size()
def _resolveSpecies(self):
'''
Gets the frequency from a database on basis of the map name if
possible.
'''
species = self.map_nameList[2]
if 'mum' in species:
try:
self.wavelength = float(species.replace('mum', '')) * 1e-6
self.frequency = 299792356 / self.wavelength
except:
self.frequency = np.nan
self.wavelength = np.nan
elif 'mm' in species:
try:
self.wavelength = float(species.replace('mm', '')) * 1e-3
self.frequency = 299792356 / self.wavelength
except:
self.frequency = np.nan
self.wavelength = np.nan
elif 'GHz' in species:
try:
self.frequency = float(species.replace('GHz', '')) * 1e9
self.wavelength = 299792356 / self.frequency
except:
self.frequency = np.nan
self.wavelength = np.nan
else:
self.frequency = np.nan
self.wavelength = np.nan
return species
def _resolveResolution(self):
'''
Reads the resolution string from the map name.
'''
# TODO: include handling of 'uk'
string = self.map_nameList[4]
# Test if there is a digit after the last point.
# To exclude file endings like .fits.gdf.
# In this the dataFormat would be 'gdf'.
# but self.map_nameList[4] storing the resolution
# Still does contain points only due to numbers.
test = string.split('.')
x = True
while x:
if 'uk' in string:
break
try:
float(test[-1][:1])
x = False
except KeyboardInterrupt:
sys.exit()
except:
string = string.replace('.' + test[-1], '')
test = test[0:-1]
if 'uk' in string:
return string
# Resolve the resolution naming scheme explained above.
if 'x' in string and 'a' in string:
major = float(string.split('x')[0])
minor = float(string.split('x')[1].split('a')[0])
pa = float(string.split('x')[1].split('a')[1])
if 'x' in string and 'a' not in string:
major = float(string.split('x')[0])
minorData = string.split('x')[1]
pa = 0.0
if 'x' not in string and 'a' in string:
major = float(string.split('a')[0])
minor = float(string.split('a')[0])
paData = string.split('a')[1]
if 'x' not in string and 'a' not in string:
major = float(string)
minor = float(string)
pa = 0
return [major, minor, pa]
[docs] def resolutionToString(self, resolution=None):
r""" Converts the resolution list to a string to be printed and
included in the file names.
"""
if resolution is None:
if float(self.resolution[2]) == 0.0:
if float(self.resolution[0]) == float(self.resolution[1]):
string = "%1.2f" % self.resolution[0]
if float(self.resolution[0]) != float(self.resolution[1]):
string = ("%1.2f" % self.resolution[0] + 'x' +
"%1.2f" % self.resolution[1])
if float(self.resolution[2]) != 0.0:
if float(self.resolution[0]) == float(self.resolution[1]):
string = ("%1.2f" % self.resolution[0] + 'a' +
"%1.1f" % self.resolution[2])
if float(self.resolution[0]) != float(self.resolution[1]):
string = ("%1.2f" % self.resolution[0] + 'x' +
"%1.2f" % self.resolution[1] + 'a' +
"%1.1f" % self.resolution[2])
if resolution is not None and type(resolution) is not str:
if float(resolution[2]) == 0.0:
if float(resolution[0]) == float(resolution[1]):
string = "%1.2f" % resolution[0]
if float(resolution[0]) != float(resolution[1]):
string = ("%1.2f" % resolution[0] + 'x' +
"%1.2f" % resolution[1])
if float(resolution[2]) != 0.0:
if float(resolution[0]) == float(resolution[1]):
string = ("%1.2f" % resolution[0] + 'a' +
"%1.1f" % resolution[2])
if float(resolution[0]) != float(resolution[1]):
string = ("%1.2f" % resolution[0] + 'x' +
"%1.2f" % resolution[1] + 'a' +
"%1.1f" % resolution[2])
if type(resolution) is str:
string = resolution
return string
[docs] def get_beam_size(self):
r""" Calculates the beam-size in steradians and in m^2. Fot the latter
the distance to the source has to be given.
Returns
-------
Initialization if the variables:
self.beamSizeSterad and self.beamSizeM2
Notes
-----
The formula used is:
.. math:
\Omega = 1.133 * FWHM(rad)^2 \cdot (Distance(m)^2)
"""
if self.resolution != 'uk':
self.beamSizeSterad = (1.133 * const.a2r ** 2 * self.resolution[0]
* self.resolution[1])
if self.distance is not None:
self.beamSizeM2 = (1.133 * (self.distance * const.a2r *
const.pcInM) ** 2 *
self.resolution[0] * self.resolution[1])
else:
self.beamSize = np.nan
[docs] def change_map_name(self, source=None, telescope=None, species=None,
fluxUnit=None, resolution=None, comments=None,
dataFormat=False, prefix=None):
'''
This function can be used to change the names of the maps and make a
copy of the file to the new name and/or location.
'''
source = source or self.source
telescope = telescope or self.telescope
species = species or self.species
fluxUnit = fluxUnit or self.fluxUnit
resolution = resolution or self.resolution
if dataFormat is False:
dataFormat = self.dataFormat
prefix = prefix or self.prefix
if comments is None:
comments = comments or self.comments
# Now update the variables
self.source = source
self.telescope = telescope
self.species = species
self.fluxUnit = fluxUnit
self.resolution = resolution
if dataFormat is not False:
self.dataFormat = dataFormat
self.prefix = prefix
if comments is not None:
comments = self.comments + comments
self.comments = self.comments + comments
comment_string = _comment_to_string()
target_file_name = "{}{}_{}_{}_{}_{}{}.{}".format(
prefix,
source, telescope, species, fluxUnit,
self.resolutionToString(self.resolution),
comment_string,
dataFormat
)
if len(self.comments) == 0:
if str(self.map_name) != (str(prefix) + str(source) + '_' +
str(telescope) + '_' + str(species) +
'_' + str(fluxUnit) + '_' +
str(resolution) + '.' +
str(dataFormat)):
copy_command = "cp {} {}".format(self.map_name,
target_file_name)
subprocess.call(copy_command, shell=True)
self.map_name = target_file_name
if len(self.comments) != 0:
if ((str(self.map_name) != str(prefix) + str(source) + '_' +
str(telescope) + '_' + str(species) + '_' + str(fluxUnit) +
'_' + str(resolution) + '_' + '_'.join(self.comments) + '.' +
str(dataFormat))):
os.system('cp ' + str(self.map_name) + ' ' + str(prefix) +
str(source) + '_' + str(telescope) + '_' +
str(species) + '_' + str(fluxUnit) + '_' +
self.resolutionToString(self.resolution) + '_' +
'_'.join(self.comments) + '.' + str(dataFormat))
self.map_name = (str(prefix) + str(source) + '_' +
str(telescope) + '_' + str(species) + '_' +
str(fluxUnit) + '_' +
self.resolutionToString(self.resolution) +
'_' + '_'.join(self.comments) +
'.' + str(dataFormat))
def _comment_to_string(self):
""" Converts the comment list to a string
"""
comment_string = "_".join(self.comments)
if comment_string != "":
comment_string = "_{}".format(comment_string)
return comment_string
[docs] def returnName(self, source=None, telescope=None, species=None,
fluxUnit=None, resolution=None, comments=None,
dataFormat=False, prefix=None):
'''
Returns the Name corresponding to the Name convention. Single keywords
can be changed.
This function is useful to generate a writeout name for a changed file
without overwriting the current ``self.map_name``.
Parameters
----------
All possible parameters from the "Naming Convention" plus the new
prefix.
'''
source = source or self.source
telescope = telescope or self.telescope
species = species or self.species
fluxUnit = fluxUnit or self.fluxUnit
resolution = resolution or self.resolution
prefix = prefix or self.prefix
if dataFormat is False:
dataFormat = self.dataFormat
if comments is None:
comments = self.comments
elif comments is not None:
comments = self.comments + comments
if len(comments) == 0:
if dataFormat is not None:
return (str(prefix) + str(source) + '_' + str(telescope) +
'_' + str(species) + '_' + str(fluxUnit) + '_' +
self.resolutionToString(resolution) + '.' +
str(dataFormat))
if dataFormat is None:
return (str(prefix) + str(source) + '_' + str(telescope) +
'_' + str(species) + '_' + str(fluxUnit) + '_' +
self.resolutionToString(resolution))
if len(comments) != 0:
if dataFormat is not None:
return (str(prefix) + str(source) + '_' + str(telescope) +
'_' + str(species) + '_' + str(fluxUnit) + '_' +
self.resolutionToString(resolution) + '_' +
'_'.join(comments) + '.' + str(dataFormat))
if dataFormat is None:
return (str(prefix) + str(source) + '_' + str(telescope) +
'_' + str(species) + '_' + str(fluxUnit) + '_' +
self.resolutionToString(resolution) + '_' +
'_'.join(comments))
[docs] def flux_conversion(self, x=None, major=None, minor=None,
nu_or_lambda='nu', direction=None):
r"""
Calulates conversion between K.km/s and Jy/beam and vise versa.
Parameters
----------
x : float [GHz]
Wavelength/frequency. Defaults to the frequency of the loaded map,
i.e. self.frequency
major : float
Major Axis Beam (arcsec). Default None, i.e. using self.resolution.
minor : float
Minor Axis Beam(arcsec). Default None, i.e. using self.resolution.
nu_or_lambda : string
Choose type of x: frequency = ``'nu'`` or wavelength =
``'lambda'``.
direction : string
choose conversion direction ``'kelvin_to_jansky'``
means Kelvin to Jansky; ``'jansky_to_kelvin'`` Jansky to Kelvin.
Notes
-----
If self.frequency and self.resolution are correctly set, this functions
does not need any input. Otherwise this has to be given explicitly.
"""
if direction is not None and (direction != 'kelvin_to_jansky'
or direction != 'jansky_to_kelvin'):
print ('Keyword Error direction has to be kelvin_to_jansky or'
'jansky_to_kelvin -> Exit!')
if self.fluxUnit in ['JyB', 'Jy/Beam'] and direction is None:
direction = 'jansky_to_kelvin'
if self.fluxUnit in ['Tmb', 'T', 'Kkms'] and direction is None:
direction = 'kelvin_to_jansky'
print self.fluxUnit
if ((self.fluxUnit not in ['JyB', 'Jy/Beam'] and self.fluxUnit
not in ['Tmb', 'T', 'Kkms'])):
print ('Map is not in the right units has to be Jy/beam or '
'Kelvin something. -> Exit!')
sys.exit()
if nu_or_lambda == 'lambda':
if direction == 'jansky_to_kelvin':
def fcon(x, major, minor):
return units.jansky_to_kelvin(x, major,
minor, nu_or_lambda='lambda')
if direction == 'kelvin_to_jansky':
def fcon(x, major, minor):
return units.kelvin_to_jansky(x, major,
minor, nu_or_lambda='lambda')
if nu_or_lambda == 'nu':
if direction == 'jansky_to_kelvin':
def fcon(frequency, major, minor):
return units.jansky_to_kelvin(x, major,
minor, nu_or_lambda='nu')
if direction == 'kelvin_to_jansky':
def fcon(frequency, major, minor):
return units.kelvin_to_jansky(x, major,
minor, nu_or_lambda='nu')
if x is None:
if self.frequency is not np.nan:
x = self.frequency / 1e9
elif self.frequency is np.nan:
print 'No frequency information present. Can not proceed.'
if major is None:
major = self.resolution[0]
if minor is None:
minor = self.resolution[1]
return fcon(x, major, minor)