Skip to content

source.py

Class to describe a VAST astrophysical source.

Source

This is a class representation of a catalogued source position

Attributes:

Name Type Description
pipeline bool

Set to True if the source is generated from a VAST Pipeline run.

coord astropy.coordinates.SkyCoord

The coordinate of the source as a SkyCoord object. Planets can sometimes have a SkyCoord containing more than one coordinate.

name str

The name of the source.

epochs List[str]

The epochs the source contains.

fields List[str]

The fields the source contains.

stokes str

The Stokes parameter of the source.

crossmatch_radius astropy.coordinates.Angle

Angle of the crossmatch. This will not be valid for pipeline sources.

measurements pandas.core.frame.DataFrame

The individual measurements of the source.

islands bool

Set to True if islands have been used for the source creation.

outdir str

Path that will be appended to any files that are saved.

base_folder str

The directory where the data (fits files) is held.

image_type str

'TILES' or 'COMBINED'.

tiles bool

True if image_type == TILES.

corrected_data bool

Access the corrected data. Only relevant if tiles is True. Defaults to True.

detections int

The number of selavy detections the source contains.

limits int

The number of upper limits the source contains. Will be set to None for pipeline sources.

forced_fits int

The number of forced fits the source contains.

norms astropy.visualization.ImageNormalize

Contains the normalization value to use for consistent normalization across the measurements for png representation.

planet bool

Set to True if the source has been defined as a planet.

Source code in vasttools/source.py
class Source:
    """
    This is a class representation of a catalogued source position

    Attributes:
        pipeline (bool): Set to `True` if the source is generated from a VAST
            Pipeline run.
        coord (astropy.coordinates.SkyCoord):
            The coordinate of the source as a SkyCoord object.
            Planets can sometimes have a SkyCoord containing more than
            one coordinate.
        name (str): The name of the source.
        epochs (List[str]): The epochs the source contains.
        fields (List[str]): The fields the source contains.
        stokes (str): The Stokes parameter of the source.
        crossmatch_radius (astropy.coordinates.Angle):
            Angle of the crossmatch. This will not be valid for
            pipeline sources.
        measurements (pandas.core.frame.DataFrame):
            The individual measurements of the source.
        islands (bool): Set to `True` if islands have been used for the
            source creation.
        outdir (str): Path that will be appended to any files that are saved.
        base_folder (str): The directory where the data (fits files) is held.
        image_type (str): 'TILES' or 'COMBINED'.
        tiles (bool): `True` if `image_type` == `TILES`.
        corrected_data (bool): Access the corrected data. Only relevant if
            `tiles` is `True`. Defaults to `True`.
        detections (int): The number of selavy detections the source contains.
        limits (int):
            The number of upper limits the source contains. Will be set to
            `None` for pipeline sources.
        forced_fits (int): The number of forced fits the source contains.
        norms (astropy.visualization.ImageNormalize):
            Contains the normalization value to use for consistent
            normalization across the measurements for png representation.
        planet (bool): Set to `True` if the source has been defined
            as a planet.
    """

    def __init__(
        self,
        coord: SkyCoord,
        name: str,
        epochs: List[str],
        fields: List[str],
        stokes: str,
        primary_field: str,
        crossmatch_radius: Angle,
        measurements: pd.DataFrame,
        base_folder: str,
        image_type: str = "COMBINED",
        islands: bool = False,
        outdir: str = ".",
        planet: bool = False,
        pipeline: bool = False,
        tiles: bool = False,
        corrected_data: bool = True,
        forced_fits: bool = False,
    ) -> None:
        """
        Constructor method

        Args:
            coord: Source coordinates.
            name: The name of the source.
            epochs: The epochs that the source contains.
            fields: The fields that the source contains.
            stokes: The stokes parameter of the source.
            primary_field: The primary VAST Pilot field of the source.
            crossmatch_radius: The crossmatch radius used to find the
                measurements.
            measurements: DataFrame containing the measurements.
            base_folder: Path to base folder in default directory structure
            image_type: The string representation of the image type,
                either 'COMBINED' or 'TILES', defaults to "COMBINED".
            islands: Is `True` if islands has been used instead of
                components, defaults to `False`.
            outdir: The directory where any media outputs will be written
                to, defaults to ".".
            planet: Set to `True` if the source is a planet, defaults
                to `False`.
            pipeline: Set to `True` if the source has been loaded from a
                VAST Pipeline run, defaults to `False`.
            tiles: Set to 'True` if the source is from a tile images,
                defaults to `False`.
            corrected_data: Access the corrected data. Only relevant if
                `tiles` is `True`. Defaults to `True`.
            forced_fits: Set to `True` if forced fits are included in the
                source measurements, defaults to `False`.

        Returns:
            None
        """
        self.logger = logging.getLogger('vasttools.source.Source')
        self.logger.debug('Created Source instance')
        self.pipeline = pipeline
        self.coord = coord
        self.name = name
        self.epochs = epochs
        self.fields = fields
        self.stokes = stokes
        self.primary_field = primary_field
        self.crossmatch_radius = crossmatch_radius
        self.measurements = measurements.infer_objects()
        self.measurements.dateobs = pd.to_datetime(
            self.measurements.dateobs
        )
        self.islands = islands
        if self.islands:
            self.cat_type = 'islands'
        else:
            self.cat_type = 'components'

        self.outdir = outdir

        self.base_folder = base_folder
        self.image_type = image_type
        if image_type == 'TILES':
            self.tiles = True
        else:
            self.tiles = False

        self.corrected_data = corrected_data
        if self.pipeline:
            self.detections = self.measurements[
                self.measurements.forced == False
            ].shape[0]

            self.forced = self.measurements[
                self.measurements.forced == False
            ].shape[0]

            self.limits = None
            self.forced_fits = False
        else:
            self.detections = self.measurements[
                self.measurements.detection
            ].shape[0]

            self.limits = self.measurements[
                self.measurements.detection == False
            ].shape[0]

            self.forced = None
            self.forced_fits = forced_fits

        self._cutouts_got = False

        self.norms = None
        self._checked_norms = False

        self.planet = planet

    def write_measurements(
        self, simple: bool = False, outfile: Optional[str] = None
    ) -> None:
        """Write the measurements to a CSV file.

        Args:
            simple: Only include flux density and uncertainty in returned
                table, defaults to `False`.
            outfile: File to write measurements to, defaults to None.

        Returns:
            None
        """
        if simple:
            if self.pipeline:
                cols = [
                    'source',
                    'ra',
                    'dec',
                    'component_id',
                    'flux_peak',
                    'flux_peak_err',
                    'flux_int',
                    'flux_int_err',
                    'rms',
                ]
            else:
                cols = [
                    'name',
                    'ra_deg_cont',
                    'dec_deg_cont',
                    'component_id',
                    'flux_peak',
                    'flux_peak_err',
                    'flux_int',
                    'flux_int_err',
                    'rms_image',
                ]

            measurements_to_write = self.measurements[cols]

        else:
            cols = [
                'fields',
                'skycoord',
                'selavy',
                'image',
                'rms',
            ]

            if self.pipeline:
                cols[0] = 'field'

            measurements_to_write = self.measurements.drop(
                labels=cols, axis=1
            )

        # drop any empty values
        if not self.pipeline and not self.forced_fits:
            measurements_to_write = measurements_to_write[
                measurements_to_write['rms_image'] != -99
            ]

        if measurements_to_write.empty:
            self.logger.warning(
                "%s has no measurements! No file will be written.",
                self.name
            )
            return

        if outfile is None:
            outfile = "{}_measurements.csv".format(self.name.replace(
                " ", "_"
            ).replace(
                "/", "_"
            ))

        elif not outfile.endswith(".csv"):
            outfile += ".csv"

        if self.outdir != ".":
            outfile = os.path.join(
                self.outdir,
                outfile
            )

        measurements_to_write.to_csv(outfile, index=False)

        self.logger.debug("Wrote {}.".format(outfile))

    def plot_lightcurve(
        self,
        sigma_thresh: int = 5,
        figsize: Tuple[int, int] = (8, 4),
        min_points: int = 2,
        min_detections: int = 0,
        mjd: bool = False,
        # TODO: Is this a pd.Timestamp or a datetime?
        start_date: Optional[pd.Timestamp] = None,
        grid: bool = False,
        yaxis_start: str = "0",
        peak_flux: bool = True,
        save: bool = False,
        outfile: Optional[str] = None,
        use_forced_for_limits: bool = False,
        use_forced_for_all: bool = False,
        hide_legend: bool = False,
        plot_dpi: int = 150
    ) -> Union[None, plt.Figure]:
        """
        Plot source lightcurves and save to file

        Args:
            sigma_thresh: Threshold to use for upper limits, defaults to 5.
            figsize: Figure size, defaults to (8, 4).
            min_points: Minimum number of points for plotting, defaults
                to 2.
            min_detections:  Minimum number of detections for plotting,
                defaults to 0.
            mjd: Plot x-axis in MJD rather than datetime, defaults to False.
            start_date: Plot in days from start date, defaults to None.
            grid: Turn on matplotlib grid, defaults to False.
            yaxis_start: Define where the y-axis begins from, either 'auto'
                or '0', defaults to "0".
            peak_flux: Uses peak flux instead of integrated flux,
                defaults to `True`.
            save: When `True` the plot is saved rather than displayed,
                defaults to `False`.
            outfile: The filename to save when using, defaults to None which
                will use '<souce_name>_lc.png'.
            use_forced_for_limits: Use the forced extractions instead of
                upper limits for non-detections., defaults to `False`.
            use_forced_for_all: Use the forced fits for all the datapoints,
                defaults to `False`.
            hide_legend: Hide the legend, defaults to `False`.
            plot_dpi: Specify the DPI of saved figures, defaults to 150.

        Returns:
            None if save is `True` or the matplotlib figure if save is
            `False`.

        Raises:
            SourcePlottingError: Source does not have any forced fits when the
                'use_forced_for_all' or 'use_forced_for_limits' options have
                been selected.
            SourcePlottingError: Number of detections lower than the
                minimum required.
            SourcePlottingError: Number of datapoints lower than the
                minimum required.
            SourcePlottingError: If measurements dataframe is empty.
        """
        if use_forced_for_all or use_forced_for_limits:
            if not self.forced_fits:
                raise SourcePlottingError(
                    "Source does not have any forced fits points to plot."
                )

        if self.detections < min_detections:
            msg = (
                f"Number of detections ({self.detections}) lower "
                f"than minimum required ({min_detections})"
            )
            self.logger.error(msg)
            raise SourcePlottingError(msg)

        if self.measurements.shape[0] < min_points:
            msg = (
                f"Number of datapoints ({self.measurements.shape[0]}) lower "
                f"than minimum required ({min_points})"
            )
            self.logger.error(msg)
            raise SourcePlottingError(msg)

        if mjd and start_date is not None:
            msg = (
                "The 'mjd' and 'start date' options "
                "cannot be used at the same time!"
            )
            self.logger.error(msg)
            raise SourcePlottingError(msg)

        # remove empty values
        measurements_df = self.measurements
        if not self.pipeline and not (
            use_forced_for_limits or use_forced_for_all
        ):
            measurements_df = self.measurements[
                self.measurements['rms_image'] != -99
            ]

        if measurements_df.empty:
            msg = f"{self.name} has no measurements!"
            self.logger.error(msg)
            raise SourcePlottingError(msg)

        # Build figure and labels
        fig = plt.figure(figsize=figsize)
        ax = fig.add_subplot(111)
        plot_title = self.name
        if self.islands:
            plot_title += " (island)"
        ax.set_title(plot_title)

        if peak_flux:
            label = 'Peak Flux Density (mJy/beam)'
            flux_col = "flux_peak"
        else:
            label = 'Integrated Flux Density (mJy)'
            flux_col = "flux_int"

        if use_forced_for_all:
            label = "Forced " + label
            flux_col = "f_" + flux_col

        if self.stokes != "I":
            label = "Absolute " + label
            measurements_df[flux_col] = measurements_df[flux_col].abs()

        ax.set_ylabel(label)

        freq_col = 'frequency'

        grouped_df = measurements_df.groupby(freq_col)
        freqs = list(grouped_df.groups.keys())

        # Colours for each frequency
        freq_cmap = plt.cm.get_cmap('viridis')
        cNorm = matplotlib.colors.Normalize(
            vmin=min(freqs), vmax=max(freqs) * 1.1)
        scalarMap = matplotlib.cm.ScalarMappable(norm=cNorm, cmap=freq_cmap)
        sm = scalarMap
        sm._A = []

        # Markers for each frequency
        markers = ['o', 'D', '*', 'X', 's', 'd', 'p']

        self.logger.debug("Frequencies: {}".format(freqs))
        for i, (freq, measurements) in enumerate(grouped_df):
            self.logger.debug("Plotting {} MHz data".format(freq))
            marker = markers[i % len(markers)]
            marker_colour = sm.to_rgba(freq)
            plot_dates = measurements['dateobs']
            self.logger.debug(plot_dates)
            if mjd:
                plot_dates = Time(plot_dates.to_numpy()).mjd
            elif start_date:
                plot_dates -= start_date
                plot_dates /= pd.Timedelta(1, unit='d')

            self.logger.debug("Plotting upper limit")
            if self.pipeline:
                upper_lim_mask = measurements.forced
            else:
                upper_lim_mask = measurements.detection == False
                if use_forced_for_all:
                    upper_lim_mask = np.array([False for i in upper_lim_mask])
            upper_lims = measurements[
                upper_lim_mask
            ]
            if self.pipeline:
                if peak_flux:
                    value_col = 'flux_peak'
                    err_value_col = 'flux_peak_err'
                else:
                    value_col = 'flux_int'
                    err_value_col = 'flux_int_err'
                uplims = False
                sigma_thresh = 1.0
                label = 'Forced'
                markerfacecolor = 'w'
            else:
                if use_forced_for_limits:
                    value_col = 'f_flux_peak'
                    err_value_col = 'f_flux_peak_err'
                    uplims = False
                    sigma_thresh = 1.0
                    markerfacecolor = 'w'
                    label = "Forced"
                else:
                    value_col = err_value_col = 'rms_image'
                    uplims = True
                    markerfacecolor = marker_colour
                    label = 'Upper limit'
            if upper_lim_mask.any():
                upperlim_points = ax.errorbar(
                    plot_dates[upper_lim_mask],
                    sigma_thresh *
                    upper_lims[value_col],
                    yerr=upper_lims[err_value_col],
                    uplims=uplims,
                    lolims=False,
                    marker=marker,
                    c=marker_colour,
                    linestyle="none",
                    markerfacecolor=markerfacecolor
                )

            self.logger.debug("Plotting detection")

            if use_forced_for_all:
                detections = measurements
            else:
                detections = measurements[
                    ~upper_lim_mask
                ]

            if self.pipeline:
                if peak_flux:
                    err_value_col = 'flux_peak_err'
                else:
                    err_value_col = 'flux_int_err'
            else:
                if use_forced_for_all:
                    err_value_col = flux_col + '_err'
                else:
                    err_value_col = 'rms_image'

            if use_forced_for_all:
                markerfacecolor = 'w'
                label = 'Forced'
            else:
                markerfacecolor = marker_colour
                label = 'Selavy'
            if (~upper_lim_mask).any():
                detection_points = ax.errorbar(
                    plot_dates[~upper_lim_mask],
                    detections[flux_col],
                    yerr=detections[err_value_col],
                    marker=marker,
                    c=marker_colour,
                    linestyle="none",
                    markerfacecolor=markerfacecolor
                )

        self.logger.debug("Plotting finished.")
        if self.pipeline:
            upper_lim_mask = measurements_df.forced
        else:
            upper_lim_mask = measurements_df.detection == False

        if use_forced_for_all:
            detections = measurements_df
        else:
            detections = measurements_df[
                ~upper_lim_mask
            ]
        if yaxis_start == "0":
            max_det = detections.loc[:, [flux_col, err_value_col]].sum(axis=1)
            if use_forced_for_limits or self.pipeline:
                max_y = np.nanmax(
                    max_det.tolist() +
                    upper_lims[value_col].tolist()
                )
            elif use_forced_for_all:
                max_y = np.nanmax(max_det.tolist())
            else:
                max_y = np.nanmax(
                    max_det.tolist() +
                    (sigma_thresh * upper_lims[err_value_col]).tolist()
                )
            ax.set_ylim(
                bottom=0,
                top=max_y * 1.1
            )

        if mjd:
            ax.set_xlabel('Date (MJD)')
        elif start_date:
            ax.set_xlabel('Days since {}'.format(start_date))
        else:
            fig.autofmt_xdate()
            ax.set_xlabel('Date')

            date_form = mdates.DateFormatter("%Y-%m-%d")
            ax.xaxis.set_major_formatter(date_form)
            ax.xaxis.set_major_locator(mdates.AutoDateLocator(maxticks=15))

        ax.grid(grid)

        if not hide_legend:
            # Manually create legend artists for consistency.
            # Using dummy points throws a matplotlib warning.
            handles = []
            labels = []
            for i, freq in enumerate(freqs):
                line = Line2D([],
                              [],
                              ls="",
                              marker=markers[i],
                              color=sm.to_rgba(freq)
                              )
                barline = LineCollection(np.empty((2, 2, 2)))

                err = ErrorbarContainer((line, None, [barline]),
                                        has_yerr=True
                                        )
                handles.append(err)
                labels.append('{} MHz'.format(freq))

            ax.legend(handles=handles, labels=labels)

        if save:
            if outfile is None:
                outfile = "{}_lc.png".format(self.name.replace(
                    " ", "_"
                ).replace(
                    "/", "_"
                ))

            elif not outfile.endswith(".png"):
                outfile += ".png"

            if self.outdir != ".":
                outfile = os.path.join(
                    self.outdir,
                    outfile
                )

            plt.savefig(outfile, bbox_inches='tight', dpi=plot_dpi)
            plt.close()

            return

        else:

            return fig

    def get_cutout_data(self, size: Optional[Angle] = None) -> None:
        """
        Function to fetch the cutout data for that source
        required for producing all the media output.

        If size is not provided then the default size of 5 arcmin will be
        used.

        Args:
            size: The angular size of the cutouts, defaults to None.

        Returns:
            None
        """
        if size is None:
            args = None
        else:
            args = (size,)

        self.cutout_df = self.measurements.apply(
            self._get_cutout,
            args=args,
            axis=1,
            result_type='expand'
        ).rename(columns={
            0: "data",
            1: "wcs",
            2: "header",
            3: "selavy_overlay",
            4: "beam"
        })
        self._cutouts_got = True

    def _analyse_norm_level(
        self,
        percentile: float = 99.9,
        zscale: bool = False,
        z_contrast: float = 0.2,
        cutout_data: Optional[pd.DataFrame] = None,
        return_norm: bool = False
    ) -> Union[None, ImageNormalize]:
        """
        Selects the appropriate image to use as the normalization
        value for each image.

        Either the first `detection` image is used, or the first image
        in time if there are no detections.

        Args:
            percentile: The value passed to the percentile
                normalization function, defaults to 99.9.
            zscale: Uses ZScale normalization instead of PercentileInterval,
                defaults to `False`
            z_contrast: Contast value passed to the ZScaleInterval
                function when zscale is selected, defaults to 0.2.
            cutout_data: Pass external cutout_data to be used
                instead of fetching the data, defaults to None.
            return_norm: If `True` the calculated norm is returned
                by the function, defaults to False.

        Returns:
            None if return_norm is `False` or the normalization if `True`.

        Raises:
            ValueError: If the cutout data is yet to be obtained.
        """
        if cutout_data is None:
            if not self._cutouts_got:
                self.logger.warning(
                    "Fetch cutout data before running this function!"
                )

                raise ValueError(
                    "Fetch cutout data before running this function!"
                )
            else:
                cutout_data = self.cutout_df

        if self.detections > 0:
            scale_index = self.measurements[
                self.measurements.detection
            ].index.values[0]
        else:
            scale_index = 0

        scale_data = cutout_data.loc[scale_index].data * 1.e3

        if zscale:
            norms = ImageNormalize(
                scale_data, interval=ZScaleInterval(
                    contrast=z_contrast))
        else:
            norms = ImageNormalize(
                scale_data,
                interval=PercentileInterval(percentile),
                stretch=LinearStretch())

        if return_norm:
            return norms
        else:
            self.norms = norms

        self._checked_norms = True

    def _get_cutout(
        self, row: pd.Series, size: Angle = Angle(5. * u.arcmin)
    ) -> Tuple[np.ndarray, WCS, fits.Header, pd.DataFrame, Beam]:
        """Does the actual fetching of the cutout data.

        Args:
            row: The row in the measurements df for which
                media will be fetched.
            size: The size of the cutout, defaults to Angle(5.*u.arcmin)

        Returns:
            Tuple containing the cutout data.
        """

        self._size = size

        if self.pipeline:
            image = Image(
                row.field, row.epoch, self.stokes, self.base_folder,
                path=row.image, rmspath=row.rms,
                corrected_data=self.corrected_data
            )
            image.get_img_data()
        else:
            e = row.epoch
            if "-" in e:
                e = e.split("-")[0]
            image = Image(
                row.field, e, self.stokes,
                self.base_folder, tiles=self.tiles,
                sbid=row.sbid, corrected_data=self.corrected_data
            )
            image.get_img_data()

        cutout = Cutout2D(
            image.data,
            position=row.skycoord,
            size=size,
            wcs=image.wcs
        )

        cutout_data = copy.deepcopy(cutout.data)
        cutout_wcs = copy.deepcopy(cutout.wcs)

        header = copy.deepcopy(image.header)
        header.update(cutout.wcs.to_header())

        beam = image.beam

        del cutout
        del image

        if self.pipeline:
            selavy_components = pd.read_parquet(
                row.selavy,
                columns=[
                    'island_id',
                    'ra',
                    'dec',
                    'bmaj',
                    'bmin',
                    'pa'
                ]
            ).rename(
                columns={
                    'ra': 'ra_deg_cont',
                    'dec': 'dec_deg_cont',
                    'bmaj': 'maj_axis',
                    'bmin': 'min_axis',
                    'pa': 'pos_ang'
                }
            )
        else:
            selavy_components = read_selavy(row.selavy, cols=[
                'island_id',
                'ra_deg_cont',
                'dec_deg_cont',
                'maj_axis',
                'min_axis',
                'pos_ang'
            ])

        selavy_coords = SkyCoord(
            selavy_components.ra_deg_cont.values,
            selavy_components.dec_deg_cont.values,
            unit=(u.deg, u.deg)
        )

        selavy_components = filter_selavy_components(
            selavy_components,
            selavy_coords,
            size,
            row.skycoord
        )

        del selavy_coords

        return (
            cutout_data, cutout_wcs, header, selavy_components, beam
        )

    def show_png_cutout(
        self,
        index: int,
        selavy: bool = True,
        percentile: float = 99.9,
        zscale: bool = False,
        contrast: float = 0.2,
        no_islands: bool = True,
        label: str = "Source",
        no_colorbar: bool = False,
        title: Optional[str] = None,
        crossmatch_overlay: bool = False,
        hide_beam: bool = False,
        size: Optional[Angle] = None,
        force: bool = False,
        offset_axes: bool = True,
    ) -> plt.Figure:
        """
        Wrapper for make_png to make nicer interactive function.
        No access to save.

        Args:
            index: Index of the observation to show.
            selavy: If `True` then selavy overlay are shown,
                 defaults to `True`.
            percentile: The value passed to the percentile
                normalization function, defaults to 99.9.
            zscale: Uses ZScale normalization instead of
                PercentileInterval, defaults to `False`.
            contrast: Contrast value passed to the ZScaleInterval
                function when zscale is selected, defaults to 0.2.
            no_islands: Hide island name labels, defaults to `True`.
            label: legend label for source, defaults to "Source".
            no_colorbar: Hides the colorbar, defaults to `False`.
            title: Sets the plot title, defaults to None.
            crossmatch_overlay: Plots a circle that represents the
                crossmatch radius, defaults to `False`.
            hide_beam: Hide the beam on the plot, defaults to `False`.
            size: Size of the cutout, defaults to None.
            force: Whether to force the re-fetching of the cutout data,
                defaults to `False`.
            offset_axes: Use offset, rather than absolute, axis labels.

        Returns:
            Figure object.
        """

        fig = self.make_png(
            index,
            selavy=selavy,
            percentile=percentile,
            zscale=zscale,
            contrast=contrast,
            no_islands=no_islands,
            label=label,
            no_colorbar=no_colorbar,
            title=title,
            crossmatch_overlay=crossmatch_overlay,
            hide_beam=hide_beam,
            size=size,
            force=force,
            offset_axes=offset_axes
        )

        return fig

    def save_png_cutout(
        self,
        index: int,
        selavy: bool = True,
        percentile: float = 99.9,
        zscale: bool = False,
        contrast: float = 0.2,
        no_islands: bool = True,
        label: str = "Source",
        no_colorbar: bool = False,
        title: Optional[str] = None,
        crossmatch_overlay: bool = False,
        hide_beam: bool = False,
        size: Optional[Angle] = None,
        force: bool = False,
        outfile: Optional[str] = None,
        plot_dpi: int = 150,
        offset_axes: bool = True
    ) -> None:
        """
        Wrapper for make_png to make nicer interactive function.
        Always save.

        Args:
            index: Index of the observation to show.
            selavy: If `True` then selavy overlay are shown,
                 defaults to `True`.
            percentile: The value passed to the percentile
                normalization function, defaults to 99.9.
            zscale: Uses ZScale normalization instead of
                PercentileInterval, defaults to `False`.
            contrast: Contrast value passed to the ZScaleInterval
                function when zscale is selected, defaults to 0.2.
            no_islands: Hide island name labels, defaults to `True`.
            label: legend label for source, defaults to "Source".
            no_colorbar: Hides the colorbar, defaults to `False`.
            title: Sets the plot title, defaults to None.
            crossmatch_overlay: Plots a circle that represents the
                crossmatch radius, defaults to `False`.
            hide_beam: Hide the beam on the plot, defaults to `False`.
            size: Size of the cutout, defaults to None.
            force: Whether to force the re-fetching
                of the cutout data, defaults to `False`.
            outfile: Name to give the file, if None then the name is
                automatically generated, defaults to None.
            plot_dpi: Specify the DPI of saved figures, defaults to 150.
            offset_axes: Use offset, rather than absolute, axis labels.

        Returns:
            None
        """
        fig = self.make_png(
            index,
            selavy=selavy,
            percentile=percentile,
            zscale=zscale,
            contrast=contrast,
            no_islands=no_islands,
            label=label,
            no_colorbar=no_colorbar,
            title=title,
            crossmatch_overlay=crossmatch_overlay,
            hide_beam=hide_beam,
            size=size,
            force=force,
            outfile=outfile,
            save=True,
            plot_dpi=plot_dpi,
            offset_axes=offset_axes
        )

        return

    def _get_save_name(self,
                       index: int,
                       ext: str
                       ) -> str:
        """
        Generate name of file to save to.

        Args:
            index: Index of the requested data
            ext: File extension

        Returns:
            Name of file to save.
        """

        row = self.measurements.iloc[index]

        if not ext.startswith("."):
            ext = f".{ext}"

        source_name = self.name.replace(" ", "_").replace("/", "_")

        if self.pipeline:
            outfile = f"{source_name}_{index}{ext}"
        else:
            field_name = row.field
            sbid = row.sbid

            outfile = f"{source_name}_{field_name}_SB{sbid}{ext}"

        return outfile

    def save_fits_cutout(
        self,
        index: int,
        outfile: Optional[str] = None,
        size: Optional[Angle] = None,
        force: bool = False,
        cutout_data: Optional[pd.DataFrame] = None
    ) -> None:
        """
        Saves the FITS file cutout of the requested observation.

        Args:
            index: The index of the requested observation.
            outfile: File to save to, defaults to None.
            size: Size of the cutout, defaults to None.
            force: Whether to force the re-fetching
                of the cutout data, defaults to `False`.
            cutout_data: Pass external cutout_data to be used
                instead of fetching the data, defaults to None.

        Returns:
            None

        Raises:
            ValueError: If the source does not contain the requested index.
        """

        if (self._cutouts_got is False) or (force):
            if cutout_data is None:
                self.get_cutout_data(size)

        if outfile is None:
            outfile = self._get_save_name(index, ".fits")

        if self.outdir != ".":
            outfile = os.path.join(
                self.outdir,
                outfile
            )
        if cutout_data is None:
            cutout_row = self.cutout_df.iloc[index]
        else:
            cutout_row = cutout_data.iloc[index]

        hdu_stamp = fits.PrimaryHDU(
            data=cutout_row.data,
            header=cutout_row.header
        )

        # Write the cutout to a new FITS file
        hdu_stamp.writeto(outfile, overwrite=True)
        self.logger.debug(f"Wrote to {outfile}")

        del hdu_stamp

    def _save_noisemap_cutout(
        self,
        index: int,
        cutout_data: pd.DataFrame,
        noisemap_type: str,
        outfile: Optional[str] = None,
    ) -> None:
        """
        Saves the FITS file cutout of the requested observation.

        Args:
            index: The index of the requested observation.
            cutout_data: The external cutout data to be used.
            noisemap_type: The type of noisemap to use. Must be 'rms' or 'bkg'.
            outfile: File to save to, defaults to None.

        Returns:
            None

        Raises:
            ValueError: If the noisemap_type is not 'rms' or 'bkg'
        """

        if noisemap_type not in ['rms', 'bkg']:
            raise ValueError("noisemap_type must be 'rms' or 'bkg'")
        if outfile is None:
            outfile = self._get_save_name(index, f"{noisemap_type}.fits")

        if self.outdir != ".":
            outfile = os.path.join(
                self.outdir,
                outfile
            )
        cutout_row = cutout_data.iloc[index]

        hdu_stamp = fits.PrimaryHDU(
            data=cutout_row[f'{noisemap_type}_data'],
            header=cutout_row[f'{noisemap_type}_header']
        )

        # Write the cutout to a new FITS file
        hdu_stamp.writeto(outfile, overwrite=True)
        self.logger.debug(f"Wrote to {outfile}")

        del hdu_stamp

    def save_all_ann(
        self,
        crossmatch_overlay: bool = False,
        cutout_data: Optional[pd.DataFrame] = None
    ) -> None:
        """
        Save kvis annotation file corresponding to the source

        Args:
            crossmatch_overlay: Include the crossmatch radius,
                defaults to `False`
            cutout_data: Pass external cutout_data to be used
                instead of fetching the data, defaults to None.

        Returns:
            None
        """
        indices = self.measurements.index.to_series()
        indices.apply(
            self.write_ann,
            args=(
                None,
                crossmatch_overlay,
                None,
                False,
                cutout_data
            )
        )

    def save_all_reg(
        self,
        crossmatch_overlay: bool = False,
        cutout_data: Optional[pd.DataFrame] = None
    ) -> None:
        """
        Save DS9 region file corresponding to the source

        Args:
            crossmatch_overlay: Include the crossmatch radius,
                defaults to `False`
            cutout_data: Pass external cutout_data to be used
                instead of fetching the data, defaults to None.

        Returns:
            None
        """

        indices = self.measurements.index.to_series()
        indices.apply(
            self.write_reg,
            args=(
                None,
                crossmatch_overlay,
                None,
                False,
                cutout_data
            )
        )

    def save_all_fits_cutouts(
        self,
        size: Optional[Angle] = None,
        force: bool = False,
        cutout_data: Optional[pd.DataFrame] = None
    ) -> None:
        """
        Save all cutouts of the source to fits file

        Args:
            size: Size of the cutouts, defaults to None.
            force: Whether to force the re-fetching
                of the cutout data, defaults to `False`.
            cutout_data: Pass external cutout_data to be used
                instead of fetching the data, defaults to None.

        Returns:
            None
        """
        if (self._cutouts_got is False) or (force):
            if cutout_data is None:
                self.get_cutout_data(size)

        self.logger.debug("Saving fits cutouts...")

        if cutout_data is None:
            indices = self.measurements.index
        else:
            indices = cutout_data.index

        for i in indices:
            self.save_fits_cutout(i, cutout_data=cutout_data)

    def _save_all_noisemap_cutouts(
        self,
        cutout_data: pd.DataFrame,
        rms: bool = True,
        bkg: bool = True,
    ) -> None:
        """
        Save all cutouts of the source to fits file

        Args:
            cutout_data: The external data to be used.
            rms: Create rms map cutouts.
            bkg: Create bkg map cutouts.

        Returns:
            None
        """

        self.logger.debug("Saving noisemap cutouts...")

        for i in cutout_data.index:
            if rms:
                self._save_noisemap_cutout(i, cutout_data, 'rms')
            if bkg:
                self._save_noisemap_cutout(i, cutout_data, 'bkg')

    def save_all_png_cutouts(
        self,
        selavy: bool = True,
        percentile: float = 99.9,
        zscale: bool = False,
        contrast: float = 0.2,
        no_islands: bool = True,
        no_colorbar: bool = False,
        crossmatch_overlay: bool = False,
        hide_beam: bool = False,
        size: Optional[Angle] = None,
        disable_autoscaling: bool = False,
        cutout_data: Optional[pd.DataFrame] = None,
        calc_script_norms: bool = False,
        plot_dpi: int = 150,
        offset_axes: bool = True
    ) -> None:
        """
        Wrapper function to save all the png cutouts
        for all epochs.

        Args:
            selavy: If `True` then selavy overlay are shown,
                 defaults to `True`.
            percentile: The value passed to the percentile
                normalization function, defaults to 99.9.
            zscale: Uses ZScale normalization instead of
                PercentileInterval, defaults to `False`.
            contrast: Contrast value passed to the ZScaleInterval
                function when zscale is selected, defaults to 0.2.
            no_islands: Hide island name labels, defaults to `True`.
            no_colorbar: Hides the colorbar, defaults to `False`.
            crossmatch_overlay: Plots a circle that represents the
                crossmatch radius, defaults to `False`.
            hide_beam: Hide the beam on the plot, defaults to `False`.
            size: Size of the cutout, defaults to None.
            disable_autoscaling: Do not use the consistent normalization
                values but calculate norms separately for each epoch,
                defaults to `False`.
            cutout_data: Pass external cutout_data to be used
                instead of fetching the data, defaults to None.
            calc_script_norms: When passing cutout data this parameter
                can be set to True to pass this cutout data to the analyse
                norms function, defaults to False.
            plot_dpi: Specify the DPI of saved figures, defaults to 150.
            offset_axes: Use offset, rather than absolute, axis labels.

        Returns:
            None
        """
        if self._cutouts_got is False:
            if cutout_data is None:
                self.get_cutout_data(size)

        if not calc_script_norms:
            if not self._checked_norms:
                self._analyse_norm_level(
                    percentile=percentile,
                    zscale=zscale,
                    z_contrast=contrast
                )
            norms = None
        else:
            norms = self._analyse_norm_level(
                percentile=percentile,
                zscale=zscale,
                z_contrast=contrast,
                cutout_data=cutout_data
            )

        indices = self.measurements.index.to_series()
        indices.apply(
            self.make_png,
            args=(
                selavy,
                percentile,
                zscale,
                contrast,
                None,
                no_islands,
                "Source",
                no_colorbar,
                None,
                crossmatch_overlay,
                hide_beam,
                True,
                None,
                False,
                disable_autoscaling,
                cutout_data,
                norms,
                plot_dpi,
                offset_axes
            )
        )

    def show_all_png_cutouts(
        self,
        columns: int = 4,
        percentile: float = 99.9,
        zscale: bool = False,
        contrast: float = 0.1,
        outfile: Optional[str] = None,
        save: bool = False,
        size: Optional[Angle] = None,
        figsize: Tuple[int, int] = (10, 5),
        force: bool = False,
        no_selavy: bool = False,
        disable_autoscaling: bool = False,
        hide_epoch_labels: bool = False,
        plot_dpi: int = 150,
        offset_axes: bool = True
    ) -> Union[None, plt.figure]:
        """
        Creates a grid plot showing the source in each epoch.

        Args:
            columns: Number of columns to use for the grid plot,
                defaults to 4.
            percentile: The value passed to the percentile
                normalization function, defaults to 99.9.
            zscale: Uses ZScale normalization instead of
                PercentileInterval, defaults to `False`.
            contrast: Contast value passed to the ZScaleInterval
                function when zscale is selected, defaults to 0.2.
            outfile: Name of the output file, if None then the name
                 is automatically generated, defaults to None.
            save: Save the plot instead of displaying,
                defaults to `False`.
            size: Size of the cutout, defaults to None.
            figsize: Size of the matplotlib.pyplot figure,
                defaults to (10, 5).
            force: Whether to force the re-fetching
                of the cutout data, defaults to `False`.
            no_selavy: When `True` the selavy overlay
                is hidden, defaults to `False`.
            disable_autoscaling: Turn off the consistent normalization and
                 calculate the normalizations separately for each epoch,
                defaults to `False`.
            hide_epoch_labels: Turn off the epoch number label (found in
                top left corner of image).
            plot_dpi: Specify the DPI of saved figures, defaults to 150.
            offset_axes: Use offset, rather than absolute, axis labels.

        Returns:
            None is save is `True` or the Figure if `False`.
        """

        if (self._cutouts_got is False) or (force):
            self.get_cutout_data(size)

        num_plots = self.measurements.shape[0]
        nrows = int(np.ceil(num_plots / columns))

        fig = plt.figure(figsize=figsize)

        fig.tight_layout()

        plots = {}

        if not self._checked_norms or force:
            self._analyse_norm_level(
                percentile=percentile,
                zscale=zscale,
                z_contrast=contrast
            )
        img_norms = self.norms

        for i in range(num_plots):
            cutout_row = self.cutout_df.iloc[i]
            measurement_row = self.measurements.iloc[i]
            target_coords = np.array(
                ([[
                    measurement_row.ra,
                    measurement_row.dec
                ]])
            )
            i += 1
            plots[i] = fig.add_subplot(
                nrows,
                columns,
                i,
                projection=cutout_row.wcs
            )

            if disable_autoscaling:
                if zscale:
                    img_norms = ImageNormalize(
                        cutout_row.data * 1.e3,
                        interval=ZScaleInterval(
                            contrast=contrast
                        )
                    )
                else:
                    img_norms = ImageNormalize(
                        cutout_row.data * 1.e3,
                        interval=PercentileInterval(percentile),
                        stretch=LinearStretch())

            im = plots[i].imshow(
                cutout_row.data * 1.e3, norm=img_norms, cmap="gray_r"
            )

            epoch_time = measurement_row.dateobs
            epoch = measurement_row.epoch

            plots[i].set_title('{}'.format(
                epoch_time.strftime("%Y-%m-%d %H:%M:%S")
            ))

            if not hide_epoch_labels:
                plots[i].text(
                    0.05, 0.9, f"{epoch}", transform=plots[i].transAxes
                )

            cross_target_coords = cutout_row.wcs.wcs_world2pix(
                target_coords, 0
            )
            crosshair_lines = self._create_crosshair_lines(
                cross_target_coords,
                0.15,
                0.15,
                cutout_row.data.shape
            )

            if (not cutout_row['selavy_overlay'].empty) and (not no_selavy):
                plots[i].set_autoscale_on(False)
                (
                    collection,
                    patches,
                    island_names
                ) = self._gen_overlay_collection(
                    cutout_row
                )
                plots[i].add_collection(collection, autolim=False)

            if self.forced_fits:
                (
                    collection,
                    patches,
                    island_names
                ) = self._gen_overlay_collection(
                    cutout_row, f_source=measurement_row
                )
                plots[i].add_collection(collection, autolim=False)
                del collection

            [plots[i].plot(
                l[0], l[1], color="C3", zorder=10, lw=1.5, alpha=0.6
            ) for l in crosshair_lines]

            lon = plots[i].coords[0]
            lat = plots[i].coords[1]

            lon.set_ticks_visible(False)
            lon.set_ticklabel_visible(False)
            lat.set_ticks_visible(False)
            lat.set_ticklabel_visible(False)

        if save:
            if outfile is None:
                outfile = "{}_cutouts.png".format(self.name.replace(
                    " ", "_"
                ).replace(
                    "/", "_"
                ))

            elif not outfile.endswith(".png"):
                outfile += ".png"

            if self.outdir != ".":
                outfile = os.path.join(
                    self.outdir,
                    outfile
                )

            plt.savefig(outfile, bbox_inches='tight', dpi=plot_dpi)

            plt.close()

            return

        else:

            return fig

    def _gen_overlay_collection(
        self,
        cutout_row: pd.Series,
        f_source: Optional[pd.DataFrame] = None
    ) -> Tuple[PatchCollection, Patch, List[str]]:
        """
        Generates the ellipse collection for selavy sources to add
        to the matplotlib axis.

        Args:
            cutout_row: The row containing the selavy data
                to make the ellipses from.
            f_source: Forced fit extraction to create the
                 forced fit ellipse, defaults to None.

        Returns:
            Tuple of the ellipse collection, patches, and the island names.
        """

        wcs = cutout_row.wcs
        selavy_sources = cutout_row.selavy_overlay
        pix_scale = proj_plane_pixel_scales(wcs)
        sx = pix_scale[0]
        sy = pix_scale[1]
        degrees_per_pixel = np.sqrt(sx * sy)

        # define ellipse properties for clarity, selavy cut will have
        # already been created.
        if f_source is None:
            ww = selavy_sources["maj_axis"]
            hh = selavy_sources["min_axis"]
            aa = selavy_sources["pos_ang"]
            x = selavy_sources["ra_deg_cont"]
            y = selavy_sources["dec_deg_cont"]
        else:
            ww = np.array([f_source["f_maj_axis"]])
            hh = np.array([f_source["f_min_axis"]])
            aa = np.array([f_source["f_pos_ang"]])
            x = np.array([f_source["ra"]])
            y = np.array([f_source["dec"]])

        ww = ww.astype(float) / 3600.
        hh = hh.astype(float) / 3600.
        ww /= degrees_per_pixel
        hh /= degrees_per_pixel
        aa = aa.astype(float)
        x = x.astype(float)
        y = y.astype(float)

        coordinates = np.column_stack((x, y))

        coordinates = wcs.wcs_world2pix(coordinates, 0)

        # Create ellipses, collect them, add to axis.
        # Also where correction is applied to PA to account for how selavy
        # defines it vs matplotlib
        if f_source is None:
            island_names = selavy_sources["island_id"].apply(
                self._remove_sbid
            )
            colors = ["C2" if c.startswith(
                "n") else "C1" for c in island_names]
        else:
            island_names = [f_source["f_island_id"], ]
            colors = ["C3" for c in island_names]

        patches = [Ellipse(
            coordinates[i], hh[i], ww[i],
            aa[i]) for i in range(len(coordinates))]
        collection = PatchCollection(
            patches,
            facecolors="None",
            edgecolors=colors,
            lw=1.5)

        return collection, patches, island_names

    def skyview_contour_plot(
        self,
        index: int,
        survey: str,
        contour_levels: List[float] = [3., 5., 10., 15.],
        percentile: float = 99.9,
        zscale: bool = False,
        contrast: float = 0.2,
        outfile: Optional[str] = None,
        no_colorbar: bool = False,
        title: Optional[str] = None,
        save: bool = False,
        size: Optional[Angle] = None,
        force: bool = False,
        plot_dpi: int = 150,
    ) -> Union[None, plt.figure]:
        """
        Fetches a FITS file from SkyView of the requested survey at
        the source location and overlays ASKAP contours.

        Args:
            index: Index of the requested ASKAP observation.
            survey: Survey requested to be fetched using SkyView.
            contour_levels: Contour levels to plot which are multiples
                 of the local rms, defaults to [3., 5., 10., 15.].
            percentile: The value passed to the percentile
                normalization function, defaults to 99.9.
            zscale: Uses ZScale normalization instead of
                PercentileInterval, defaults to `False`.
            contrast: Contrast value passed to the ZScaleInterval
                function when zscale is selected, defaults to 0.2.
            outfile: Name to give the file, if None then the name is
                automatically generated, defaults to None.
            no_colorbar: Hides the colorbar, defaults to `False`.
            title: Plot title, defaults to None.
            save: Saves the file instead of returing the figure,
                defaults to `False`.
            size: Size of the cutout, defaults to None.
            force: Whether to force the re-fetching
                of the cutout data, defaults to `False`.
            plot_dpi: Specify the DPI of saved figures, defaults to 150.

        Returns:
            None if save is `True` or the figure object if `False`

        Raises:
            ValueError: If the index is out of range.
            ValueError: If the requested survey is not valid.
        """

        if (self._cutouts_got is False) or (force):
            self.get_cutout_data(size)

        size = self._size

        surveys = list(SkyView.survey_dict.values())
        survey_list = [item for sublist in surveys for item in sublist]

        if survey not in survey_list:
            raise ValueError(f"{survey} is not a valid SkyView survey name")

        if index > len(self.measurements):
            raise ValueError(f"Cannot access {index}th measurement.")
            return

        if outfile is None:
            outfile = self._get_save_name(index, ".png")

        if self.outdir != ".":
            outfile = os.path.join(
                self.outdir,
                outfile
            )

        try:
            paths = SkyView.get_images(
                position=self.measurements.iloc[index]['skycoord'],
                survey=[survey], radius=size
            )
            path_fits = paths[0][0]

            path_wcs = WCS(path_fits.header)

        except Exception as e:
            warnings.warn("SkyView fetch failed!")
            warnings.warn(e)
            return

        fig = plt.figure(figsize=(8, 8))
        ax = fig.add_subplot(111, projection=path_wcs)

        mean_vast, median_vast, rms_vast = sigma_clipped_stats(
            self.cutout_df.iloc[index].data
        )

        levels = [
            i * rms_vast for i in contour_levels
        ]

        if zscale:
            norm = ImageNormalize(
                path_fits.data,
                interval=ZScaleInterval(
                    contrast=contrast
                )
            )
        else:
            norm = ImageNormalize(
                path_fits.data,
                interval=PercentileInterval(percentile),
                stretch=LinearStretch()
            )

        im = ax.imshow(path_fits.data, norm=norm, cmap='gray_r')

        ax.contour(
            self.cutout_df.iloc[index].data,
            levels=levels,
            transform=ax.get_transform(self.cutout_df.iloc[index].wcs),
            colors='C0',
            zorder=10,
        )

        if title is None:
            obs_time = self.measurements.iloc[index].dateobs
            title = "{} {}".format(
                self.name,
                obs_time.strftime(
                    "%Y-%m-%d %H:%M:%S"
                )
            )

        ax.set_title(title)

        lon = ax.coords[0]
        lat = ax.coords[1]
        lon.set_axislabel("Right Ascension (J2000)")
        lat.set_axislabel("Declination (J2000)")

        if not no_colorbar:
            divider = make_axes_locatable(ax)
            cax = divider.append_axes(
                "right", size="3%", pad=0.1, axes_class=maxes.Axes)
            cb = fig.colorbar(im, cax=cax)

        if save:
            plt.savefig(outfile, bbox_inches="tight", dpi=plot_dpi)
            self.logger.debug("Saved {}".format(outfile))

            plt.close(fig)

            return
        else:
            return fig

    def make_png(
        self,
        index: int,
        selavy: bool = True,
        percentile: float = 99.9,
        zscale: bool = False,
        contrast: float = 0.2,
        outfile: Optional[str] = None,
        no_islands: bool = True,
        label: str = "Source",
        no_colorbar: bool = False,
        title: Optional[str] = None,
        crossmatch_overlay: bool = False,
        hide_beam: bool = False,
        save: bool = False,
        size: Optional[Angle] = None,
        force: bool = False,
        disable_autoscaling: bool = False,
        cutout_data: Optional[pd.DataFrame] = None,
        norms: Optional[ImageNormalize] = None,
        plot_dpi: int = 150,
        offset_axes: bool = True
    ) -> Union[None, plt.figure]:
        """
        Save a PNG of the image postagestamp.

        Args:
            index: The index correpsonding to the requested observation.
            selavy: `True` to overlay selavy components, `False` otherwise.
            percentile: The value passed to the percentile
                normalization function, defaults to 99.9.
            zscale: Uses ZScale normalization instead of
                PercentileInterval, defaults to `False`.
            contrast: Contrast value passed to the ZScaleInterval
                function when zscale is selected, defaults to 0.2.
            outfile: Name to give the file, if None then the name is
                automatically generated, defaults to None.
            no_islands: Disable island lables on the png, defaults to
                `False`.
            label: Figure title (usually the name of the source of
                interest), defaults to "Source".
            no_colorbar: If `True`, do not show the colorbar on the png,
                defaults to `False`.
            title: String to set as title,
                defaults to None where a default title will be used.
            crossmatch_overlay: If 'True' then a circle is added to the png
                plot representing the crossmatch radius, defaults to `False`.
            hide_beam: If 'True' then the beam is not plotted onto the png
                plot, defaults to `False`.
            save: If `True` the plot is saved rather than the figure being
                returned, defaults to `False`.
            size: Size of the cutout, defaults to None.
            force: Whether to force the re-fetching of the cutout data,
                defaults to `False`.
            disable_autoscaling: Turn off the consistent normalization and
                calculate the normalizations separately for each observation,
                defaults to `False`.
            cutout_data: Pass external cutout_data to be used
                instead of fetching the data, defaults to None.
            norms: Pass external normalization to be used
                instead of internal calculations.
            plot_dpi: Specify the DPI of saved figures, defaults to 150.
            offset_axes: Use offset, rather than absolute, axis labels.

        Returns:
            None if save is `True` or the figure object if `False`

        Raises:
            ValueError: If the index is out of range.
        """

        if (self._cutouts_got is False) or (force):
            if cutout_data is None:
                self.get_cutout_data(size)

        if index > len(self.measurements):
            raise ValueError(f"Cannot access {index}th measurement.")

        if outfile is None:
            outfile = self._get_save_name(index, ".png")

        if self.outdir != ".":
            outfile = os.path.join(
                self.outdir,
                outfile
            )

        if cutout_data is None:
            cutout_row = self.cutout_df.iloc[index]
        else:
            cutout_row = cutout_data.iloc[index]

        fig = plt.figure(figsize=(8, 8))
        ax = fig.add_subplot(111, projection=cutout_row.wcs)
        # Get the Image Normalisation from zscale, user contrast.
        if not disable_autoscaling:
            if norms is not None:
                img_norms = norms
            else:
                if not self._checked_norms:
                    self._analyse_norm_level(
                        percentile=percentile,
                        zscale=zscale,
                        z_contrast=contrast
                    )
                img_norms = self.norms
        else:
            if zscale:
                img_norms = ImageNormalize(
                    cutout_row.data * 1.e3,
                    interval=ZScaleInterval(
                        contrast=contrast
                    ))
            else:
                img_norms = ImageNormalize(
                    cutout_row.data * 1.e3,
                    interval=PercentileInterval(percentile),
                    stretch=LinearStretch())

        im = ax.imshow(
            cutout_row.data * 1.e3,
            norm=img_norms,
            cmap="gray_r"
        )

        # insert crosshair of target
        target_coords = np.array(
            ([[
                self.measurements.iloc[index].ra,
                self.measurements.iloc[index].dec
            ]])
        )

        target_coords = cutout_row.wcs.wcs_world2pix(
            target_coords, 0
        )

        crosshair_lines = self._create_crosshair_lines(
            target_coords,
            0.03,
            0.03,
            cutout_row.data.shape
        )

        [ax.plot(
            l[0], l[1], color="C3", zorder=10, lw=1.5, alpha=0.6
        ) for l in crosshair_lines]
        # the commented lines below are to use the crosshair
        # marker directly.
        # ax.scatter(
        #     [self.src_coord.ra.deg], [self.src_coord.dec.deg],
        #     transform=ax.get_transform('world'), marker="c",
        #     color="C3", zorder=10, label=label, s=1000, lw=1.5,
        #     alpha=0.5
        # )
        if crossmatch_overlay:
            try:
                crossmatch_patch = SphericalCircle(
                    (
                        self.measurements.iloc[index].skycoord.ra,
                        self.measurements.iloc[index].skycoord.dec
                    ),
                    self.crossmatch_radius,
                    transform=ax.get_transform('world'),
                    label="Crossmatch radius ({:.1f} arcsec)".format(
                        self.crossmatch_radius.arcsec
                    ), edgecolor='C4', facecolor='none', alpha=0.8)
                ax.add_patch(crossmatch_patch)
            except Exception as e:
                self.logger.warning(
                    "Crossmatch circle png overlay failed!"
                    " Has the source been crossmatched?")
                crossmatch_overlay = False

        if (not cutout_row['selavy_overlay'].empty) and selavy:
            ax.set_autoscale_on(False)
            collection, patches, island_names = self._gen_overlay_collection(
                cutout_row
            )
            ax.add_collection(collection, autolim=False)
            del collection

            # Add island labels, haven't found a better way other than looping
            # at the moment.
            if not no_islands and not self.islands:
                for i, val in enumerate(patches):
                    ax.annotate(
                        island_names[i],
                        val.center,
                        annotation_clip=True,
                        color="C0",
                        weight="bold")
        else:
            self.logger.debug(
                "PNG: No selavy selected or selavy catalogue failed. (%s)",
                self.name
            )

        if self.forced_fits:
            collection, patches, island_names = self._gen_overlay_collection(
                cutout_row,
                f_source=self.measurements.iloc[index]
            )
            ax.add_collection(collection, autolim=False)
            del collection

        legend_elements = [
            Line2D(
                [0], [0], marker='c', color='C3', label=label,
                markerfacecolor='g', ls="none", markersize=8
            )
        ]

        if selavy:
            legend_elements.append(
                Line2D(
                    [0], [0], marker='o', color='C1',
                    label="Selavy {}".format(self.cat_type),
                    markerfacecolor='none', ls="none", markersize=10
                )
            )

        if crossmatch_overlay:
            legend_elements.append(
                Line2D(
                    [0], [0], marker='o', color='C4',
                    label="Crossmatch radius ({:.1f} arcsec)".format(
                        self.crossmatch_radius.arcsec
                    ),
                    markerfacecolor='none', ls="none",
                    markersize=10
                )
            )

        if self.forced_fits:
            legend_elements.append(
                Line2D(
                    [0], [0], marker='o', color='C3',
                    label="Forced Fit",
                    markerfacecolor='none', ls="none",
                    markersize=10
                )
            )

        ax.legend(handles=legend_elements)
        lon = ax.coords[0]
        lat = ax.coords[1]
        lon.set_axislabel("Right Ascension (J2000)")
        lat.set_axislabel("Declination (J2000)")

        if not no_colorbar:
            divider = make_axes_locatable(ax)
            cax = divider.append_axes(
                "right", size="3%", pad=0.1, axes_class=maxes.Axes)
            cb = fig.colorbar(im, cax=cax)
            cb.set_label("mJy/beam")

        if title is None:
            obs_time = self.measurements.iloc[index].dateobs
            title = "{} {}".format(
                self.name,
                obs_time.strftime(
                    "%Y-%m-%d %H:%M:%S"
                )
            )

        ax.set_title(title)

        if cutout_row.beam is not None and hide_beam is False:
            img_beam = cutout_row.beam
            if cutout_row.wcs.is_celestial:
                major = img_beam.major.value
                minor = img_beam.minor.value
                pa = img_beam.pa.value
                pix_scale = proj_plane_pixel_scales(
                    cutout_row.wcs
                )
                sx = pix_scale[0]
                sy = pix_scale[1]
                degrees_per_pixel = np.sqrt(sx * sy)
                minor /= degrees_per_pixel
                major /= degrees_per_pixel

                png_beam = AnchoredEllipse(
                    ax.transData, width=minor,
                    height=major, angle=pa, loc="lower right",
                    pad=0.5, borderpad=0.4,
                    frameon=False)
                png_beam.ellipse.set_edgecolor("k")
                png_beam.ellipse.set_facecolor("w")
                png_beam.ellipse.set_linewidth(1.5)

                ax.add_artist(png_beam)
        else:
            self.logger.debug("Hiding beam.")

        if offset_axes:
            axis_units = u.arcmin

            if size is None and cutout_row.wcs.is_celestial:
                pix_scale = proj_plane_pixel_scales(
                    cutout_row.wcs
                )
                sx = pix_scale[0]
                sy = pix_scale[1]
                xlims = ax.get_xlim()
                ylims = ax.get_ylim()

                xsize = sx * (xlims[1] - xlims[0])
                ysize = sy * (ylims[1] - ylims[0])
                size = max([xsize, ysize]) * u.deg

            if size is not None:
                if size < 2 * u.arcmin:
                    axis_units = u.arcsec
                elif size > 2 * u.deg:
                    axis_units = u.deg

            offset_postagestamp_axes(ax,
                                     self.coord,
                                     ra_units=axis_units,
                                     dec_units=axis_units
                                     )

        if save:
            plt.savefig(outfile, bbox_inches="tight", dpi=plot_dpi)
            self.logger.debug("Saved {}".format(outfile))

            plt.close(fig)
            return

        else:
            return fig

    def write_ann(
        self,
        index: int,
        outfile: str = None,
        crossmatch_overlay: bool = False,
        size: Optional[Angle] = None,
        force: bool = False,
        cutout_data: Optional[pd.DataFrame] = None
    ) -> None:
        """
        Write a kvis annotation file containing all selavy sources
        within the image.

        Args:
            index: The index correpsonding to the requested observation.
            outfile: Name of the file to write, defaults to None.
            crossmatch_overlay: If True, a circle is added to the
                annotation file output denoting the crossmatch radius,
                defaults to False.
            size: Size of the cutout, defaults to None.
            force: Whether to force the re-fetching
                of the cutout data, defaults to `False`
            cutout_data: Pass external cutout_data to be used
                instead of fetching the data, defaults to None.

        Returns:
            None
        """
        if (self._cutouts_got is False) or (force):
            if cutout_data is None:
                self.get_cutout_data(size)

        if outfile is None:
            outfile = self._get_save_name(index, ".ann")
        if self.outdir != ".":
            outfile = os.path.join(
                self.outdir,
                outfile
            )

        neg = False
        with open(outfile, 'w') as f:
            f.write("COORD W\n")
            f.write("PA SKY\n")
            f.write("FONT hershey14\n")
            f.write("COLOR BLUE\n")
            f.write("CROSS {0} {1} {2} {2}\n".format(
                self.measurements.iloc[index].ra,
                self.measurements.iloc[index].dec,
                3. / 3600.
            ))
            if crossmatch_overlay:
                try:
                    f.write("CIRCLE {} {} {}\n".format(
                        self.measurements.iloc[index].ra,
                        self.measurements.iloc[index].dec,
                        self.crossmatch_radius.deg
                    ))
                except Exception as e:
                    self.logger.warning(
                        "Crossmatch circle overlay failed!"
                        " Has the source been crossmatched?")
            f.write("COLOR GREEN\n")

            if cutout_data is None:
                selavy_cat_cut = self.cutout_df.iloc[index].selavy_overlay
            else:
                selavy_cat_cut = cutout_data.iloc[index].selavy_overlay

            for i, row in selavy_cat_cut.iterrows():
                if row["island_id"].startswith("n"):
                    neg = True
                    f.write("COLOR RED\n")
                ra = row["ra_deg_cont"]
                dec = row["dec_deg_cont"]
                f.write(
                    "ELLIPSE {} {} {} {} {}\n".format(
                        ra,
                        dec,
                        float(
                            row["maj_axis"]) /
                        3600. /
                        2.,
                        float(
                            row["min_axis"]) /
                        3600. /
                        2.,
                        float(
                            row["pos_ang"])))
                f.write(
                    "TEXT {} {} {}\n".format(
                        ra, dec, self._remove_sbid(
                            row["island_id"])))
                if neg:
                    f.write("COLOR GREEN\n")
                    neg = False

        self.logger.debug("Wrote annotation file {}.".format(outfile))

    def write_reg(
        self,
        index: int,
        outfile: Optional[str] = None,
        crossmatch_overlay: bool = False,
        size: Optional[Angle] = None,
        force: bool = False,
        cutout_data: Optional[pd.DataFrame] = None
    ) -> None:
        """
        Write a DS9 region file containing all selavy sources within the image

        Args:
            index: The index correpsonding to the requested observation.
            outfile: Name of the file to write, defaults to None.
            crossmatch_overlay: If True, a circle is added to the
                annotation file output denoting the crossmatch radius,
                defaults to False.
            size: Size of the cutout, defaults to None.
            force: Whether to force the re-fetching
                of the cutout data, defaults to `False`.
            cutout_data: Pass external cutout_data to be used
                instead of fetching the data, defaults to None.

        Returns:
            None
        """
        if (self._cutouts_got is False) or (force):
            if cutout_data is None:
                self.get_cutout_data(size)

        if outfile is None:
            outfile = self._get_save_name(index, ".reg")
        if self.outdir != ".":
            outfile = os.path.join(
                self.outdir,
                outfile
            )

        with open(outfile, 'w') as f:
            f.write("# Region file format: DS9 version 4.0\n")
            f.write("global color=green font=\"helvetica 10 normal\" "
                    "select=1 highlite=1 edit=1 "
                    "move=1 delete=1 include=1 "
                    "fixed=0 source=1\n")
            f.write("fk5\n")
            f.write(
                "point({} {}) # point=x color=blue\n".format(
                    self.measurements.iloc[index].ra,
                    self.measurements.iloc[index].dec,
                ))
            if crossmatch_overlay:
                try:
                    f.write("circle({} {} {}) # color=blue\n".format(
                        self.measurements.iloc[index].ra,
                        self.measurements.iloc[index].dec,
                        self.crossmatch_radius.deg
                    ))
                except Exception as e:
                    self.logger.warning(
                        "Crossmatch circle overlay failed!"
                        " Has the source been crossmatched?")

            if cutout_data is None:
                selavy_cat_cut = self.cutout_df.iloc[index].selavy_overlay
            else:
                selavy_cat_cut = cutout_data.iloc[index].selavy_overlay

            for i, row in selavy_cat_cut.iterrows():
                if row["island_id"].startswith("n"):
                    color = "red"
                else:
                    color = "green"
                ra = row["ra_deg_cont"]
                dec = row["dec_deg_cont"]
                f.write(
                    "ellipse({} {} {} {} {}) # color={}\n".format(
                        ra,
                        dec,
                        float(
                            row["maj_axis"]) /
                        3600. /
                        2.,
                        float(
                            row["min_axis"]) /
                        3600. /
                        2.,
                        float(
                            row["pos_ang"]) +
                        90.,
                        color))
                f.write(
                    "text({} {} \"{}\") # color={}\n".format(
                        ra - (10. / 3600.), dec, self._remove_sbid(
                            row["island_id"]), color))

        self.logger.debug("Wrote region file {}.".format(outfile))

    def _remove_sbid(self, island: str) -> str:
        """Removes the SBID component of the island name.

        Takes into account negative 'n' label for negative sources.

        Args:
            island: island name.

        Returns:
            Truncated island name.
        """

        temp = island.split("_")
        new_val = "_".join(temp[-2:])
        if temp[0].startswith("n"):
            new_val = "n" + new_val
        return new_val

    def _create_crosshair_lines(
        self,
        target: np.ndarray,
        pixel_buff: float,
        length: float,
        img_size: Tuple[int, int]
    ) -> List[List[float]]:
        """
        Takes the target pixel coordinates and creates the plots
        that are required to plot a 'crosshair' marker.

        To keep the crosshair consistent between scales, the values are
        provided as percentages of the image size.

        Args:
            target: The target in pixel coordinates.
            pixel_buff: Percentage of image size that is the buffer
                of the crosshair, i.e. the distance between the target and
                beginning of the line.
            length: Size of the line of the crosshair, again as a
                percentage of the image size.
            img_size: Tuple size of the image array.

        Returns:
            List of pairs of pixel coordinates to plot using scatter.
        """
        x = target[0][0]
        y = target[0][1]

        min_size = np.min(img_size)

        pixel_buff = int(min_size * pixel_buff)
        length = int(min_size * length)

        plots = []

        plots.append([[x, x], [y + pixel_buff, y + pixel_buff + length]])
        plots.append([[x, x], [y - pixel_buff, y - pixel_buff - length]])
        plots.append([[x + pixel_buff, x + pixel_buff + length], [y, y]])
        plots.append([[x - pixel_buff, x - pixel_buff - length], [y, y]])

        return plots

    def simbad_search(
        self, radius: Angle = Angle(20. * u.arcsec)
    ) -> Union[None, Table]:
        """
        Searches SIMBAD for object coordinates and returns matches.

        Args:
            radius: Radius to search, defaults to Angle(20. * u.arcsec)

        Returns:
            Table of matches or None if no matches

        Raises:
            ValueError: Error in performing the SIMBAD region search.
        """
        Simbad.add_votable_fields('ra(d)', 'dec(d)')

        try:
            result_table = Simbad.query_region(self.coord, radius=radius)
            if result_table is None:
                return None

            return result_table

        except Exception as e:
            raise ValueError(
                "Error in performing the SIMBAD region search! Error: %s", e
            )
            return None

    def ned_search(
        self, radius: Angle = Angle(20. * u.arcsec)
    ) -> Union[None, Table]:
        """
        Searches NED for object coordinates and returns matches.

        Args:
            radius: Radius to search, defaults to Angle(20. * u.arcsec).

        Returns:
            Table of matches or None if no matches

        Raises:
            ValueError: Error in performing the NED region search.
        """
        try:
            result_table = Ned.query_region(self.coord, radius=radius)

            return result_table

        except Exception as e:
            raise ValueError(
                "Error in performing the NED region search! Error: %s", e
            )
            return None

    def casda_search(
        self,
        radius: Angle = Angle(20. * u.arcsec),
        filter_out_unreleased: bool = False,
        show_all: bool = False
    ) -> Union[None, Table]:
        """
        Searches CASDA for object coordinates and returns matches.

        Args:
            radius: Radius to search, defaults to Angle(20. * u.arcsec).
            filter_out_unreleased: Remove unreleased data,
                defaults to `False`.
            show_all: Show all available data, defaults to `False`.

        Returns:
            Table of matches or None if no matches

        Raises:
            ValueError: Error in performing the CASDA region search.
        """
        try:
            result_table = Casda.query_region(self.coord, radius=radius)

            if filter_out_unreleased:
                result_table = Casda.filter_out_unreleased(result_table)
            if not show_all:
                mask = result_table[
                    'dataproduct_subtype'
                ] == 'cont.restored.t0'
                result_table = result_table[mask]
                mask = [(
                    ("image.i" in i) & ("taylor.0.res" in i)
                ) for i in result_table[
                    'filename'
                ]]
                result_table = result_table[mask]

            return result_table

        except Exception as e:
            raise ValueError(
                "Error in performing the CASDA region search! Error: %s", e
            )
            return None

    def _get_fluxes_and_errors(
        self,
        suffix: str,
        forced_fits: bool
    ) -> Tuple[pd.Series, pd.Series]:
        """
        Selects the correct fluxes, upper limits or forced fits
        to calculate the metrics

        Args:
            suffix: 'peak' or 'int'.
            forced_fits: Set to `True` if forced fits should be used.

        Returns:
            The fluxes and errors to use.
        """
        if self.pipeline:
            non_detect_label = 'flux_{}'.format(suffix)
            non_detect_label_err = 'flux_{}_err'.format(suffix)
            scale = 1.
            detection_label = 'forced'
            detection_value = False
        else:
            detection_label = 'detection'
            detection_value = True
            if forced_fits:
                non_detect_label = 'f_flux_{}'.format(suffix)
                non_detect_label_err = 'f_flux_{}_err'.format(suffix)
                scale = 1.
            else:
                scale = 5.
                non_detect_label = 'rms_image'
                non_detect_label_err = 'rms_image'

        detect_mask = self.measurements[detection_label] == detection_value

        detect_fluxes = (
            self.measurements[detect_mask]['flux_{}'.format(suffix)]
        )
        detect_errors = (
            self.measurements[detect_mask]['flux_{}_err'.format(
                suffix
            )]
        )

        non_detect_fluxes = (
            self.measurements[~detect_mask][non_detect_label] * scale
        )
        non_detect_errors = (
            self.measurements[~detect_mask][non_detect_label_err]
        )

        fluxes = pd.concat([detect_fluxes, non_detect_fluxes])
        errors = pd.concat([detect_errors, non_detect_errors])

        return fluxes, errors

    def calc_eta_metric(
        self,
        use_int: bool = False,
        forced_fits: bool = False
    ) -> float:
        """
        Calculate the eta variability metric

        Args:
            use_int: Calculate using integrated (rather than peak) flux,
                defaults to `False`
            forced_fits: Use forced fits, defaults to `False`

        Returns:
            Eta variability metric.

        Raises:
            Exception: No forced fits are present when forced fits have been
                selected.
        """
        if self.measurements.shape[0] == 1:
            return 0.

        suffix = 'int' if use_int else 'peak'

        if forced_fits and not self.forced_fits:
            raise Exception(
                "Forced fits selected but no forced fits are present!"
            )

        fluxes, errors = self._get_fluxes_and_errors(suffix, forced_fits)
        n_src = fluxes.shape[0]

        weights = 1. / errors**2
        eta = (n_src / (n_src - 1)) * (
            (weights * fluxes**2).mean() - (
                (weights * fluxes).mean()**2 / weights.mean()
            )
        )

        return eta

    def calc_v_metric(
        self,
        use_int: bool = False,
        forced_fits: bool = False
    ) -> float:
        """
        Calculate the V variability metric.

        Args:
            use_int: Calculate using integrated (rather than peak) flux,
                defaults to `False`.
            forced_fits: Use forced fits, defaults to `False`.

        Returns:
            V variability metric.

        Raises:
            Exception: No forced fits are present when forced fits have been
                selected.
        """
        if self.measurements.shape[0] == 1:
            return 0.

        suffix = 'int' if use_int else 'peak'

        if forced_fits and not self.forced_fits:
            raise Exception(
                "Forced fits selected but no forced fits are present!"
            )

        fluxes, _ = self._get_fluxes_and_errors(suffix, forced_fits)
        v = fluxes.std() / fluxes.mean()

        return v

    def calc_eta_and_v_metrics(
        self,
        use_int: bool = False,
        forced_fits: bool = False
    ) -> Tuple[float, float]:
        """
        Calculate both variability metrics

        Args:
            use_int: Calculate using integrated (rather than peak) flux,
                defaults to `False`.
            forced_fits: Use forced fits, defaults to `False`.

        Returns:
            Variability metrics eta and v.
        """

        eta = self.calc_eta_metric(use_int=use_int, forced_fits=forced_fits)
        v = self.calc_v_metric(use_int=use_int, forced_fits=forced_fits)

        return eta, v

__init__(self, coord, name, epochs, fields, stokes, primary_field, crossmatch_radius, measurements, base_folder, image_type='COMBINED', islands=False, outdir='.', planet=False, pipeline=False, tiles=False, corrected_data=True, forced_fits=False) special

Constructor method

Parameters:

Name Type Description Default
coord SkyCoord

Source coordinates.

required
name str

The name of the source.

required
epochs List[str]

The epochs that the source contains.

required
fields List[str]

The fields that the source contains.

required
stokes str

The stokes parameter of the source.

required
primary_field str

The primary VAST Pilot field of the source.

required
crossmatch_radius Angle

The crossmatch radius used to find the measurements.

required
measurements DataFrame

DataFrame containing the measurements.

required
base_folder str

Path to base folder in default directory structure

required
image_type str

The string representation of the image type, either 'COMBINED' or 'TILES', defaults to "COMBINED".

'COMBINED'
islands bool

Is True if islands has been used instead of components, defaults to False.

False
outdir str

The directory where any media outputs will be written to, defaults to ".".

'.'
planet bool

Set to True if the source is a planet, defaults to False.

False
pipeline bool

Set to True if the source has been loaded from a VAST Pipeline run, defaults to False.

False
tiles bool

Set to 'Trueif the source is from a tile images, defaults toFalse`.

False
corrected_data bool

Access the corrected data. Only relevant if tiles is True. Defaults to True.

True
forced_fits bool

Set to True if forced fits are included in the source measurements, defaults to False.

False

Returns:

Type Description
None

None

Source code in vasttools/source.py
def __init__(
    self,
    coord: SkyCoord,
    name: str,
    epochs: List[str],
    fields: List[str],
    stokes: str,
    primary_field: str,
    crossmatch_radius: Angle,
    measurements: pd.DataFrame,
    base_folder: str,
    image_type: str = "COMBINED",
    islands: bool = False,
    outdir: str = ".",
    planet: bool = False,
    pipeline: bool = False,
    tiles: bool = False,
    corrected_data: bool = True,
    forced_fits: bool = False,
) -> None:
    """
    Constructor method

    Args:
        coord: Source coordinates.
        name: The name of the source.
        epochs: The epochs that the source contains.
        fields: The fields that the source contains.
        stokes: The stokes parameter of the source.
        primary_field: The primary VAST Pilot field of the source.
        crossmatch_radius: The crossmatch radius used to find the
            measurements.
        measurements: DataFrame containing the measurements.
        base_folder: Path to base folder in default directory structure
        image_type: The string representation of the image type,
            either 'COMBINED' or 'TILES', defaults to "COMBINED".
        islands: Is `True` if islands has been used instead of
            components, defaults to `False`.
        outdir: The directory where any media outputs will be written
            to, defaults to ".".
        planet: Set to `True` if the source is a planet, defaults
            to `False`.
        pipeline: Set to `True` if the source has been loaded from a
            VAST Pipeline run, defaults to `False`.
        tiles: Set to 'True` if the source is from a tile images,
            defaults to `False`.
        corrected_data: Access the corrected data. Only relevant if
            `tiles` is `True`. Defaults to `True`.
        forced_fits: Set to `True` if forced fits are included in the
            source measurements, defaults to `False`.

    Returns:
        None
    """
    self.logger = logging.getLogger('vasttools.source.Source')
    self.logger.debug('Created Source instance')
    self.pipeline = pipeline
    self.coord = coord
    self.name = name
    self.epochs = epochs
    self.fields = fields
    self.stokes = stokes
    self.primary_field = primary_field
    self.crossmatch_radius = crossmatch_radius
    self.measurements = measurements.infer_objects()
    self.measurements.dateobs = pd.to_datetime(
        self.measurements.dateobs
    )
    self.islands = islands
    if self.islands:
        self.cat_type = 'islands'
    else:
        self.cat_type = 'components'

    self.outdir = outdir

    self.base_folder = base_folder
    self.image_type = image_type
    if image_type == 'TILES':
        self.tiles = True
    else:
        self.tiles = False

    self.corrected_data = corrected_data
    if self.pipeline:
        self.detections = self.measurements[
            self.measurements.forced == False
        ].shape[0]

        self.forced = self.measurements[
            self.measurements.forced == False
        ].shape[0]

        self.limits = None
        self.forced_fits = False
    else:
        self.detections = self.measurements[
            self.measurements.detection
        ].shape[0]

        self.limits = self.measurements[
            self.measurements.detection == False
        ].shape[0]

        self.forced = None
        self.forced_fits = forced_fits

    self._cutouts_got = False

    self.norms = None
    self._checked_norms = False

    self.planet = planet

calc_eta_and_v_metrics(self, use_int=False, forced_fits=False)

Calculate both variability metrics

Parameters:

Name Type Description Default
use_int bool

Calculate using integrated (rather than peak) flux, defaults to False.

False
forced_fits bool

Use forced fits, defaults to False.

False

Returns:

Type Description
Tuple[float, float]

Variability metrics eta and v.

Source code in vasttools/source.py
def calc_eta_and_v_metrics(
    self,
    use_int: bool = False,
    forced_fits: bool = False
) -> Tuple[float, float]:
    """
    Calculate both variability metrics

    Args:
        use_int: Calculate using integrated (rather than peak) flux,
            defaults to `False`.
        forced_fits: Use forced fits, defaults to `False`.

    Returns:
        Variability metrics eta and v.
    """

    eta = self.calc_eta_metric(use_int=use_int, forced_fits=forced_fits)
    v = self.calc_v_metric(use_int=use_int, forced_fits=forced_fits)

    return eta, v

calc_eta_metric(self, use_int=False, forced_fits=False)

Calculate the eta variability metric

Parameters:

Name Type Description Default
use_int bool

Calculate using integrated (rather than peak) flux, defaults to False

False
forced_fits bool

Use forced fits, defaults to False

False

Returns:

Type Description
float

Eta variability metric.

Exceptions:

Type Description
Exception

No forced fits are present when forced fits have been selected.

Source code in vasttools/source.py
def calc_eta_metric(
    self,
    use_int: bool = False,
    forced_fits: bool = False
) -> float:
    """
    Calculate the eta variability metric

    Args:
        use_int: Calculate using integrated (rather than peak) flux,
            defaults to `False`
        forced_fits: Use forced fits, defaults to `False`

    Returns:
        Eta variability metric.

    Raises:
        Exception: No forced fits are present when forced fits have been
            selected.
    """
    if self.measurements.shape[0] == 1:
        return 0.

    suffix = 'int' if use_int else 'peak'

    if forced_fits and not self.forced_fits:
        raise Exception(
            "Forced fits selected but no forced fits are present!"
        )

    fluxes, errors = self._get_fluxes_and_errors(suffix, forced_fits)
    n_src = fluxes.shape[0]

    weights = 1. / errors**2
    eta = (n_src / (n_src - 1)) * (
        (weights * fluxes**2).mean() - (
            (weights * fluxes).mean()**2 / weights.mean()
        )
    )

    return eta

calc_v_metric(self, use_int=False, forced_fits=False)

Calculate the V variability metric.

Parameters:

Name Type Description Default
use_int bool

Calculate using integrated (rather than peak) flux, defaults to False.

False
forced_fits bool

Use forced fits, defaults to False.

False

Returns:

Type Description
float

V variability metric.

Exceptions:

Type Description
Exception

No forced fits are present when forced fits have been selected.

Source code in vasttools/source.py
def calc_v_metric(
    self,
    use_int: bool = False,
    forced_fits: bool = False
) -> float:
    """
    Calculate the V variability metric.

    Args:
        use_int: Calculate using integrated (rather than peak) flux,
            defaults to `False`.
        forced_fits: Use forced fits, defaults to `False`.

    Returns:
        V variability metric.

    Raises:
        Exception: No forced fits are present when forced fits have been
            selected.
    """
    if self.measurements.shape[0] == 1:
        return 0.

    suffix = 'int' if use_int else 'peak'

    if forced_fits and not self.forced_fits:
        raise Exception(
            "Forced fits selected but no forced fits are present!"
        )

    fluxes, _ = self._get_fluxes_and_errors(suffix, forced_fits)
    v = fluxes.std() / fluxes.mean()

    return v

Searches CASDA for object coordinates and returns matches.

Parameters:

Name Type Description Default
radius Angle

Radius to search, defaults to Angle(20. * u.arcsec).

<Angle 20. arcsec>
filter_out_unreleased bool

Remove unreleased data, defaults to False.

False
show_all bool

Show all available data, defaults to False.

False

Returns:

Type Description
Optional[astropy.table.table.Table]

Table of matches or None if no matches

Exceptions:

Type Description
ValueError

Error in performing the CASDA region search.

Source code in vasttools/source.py
def casda_search(
    self,
    radius: Angle = Angle(20. * u.arcsec),
    filter_out_unreleased: bool = False,
    show_all: bool = False
) -> Union[None, Table]:
    """
    Searches CASDA for object coordinates and returns matches.

    Args:
        radius: Radius to search, defaults to Angle(20. * u.arcsec).
        filter_out_unreleased: Remove unreleased data,
            defaults to `False`.
        show_all: Show all available data, defaults to `False`.

    Returns:
        Table of matches or None if no matches

    Raises:
        ValueError: Error in performing the CASDA region search.
    """
    try:
        result_table = Casda.query_region(self.coord, radius=radius)

        if filter_out_unreleased:
            result_table = Casda.filter_out_unreleased(result_table)
        if not show_all:
            mask = result_table[
                'dataproduct_subtype'
            ] == 'cont.restored.t0'
            result_table = result_table[mask]
            mask = [(
                ("image.i" in i) & ("taylor.0.res" in i)
            ) for i in result_table[
                'filename'
            ]]
            result_table = result_table[mask]

        return result_table

    except Exception as e:
        raise ValueError(
            "Error in performing the CASDA region search! Error: %s", e
        )
        return None

get_cutout_data(self, size=None)

Function to fetch the cutout data for that source required for producing all the media output.

If size is not provided then the default size of 5 arcmin will be used.

Parameters:

Name Type Description Default
size Optional[astropy.coordinates.angles.Angle]

The angular size of the cutouts, defaults to None.

None

Returns:

Type Description
None

None

Source code in vasttools/source.py
def get_cutout_data(self, size: Optional[Angle] = None) -> None:
    """
    Function to fetch the cutout data for that source
    required for producing all the media output.

    If size is not provided then the default size of 5 arcmin will be
    used.

    Args:
        size: The angular size of the cutouts, defaults to None.

    Returns:
        None
    """
    if size is None:
        args = None
    else:
        args = (size,)

    self.cutout_df = self.measurements.apply(
        self._get_cutout,
        args=args,
        axis=1,
        result_type='expand'
    ).rename(columns={
        0: "data",
        1: "wcs",
        2: "header",
        3: "selavy_overlay",
        4: "beam"
    })
    self._cutouts_got = True

make_png(self, index, selavy=True, percentile=99.9, zscale=False, contrast=0.2, outfile=None, no_islands=True, label='Source', no_colorbar=False, title=None, crossmatch_overlay=False, hide_beam=False, save=False, size=None, force=False, disable_autoscaling=False, cutout_data=None, norms=None, plot_dpi=150, offset_axes=True)

Save a PNG of the image postagestamp.

Parameters:

Name Type Description Default
index int

The index correpsonding to the requested observation.

required
selavy bool

True to overlay selavy components, False otherwise.

True
percentile float

The value passed to the percentile normalization function, defaults to 99.9.

99.9
zscale bool

Uses ZScale normalization instead of PercentileInterval, defaults to False.

False
contrast float

Contrast value passed to the ZScaleInterval function when zscale is selected, defaults to 0.2.

0.2
outfile Optional[str]

Name to give the file, if None then the name is automatically generated, defaults to None.

None
no_islands bool

Disable island lables on the png, defaults to False.

True
label str

Figure title (usually the name of the source of interest), defaults to "Source".

'Source'
no_colorbar bool

If True, do not show the colorbar on the png, defaults to False.

False
title Optional[str]

String to set as title, defaults to None where a default title will be used.

None
crossmatch_overlay bool

If 'True' then a circle is added to the png plot representing the crossmatch radius, defaults to False.

False
hide_beam bool

If 'True' then the beam is not plotted onto the png plot, defaults to False.

False
save bool

If True the plot is saved rather than the figure being returned, defaults to False.

False
size Optional[astropy.coordinates.angles.Angle]

Size of the cutout, defaults to None.

None
force bool

Whether to force the re-fetching of the cutout data, defaults to False.

False
disable_autoscaling bool

Turn off the consistent normalization and calculate the normalizations separately for each observation, defaults to False.

False
cutout_data Optional[pandas.core.frame.DataFrame]

Pass external cutout_data to be used instead of fetching the data, defaults to None.

None
norms Optional[astropy.visualization.mpl_normalize.ImageNormalize]

Pass external normalization to be used instead of internal calculations.

None
plot_dpi int

Specify the DPI of saved figures, defaults to 150.

150
offset_axes bool

Use offset, rather than absolute, axis labels.

True

Returns:

Type Description
Optional[figure]

None if save is True or the figure object if False

Exceptions:

Type Description
ValueError

If the index is out of range.

Source code in vasttools/source.py
def make_png(
    self,
    index: int,
    selavy: bool = True,
    percentile: float = 99.9,
    zscale: bool = False,
    contrast: float = 0.2,
    outfile: Optional[str] = None,
    no_islands: bool = True,
    label: str = "Source",
    no_colorbar: bool = False,
    title: Optional[str] = None,
    crossmatch_overlay: bool = False,
    hide_beam: bool = False,
    save: bool = False,
    size: Optional[Angle] = None,
    force: bool = False,
    disable_autoscaling: bool = False,
    cutout_data: Optional[pd.DataFrame] = None,
    norms: Optional[ImageNormalize] = None,
    plot_dpi: int = 150,
    offset_axes: bool = True
) -> Union[None, plt.figure]:
    """
    Save a PNG of the image postagestamp.

    Args:
        index: The index correpsonding to the requested observation.
        selavy: `True` to overlay selavy components, `False` otherwise.
        percentile: The value passed to the percentile
            normalization function, defaults to 99.9.
        zscale: Uses ZScale normalization instead of
            PercentileInterval, defaults to `False`.
        contrast: Contrast value passed to the ZScaleInterval
            function when zscale is selected, defaults to 0.2.
        outfile: Name to give the file, if None then the name is
            automatically generated, defaults to None.
        no_islands: Disable island lables on the png, defaults to
            `False`.
        label: Figure title (usually the name of the source of
            interest), defaults to "Source".
        no_colorbar: If `True`, do not show the colorbar on the png,
            defaults to `False`.
        title: String to set as title,
            defaults to None where a default title will be used.
        crossmatch_overlay: If 'True' then a circle is added to the png
            plot representing the crossmatch radius, defaults to `False`.
        hide_beam: If 'True' then the beam is not plotted onto the png
            plot, defaults to `False`.
        save: If `True` the plot is saved rather than the figure being
            returned, defaults to `False`.
        size: Size of the cutout, defaults to None.
        force: Whether to force the re-fetching of the cutout data,
            defaults to `False`.
        disable_autoscaling: Turn off the consistent normalization and
            calculate the normalizations separately for each observation,
            defaults to `False`.
        cutout_data: Pass external cutout_data to be used
            instead of fetching the data, defaults to None.
        norms: Pass external normalization to be used
            instead of internal calculations.
        plot_dpi: Specify the DPI of saved figures, defaults to 150.
        offset_axes: Use offset, rather than absolute, axis labels.

    Returns:
        None if save is `True` or the figure object if `False`

    Raises:
        ValueError: If the index is out of range.
    """

    if (self._cutouts_got is False) or (force):
        if cutout_data is None:
            self.get_cutout_data(size)

    if index > len(self.measurements):
        raise ValueError(f"Cannot access {index}th measurement.")

    if outfile is None:
        outfile = self._get_save_name(index, ".png")

    if self.outdir != ".":
        outfile = os.path.join(
            self.outdir,
            outfile
        )

    if cutout_data is None:
        cutout_row = self.cutout_df.iloc[index]
    else:
        cutout_row = cutout_data.iloc[index]

    fig = plt.figure(figsize=(8, 8))
    ax = fig.add_subplot(111, projection=cutout_row.wcs)
    # Get the Image Normalisation from zscale, user contrast.
    if not disable_autoscaling:
        if norms is not None:
            img_norms = norms
        else:
            if not self._checked_norms:
                self._analyse_norm_level(
                    percentile=percentile,
                    zscale=zscale,
                    z_contrast=contrast
                )
            img_norms = self.norms
    else:
        if zscale:
            img_norms = ImageNormalize(
                cutout_row.data * 1.e3,
                interval=ZScaleInterval(
                    contrast=contrast
                ))
        else:
            img_norms = ImageNormalize(
                cutout_row.data * 1.e3,
                interval=PercentileInterval(percentile),
                stretch=LinearStretch())

    im = ax.imshow(
        cutout_row.data * 1.e3,
        norm=img_norms,
        cmap="gray_r"
    )

    # insert crosshair of target
    target_coords = np.array(
        ([[
            self.measurements.iloc[index].ra,
            self.measurements.iloc[index].dec
        ]])
    )

    target_coords = cutout_row.wcs.wcs_world2pix(
        target_coords, 0
    )

    crosshair_lines = self._create_crosshair_lines(
        target_coords,
        0.03,
        0.03,
        cutout_row.data.shape
    )

    [ax.plot(
        l[0], l[1], color="C3", zorder=10, lw=1.5, alpha=0.6
    ) for l in crosshair_lines]
    # the commented lines below are to use the crosshair
    # marker directly.
    # ax.scatter(
    #     [self.src_coord.ra.deg], [self.src_coord.dec.deg],
    #     transform=ax.get_transform('world'), marker="c",
    #     color="C3", zorder=10, label=label, s=1000, lw=1.5,
    #     alpha=0.5
    # )
    if crossmatch_overlay:
        try:
            crossmatch_patch = SphericalCircle(
                (
                    self.measurements.iloc[index].skycoord.ra,
                    self.measurements.iloc[index].skycoord.dec
                ),
                self.crossmatch_radius,
                transform=ax.get_transform('world'),
                label="Crossmatch radius ({:.1f} arcsec)".format(
                    self.crossmatch_radius.arcsec
                ), edgecolor='C4', facecolor='none', alpha=0.8)
            ax.add_patch(crossmatch_patch)
        except Exception as e:
            self.logger.warning(
                "Crossmatch circle png overlay failed!"
                " Has the source been crossmatched?")
            crossmatch_overlay = False

    if (not cutout_row['selavy_overlay'].empty) and selavy:
        ax.set_autoscale_on(False)
        collection, patches, island_names = self._gen_overlay_collection(
            cutout_row
        )
        ax.add_collection(collection, autolim=False)
        del collection

        # Add island labels, haven't found a better way other than looping
        # at the moment.
        if not no_islands and not self.islands:
            for i, val in enumerate(patches):
                ax.annotate(
                    island_names[i],
                    val.center,
                    annotation_clip=True,
                    color="C0",
                    weight="bold")
    else:
        self.logger.debug(
            "PNG: No selavy selected or selavy catalogue failed. (%s)",
            self.name
        )

    if self.forced_fits:
        collection, patches, island_names = self._gen_overlay_collection(
            cutout_row,
            f_source=self.measurements.iloc[index]
        )
        ax.add_collection(collection, autolim=False)
        del collection

    legend_elements = [
        Line2D(
            [0], [0], marker='c', color='C3', label=label,
            markerfacecolor='g', ls="none", markersize=8
        )
    ]

    if selavy:
        legend_elements.append(
            Line2D(
                [0], [0], marker='o', color='C1',
                label="Selavy {}".format(self.cat_type),
                markerfacecolor='none', ls="none", markersize=10
            )
        )

    if crossmatch_overlay:
        legend_elements.append(
            Line2D(
                [0], [0], marker='o', color='C4',
                label="Crossmatch radius ({:.1f} arcsec)".format(
                    self.crossmatch_radius.arcsec
                ),
                markerfacecolor='none', ls="none",
                markersize=10
            )
        )

    if self.forced_fits:
        legend_elements.append(
            Line2D(
                [0], [0], marker='o', color='C3',
                label="Forced Fit",
                markerfacecolor='none', ls="none",
                markersize=10
            )
        )

    ax.legend(handles=legend_elements)
    lon = ax.coords[0]
    lat = ax.coords[1]
    lon.set_axislabel("Right Ascension (J2000)")
    lat.set_axislabel("Declination (J2000)")

    if not no_colorbar:
        divider = make_axes_locatable(ax)
        cax = divider.append_axes(
            "right", size="3%", pad=0.1, axes_class=maxes.Axes)
        cb = fig.colorbar(im, cax=cax)
        cb.set_label("mJy/beam")

    if title is None:
        obs_time = self.measurements.iloc[index].dateobs
        title = "{} {}".format(
            self.name,
            obs_time.strftime(
                "%Y-%m-%d %H:%M:%S"
            )
        )

    ax.set_title(title)

    if cutout_row.beam is not None and hide_beam is False:
        img_beam = cutout_row.beam
        if cutout_row.wcs.is_celestial:
            major = img_beam.major.value
            minor = img_beam.minor.value
            pa = img_beam.pa.value
            pix_scale = proj_plane_pixel_scales(
                cutout_row.wcs
            )
            sx = pix_scale[0]
            sy = pix_scale[1]
            degrees_per_pixel = np.sqrt(sx * sy)
            minor /= degrees_per_pixel
            major /= degrees_per_pixel

            png_beam = AnchoredEllipse(
                ax.transData, width=minor,
                height=major, angle=pa, loc="lower right",
                pad=0.5, borderpad=0.4,
                frameon=False)
            png_beam.ellipse.set_edgecolor("k")
            png_beam.ellipse.set_facecolor("w")
            png_beam.ellipse.set_linewidth(1.5)

            ax.add_artist(png_beam)
    else:
        self.logger.debug("Hiding beam.")

    if offset_axes:
        axis_units = u.arcmin

        if size is None and cutout_row.wcs.is_celestial:
            pix_scale = proj_plane_pixel_scales(
                cutout_row.wcs
            )
            sx = pix_scale[0]
            sy = pix_scale[1]
            xlims = ax.get_xlim()
            ylims = ax.get_ylim()

            xsize = sx * (xlims[1] - xlims[0])
            ysize = sy * (ylims[1] - ylims[0])
            size = max([xsize, ysize]) * u.deg

        if size is not None:
            if size < 2 * u.arcmin:
                axis_units = u.arcsec
            elif size > 2 * u.deg:
                axis_units = u.deg

        offset_postagestamp_axes(ax,
                                 self.coord,
                                 ra_units=axis_units,
                                 dec_units=axis_units
                                 )

    if save:
        plt.savefig(outfile, bbox_inches="tight", dpi=plot_dpi)
        self.logger.debug("Saved {}".format(outfile))

        plt.close(fig)
        return

    else:
        return fig

Searches NED for object coordinates and returns matches.

Parameters:

Name Type Description Default
radius Angle

Radius to search, defaults to Angle(20. * u.arcsec).

<Angle 20. arcsec>

Returns:

Type Description
Optional[astropy.table.table.Table]

Table of matches or None if no matches

Exceptions:

Type Description
ValueError

Error in performing the NED region search.

Source code in vasttools/source.py
def ned_search(
    self, radius: Angle = Angle(20. * u.arcsec)
) -> Union[None, Table]:
    """
    Searches NED for object coordinates and returns matches.

    Args:
        radius: Radius to search, defaults to Angle(20. * u.arcsec).

    Returns:
        Table of matches or None if no matches

    Raises:
        ValueError: Error in performing the NED region search.
    """
    try:
        result_table = Ned.query_region(self.coord, radius=radius)

        return result_table

    except Exception as e:
        raise ValueError(
            "Error in performing the NED region search! Error: %s", e
        )
        return None

plot_lightcurve(self, sigma_thresh=5, figsize=(8, 4), min_points=2, min_detections=0, mjd=False, start_date=None, grid=False, yaxis_start='0', peak_flux=True, save=False, outfile=None, use_forced_for_limits=False, use_forced_for_all=False, hide_legend=False, plot_dpi=150)

Plot source lightcurves and save to file

Parameters:

Name Type Description Default
sigma_thresh int

Threshold to use for upper limits, defaults to 5.

5
figsize Tuple[int, int]

Figure size, defaults to (8, 4).

(8, 4)
min_points int

Minimum number of points for plotting, defaults to 2.

2
min_detections int

Minimum number of detections for plotting, defaults to 0.

0
mjd bool

Plot x-axis in MJD rather than datetime, defaults to False.

False
start_date Optional[pandas._libs.tslibs.timestamps.Timestamp]

Plot in days from start date, defaults to None.

None
grid bool

Turn on matplotlib grid, defaults to False.

False
yaxis_start str

Define where the y-axis begins from, either 'auto' or '0', defaults to "0".

'0'
peak_flux bool

Uses peak flux instead of integrated flux, defaults to True.

True
save bool

When True the plot is saved rather than displayed, defaults to False.

False
outfile Optional[str]

The filename to save when using, defaults to None which will use '_lc.png'.

None
use_forced_for_limits bool

Use the forced extractions instead of upper limits for non-detections., defaults to False.

False
use_forced_for_all bool

Use the forced fits for all the datapoints, defaults to False.

False
hide_legend bool

Hide the legend, defaults to False.

False
plot_dpi int

Specify the DPI of saved figures, defaults to 150.

150

Returns:

Type Description
Optional[matplotlib.figure.Figure]

None if save is True or the matplotlib figure if save is False.

Exceptions:

Type Description
SourcePlottingError

Source does not have any forced fits when the 'use_forced_for_all' or 'use_forced_for_limits' options have been selected.

SourcePlottingError

Number of detections lower than the minimum required.

SourcePlottingError

Number of datapoints lower than the minimum required.

SourcePlottingError

If measurements dataframe is empty.

Source code in vasttools/source.py
def plot_lightcurve(
    self,
    sigma_thresh: int = 5,
    figsize: Tuple[int, int] = (8, 4),
    min_points: int = 2,
    min_detections: int = 0,
    mjd: bool = False,
    # TODO: Is this a pd.Timestamp or a datetime?
    start_date: Optional[pd.Timestamp] = None,
    grid: bool = False,
    yaxis_start: str = "0",
    peak_flux: bool = True,
    save: bool = False,
    outfile: Optional[str] = None,
    use_forced_for_limits: bool = False,
    use_forced_for_all: bool = False,
    hide_legend: bool = False,
    plot_dpi: int = 150
) -> Union[None, plt.Figure]:
    """
    Plot source lightcurves and save to file

    Args:
        sigma_thresh: Threshold to use for upper limits, defaults to 5.
        figsize: Figure size, defaults to (8, 4).
        min_points: Minimum number of points for plotting, defaults
            to 2.
        min_detections:  Minimum number of detections for plotting,
            defaults to 0.
        mjd: Plot x-axis in MJD rather than datetime, defaults to False.
        start_date: Plot in days from start date, defaults to None.
        grid: Turn on matplotlib grid, defaults to False.
        yaxis_start: Define where the y-axis begins from, either 'auto'
            or '0', defaults to "0".
        peak_flux: Uses peak flux instead of integrated flux,
            defaults to `True`.
        save: When `True` the plot is saved rather than displayed,
            defaults to `False`.
        outfile: The filename to save when using, defaults to None which
            will use '<souce_name>_lc.png'.
        use_forced_for_limits: Use the forced extractions instead of
            upper limits for non-detections., defaults to `False`.
        use_forced_for_all: Use the forced fits for all the datapoints,
            defaults to `False`.
        hide_legend: Hide the legend, defaults to `False`.
        plot_dpi: Specify the DPI of saved figures, defaults to 150.

    Returns:
        None if save is `True` or the matplotlib figure if save is
        `False`.

    Raises:
        SourcePlottingError: Source does not have any forced fits when the
            'use_forced_for_all' or 'use_forced_for_limits' options have
            been selected.
        SourcePlottingError: Number of detections lower than the
            minimum required.
        SourcePlottingError: Number of datapoints lower than the
            minimum required.
        SourcePlottingError: If measurements dataframe is empty.
    """
    if use_forced_for_all or use_forced_for_limits:
        if not self.forced_fits:
            raise SourcePlottingError(
                "Source does not have any forced fits points to plot."
            )

    if self.detections < min_detections:
        msg = (
            f"Number of detections ({self.detections}) lower "
            f"than minimum required ({min_detections})"
        )
        self.logger.error(msg)
        raise SourcePlottingError(msg)

    if self.measurements.shape[0] < min_points:
        msg = (
            f"Number of datapoints ({self.measurements.shape[0]}) lower "
            f"than minimum required ({min_points})"
        )
        self.logger.error(msg)
        raise SourcePlottingError(msg)

    if mjd and start_date is not None:
        msg = (
            "The 'mjd' and 'start date' options "
            "cannot be used at the same time!"
        )
        self.logger.error(msg)
        raise SourcePlottingError(msg)

    # remove empty values
    measurements_df = self.measurements
    if not self.pipeline and not (
        use_forced_for_limits or use_forced_for_all
    ):
        measurements_df = self.measurements[
            self.measurements['rms_image'] != -99
        ]

    if measurements_df.empty:
        msg = f"{self.name} has no measurements!"
        self.logger.error(msg)
        raise SourcePlottingError(msg)

    # Build figure and labels
    fig = plt.figure(figsize=figsize)
    ax = fig.add_subplot(111)
    plot_title = self.name
    if self.islands:
        plot_title += " (island)"
    ax.set_title(plot_title)

    if peak_flux:
        label = 'Peak Flux Density (mJy/beam)'
        flux_col = "flux_peak"
    else:
        label = 'Integrated Flux Density (mJy)'
        flux_col = "flux_int"

    if use_forced_for_all:
        label = "Forced " + label
        flux_col = "f_" + flux_col

    if self.stokes != "I":
        label = "Absolute " + label
        measurements_df[flux_col] = measurements_df[flux_col].abs()

    ax.set_ylabel(label)

    freq_col = 'frequency'

    grouped_df = measurements_df.groupby(freq_col)
    freqs = list(grouped_df.groups.keys())

    # Colours for each frequency
    freq_cmap = plt.cm.get_cmap('viridis')
    cNorm = matplotlib.colors.Normalize(
        vmin=min(freqs), vmax=max(freqs) * 1.1)
    scalarMap = matplotlib.cm.ScalarMappable(norm=cNorm, cmap=freq_cmap)
    sm = scalarMap
    sm._A = []

    # Markers for each frequency
    markers = ['o', 'D', '*', 'X', 's', 'd', 'p']

    self.logger.debug("Frequencies: {}".format(freqs))
    for i, (freq, measurements) in enumerate(grouped_df):
        self.logger.debug("Plotting {} MHz data".format(freq))
        marker = markers[i % len(markers)]
        marker_colour = sm.to_rgba(freq)
        plot_dates = measurements['dateobs']
        self.logger.debug(plot_dates)
        if mjd:
            plot_dates = Time(plot_dates.to_numpy()).mjd
        elif start_date:
            plot_dates -= start_date
            plot_dates /= pd.Timedelta(1, unit='d')

        self.logger.debug("Plotting upper limit")
        if self.pipeline:
            upper_lim_mask = measurements.forced
        else:
            upper_lim_mask = measurements.detection == False
            if use_forced_for_all:
                upper_lim_mask = np.array([False for i in upper_lim_mask])
        upper_lims = measurements[
            upper_lim_mask
        ]
        if self.pipeline:
            if peak_flux:
                value_col = 'flux_peak'
                err_value_col = 'flux_peak_err'
            else:
                value_col = 'flux_int'
                err_value_col = 'flux_int_err'
            uplims = False
            sigma_thresh = 1.0
            label = 'Forced'
            markerfacecolor = 'w'
        else:
            if use_forced_for_limits:
                value_col = 'f_flux_peak'
                err_value_col = 'f_flux_peak_err'
                uplims = False
                sigma_thresh = 1.0
                markerfacecolor = 'w'
                label = "Forced"
            else:
                value_col = err_value_col = 'rms_image'
                uplims = True
                markerfacecolor = marker_colour
                label = 'Upper limit'
        if upper_lim_mask.any():
            upperlim_points = ax.errorbar(
                plot_dates[upper_lim_mask],
                sigma_thresh *
                upper_lims[value_col],
                yerr=upper_lims[err_value_col],
                uplims=uplims,
                lolims=False,
                marker=marker,
                c=marker_colour,
                linestyle="none",
                markerfacecolor=markerfacecolor
            )

        self.logger.debug("Plotting detection")

        if use_forced_for_all:
            detections = measurements
        else:
            detections = measurements[
                ~upper_lim_mask
            ]

        if self.pipeline:
            if peak_flux:
                err_value_col = 'flux_peak_err'
            else:
                err_value_col = 'flux_int_err'
        else:
            if use_forced_for_all:
                err_value_col = flux_col + '_err'
            else:
                err_value_col = 'rms_image'

        if use_forced_for_all:
            markerfacecolor = 'w'
            label = 'Forced'
        else:
            markerfacecolor = marker_colour
            label = 'Selavy'
        if (~upper_lim_mask).any():
            detection_points = ax.errorbar(
                plot_dates[~upper_lim_mask],
                detections[flux_col],
                yerr=detections[err_value_col],
                marker=marker,
                c=marker_colour,
                linestyle="none",
                markerfacecolor=markerfacecolor
            )

    self.logger.debug("Plotting finished.")
    if self.pipeline:
        upper_lim_mask = measurements_df.forced
    else:
        upper_lim_mask = measurements_df.detection == False

    if use_forced_for_all:
        detections = measurements_df
    else:
        detections = measurements_df[
            ~upper_lim_mask
        ]
    if yaxis_start == "0":
        max_det = detections.loc[:, [flux_col, err_value_col]].sum(axis=1)
        if use_forced_for_limits or self.pipeline:
            max_y = np.nanmax(
                max_det.tolist() +
                upper_lims[value_col].tolist()
            )
        elif use_forced_for_all:
            max_y = np.nanmax(max_det.tolist())
        else:
            max_y = np.nanmax(
                max_det.tolist() +
                (sigma_thresh * upper_lims[err_value_col]).tolist()
            )
        ax.set_ylim(
            bottom=0,
            top=max_y * 1.1
        )

    if mjd:
        ax.set_xlabel('Date (MJD)')
    elif start_date:
        ax.set_xlabel('Days since {}'.format(start_date))
    else:
        fig.autofmt_xdate()
        ax.set_xlabel('Date')

        date_form = mdates.DateFormatter("%Y-%m-%d")
        ax.xaxis.set_major_formatter(date_form)
        ax.xaxis.set_major_locator(mdates.AutoDateLocator(maxticks=15))

    ax.grid(grid)

    if not hide_legend:
        # Manually create legend artists for consistency.
        # Using dummy points throws a matplotlib warning.
        handles = []
        labels = []
        for i, freq in enumerate(freqs):
            line = Line2D([],
                          [],
                          ls="",
                          marker=markers[i],
                          color=sm.to_rgba(freq)
                          )
            barline = LineCollection(np.empty((2, 2, 2)))

            err = ErrorbarContainer((line, None, [barline]),
                                    has_yerr=True
                                    )
            handles.append(err)
            labels.append('{} MHz'.format(freq))

        ax.legend(handles=handles, labels=labels)

    if save:
        if outfile is None:
            outfile = "{}_lc.png".format(self.name.replace(
                " ", "_"
            ).replace(
                "/", "_"
            ))

        elif not outfile.endswith(".png"):
            outfile += ".png"

        if self.outdir != ".":
            outfile = os.path.join(
                self.outdir,
                outfile
            )

        plt.savefig(outfile, bbox_inches='tight', dpi=plot_dpi)
        plt.close()

        return

    else:

        return fig

save_all_ann(self, crossmatch_overlay=False, cutout_data=None)

Save kvis annotation file corresponding to the source

Parameters:

Name Type Description Default
crossmatch_overlay bool

Include the crossmatch radius, defaults to False

False
cutout_data Optional[pandas.core.frame.DataFrame]

Pass external cutout_data to be used instead of fetching the data, defaults to None.

None

Returns:

Type Description
None

None

Source code in vasttools/source.py
def save_all_ann(
    self,
    crossmatch_overlay: bool = False,
    cutout_data: Optional[pd.DataFrame] = None
) -> None:
    """
    Save kvis annotation file corresponding to the source

    Args:
        crossmatch_overlay: Include the crossmatch radius,
            defaults to `False`
        cutout_data: Pass external cutout_data to be used
            instead of fetching the data, defaults to None.

    Returns:
        None
    """
    indices = self.measurements.index.to_series()
    indices.apply(
        self.write_ann,
        args=(
            None,
            crossmatch_overlay,
            None,
            False,
            cutout_data
        )
    )

save_all_fits_cutouts(self, size=None, force=False, cutout_data=None)

Save all cutouts of the source to fits file

Parameters:

Name Type Description Default
size Optional[astropy.coordinates.angles.Angle]

Size of the cutouts, defaults to None.

None
force bool

Whether to force the re-fetching of the cutout data, defaults to False.

False
cutout_data Optional[pandas.core.frame.DataFrame]

Pass external cutout_data to be used instead of fetching the data, defaults to None.

None

Returns:

Type Description
None

None

Source code in vasttools/source.py
def save_all_fits_cutouts(
    self,
    size: Optional[Angle] = None,
    force: bool = False,
    cutout_data: Optional[pd.DataFrame] = None
) -> None:
    """
    Save all cutouts of the source to fits file

    Args:
        size: Size of the cutouts, defaults to None.
        force: Whether to force the re-fetching
            of the cutout data, defaults to `False`.
        cutout_data: Pass external cutout_data to be used
            instead of fetching the data, defaults to None.

    Returns:
        None
    """
    if (self._cutouts_got is False) or (force):
        if cutout_data is None:
            self.get_cutout_data(size)

    self.logger.debug("Saving fits cutouts...")

    if cutout_data is None:
        indices = self.measurements.index
    else:
        indices = cutout_data.index

    for i in indices:
        self.save_fits_cutout(i, cutout_data=cutout_data)

save_all_png_cutouts(self, selavy=True, percentile=99.9, zscale=False, contrast=0.2, no_islands=True, no_colorbar=False, crossmatch_overlay=False, hide_beam=False, size=None, disable_autoscaling=False, cutout_data=None, calc_script_norms=False, plot_dpi=150, offset_axes=True)

Wrapper function to save all the png cutouts for all epochs.

Parameters:

Name Type Description Default
selavy bool

If True then selavy overlay are shown, defaults to True.

True
percentile float

The value passed to the percentile normalization function, defaults to 99.9.

99.9
zscale bool

Uses ZScale normalization instead of PercentileInterval, defaults to False.

False
contrast float

Contrast value passed to the ZScaleInterval function when zscale is selected, defaults to 0.2.

0.2
no_islands bool

Hide island name labels, defaults to True.

True
no_colorbar bool

Hides the colorbar, defaults to False.

False
crossmatch_overlay bool

Plots a circle that represents the crossmatch radius, defaults to False.

False
hide_beam bool

Hide the beam on the plot, defaults to False.

False
size Optional[astropy.coordinates.angles.Angle]

Size of the cutout, defaults to None.

None
disable_autoscaling bool

Do not use the consistent normalization values but calculate norms separately for each epoch, defaults to False.

False
cutout_data Optional[pandas.core.frame.DataFrame]

Pass external cutout_data to be used instead of fetching the data, defaults to None.

None
calc_script_norms bool

When passing cutout data this parameter can be set to True to pass this cutout data to the analyse norms function, defaults to False.

False
plot_dpi int

Specify the DPI of saved figures, defaults to 150.

150
offset_axes bool

Use offset, rather than absolute, axis labels.

True

Returns:

Type Description
None

None

Source code in vasttools/source.py
def save_all_png_cutouts(
    self,
    selavy: bool = True,
    percentile: float = 99.9,
    zscale: bool = False,
    contrast: float = 0.2,
    no_islands: bool = True,
    no_colorbar: bool = False,
    crossmatch_overlay: bool = False,
    hide_beam: bool = False,
    size: Optional[Angle] = None,
    disable_autoscaling: bool = False,
    cutout_data: Optional[pd.DataFrame] = None,
    calc_script_norms: bool = False,
    plot_dpi: int = 150,
    offset_axes: bool = True
) -> None:
    """
    Wrapper function to save all the png cutouts
    for all epochs.

    Args:
        selavy: If `True` then selavy overlay are shown,
             defaults to `True`.
        percentile: The value passed to the percentile
            normalization function, defaults to 99.9.
        zscale: Uses ZScale normalization instead of
            PercentileInterval, defaults to `False`.
        contrast: Contrast value passed to the ZScaleInterval
            function when zscale is selected, defaults to 0.2.
        no_islands: Hide island name labels, defaults to `True`.
        no_colorbar: Hides the colorbar, defaults to `False`.
        crossmatch_overlay: Plots a circle that represents the
            crossmatch radius, defaults to `False`.
        hide_beam: Hide the beam on the plot, defaults to `False`.
        size: Size of the cutout, defaults to None.
        disable_autoscaling: Do not use the consistent normalization
            values but calculate norms separately for each epoch,
            defaults to `False`.
        cutout_data: Pass external cutout_data to be used
            instead of fetching the data, defaults to None.
        calc_script_norms: When passing cutout data this parameter
            can be set to True to pass this cutout data to the analyse
            norms function, defaults to False.
        plot_dpi: Specify the DPI of saved figures, defaults to 150.
        offset_axes: Use offset, rather than absolute, axis labels.

    Returns:
        None
    """
    if self._cutouts_got is False:
        if cutout_data is None:
            self.get_cutout_data(size)

    if not calc_script_norms:
        if not self._checked_norms:
            self._analyse_norm_level(
                percentile=percentile,
                zscale=zscale,
                z_contrast=contrast
            )
        norms = None
    else:
        norms = self._analyse_norm_level(
            percentile=percentile,
            zscale=zscale,
            z_contrast=contrast,
            cutout_data=cutout_data
        )

    indices = self.measurements.index.to_series()
    indices.apply(
        self.make_png,
        args=(
            selavy,
            percentile,
            zscale,
            contrast,
            None,
            no_islands,
            "Source",
            no_colorbar,
            None,
            crossmatch_overlay,
            hide_beam,
            True,
            None,
            False,
            disable_autoscaling,
            cutout_data,
            norms,
            plot_dpi,
            offset_axes
        )
    )

save_all_reg(self, crossmatch_overlay=False, cutout_data=None)

Save DS9 region file corresponding to the source

Parameters:

Name Type Description Default
crossmatch_overlay bool

Include the crossmatch radius, defaults to False

False
cutout_data Optional[pandas.core.frame.DataFrame]

Pass external cutout_data to be used instead of fetching the data, defaults to None.

None

Returns:

Type Description
None

None

Source code in vasttools/source.py
def save_all_reg(
    self,
    crossmatch_overlay: bool = False,
    cutout_data: Optional[pd.DataFrame] = None
) -> None:
    """
    Save DS9 region file corresponding to the source

    Args:
        crossmatch_overlay: Include the crossmatch radius,
            defaults to `False`
        cutout_data: Pass external cutout_data to be used
            instead of fetching the data, defaults to None.

    Returns:
        None
    """

    indices = self.measurements.index.to_series()
    indices.apply(
        self.write_reg,
        args=(
            None,
            crossmatch_overlay,
            None,
            False,
            cutout_data
        )
    )

save_fits_cutout(self, index, outfile=None, size=None, force=False, cutout_data=None)

Saves the FITS file cutout of the requested observation.

Parameters:

Name Type Description Default
index int

The index of the requested observation.

required
outfile Optional[str]

File to save to, defaults to None.

None
size Optional[astropy.coordinates.angles.Angle]

Size of the cutout, defaults to None.

None
force bool

Whether to force the re-fetching of the cutout data, defaults to False.

False
cutout_data Optional[pandas.core.frame.DataFrame]

Pass external cutout_data to be used instead of fetching the data, defaults to None.

None

Returns:

Type Description
None

None

Exceptions:

Type Description
ValueError

If the source does not contain the requested index.

Source code in vasttools/source.py
def save_fits_cutout(
    self,
    index: int,
    outfile: Optional[str] = None,
    size: Optional[Angle] = None,
    force: bool = False,
    cutout_data: Optional[pd.DataFrame] = None
) -> None:
    """
    Saves the FITS file cutout of the requested observation.

    Args:
        index: The index of the requested observation.
        outfile: File to save to, defaults to None.
        size: Size of the cutout, defaults to None.
        force: Whether to force the re-fetching
            of the cutout data, defaults to `False`.
        cutout_data: Pass external cutout_data to be used
            instead of fetching the data, defaults to None.

    Returns:
        None

    Raises:
        ValueError: If the source does not contain the requested index.
    """

    if (self._cutouts_got is False) or (force):
        if cutout_data is None:
            self.get_cutout_data(size)

    if outfile is None:
        outfile = self._get_save_name(index, ".fits")

    if self.outdir != ".":
        outfile = os.path.join(
            self.outdir,
            outfile
        )
    if cutout_data is None:
        cutout_row = self.cutout_df.iloc[index]
    else:
        cutout_row = cutout_data.iloc[index]

    hdu_stamp = fits.PrimaryHDU(
        data=cutout_row.data,
        header=cutout_row.header
    )

    # Write the cutout to a new FITS file
    hdu_stamp.writeto(outfile, overwrite=True)
    self.logger.debug(f"Wrote to {outfile}")

    del hdu_stamp

save_png_cutout(self, index, selavy=True, percentile=99.9, zscale=False, contrast=0.2, no_islands=True, label='Source', no_colorbar=False, title=None, crossmatch_overlay=False, hide_beam=False, size=None, force=False, outfile=None, plot_dpi=150, offset_axes=True)

Wrapper for make_png to make nicer interactive function. Always save.

Parameters:

Name Type Description Default
index int

Index of the observation to show.

required
selavy bool

If True then selavy overlay are shown, defaults to True.

True
percentile float

The value passed to the percentile normalization function, defaults to 99.9.

99.9
zscale bool

Uses ZScale normalization instead of PercentileInterval, defaults to False.

False
contrast float

Contrast value passed to the ZScaleInterval function when zscale is selected, defaults to 0.2.

0.2
no_islands bool

Hide island name labels, defaults to True.

True
label str

legend label for source, defaults to "Source".

'Source'
no_colorbar bool

Hides the colorbar, defaults to False.

False
title Optional[str]

Sets the plot title, defaults to None.

None
crossmatch_overlay bool

Plots a circle that represents the crossmatch radius, defaults to False.

False
hide_beam bool

Hide the beam on the plot, defaults to False.

False
size Optional[astropy.coordinates.angles.Angle]

Size of the cutout, defaults to None.

None
force bool

Whether to force the re-fetching of the cutout data, defaults to False.

False
outfile Optional[str]

Name to give the file, if None then the name is automatically generated, defaults to None.

None
plot_dpi int

Specify the DPI of saved figures, defaults to 150.

150
offset_axes bool

Use offset, rather than absolute, axis labels.

True

Returns:

Type Description
None

None

Source code in vasttools/source.py
def save_png_cutout(
    self,
    index: int,
    selavy: bool = True,
    percentile: float = 99.9,
    zscale: bool = False,
    contrast: float = 0.2,
    no_islands: bool = True,
    label: str = "Source",
    no_colorbar: bool = False,
    title: Optional[str] = None,
    crossmatch_overlay: bool = False,
    hide_beam: bool = False,
    size: Optional[Angle] = None,
    force: bool = False,
    outfile: Optional[str] = None,
    plot_dpi: int = 150,
    offset_axes: bool = True
) -> None:
    """
    Wrapper for make_png to make nicer interactive function.
    Always save.

    Args:
        index: Index of the observation to show.
        selavy: If `True` then selavy overlay are shown,
             defaults to `True`.
        percentile: The value passed to the percentile
            normalization function, defaults to 99.9.
        zscale: Uses ZScale normalization instead of
            PercentileInterval, defaults to `False`.
        contrast: Contrast value passed to the ZScaleInterval
            function when zscale is selected, defaults to 0.2.
        no_islands: Hide island name labels, defaults to `True`.
        label: legend label for source, defaults to "Source".
        no_colorbar: Hides the colorbar, defaults to `False`.
        title: Sets the plot title, defaults to None.
        crossmatch_overlay: Plots a circle that represents the
            crossmatch radius, defaults to `False`.
        hide_beam: Hide the beam on the plot, defaults to `False`.
        size: Size of the cutout, defaults to None.
        force: Whether to force the re-fetching
            of the cutout data, defaults to `False`.
        outfile: Name to give the file, if None then the name is
            automatically generated, defaults to None.
        plot_dpi: Specify the DPI of saved figures, defaults to 150.
        offset_axes: Use offset, rather than absolute, axis labels.

    Returns:
        None
    """
    fig = self.make_png(
        index,
        selavy=selavy,
        percentile=percentile,
        zscale=zscale,
        contrast=contrast,
        no_islands=no_islands,
        label=label,
        no_colorbar=no_colorbar,
        title=title,
        crossmatch_overlay=crossmatch_overlay,
        hide_beam=hide_beam,
        size=size,
        force=force,
        outfile=outfile,
        save=True,
        plot_dpi=plot_dpi,
        offset_axes=offset_axes
    )

    return

show_all_png_cutouts(self, columns=4, percentile=99.9, zscale=False, contrast=0.1, outfile=None, save=False, size=None, figsize=(10, 5), force=False, no_selavy=False, disable_autoscaling=False, hide_epoch_labels=False, plot_dpi=150, offset_axes=True)

Creates a grid plot showing the source in each epoch.

Parameters:

Name Type Description Default
columns int

Number of columns to use for the grid plot, defaults to 4.

4
percentile float

The value passed to the percentile normalization function, defaults to 99.9.

99.9
zscale bool

Uses ZScale normalization instead of PercentileInterval, defaults to False.

False
contrast float

Contast value passed to the ZScaleInterval function when zscale is selected, defaults to 0.2.

0.1
outfile Optional[str]

Name of the output file, if None then the name is automatically generated, defaults to None.

None
save bool

Save the plot instead of displaying, defaults to False.

False
size Optional[astropy.coordinates.angles.Angle]

Size of the cutout, defaults to None.

None
figsize Tuple[int, int]

Size of the matplotlib.pyplot figure, defaults to (10, 5).

(10, 5)
force bool

Whether to force the re-fetching of the cutout data, defaults to False.

False
no_selavy bool

When True the selavy overlay is hidden, defaults to False.

False
disable_autoscaling bool

Turn off the consistent normalization and calculate the normalizations separately for each epoch, defaults to False.

False
hide_epoch_labels bool

Turn off the epoch number label (found in top left corner of image).

False
plot_dpi int

Specify the DPI of saved figures, defaults to 150.

150
offset_axes bool

Use offset, rather than absolute, axis labels.

True

Returns:

Type Description
Optional[figure]

None is save is True or the Figure if False.

Source code in vasttools/source.py
def show_all_png_cutouts(
    self,
    columns: int = 4,
    percentile: float = 99.9,
    zscale: bool = False,
    contrast: float = 0.1,
    outfile: Optional[str] = None,
    save: bool = False,
    size: Optional[Angle] = None,
    figsize: Tuple[int, int] = (10, 5),
    force: bool = False,
    no_selavy: bool = False,
    disable_autoscaling: bool = False,
    hide_epoch_labels: bool = False,
    plot_dpi: int = 150,
    offset_axes: bool = True
) -> Union[None, plt.figure]:
    """
    Creates a grid plot showing the source in each epoch.

    Args:
        columns: Number of columns to use for the grid plot,
            defaults to 4.
        percentile: The value passed to the percentile
            normalization function, defaults to 99.9.
        zscale: Uses ZScale normalization instead of
            PercentileInterval, defaults to `False`.
        contrast: Contast value passed to the ZScaleInterval
            function when zscale is selected, defaults to 0.2.
        outfile: Name of the output file, if None then the name
             is automatically generated, defaults to None.
        save: Save the plot instead of displaying,
            defaults to `False`.
        size: Size of the cutout, defaults to None.
        figsize: Size of the matplotlib.pyplot figure,
            defaults to (10, 5).
        force: Whether to force the re-fetching
            of the cutout data, defaults to `False`.
        no_selavy: When `True` the selavy overlay
            is hidden, defaults to `False`.
        disable_autoscaling: Turn off the consistent normalization and
             calculate the normalizations separately for each epoch,
            defaults to `False`.
        hide_epoch_labels: Turn off the epoch number label (found in
            top left corner of image).
        plot_dpi: Specify the DPI of saved figures, defaults to 150.
        offset_axes: Use offset, rather than absolute, axis labels.

    Returns:
        None is save is `True` or the Figure if `False`.
    """

    if (self._cutouts_got is False) or (force):
        self.get_cutout_data(size)

    num_plots = self.measurements.shape[0]
    nrows = int(np.ceil(num_plots / columns))

    fig = plt.figure(figsize=figsize)

    fig.tight_layout()

    plots = {}

    if not self._checked_norms or force:
        self._analyse_norm_level(
            percentile=percentile,
            zscale=zscale,
            z_contrast=contrast
        )
    img_norms = self.norms

    for i in range(num_plots):
        cutout_row = self.cutout_df.iloc[i]
        measurement_row = self.measurements.iloc[i]
        target_coords = np.array(
            ([[
                measurement_row.ra,
                measurement_row.dec
            ]])
        )
        i += 1
        plots[i] = fig.add_subplot(
            nrows,
            columns,
            i,
            projection=cutout_row.wcs
        )

        if disable_autoscaling:
            if zscale:
                img_norms = ImageNormalize(
                    cutout_row.data * 1.e3,
                    interval=ZScaleInterval(
                        contrast=contrast
                    )
                )
            else:
                img_norms = ImageNormalize(
                    cutout_row.data * 1.e3,
                    interval=PercentileInterval(percentile),
                    stretch=LinearStretch())

        im = plots[i].imshow(
            cutout_row.data * 1.e3, norm=img_norms, cmap="gray_r"
        )

        epoch_time = measurement_row.dateobs
        epoch = measurement_row.epoch

        plots[i].set_title('{}'.format(
            epoch_time.strftime("%Y-%m-%d %H:%M:%S")
        ))

        if not hide_epoch_labels:
            plots[i].text(
                0.05, 0.9, f"{epoch}", transform=plots[i].transAxes
            )

        cross_target_coords = cutout_row.wcs.wcs_world2pix(
            target_coords, 0
        )
        crosshair_lines = self._create_crosshair_lines(
            cross_target_coords,
            0.15,
            0.15,
            cutout_row.data.shape
        )

        if (not cutout_row['selavy_overlay'].empty) and (not no_selavy):
            plots[i].set_autoscale_on(False)
            (
                collection,
                patches,
                island_names
            ) = self._gen_overlay_collection(
                cutout_row
            )
            plots[i].add_collection(collection, autolim=False)

        if self.forced_fits:
            (
                collection,
                patches,
                island_names
            ) = self._gen_overlay_collection(
                cutout_row, f_source=measurement_row
            )
            plots[i].add_collection(collection, autolim=False)
            del collection

        [plots[i].plot(
            l[0], l[1], color="C3", zorder=10, lw=1.5, alpha=0.6
        ) for l in crosshair_lines]

        lon = plots[i].coords[0]
        lat = plots[i].coords[1]

        lon.set_ticks_visible(False)
        lon.set_ticklabel_visible(False)
        lat.set_ticks_visible(False)
        lat.set_ticklabel_visible(False)

    if save:
        if outfile is None:
            outfile = "{}_cutouts.png".format(self.name.replace(
                " ", "_"
            ).replace(
                "/", "_"
            ))

        elif not outfile.endswith(".png"):
            outfile += ".png"

        if self.outdir != ".":
            outfile = os.path.join(
                self.outdir,
                outfile
            )

        plt.savefig(outfile, bbox_inches='tight', dpi=plot_dpi)

        plt.close()

        return

    else:

        return fig

show_png_cutout(self, index, selavy=True, percentile=99.9, zscale=False, contrast=0.2, no_islands=True, label='Source', no_colorbar=False, title=None, crossmatch_overlay=False, hide_beam=False, size=None, force=False, offset_axes=True)

Wrapper for make_png to make nicer interactive function. No access to save.

Parameters:

Name Type Description Default
index int

Index of the observation to show.

required
selavy bool

If True then selavy overlay are shown, defaults to True.

True
percentile float

The value passed to the percentile normalization function, defaults to 99.9.

99.9
zscale bool

Uses ZScale normalization instead of PercentileInterval, defaults to False.

False
contrast float

Contrast value passed to the ZScaleInterval function when zscale is selected, defaults to 0.2.

0.2
no_islands bool

Hide island name labels, defaults to True.

True
label str

legend label for source, defaults to "Source".

'Source'
no_colorbar bool

Hides the colorbar, defaults to False.

False
title Optional[str]

Sets the plot title, defaults to None.

None
crossmatch_overlay bool

Plots a circle that represents the crossmatch radius, defaults to False.

False
hide_beam bool

Hide the beam on the plot, defaults to False.

False
size Optional[astropy.coordinates.angles.Angle]

Size of the cutout, defaults to None.

None
force bool

Whether to force the re-fetching of the cutout data, defaults to False.

False
offset_axes bool

Use offset, rather than absolute, axis labels.

True

Returns:

Type Description
Figure

Figure object.

Source code in vasttools/source.py
def show_png_cutout(
    self,
    index: int,
    selavy: bool = True,
    percentile: float = 99.9,
    zscale: bool = False,
    contrast: float = 0.2,
    no_islands: bool = True,
    label: str = "Source",
    no_colorbar: bool = False,
    title: Optional[str] = None,
    crossmatch_overlay: bool = False,
    hide_beam: bool = False,
    size: Optional[Angle] = None,
    force: bool = False,
    offset_axes: bool = True,
) -> plt.Figure:
    """
    Wrapper for make_png to make nicer interactive function.
    No access to save.

    Args:
        index: Index of the observation to show.
        selavy: If `True` then selavy overlay are shown,
             defaults to `True`.
        percentile: The value passed to the percentile
            normalization function, defaults to 99.9.
        zscale: Uses ZScale normalization instead of
            PercentileInterval, defaults to `False`.
        contrast: Contrast value passed to the ZScaleInterval
            function when zscale is selected, defaults to 0.2.
        no_islands: Hide island name labels, defaults to `True`.
        label: legend label for source, defaults to "Source".
        no_colorbar: Hides the colorbar, defaults to `False`.
        title: Sets the plot title, defaults to None.
        crossmatch_overlay: Plots a circle that represents the
            crossmatch radius, defaults to `False`.
        hide_beam: Hide the beam on the plot, defaults to `False`.
        size: Size of the cutout, defaults to None.
        force: Whether to force the re-fetching of the cutout data,
            defaults to `False`.
        offset_axes: Use offset, rather than absolute, axis labels.

    Returns:
        Figure object.
    """

    fig = self.make_png(
        index,
        selavy=selavy,
        percentile=percentile,
        zscale=zscale,
        contrast=contrast,
        no_islands=no_islands,
        label=label,
        no_colorbar=no_colorbar,
        title=title,
        crossmatch_overlay=crossmatch_overlay,
        hide_beam=hide_beam,
        size=size,
        force=force,
        offset_axes=offset_axes
    )

    return fig

Searches SIMBAD for object coordinates and returns matches.

Parameters:

Name Type Description Default
radius Angle

Radius to search, defaults to Angle(20. * u.arcsec)

<Angle 20. arcsec>

Returns:

Type Description
Optional[astropy.table.table.Table]

Table of matches or None if no matches

Exceptions:

Type Description
ValueError

Error in performing the SIMBAD region search.

Source code in vasttools/source.py
def simbad_search(
    self, radius: Angle = Angle(20. * u.arcsec)
) -> Union[None, Table]:
    """
    Searches SIMBAD for object coordinates and returns matches.

    Args:
        radius: Radius to search, defaults to Angle(20. * u.arcsec)

    Returns:
        Table of matches or None if no matches

    Raises:
        ValueError: Error in performing the SIMBAD region search.
    """
    Simbad.add_votable_fields('ra(d)', 'dec(d)')

    try:
        result_table = Simbad.query_region(self.coord, radius=radius)
        if result_table is None:
            return None

        return result_table

    except Exception as e:
        raise ValueError(
            "Error in performing the SIMBAD region search! Error: %s", e
        )
        return None

skyview_contour_plot(self, index, survey, contour_levels=[3.0, 5.0, 10.0, 15.0], percentile=99.9, zscale=False, contrast=0.2, outfile=None, no_colorbar=False, title=None, save=False, size=None, force=False, plot_dpi=150)

Fetches a FITS file from SkyView of the requested survey at the source location and overlays ASKAP contours.

Parameters:

Name Type Description Default
index int

Index of the requested ASKAP observation.

required
survey str

Survey requested to be fetched using SkyView.

required
contour_levels List[float]

Contour levels to plot which are multiples of the local rms, defaults to [3., 5., 10., 15.].

[3.0, 5.0, 10.0, 15.0]
percentile float

The value passed to the percentile normalization function, defaults to 99.9.

99.9
zscale bool

Uses ZScale normalization instead of PercentileInterval, defaults to False.

False
contrast float

Contrast value passed to the ZScaleInterval function when zscale is selected, defaults to 0.2.

0.2
outfile Optional[str]

Name to give the file, if None then the name is automatically generated, defaults to None.

None
no_colorbar bool

Hides the colorbar, defaults to False.

False
title Optional[str]

Plot title, defaults to None.

None
save bool

Saves the file instead of returing the figure, defaults to False.

False
size Optional[astropy.coordinates.angles.Angle]

Size of the cutout, defaults to None.

None
force bool

Whether to force the re-fetching of the cutout data, defaults to False.

False
plot_dpi int

Specify the DPI of saved figures, defaults to 150.

150

Returns:

Type Description
Optional[figure]

None if save is True or the figure object if False

Exceptions:

Type Description
ValueError

If the index is out of range.

ValueError

If the requested survey is not valid.

Source code in vasttools/source.py
def skyview_contour_plot(
    self,
    index: int,
    survey: str,
    contour_levels: List[float] = [3., 5., 10., 15.],
    percentile: float = 99.9,
    zscale: bool = False,
    contrast: float = 0.2,
    outfile: Optional[str] = None,
    no_colorbar: bool = False,
    title: Optional[str] = None,
    save: bool = False,
    size: Optional[Angle] = None,
    force: bool = False,
    plot_dpi: int = 150,
) -> Union[None, plt.figure]:
    """
    Fetches a FITS file from SkyView of the requested survey at
    the source location and overlays ASKAP contours.

    Args:
        index: Index of the requested ASKAP observation.
        survey: Survey requested to be fetched using SkyView.
        contour_levels: Contour levels to plot which are multiples
             of the local rms, defaults to [3., 5., 10., 15.].
        percentile: The value passed to the percentile
            normalization function, defaults to 99.9.
        zscale: Uses ZScale normalization instead of
            PercentileInterval, defaults to `False`.
        contrast: Contrast value passed to the ZScaleInterval
            function when zscale is selected, defaults to 0.2.
        outfile: Name to give the file, if None then the name is
            automatically generated, defaults to None.
        no_colorbar: Hides the colorbar, defaults to `False`.
        title: Plot title, defaults to None.
        save: Saves the file instead of returing the figure,
            defaults to `False`.
        size: Size of the cutout, defaults to None.
        force: Whether to force the re-fetching
            of the cutout data, defaults to `False`.
        plot_dpi: Specify the DPI of saved figures, defaults to 150.

    Returns:
        None if save is `True` or the figure object if `False`

    Raises:
        ValueError: If the index is out of range.
        ValueError: If the requested survey is not valid.
    """

    if (self._cutouts_got is False) or (force):
        self.get_cutout_data(size)

    size = self._size

    surveys = list(SkyView.survey_dict.values())
    survey_list = [item for sublist in surveys for item in sublist]

    if survey not in survey_list:
        raise ValueError(f"{survey} is not a valid SkyView survey name")

    if index > len(self.measurements):
        raise ValueError(f"Cannot access {index}th measurement.")
        return

    if outfile is None:
        outfile = self._get_save_name(index, ".png")

    if self.outdir != ".":
        outfile = os.path.join(
            self.outdir,
            outfile
        )

    try:
        paths = SkyView.get_images(
            position=self.measurements.iloc[index]['skycoord'],
            survey=[survey], radius=size
        )
        path_fits = paths[0][0]

        path_wcs = WCS(path_fits.header)

    except Exception as e:
        warnings.warn("SkyView fetch failed!")
        warnings.warn(e)
        return

    fig = plt.figure(figsize=(8, 8))
    ax = fig.add_subplot(111, projection=path_wcs)

    mean_vast, median_vast, rms_vast = sigma_clipped_stats(
        self.cutout_df.iloc[index].data
    )

    levels = [
        i * rms_vast for i in contour_levels
    ]

    if zscale:
        norm = ImageNormalize(
            path_fits.data,
            interval=ZScaleInterval(
                contrast=contrast
            )
        )
    else:
        norm = ImageNormalize(
            path_fits.data,
            interval=PercentileInterval(percentile),
            stretch=LinearStretch()
        )

    im = ax.imshow(path_fits.data, norm=norm, cmap='gray_r')

    ax.contour(
        self.cutout_df.iloc[index].data,
        levels=levels,
        transform=ax.get_transform(self.cutout_df.iloc[index].wcs),
        colors='C0',
        zorder=10,
    )

    if title is None:
        obs_time = self.measurements.iloc[index].dateobs
        title = "{} {}".format(
            self.name,
            obs_time.strftime(
                "%Y-%m-%d %H:%M:%S"
            )
        )

    ax.set_title(title)

    lon = ax.coords[0]
    lat = ax.coords[1]
    lon.set_axislabel("Right Ascension (J2000)")
    lat.set_axislabel("Declination (J2000)")

    if not no_colorbar:
        divider = make_axes_locatable(ax)
        cax = divider.append_axes(
            "right", size="3%", pad=0.1, axes_class=maxes.Axes)
        cb = fig.colorbar(im, cax=cax)

    if save:
        plt.savefig(outfile, bbox_inches="tight", dpi=plot_dpi)
        self.logger.debug("Saved {}".format(outfile))

        plt.close(fig)

        return
    else:
        return fig

write_ann(self, index, outfile=None, crossmatch_overlay=False, size=None, force=False, cutout_data=None)

Write a kvis annotation file containing all selavy sources within the image.

Parameters:

Name Type Description Default
index int

The index correpsonding to the requested observation.

required
outfile str

Name of the file to write, defaults to None.

None
crossmatch_overlay bool

If True, a circle is added to the annotation file output denoting the crossmatch radius, defaults to False.

False
size Optional[astropy.coordinates.angles.Angle]

Size of the cutout, defaults to None.

None
force bool

Whether to force the re-fetching of the cutout data, defaults to False

False
cutout_data Optional[pandas.core.frame.DataFrame]

Pass external cutout_data to be used instead of fetching the data, defaults to None.

None

Returns:

Type Description
None

None

Source code in vasttools/source.py
def write_ann(
    self,
    index: int,
    outfile: str = None,
    crossmatch_overlay: bool = False,
    size: Optional[Angle] = None,
    force: bool = False,
    cutout_data: Optional[pd.DataFrame] = None
) -> None:
    """
    Write a kvis annotation file containing all selavy sources
    within the image.

    Args:
        index: The index correpsonding to the requested observation.
        outfile: Name of the file to write, defaults to None.
        crossmatch_overlay: If True, a circle is added to the
            annotation file output denoting the crossmatch radius,
            defaults to False.
        size: Size of the cutout, defaults to None.
        force: Whether to force the re-fetching
            of the cutout data, defaults to `False`
        cutout_data: Pass external cutout_data to be used
            instead of fetching the data, defaults to None.

    Returns:
        None
    """
    if (self._cutouts_got is False) or (force):
        if cutout_data is None:
            self.get_cutout_data(size)

    if outfile is None:
        outfile = self._get_save_name(index, ".ann")
    if self.outdir != ".":
        outfile = os.path.join(
            self.outdir,
            outfile
        )

    neg = False
    with open(outfile, 'w') as f:
        f.write("COORD W\n")
        f.write("PA SKY\n")
        f.write("FONT hershey14\n")
        f.write("COLOR BLUE\n")
        f.write("CROSS {0} {1} {2} {2}\n".format(
            self.measurements.iloc[index].ra,
            self.measurements.iloc[index].dec,
            3. / 3600.
        ))
        if crossmatch_overlay:
            try:
                f.write("CIRCLE {} {} {}\n".format(
                    self.measurements.iloc[index].ra,
                    self.measurements.iloc[index].dec,
                    self.crossmatch_radius.deg
                ))
            except Exception as e:
                self.logger.warning(
                    "Crossmatch circle overlay failed!"
                    " Has the source been crossmatched?")
        f.write("COLOR GREEN\n")

        if cutout_data is None:
            selavy_cat_cut = self.cutout_df.iloc[index].selavy_overlay
        else:
            selavy_cat_cut = cutout_data.iloc[index].selavy_overlay

        for i, row in selavy_cat_cut.iterrows():
            if row["island_id"].startswith("n"):
                neg = True
                f.write("COLOR RED\n")
            ra = row["ra_deg_cont"]
            dec = row["dec_deg_cont"]
            f.write(
                "ELLIPSE {} {} {} {} {}\n".format(
                    ra,
                    dec,
                    float(
                        row["maj_axis"]) /
                    3600. /
                    2.,
                    float(
                        row["min_axis"]) /
                    3600. /
                    2.,
                    float(
                        row["pos_ang"])))
            f.write(
                "TEXT {} {} {}\n".format(
                    ra, dec, self._remove_sbid(
                        row["island_id"])))
            if neg:
                f.write("COLOR GREEN\n")
                neg = False

    self.logger.debug("Wrote annotation file {}.".format(outfile))

write_measurements(self, simple=False, outfile=None)

Write the measurements to a CSV file.

Parameters:

Name Type Description Default
simple bool

Only include flux density and uncertainty in returned table, defaults to False.

False
outfile Optional[str]

File to write measurements to, defaults to None.

None

Returns:

Type Description
None

None

Source code in vasttools/source.py
def write_measurements(
    self, simple: bool = False, outfile: Optional[str] = None
) -> None:
    """Write the measurements to a CSV file.

    Args:
        simple: Only include flux density and uncertainty in returned
            table, defaults to `False`.
        outfile: File to write measurements to, defaults to None.

    Returns:
        None
    """
    if simple:
        if self.pipeline:
            cols = [
                'source',
                'ra',
                'dec',
                'component_id',
                'flux_peak',
                'flux_peak_err',
                'flux_int',
                'flux_int_err',
                'rms',
            ]
        else:
            cols = [
                'name',
                'ra_deg_cont',
                'dec_deg_cont',
                'component_id',
                'flux_peak',
                'flux_peak_err',
                'flux_int',
                'flux_int_err',
                'rms_image',
            ]

        measurements_to_write = self.measurements[cols]

    else:
        cols = [
            'fields',
            'skycoord',
            'selavy',
            'image',
            'rms',
        ]

        if self.pipeline:
            cols[0] = 'field'

        measurements_to_write = self.measurements.drop(
            labels=cols, axis=1
        )

    # drop any empty values
    if not self.pipeline and not self.forced_fits:
        measurements_to_write = measurements_to_write[
            measurements_to_write['rms_image'] != -99
        ]

    if measurements_to_write.empty:
        self.logger.warning(
            "%s has no measurements! No file will be written.",
            self.name
        )
        return

    if outfile is None:
        outfile = "{}_measurements.csv".format(self.name.replace(
            " ", "_"
        ).replace(
            "/", "_"
        ))

    elif not outfile.endswith(".csv"):
        outfile += ".csv"

    if self.outdir != ".":
        outfile = os.path.join(
            self.outdir,
            outfile
        )

    measurements_to_write.to_csv(outfile, index=False)

    self.logger.debug("Wrote {}.".format(outfile))

write_reg(self, index, outfile=None, crossmatch_overlay=False, size=None, force=False, cutout_data=None)

Write a DS9 region file containing all selavy sources within the image

Parameters:

Name Type Description Default
index int

The index correpsonding to the requested observation.

required
outfile Optional[str]

Name of the file to write, defaults to None.

None
crossmatch_overlay bool

If True, a circle is added to the annotation file output denoting the crossmatch radius, defaults to False.

False
size Optional[astropy.coordinates.angles.Angle]

Size of the cutout, defaults to None.

None
force bool

Whether to force the re-fetching of the cutout data, defaults to False.

False
cutout_data Optional[pandas.core.frame.DataFrame]

Pass external cutout_data to be used instead of fetching the data, defaults to None.

None

Returns:

Type Description
None

None

Source code in vasttools/source.py
def write_reg(
    self,
    index: int,
    outfile: Optional[str] = None,
    crossmatch_overlay: bool = False,
    size: Optional[Angle] = None,
    force: bool = False,
    cutout_data: Optional[pd.DataFrame] = None
) -> None:
    """
    Write a DS9 region file containing all selavy sources within the image

    Args:
        index: The index correpsonding to the requested observation.
        outfile: Name of the file to write, defaults to None.
        crossmatch_overlay: If True, a circle is added to the
            annotation file output denoting the crossmatch radius,
            defaults to False.
        size: Size of the cutout, defaults to None.
        force: Whether to force the re-fetching
            of the cutout data, defaults to `False`.
        cutout_data: Pass external cutout_data to be used
            instead of fetching the data, defaults to None.

    Returns:
        None
    """
    if (self._cutouts_got is False) or (force):
        if cutout_data is None:
            self.get_cutout_data(size)

    if outfile is None:
        outfile = self._get_save_name(index, ".reg")
    if self.outdir != ".":
        outfile = os.path.join(
            self.outdir,
            outfile
        )

    with open(outfile, 'w') as f:
        f.write("# Region file format: DS9 version 4.0\n")
        f.write("global color=green font=\"helvetica 10 normal\" "
                "select=1 highlite=1 edit=1 "
                "move=1 delete=1 include=1 "
                "fixed=0 source=1\n")
        f.write("fk5\n")
        f.write(
            "point({} {}) # point=x color=blue\n".format(
                self.measurements.iloc[index].ra,
                self.measurements.iloc[index].dec,
            ))
        if crossmatch_overlay:
            try:
                f.write("circle({} {} {}) # color=blue\n".format(
                    self.measurements.iloc[index].ra,
                    self.measurements.iloc[index].dec,
                    self.crossmatch_radius.deg
                ))
            except Exception as e:
                self.logger.warning(
                    "Crossmatch circle overlay failed!"
                    " Has the source been crossmatched?")

        if cutout_data is None:
            selavy_cat_cut = self.cutout_df.iloc[index].selavy_overlay
        else:
            selavy_cat_cut = cutout_data.iloc[index].selavy_overlay

        for i, row in selavy_cat_cut.iterrows():
            if row["island_id"].startswith("n"):
                color = "red"
            else:
                color = "green"
            ra = row["ra_deg_cont"]
            dec = row["dec_deg_cont"]
            f.write(
                "ellipse({} {} {} {} {}) # color={}\n".format(
                    ra,
                    dec,
                    float(
                        row["maj_axis"]) /
                    3600. /
                    2.,
                    float(
                        row["min_axis"]) /
                    3600. /
                    2.,
                    float(
                        row["pos_ang"]) +
                    90.,
                    color))
            f.write(
                "text({} {} \"{}\") # color={}\n".format(
                    ra - (10. / 3600.), dec, self._remove_sbid(
                        row["island_id"]), color))

    self.logger.debug("Wrote region file {}.".format(outfile))

SourcePlottingError (Exception)

A custom exception for plotting errors.

Source code in vasttools/source.py
class SourcePlottingError(Exception):
    """
    A custom exception for plotting errors.
    """
    pass

Last update: September 24, 2023
Created: September 24, 2023