Skip to content

pipeline.py

Class to interface with results from the VAST Pipeline.

Attributes:

Name Type Description
HOST_NCPU int

The number of CPU found on the host using 'cpu_count()'.

MeasPairsDoNotExistError (Exception)

An error to indicate that the measurement pairs do not exist for a run.

Source code in vasttools/pipeline.py
class MeasPairsDoNotExistError(Exception):
    """
    An error to indicate that the measurement pairs do not exist for a run.
    """
    pass

PipeAnalysis (PipeRun)

Class that represents an Analysis instance of a Pipeline run. Inherits from class PipeRun.

Attributes:

Name Type Description
associations pandas.core.frame.DataFrame

Associations dataframe from the pipeline run loaded from 'associations.parquet'.

bands pandas.core.frame.DataFrame

The bands dataframe from the pipeline run loaded from 'bands.parquet'.

images pandas.core.frame.DataFrame

Dataframe containing all the information on the images of the pipeline run.

measurements pandas.core.frame.DataFrame

Dataframe containing all the information on the measurements of the pipeline run.

measurement_pairs_file List[str]

List containing the locations of the measurement_pairs.parquet (or .arrow) file(s).

name str

The pipeline run name.

n_workers int

Number of workers (cpus) available.

relations pandas.core.frame.DataFrame

Dataframe containing all the information on the relations of the pipeline run.

skyregions pandas.core.frame.DataFrame

Dataframe containing all the information on the skyregions of the pipeline run.

sources pandas.core.frame.DataFrame

Dataframe containing all the information on the sources of the pipeline run.

sources_skycoord astropy.coordinates.sky_coordinate.SkyCoord

A SkyCoord object of the default sources attribute.

Source code in vasttools/pipeline.py
class PipeAnalysis(PipeRun):
    """
    Class that represents an Analysis instance of a Pipeline run.
    Inherits from class `PipeRun`.

    Attributes:
        associations (pandas.core.frame.DataFrame): Associations dataframe
            from the pipeline run loaded from 'associations.parquet'.
        bands (pandas.core.frame.DataFrame): The bands dataframe from the
            pipeline run loaded from 'bands.parquet'.
        images (pandas.core.frame.DataFrame):
            Dataframe containing all the information on the images
            of the pipeline run.
        measurements (pandas.core.frame.DataFrame):
            Dataframe containing all the information on the measurements
            of the pipeline run.
        measurement_pairs_file (List[str]):
            List containing the locations of the measurement_pairs.parquet (or
            .arrow) file(s).
        name (str):
            The pipeline run name.
        n_workers (int):
            Number of workers (cpus) available.
        relations (pandas.core.frame.DataFrame):
            Dataframe containing all the information on the relations
            of the pipeline run.
        skyregions (pandas.core.frame.DataFrame):
            Dataframe containing all the information on the skyregions
            of the pipeline run.
        sources (pandas.core.frame.DataFrame):
            Dataframe containing all the information on the sources
            of the pipeline run.
        sources_skycoord (astropy.coordinates.sky_coordinate.SkyCoord):
            A SkyCoord object of the default sources attribute.
    """

    def __init__(
        self,
        name: str,
        images: pd.DataFrame,
        skyregions: pd.DataFrame,
        relations: pd.DataFrame,
        sources: pd.DataFrame,
        associations: pd.DataFrame,
        bands: pd.DataFrame,
        measurements: Union[pd.DataFrame, vaex.dataframe.DataFrame],
        measurement_pairs_file: str,
        vaex_meas: bool = False,
        n_workers: int = HOST_NCPU - 1,
        scheduler: str = 'processes',
    ) -> None:
        """
        Constructor method.

        Args:
            name: The name of the pipeline run.
            images: Images dataframe from the pipeline run
                loaded from images.parquet. A `pandas.core.frame.DataFrame`
                instance.
            skyregions: Sky regions dataframe from the pipeline run
                loaded from skyregions.parquet. A `pandas.core.frame.DataFrame`
                instance.
            relations: Relations dataframe from the pipeline run
                loaded from relations.parquet. A `pandas.core.frame.DataFrame`
                instance.
            sources: Sources dataframe from the pipeline run
                loaded from sources.parquet. A `pandas.core.frame.DataFrame`
                instance.
            associations: Associations dataframe from the pipeline run loaded
                from 'associations.parquet'. A `pandas.core.frame.DataFrame`
                instance.
            bands: The bands dataframe from the pipeline run loaded from
                'bands.parquet'.
            measurements: Measurements dataframe from the pipeline run
                loaded from measurements.parquet and the forced measurements
                parquet files.  A `pandas.core.frame.DataFrame` or
                `vaex.dataframe.DataFrame` instance.
            measurement_pairs_file: The location of the two epoch pairs file
                from the pipeline. It is a list of locations due to the fact
                that two pipeline runs could be combined.
            vaex_meas: 'True' if the measurements have been loaded using
                vaex from an arrow file. `False` means the measurements are
                loaded into a pandas DataFrame.
            n_workers: Number of workers (cpus) available.
            scheduler: Dask scheduling option to use. Options are "processes"
                (parallel processing) or "single-threaded". Defaults to
                "single-threaded".

        Returns:
            None
        """
        super().__init__(
            name, images, skyregions, relations, sources, associations,
            bands, measurements, measurement_pairs_file, vaex_meas, n_workers,
            scheduler
        )

    def _filter_meas_pairs_df(
        self,
        measurements_df: Union[pd.DataFrame, vaex.dataframe.DataFrame]
    ) -> Union[pd.DataFrame, vaex.dataframe.DataFrame]:
        """
        A utility method to filter the measurement pairs dataframe to remove
        pairs that are no longer in the measurements dataframe.

        Args:
            measurements_df: The altered measurements dataframe in the same
                format as the standard pipeline dataframe.

        Returns:
            The filtered measurement pairs dataframe.
        """

        if not self._loaded_two_epoch_metrics:
            self.load_two_epoch_metrics()

        if self._vaex_meas_pairs:
            new_measurement_pairs = self.measurement_pairs_df.copy()
        else:
            new_measurement_pairs = vaex.from_pandas(
                self.measurement_pairs_df
            )

        mask_a = new_measurement_pairs['meas_id_a'].isin(
            measurements_df['id'].values
        ).values

        mask_b = new_measurement_pairs['meas_id_b'].isin(
            measurements_df['id'].values
        ).values

        new_measurement_pairs['mask_a'] = mask_a
        new_measurement_pairs['mask_b'] = mask_b

        mask = np.logical_and(mask_a, mask_b)
        new_measurement_pairs['mask'] = mask
        new_measurement_pairs = new_measurement_pairs[
            new_measurement_pairs['mask'] == True
        ]

        new_measurement_pairs = new_measurement_pairs.extract()
        new_measurement_pairs = new_measurement_pairs.drop(
            ['mask_a', 'mask_b', 'mask']
        )

        if not self._vaex_meas_pairs:
            new_measurement_pairs = new_measurement_pairs.to_pandas_df()

        return new_measurement_pairs

    def recalc_measurement_pairs_df(
        self,
        measurements_df: Union[pd.DataFrame, vaex.dataframe.DataFrame]
    ) -> Union[pd.DataFrame, vaex.dataframe.DataFrame]:
        """
        A method to recalculate the two epoch pair metrics based upon a
        provided altered measurements dataframe.

        Designed for use when the measurement fluxes have been changed.

        Args:
            measurements_df: The altered measurements dataframe in the same
                format as the standard pipeline dataframe.

        Returns:
            The recalculated measurement pairs dataframe.
        """
        if not self._loaded_two_epoch_metrics:
            self.load_two_epoch_metrics()

        new_measurement_pairs = self._filter_meas_pairs_df(
            measurements_df[['id']]
        )

        # an attempt to conserve memory
        if isinstance(new_measurement_pairs, vaex.dataframe.DataFrame):
            new_measurement_pairs = new_measurement_pairs.drop(
                ['vs_peak', 'vs_int', 'm_peak', 'm_int']
            )
        else:
            new_measurement_pairs = new_measurement_pairs.drop(
                ['vs_peak', 'vs_int', 'm_peak', 'm_int'],
                axis=1
            )

        flux_cols = [
            'flux_int',
            'flux_int_err',
            'flux_peak',
            'flux_peak_err',
            'id'
        ]

        # convert a vaex measurements df to panads so an index can be set
        if isinstance(measurements_df, vaex.dataframe.DataFrame):
            measurements_df = measurements_df[flux_cols].to_pandas_df()
        else:
            measurements_df = measurements_df.loc[:, flux_cols].copy()

        measurements_df = (
            measurements_df
            .drop_duplicates('id')
            .set_index('id')
        )

        for i in flux_cols:
            if i == 'id':
                continue
            for j in ['a', 'b']:
                pairs_i = i + f'_{j}'
                id_values = new_measurement_pairs[f'meas_id_{j}'].to_numpy()
                new_flux_values = measurements_df.loc[id_values][i].to_numpy()
                new_measurement_pairs[pairs_i] = new_flux_values

        del measurements_df

        # calculate 2-epoch metrics
        new_measurement_pairs["vs_peak"] = calculate_vs_metric(
            new_measurement_pairs['flux_peak_a'].to_numpy(),
            new_measurement_pairs['flux_peak_b'].to_numpy(),
            new_measurement_pairs['flux_peak_err_a'].to_numpy(),
            new_measurement_pairs['flux_peak_err_b'].to_numpy()
        )
        new_measurement_pairs["vs_int"] = calculate_vs_metric(
            new_measurement_pairs['flux_int_a'].to_numpy(),
            new_measurement_pairs['flux_int_b'].to_numpy(),
            new_measurement_pairs['flux_int_err_a'].to_numpy(),
            new_measurement_pairs['flux_int_err_b'].to_numpy()
        )
        new_measurement_pairs["m_peak"] = calculate_m_metric(
            new_measurement_pairs['flux_peak_a'].to_numpy(),
            new_measurement_pairs['flux_peak_b'].to_numpy()
        )
        new_measurement_pairs["m_int"] = calculate_m_metric(
            new_measurement_pairs['flux_int_a'].to_numpy(),
            new_measurement_pairs['flux_int_b'].to_numpy()
        )

        return new_measurement_pairs

    def recalc_sources_df(
        self,
        measurements_df: Union[pd.DataFrame, vaex.dataframe.DataFrame],
        min_vs: float = 4.3,
        measurement_pairs_df: Optional[pd.DataFrame] = None
    ) -> pd.DataFrame:
        """
        Regenerates a sources dataframe using a user provided measurements
        dataframe.

        Args:
            measurements_df: Dataframe of measurements with default pipeline
                columns. A `pandas.core.frame.DataFrame` or
                `vaex.dataframe.DataFrame` instance.
            min_vs: Minimum value of the Vs two epoch parameter to use
                when appending the two epoch metrics maximum.
            measurement_pairs_df: The recalculated measurement pairs dataframe
                if applicable. If not provided then the process will assume
                the fluxes have not changed and will purely filter the
                measurement pairs dataframe.

        Returns:
            The regenerated sources_df.  A `pandas.core.frame.DataFrame`
            instance.

        Raises:
            MeasPairsDoNotExistError: The measurement pairs file(s) do not
                exist for this run
        """

        self._raise_if_no_pairs()

        # Two epoch metrics
        if not self._loaded_two_epoch_metrics:
            self.load_two_epoch_metrics()

        if not self._vaex_meas:
            measurements_df = vaex.from_pandas(measurements_df)

        # account for RA wrapping
        ra_wrap_mask = measurements_df.ra <= 0.1
        measurements_df['ra_wrap'] = measurements_df.func.where(
            ra_wrap_mask, measurements_df[ra_wrap_mask].ra + 360.,
            measurements_df['ra']
        )

        measurements_df['interim_ew'] = (
            measurements_df['ra_wrap'] * measurements_df['weight_ew']
        )

        measurements_df['interim_ns'] = (
            measurements_df['dec'] * measurements_df['weight_ns']
        )

        for col in ['flux_int', 'flux_peak']:
            measurements_df[f'{col}_sq'] = (measurements_df[col] ** 2.)

        # most of the aggregate calculations done in vaex
        sources_df = measurements_df.groupby(
            by='source',
            agg={
                'interim_ew_sum': vaex.agg.sum(
                    'interim_ew', selection='forced==False'
                ),
                'interim_ns_sum': vaex.agg.sum(
                    'interim_ns', selection='forced==False'
                ),
                'weight_ew_sum': vaex.agg.sum(
                    'weight_ew', selection='forced==False'
                ),
                'weight_ns_sum': vaex.agg.sum(
                    'weight_ns', selection='forced==False'
                ),
                'avg_compactness': vaex.agg.mean(
                    'compactness', selection='forced==False'
                ),
                'min_snr': vaex.agg.min(
                    'snr', selection='forced==False'
                ),
                'max_snr': vaex.agg.max(
                    'snr', selection='forced==False'
                ),
                'avg_flux_int': vaex.agg.mean('flux_int'),
                'avg_flux_peak': vaex.agg.mean('flux_peak'),
                'max_flux_peak': vaex.agg.max('flux_peak'),
                'max_flux_int': vaex.agg.max('flux_int'),
                'min_flux_peak': vaex.agg.min('flux_peak'),
                'min_flux_int': vaex.agg.min('flux_int'),
                'min_flux_peak_isl_ratio': vaex.agg.min('flux_peak_isl_ratio'),
                'min_flux_int_isl_ratio': vaex.agg.min('flux_int_isl_ratio'),
                'n_measurements': vaex.agg.count('id'),
                'n_selavy': vaex.agg.count('id', selection='forced==False'),
                'n_forced': vaex.agg.count('id', selection='forced==True'),
                'n_siblings': vaex.agg.sum('has_siblings')
            }
        )

        # Drop sources which no longer have any selavy measurements
        sources_df = sources_df[sources_df.n_selavy > 0].extract()

        # Calculate average position
        sources_df['wavg_ra'] = (
            sources_df['interim_ew_sum'] / sources_df['weight_ew_sum']
        )
        sources_df['wavg_dec'] = (
            sources_df['interim_ns_sum'] / sources_df['weight_ns_sum']
        )

        sources_df['wavg_uncertainty_ew'] = (
            1. / np.sqrt(sources_df['weight_ew_sum'])
        )
        sources_df['wavg_uncertainty_ns'] = (
            1. / np.sqrt(sources_df['weight_ns_sum'])
        )

        # the RA wrapping is reverted at the end of the function when the
        # df is in pandas format.

        # TraP variability metrics, using Dask.
        measurements_df_temp = measurements_df[[
            'flux_int', 'flux_int_err', 'flux_peak', 'flux_peak_err', 'source'
        ]].extract().to_pandas_df()

        col_dtype = {
            'v_int': 'f',
            'v_peak': 'f',
            'eta_int': 'f',
            'eta_peak': 'f',
        }

        sources_df_fluxes = (
            dd.from_pandas(measurements_df_temp, self.n_workers)
            .groupby('source')
            .apply(
                pipeline_get_variable_metrics,
                meta=col_dtype
            )
            .compute(num_workers=self.n_workers, scheduler=self.scheduler)
        )

        # Switch to pandas at this point to perform join
        sources_df = sources_df.to_pandas_df().set_index('source')

        sources_df = sources_df.join(sources_df_fluxes)

        sources_df = sources_df.join(
            self.sources[['new', 'new_high_sigma']],
        )

        if measurement_pairs_df is None:
            measurement_pairs_df = self._filter_meas_pairs_df(
                measurements_df[['id']]
            )

        if isinstance(measurement_pairs_df, vaex.dataframe.DataFrame):
            new_measurement_pairs = (
                measurement_pairs_df[
                    measurement_pairs_df['vs_int'].abs() >= min_vs
                    or measurement_pairs_df['vs_peak'].abs() >= min_vs
                ]
            )
        else:
            min_vs_mask = np.logical_or(
                (measurement_pairs_df['vs_int'].abs() >= min_vs).to_numpy(),
                (measurement_pairs_df['vs_peak'].abs() >= min_vs).to_numpy()
            )
            new_measurement_pairs = measurement_pairs_df.loc[min_vs_mask]
            new_measurement_pairs = vaex.from_pandas(new_measurement_pairs)

        new_measurement_pairs['vs_int_abs'] = (
            new_measurement_pairs['vs_int'].abs()
        )

        new_measurement_pairs['vs_peak_abs'] = (
            new_measurement_pairs['vs_peak'].abs()
        )

        new_measurement_pairs['m_int_abs'] = (
            new_measurement_pairs['m_int'].abs()
        )

        new_measurement_pairs['m_peak_abs'] = (
            new_measurement_pairs['m_peak'].abs()
        )

        sources_df_two_epochs = new_measurement_pairs.groupby(
            'source_id',
            agg={
                'vs_significant_max_int': vaex.agg.max('vs_int_abs'),
                'vs_significant_max_peak': vaex.agg.max('vs_peak_abs'),
                'm_abs_significant_max_int': vaex.agg.max('m_int_abs'),
                'm_abs_significant_max_peak': vaex.agg.max('m_peak_abs'),
            }
        )

        sources_df_two_epochs = (
            sources_df_two_epochs.to_pandas_df().set_index('source_id')
        )

        sources_df = sources_df.join(sources_df_two_epochs)

        del sources_df_two_epochs

        # new relation numbers
        relation_mask = np.logical_and(
            (self.relations.from_source_id.isin(sources_df.index.values)),
            (self.relations.to_source_id.isin(sources_df.index.values))
        )

        new_relations = self.relations.loc[relation_mask]

        sources_df_relations = (
            new_relations.groupby('from_source_id').agg('count')
        ).rename(columns={'to_source_id': 'n_relations'})

        sources_df = sources_df.join(sources_df_relations)

        # nearest neighbour
        sources_sky_coord = gen_skycoord_from_df(
            sources_df, ra_col='wavg_ra', dec_col='wavg_dec'
        )

        idx, d2d, _ = sources_sky_coord.match_to_catalog_sky(
            sources_sky_coord, nthneighbor=2
        )

        sources_df['n_neighbour_dist'] = d2d.degree

        # Fill the NaN values.
        sources_df = sources_df.fillna(value={
            "vs_significant_max_peak": 0.0,
            "m_abs_significant_max_peak": 0.0,
            "vs_significant_max_int": 0.0,
            "m_abs_significant_max_int": 0.0,
            'n_relations': 0,
            'v_int': 0.,
            'v_peak': 0.
        }).drop([
            'interim_ew_sum', 'interim_ns_sum',
            'weight_ew_sum', 'weight_ns_sum'
        ], axis=1)

        # correct the RA wrapping
        ra_wrap_mask = (sources_df['wavg_ra'] >= 360.).to_numpy()
        sources_df.loc[
            ra_wrap_mask, 'wavg_ra'
        ] = sources_df.loc[ra_wrap_mask]["wavg_ra"].to_numpy() - 360.

        # Switch relations column to int
        sources_df['n_relations'] = sources_df['n_relations'].astype(int)

        return sources_df

    def _get_epoch_pair_plotting_df(
        self,
        df_filter: pd.DataFrame,
        epoch_pair_id: int,
        vs_label: str,
        m_label: str,
        vs_min: float,
        m_min: float
    ) -> Tuple[pd.DataFrame, int, int, float]:
        """
        Generates some standard parameters used by both two epoch plotting
        routines (bokeh and matplotlib).

        Args:
            df_filter: Dataframe of measurement pairs with metric
                information (pre-filtered). A `pandas.core.frame.DataFrame`
                instance.
            epoch_pair_id: The epoch pair to plot.
            vs_label: The name of the vs column to use (vs_int or vs_peak).
            m_label: The name of the m column to use (m_int or m_peak).
            vs_min: The minimum Vs metric value to be considered a candidate.
            m_min: The minimum m metric absolute value to be
                considered as candidates.

        Returns:
            Tuple of (df_filter, num_pairs, num_candidates, td_days).
        """

        td_days = (
            self.pairs_df.loc[epoch_pair_id]['td'].total_seconds()
            / (3600. * 24.)
        )

        num_pairs = df_filter.shape[0]

        # convert Vs to absolute for plotting purposes.
        df_filter[vs_label] = df_filter[vs_label].abs()

        num_candidates = df_filter[
            (df_filter[vs_label] > vs_min) & (df_filter[m_label].abs() > m_min)
        ].shape[0]

        unique_meas_ids = (
            pd.unique(df_filter[['meas_id_a', 'meas_id_b']].values.ravel('K'))
        )

        temp_meas = self.measurements[
            self.measurements['id'].isin(unique_meas_ids)
        ][['id', 'forced']]

        if self._vaex_meas:
            temp_meas = temp_meas.extract().to_pandas_df()

        temp_meas = temp_meas.drop_duplicates('id').set_index('id')

        df_filter = df_filter.merge(
            temp_meas, left_on='meas_id_a', right_index=True,
            suffixes=('_a', '_b')
        )

        df_filter = df_filter.merge(
            temp_meas, left_on='meas_id_b', right_index=True,
            suffixes=('_a', '_b')
        ).rename(columns={'forced': 'forced_a'})

        df_filter['forced_sum'] = (
            df_filter[['forced_a', 'forced_b']].agg('sum', axis=1)
        ).astype(str)

        return df_filter, num_pairs, num_candidates, td_days

    def _plot_epoch_pair_bokeh(
        self,
        epoch_pair_id: int,
        df: pd.DataFrame,
        vs_min: float = 4.3,
        m_min: float = 0.26,
        use_int_flux: bool = False,
        remove_two_forced: bool = False,
        plot_style: str = 'a'
    ) -> Model:
        """
        Adapted from code written by Andrew O'Brien.
        Plot the results of the two epoch analysis using bokeh. Currently this
        can only plot one epoch pair at a time.

        Args:
            epoch_pair_id: The epoch pair to plot.
            df: Dataframe of measurement pairs with metric information. A
                `pandas.core.frame.DataFrame` instance.
            vs_min: The minimum Vs metric value to be considered a candidate,
                defaults to 4.3.
            m_min: The minimum m metric absolute value to be considered a
                candidates, defaults to 0.26.
            use_int_flux: Whether to use the integrated fluxes instead of
                the peak fluxes.
            remove_two_forced: Will exclude any pairs that are both forced
                extractions if True, defaults to False.
            plot_style: Select whether to plot with style 'a' (Mooley) or
                'b' (Radcliffe). Defaults to 'a'.

        Returns:
            Bokeh figure.
        """
        vs_label = 'vs_int' if use_int_flux else 'vs_peak'
        m_label = 'm_int' if use_int_flux else 'm_peak'

        df_filter, num_pairs, num_candidates, td_days = (
            self._get_epoch_pair_plotting_df(
                df, epoch_pair_id, vs_label, m_label, vs_min, m_min
            )
        )

        candidate_perc = num_candidates / num_pairs * 100.

        cmap = factor_cmap(
            'forced_sum', palette=Category10_3, factors=['0', '1', '2']
        )

        if plot_style == 'a':
            df_filter[m_label] = df_filter[m_label].abs()

            fig = figure(
                x_axis_label="m",
                y_axis_label="Vs",
                y_axis_type='log',
                title=(
                    f"{epoch_pair_id}: {td_days:.2f} days"
                    f" {num_candidates}/{num_pairs} candidates "
                    f"({candidate_perc:.2f}%)"
                ),
                tools="pan,box_select,lasso_select,box_zoom,wheel_zoom,reset",
                tooltips=[("source", "@source_id")],
            )

            range_len = 2 if remove_two_forced else 3

            for i in range(range_len):
                source = df_filter[df_filter['forced_sum'] == str(i)]
                if not source.empty:
                    fig.scatter(
                        f"{m_label}",
                        f"{vs_label}",
                        source=source,
                        color=cmap,
                        marker="circle",
                        legend_label=f"{i} forced",
                        size=2,
                        nonselection_fill_alpha=0.1,
                        nonselection_fill_color="grey",
                        nonselection_line_color=None,
                    )
            # Vertical line
            vline = Span(
                location=m_min, dimension='height', line_color='black',
                line_dash='dashed'
            )
            fig.add_layout(vline)
            # Horizontal line
            hline = Span(
                location=vs_min, dimension='width', line_color='black',
                line_dash='dashed'
            )
            fig.add_layout(hline)

            variable_region = BoxAnnotation(
                left=m_min,
                bottom=vs_min,
                fill_color="orange",
                fill_alpha=0.3,
                level="underlay",
            )
            fig.add_layout(variable_region)
            fig.legend.location = "bottom_right"

        else:

            fig = figure(
                x_axis_label="Vs",
                y_axis_label="m",
                title=(
                    f"{epoch_pair_id}: {td_days:.2f} days"
                    f" {num_candidates}/{num_pairs} candidates "
                    f"({candidate_perc:.2f}%)"
                ),
                tools="pan,box_select,lasso_select,box_zoom,wheel_zoom,reset",
                tooltips=[("source", "@source_id")],
            )

            range_len = 2 if remove_two_forced else 3

            for i in range(range_len):
                source = df_filter[df_filter['forced_sum'] == str(i)]
                if not source.empty:
                    fig.scatter(
                        f"{vs_label}",
                        f"{m_label}",
                        source=source,
                        color=cmap,
                        marker="circle",
                        legend_label=f"{i} forced",
                        size=2,
                        nonselection_fill_alpha=0.1,
                        nonselection_fill_color="grey",
                        nonselection_line_color=None,
                    )

            variable_region_1 = BoxAnnotation(
                left=vs_min, bottom=m_min,
                fill_color="orange", level="underlay"
            )
            variable_region_2 = BoxAnnotation(
                left=vs_min, top=-m_min, fill_color="orange", level="underlay"
            )
            fig.add_layout(variable_region_1)
            fig.add_layout(variable_region_2)

            fig.legend.location = "top_right"

        fig.legend.click_policy = "hide"

        return fig

    def _plot_epoch_pair_matplotlib(
        self,
        epoch_pair_id: int,
        df: pd.DataFrame,
        vs_min: float = 4.3,
        m_min: float = 0.26,
        use_int_flux: bool = False,
        remove_two_forced: bool = False,
        plot_style: str = 'a'
    ) -> plt.figure:
        """
        Plot the results of the two epoch analysis using matplotlib. Currently
        this can only plot one epoch pair at a time.

        Args:
            epoch_pair_id: The epoch pair to plot.
            df: Dataframe of measurement pairs with metric information. A
                `pandas.core.frame.DataFrame` instance.
            vs_min: The minimum Vs metric value to be considered a candidate,
                defaults to 4.3.
            m_min: The minimum m metric absolute value to be considered a
                candidates, defaults to 0.26.
            use_int_flux: Whether to use the integrated fluxes instead of the
                peak fluxes.
            remove_two_forced: Will exclude any pairs that are both forced
                extractions if True, defaults to False.
            plot_style: Select whether to plot with style 'a' (Mooley) or
                'b' (Radcliffe). Defaults to 'a'.

        Returns:
            Matplotlib pyplot figure containing the plot.
        """
        plt.close()  # close any previous ones

        vs_label = 'vs_int' if use_int_flux else 'vs_peak'
        m_label = 'm_int' if use_int_flux else 'm_peak'

        df_filter, num_pairs, num_candidates, td_days = (
            self._get_epoch_pair_plotting_df(
                df, epoch_pair_id, vs_label, m_label, vs_min, m_min
            )
        )

        candidate_perc = num_candidates / num_pairs * 100.

        fig = plt.figure(figsize=(8, 8))
        ax = fig.add_subplot(111)

        colors = ["C0", "C1", "C2"]
        labels = ["0 forced", "1 forced", "2 forced"]

        range_len = 2 if remove_two_forced else 3

        if plot_style == 'a':
            for i in range(range_len):
                mask = df_filter['forced_sum'] == str(i)
                if np.any(mask):
                    ax.scatter(
                        df_filter[mask][m_label].abs(),
                        df_filter[mask][vs_label],
                        c=colors[i], label=labels[i],
                        zorder=2
                    )

            ax.axhline(vs_min, ls="--", c='k', zorder=5)
            ax.axvline(m_min, ls="--", c='k', zorder=5)
            ax.set_yscale('log')

            y_limits = ax.get_ylim()
            x_limits = ax.get_xlim()

            ax.fill_between(
                [m_min, 1e5], vs_min, 1e5,
                color='navajowhite', alpha=0.5, zorder=1
            )

            ax.set_xlim(x_limits)
            ax.set_ylim(y_limits)

            ax.set_xlabel(r"|$m$|")
            ax.set_ylabel(r"$V_{s}$")

        else:
            ax.fill_between([vs_min, 100], m_min, 4.2, color="gold", alpha=0.3)
            ax.fill_between(
                [vs_min, 100], -4.2, m_min * -1, color="gold", alpha=0.3
            )

            for i in range(range_len):
                mask = df_filter['forced_sum'] == str(i)
                if np.any(mask):
                    ax.scatter(
                        df_filter[mask][vs_label], df_filter[mask][m_label],
                        c=colors[i], label=labels[i]
                    )
            ax.set_xlim(0.5, 50)
            ax.set_ylim(-4.0, 4.0)

            ax.set_ylabel(r"$m$")
            ax.set_xlabel(r"$V_{s}$")

        date_string = "Epoch {} (Time {:.2f} days)".format(
            epoch_pair_id, td_days
        )
        number_string = "Candidates: {}/{} ({:.2f} %)".format(
            num_candidates, num_pairs, (100.*num_candidates/num_pairs)
        )
        ax.text(
            0.6, 0.05, date_string + '\n' + number_string,
            transform=ax.transAxes
        )
        ax.legend()

        return fig

    def plot_two_epoch_pairs(
        self,
        epoch_pair_id: int,
        query: Optional[str] = None,
        df: Optional[pd.DataFrame] = None,
        vs_min: float = 4.3,
        m_min: float = 0.26,
        use_int_flux: bool = False,
        remove_two_forced: bool = False,
        plot_type: str = 'bokeh',
        plot_style: str = 'a'
    ) -> Union[Model, plt.figure]:
        """
        Adapted from code written by Andrew O'Brien.
        Plot the results of the two epoch analysis. Currently this can only
        plot one epoch pair at a time.

        Args:
            epoch_pair_id: The epoch pair to plot.
            query: String query to apply to the dataframe before the analysis
                is run, defaults to None.
            df: Dataframe of sources from the pipeline run, defaults to None.
                If None then the sources from the PipeAnalysis object are used.
            vs_min: The minimum Vs metric value to be considered a candidate,
                defaults to 4.3.
            m_min: The minimum m metric absolute value to be considered a
                candidate, defaults to 0.26.
            use_int_flux: Whether to use the integrated fluxes instead of the
                peak fluxes.
            remove_two_forced: Will exclude any pairs that are both forced
                extractions if True, defaults to False.
            plot_type: Selects whether the returned plot is 'bokeh' or
                'matplotlib', defaults to 'bokeh'.
            plot_style: Select whether to plot with style 'a' (Mooley)
                or 'b' (Radcliffe). Defaults to 'a'.

        Returns:
            Bokeh or matplotlib figure.

        Raises:
            Exception: The two epoch metrics must be loaded before using this
                function.
            Exception: 'plot_type' is not recognised.
            Exception: `plot_style` is not recognised.
            Exception: Pair with entered ID does not exist.
            MeasPairsDoNotExistError: The measurement pairs file(s) do not
                exist for this run
        """

        self._raise_if_no_pairs()

        if not self._loaded_two_epoch_metrics:
            raise Exception(
                "The two epoch metrics must first be loaded to use the"
                " plotting function. Please do so with the command:\n"
                "'mypiperun.load_two_epoch_metrics()'\n"
                "and try again."
            )

        if plot_type not in ['bokeh', 'matplotlib']:
            raise Exception(
                "'plot_type' value is not recognised!"
                " Must be either 'bokeh' or 'matplotlib'."
            )

        if plot_style not in ['a', 'b']:
            raise Exception(
                "'plot_style' value is not recognised!"
                " Must be either 'a' for Mooley or 'b' for Radcliffe."
            )

        if epoch_pair_id not in self.pairs_df.index.values:
            raise Exception(f"Pair with ID '{epoch_pair_id}' does not exist!")

        if df is None:
            df = self.sources

        if query is not None:
            df = df.query(query)

        pair_epoch_key = self.pairs_df.loc[epoch_pair_id]['pair_epoch_key']

        pairs_df = (
            self.measurement_pairs_df.loc[
                self.measurement_pairs_df.pair_epoch_key == pair_epoch_key
            ]
        )

        if self._vaex_meas_pairs:
            pairs_df = pairs_df.extract().to_pandas_df()

        pairs_df = pairs_df[pairs_df['source_id'].isin(df.index.values)]

        if plot_type == 'bokeh':
            fig = self._plot_epoch_pair_bokeh(
                epoch_pair_id,
                pairs_df,
                vs_min,
                m_min,
                use_int_flux,
                remove_two_forced,
                plot_style
            )
        else:
            fig = self._plot_epoch_pair_matplotlib(
                epoch_pair_id,
                pairs_df,
                vs_min,
                m_min,
                use_int_flux,
                remove_two_forced,
                plot_style
            )

        return fig

    def run_two_epoch_analysis(
        self, vs: float, m: float, query: Optional[str] = None,
        df: Optional[pd.DataFrame] = None, use_int_flux: bool = False
    ) -> Tuple[pd.DataFrame, pd.DataFrame]:
        """
        Run the two epoch analysis on the pipeline run, with optional
        inputs to use a query or filtered dataframe.

        Args:
            vs: The minimum Vs metric value to be considered a candidate.
            m: The minimum m metric absolute value to be considered a
                candidate.
            query: String query to apply to the dataframe before the analysis
                is run, defaults to None.
            df: Dataframe of sources from the pipeline run, defaults to None.
                If None then the sources from the PipeAnalysis object are used.
            use_int_flux: Use integrated fluxes for the analysis instead of
                peak fluxes, defaults to 'False'.

        Returns:
            Tuple containing two dataframes of the candidate sources and pairs.

        Raises:
            Exception: The two epoch metrics must be loaded before using this
                function.
            MeasPairsDoNotExistError: The measurement pairs file(s) do not
                exist for this run
        """

        self._raise_if_no_pairs()

        if not self._loaded_two_epoch_metrics:
            raise Exception(
                "The two epoch metrics must first be loaded to use the"
                " plotting function. Please do so with the command:\n"
                "'mypiperun.load_two_epoch_metrics()'\n"
                "and try again."
            )

        if df is None:
            df = self.sources

        if query is not None:
            df = df.query(query)

        allowed_sources = df.index.values

        pairs_df = self.measurement_pairs_df.copy()

        if len(allowed_sources) != self.sources.shape[0]:
            if self._vaex_meas_pairs:
                pairs_df = pairs_df[
                    pairs_df['source_id'].isin(allowed_sources)
                ]
            else:
                pairs_df = pairs_df.loc[
                    pairs_df['source_id'].isin(allowed_sources)
                ]

        vs_label = 'vs_int' if use_int_flux else 'vs_peak'
        m_abs_label = 'm_int' if use_int_flux else 'm_peak'

        pairs_df[vs_label] = pairs_df[vs_label].abs()
        pairs_df[m_abs_label] = pairs_df[m_abs_label].abs()

        # If vaex convert these to pandas
        if self._vaex_meas_pairs:
            candidate_pairs = pairs_df[
                (pairs_df[vs_label] > vs) & (pairs_df[m_abs_label] > m)
            ]

            candidate_pairs = candidate_pairs.to_pandas_df()

        else:
            candidate_pairs = pairs_df.loc[
                (pairs_df[vs_label] > vs) & (pairs_df[m_abs_label] > m)
            ]

        unique_sources = candidate_pairs['source_id'].unique()

        candidate_sources = self.sources.loc[unique_sources]

        return candidate_sources, candidate_pairs

    def _fit_eta_v(
        self, df: pd.DataFrame, use_int_flux: bool = False
    ) -> Tuple[float, float, float, float]:
        """
        Fits the eta and v distributions with Gaussians. Used from
        within the 'run_eta_v_analysis' method.

        Args:
            df: DataFrame containing the sources from the pipeline run. A
                `pandas.core.frame.DataFrame` instance.
            use_int_flux: Use integrated fluxes for the analysis instead of
                peak fluxes, defaults to 'False'.

        Returns: Tuple containing the eta_fit_mean, eta_fit_sigma, v_fit_mean
            and the v_fit_sigma.
        """

        if use_int_flux:
            eta_label = 'eta_int'
            v_label = 'v_int'
        else:
            eta_label = 'eta_peak'
            v_label = 'v_peak'

        eta_log = np.log10(df[eta_label])
        v_log = np.log10(df[v_label])

        eta_log_clipped = sigma_clip(
            eta_log, masked=False, stdfunc=mad_std, sigma=3
        )
        v_log_clipped = sigma_clip(
            v_log, masked=False, stdfunc=mad_std, sigma=3
        )

        eta_fit_mean, eta_fit_sigma = norm.fit(eta_log_clipped)
        v_fit_mean, v_fit_sigma = norm.fit(v_log_clipped)

        return (eta_fit_mean, eta_fit_sigma, v_fit_mean, v_fit_sigma)

    def _gaussian_fit(
        self, data: pd.Series, param_mean: float, param_sigma: float
    ) -> Tuple[np.ndarray, norm]:
        """
        Returns the Guassian to add to the matplotlib plot.

        Args:
            data: Series object containing the log10 values of the
                distribution to plot.
            param_mean: The calculated mean of the Gaussian to fit.
            param_sigma: The calculated sigma of the Gaussian to fit.

        Returns:
            Tuple containing the range of the returned data and the
            Gaussian fit.
        """
        range_data = np.linspace(min(data), max(data), 1000)
        fit = norm.pdf(range_data, loc=param_mean, scale=param_sigma)

        return range_data, fit

    def _make_bins(self, x: pd.Series) -> List[float]:
        """
        Calculates the bins that should be used for the v, eta distribution
        using bayesian blocks.

        Args:
            x: Series object containing the log10 values of the
                distribution to plot.

        Returns:
            Bins to apply.
        """
        new_bins = bayesian_blocks(x)
        binsx = [
            new_bins[a] for a in range(
                len(new_bins) - 1
            ) if abs((new_bins[a + 1] - new_bins[a]) / new_bins[a]) > 0.05
        ]
        binsx = binsx + [new_bins[-1]]

        return binsx

    def eta_v_diagnostic_plot(
        self, eta_cutoff: float, v_cutoff: float,
        df: Optional[pd.DataFrame] = None,
        use_int_flux: bool = False
    ) -> plt.figure:
        """
        Adapted from code written by Antonia Rowlinson.
        Produces the eta, V 'diagnostic plot'
        (see Rowlinson et al., 2018,
        https://ui.adsabs.harvard.edu/abs/2019A%26C....27..111R/abstract).

        Args:
            eta_cutoff: The log10 eta_cutoff from the analysis.
            v_cutoff: The log10 v_cutoff from the analysis.
            df: Dataframe containing the sources from the Pipeline run. If
                not provided then the `self.sources` dataframe will be used.
                A `pandas.core.frame.DataFrame` instance.
            use_int_flux: Use integrated fluxes for the analysis instead of
                peak fluxes, defaults to 'False'.

        Returns:
            matplotlib figure containing the plot.
        """
        plt.close()  # close any previous ones

        if df is None:
            df = self.sources

        if use_int_flux:
            eta_label = 'eta_int'
            v_label = 'v_int'
        else:
            eta_label = 'eta_peak'
            v_label = 'v_peak'

        eta_cutoff = np.log10(eta_cutoff)
        v_cutoff = np.log10(v_cutoff)

        nullfmt = NullFormatter()  # no labels

        fig = plt.figure(figsize=(12, 12))
        ax1 = fig.add_subplot(221)
        ax2 = fig.add_subplot(222)
        ax3 = fig.add_subplot(223)
        ax4 = fig.add_subplot(224)
        fontP = FontProperties()
        fontP.set_size('large')
        fig.subplots_adjust(hspace=.001, wspace=0.001)
        ax1.set_ylabel(r'$\eta_\nu$', fontsize=28)
        ax3.set_ylabel(r'$V_\nu$', fontsize=28)
        ax3.set_xlabel('Max Flux (Jy)', fontsize=24)
        ax4.set_xlabel('Max Flux / Median Flux', fontsize=24)

        xdata_ax3 = df['max_flux_peak']
        xdata_ax4 = df['max_flux_peak'] / df['avg_flux_peak']
        ydata_ax1 = df[eta_label]
        ydata_ax3 = df[v_label]
        ax1.scatter(xdata_ax3, ydata_ax1, s=10., zorder=5)
        ax2.scatter(xdata_ax4, ydata_ax1, s=10., zorder=6)
        ax3.scatter(xdata_ax3, ydata_ax3, s=10., zorder=7)
        ax4.scatter(xdata_ax4, ydata_ax3, s=10., zorder=8)

        Xax3 = df['max_flux_peak']
        Xax4 = df['max_flux_peak'] / df['avg_flux_peak']
        Yax1 = df[eta_label]
        Yax3 = df[v_label]

        if eta_cutoff != 0 or v_cutoff != 0:
            ax1.axhline(
                y=10.**eta_cutoff, linewidth=2, color='k', linestyle='--'
            )
            ax2.axhline(
                y=10.**eta_cutoff, linewidth=2, color='k', linestyle='--'
            )
            ax3.axhline(
                y=10.**v_cutoff, linewidth=2, color='k', linestyle='--'
            )
            ax4.axhline(
                y=10.**v_cutoff, linewidth=2, color='k', linestyle='--'
            )

        ax1.set_yscale('log')
        ax1.set_xscale('log')
        ax2.set_yscale('log')
        ax3.set_yscale('log')
        ax3.set_xscale('log')
        ax4.set_yscale('log')
        xmin_ax3 = 10.**(int(np.log10(min(Xax3)) - 1.1))
        xmax_ax3 = 10.**(int(np.log10(max(Xax3)) + 1.2))
        xmin_ax4 = 0.8
        xmax_ax4 = int(max(xdata_ax4) + 0.5)
        ymin_ax1 = 10.**(int(np.log10(min(Yax1)) - 1.1))
        ymax_ax1 = 10.**(int(np.log10(max(Yax1)) + 1.2))
        ymin_ax3 = 10.**(int(np.log10(min(Yax3)) - 1.1))
        ymax_ax3 = 10.**(int(np.log10(max(Yax3)) + 1.2))
        ax1.set_ylim(ymin_ax1, ymax_ax1)
        ax3.set_ylim(ymin_ax3, ymax_ax3)
        ax3.set_xlim(xmin_ax3, xmax_ax3)
        ax4.set_xlim(xmin_ax4, xmax_ax4)
        ax1.set_xlim(ax3.get_xlim())
        ax4.set_ylim(ax3.get_ylim())
        ax2.set_xlim(ax4.get_xlim())
        ax2.set_ylim(ax1.get_ylim())
        ax1.xaxis.set_major_formatter(nullfmt)
        ax4.yaxis.set_major_formatter(nullfmt)
        ax2.xaxis.set_major_formatter(nullfmt)
        ax2.yaxis.set_major_formatter(nullfmt)

        return fig

    def _plot_eta_v_matplotlib(
        self,
        df: pd.DataFrame,
        eta_fit_mean: float,
        eta_fit_sigma: float,
        v_fit_mean: float,
        v_fit_sigma: float,
        eta_cutoff: float,
        v_cutoff: float,
        use_int_flux: bool = False
    ) -> plt.figure:
        """
        Adapted from code written by Antonia Rowlinson.
        Produces the eta, V candidates plot
        (see Rowlinson et al., 2018,
        https://ui.adsabs.harvard.edu/abs/2019A%26C....27..111R/abstract).
        Returns a matplotlib version.
        Args:
            df: Dataframe containing the sources from the pipeline run.
                A `pandas.core.frame.DataFrame` instance.
            eta_fit_mean: The mean of the eta fitted Gaussian.
            eta_fit_sigma: The sigma of the eta fitted Gaussian.
            v_fit_mean: The mean of the v fitted Gaussian.
            v_fit_sigma: The sigma of the v fitted Gaussian.
            eta_cutoff: The log10 eta_cutoff from the analysis.
            v_cutoff: The log10 v_cutoff from the analysis.
            use_int_flux: Use integrated fluxes for the analysis instead of
                peak fluxes, defaults to 'False'.
        Returns:
            Matplotlib figure containing the plot.
        """
        plt.close()  # close any previous ones
        if use_int_flux:
            x_label = 'eta_int'
            y_label = 'v_int'
            title = "Int. Flux"
        else:
            x_label = 'eta_peak'
            y_label = 'v_peak'
            title = 'Peak Flux'

        eta_cutoff = np.log10(eta_cutoff)
        v_cutoff = np.log10(v_cutoff)

        nullfmt = NullFormatter()  # no labels
        fontP = FontProperties()
        fontP.set_size('large')
        left, width = 0.1, 0.65
        bottom, height = 0.1, 0.65
        bottom_h = left_h = left+width+0.02
        rect_scatter = [left, bottom, width, height]
        rect_histx = [left, bottom_h, width, 0.2]
        rect_histy = [left_h, bottom, 0.2, height]
        fig = plt.figure(figsize=(12, 12))
        axScatter = fig.add_subplot(223)
        plt.xlabel(r'$\eta_{\nu}$', fontsize=28)
        plt.ylabel(r'$V_{\nu}$', fontsize=28)
        axHistx = fig.add_subplot(221)
        axHisty = fig.add_subplot(224)
        axHistx.xaxis.set_major_formatter(nullfmt)
        axHisty.yaxis.set_major_formatter(nullfmt)
        axHistx.axes.yaxis.set_ticklabels([])
        axHisty.axes.xaxis.set_ticklabels([])

        xdata_var = np.log10(df[x_label])
        ydata_var = np.log10(df[y_label])
        axScatter.scatter(xdata_var, ydata_var, s=10., zorder=5, color='C0')
        axScatter.fill_between(
            [eta_cutoff, 1e4], v_cutoff, 1e4,
            color='navajowhite', alpha=0.5
        )

        x = np.log10(df[x_label])
        y = np.log10(df[y_label])

        axHistx.hist(
            x, bins=self._make_bins(x), density=1,
            histtype='stepfilled', color='C0'
        )
        axHisty.hist(
            y, bins=self._make_bins(y), density=1,
            histtype='stepfilled', orientation='horizontal', color='C0'
        )

        xmin = int(min(x) - 1.1)
        xmax = int(max(x) + 1.1)
        ymin = int(min(y) - 1.1)
        ymax = int(max(y) + 1.1)
        xvals = range(xmin, xmax)
        xtxts = [r'$10^{'+str(a)+'}$' for a in xvals]
        yvals = range(ymin, ymax)
        ytxts = [r'$10^{' + str(a) + '}$' for a in yvals]
        axScatter.set_xlim([xmin, xmax])
        axScatter.set_ylim([ymin, ymax])
        axScatter.set_xticks(xvals)
        axScatter.set_xticklabels(xtxts, fontsize=20)
        axScatter.set_yticks(yvals)
        axScatter.set_yticklabels(ytxts, fontsize=20)
        axHistx.set_xlim(axScatter.get_xlim())
        axHisty.set_ylim(axScatter.get_ylim())

        if eta_cutoff != 0 or v_cutoff != 0:
            axHistx.axvline(
                x=eta_cutoff, linewidth=2, color='k', linestyle='--'
            )
            axHisty.axhline(
                y=v_cutoff, linewidth=2, color='k', linestyle='--'
            )
            axScatter.axhline(
                y=v_cutoff, linewidth=2, color='k', linestyle='--'
            )
            axScatter.axvline(
                x=eta_cutoff, linewidth=2, color='k', linestyle='--'
            )

        range_x, fitx = self._gaussian_fit(x, eta_fit_mean, eta_fit_sigma)
        axHistx.plot(range_x, fitx, 'k:', linewidth=2)
        range_y, fity = self._gaussian_fit(y, v_fit_mean, v_fit_sigma)
        axHisty.plot(fity, range_y, 'k:', linewidth=2)

        axHistx.set_position(rect_histx)
        axHisty.set_position(rect_histy)
        axScatter.set_position(rect_scatter)

        return fig

    def _plot_eta_v_bokeh(
        self,
        df: pd.DataFrame,
        eta_fit_mean: float,
        eta_fit_sigma: float,
        v_fit_mean: float,
        v_fit_sigma: float,
        eta_cutoff: float,
        v_cutoff: float,
        use_int_flux: bool = False
    ) -> gridplot:
        """
        Adapted from code written by Andrew O'Brien.
        Produces the eta, V candidates plot
        (see Rowlinson et al., 2018,
        https://ui.adsabs.harvard.edu/abs/2019A%26C....27..111R/abstract).
        Returns a bokeh version.

        Args:
            df: Dataframe containing the sources from the pipeline run. A
                `pandas.core.frame.DataFrame` instance.
            eta_fit_mean: The mean of the eta fitted Gaussian.
            eta_fit_sigma: The sigma of the eta fitted Gaussian.
            v_fit_mean: The mean of the v fitted Gaussian.
            v_fit_sigma: The sigma of the v fitted Gaussian.
            eta_cutoff: The log10 eta_cutoff from the analysis.
            v_cutoff: The log10 v_cutoff from the analysis.
            use_int_flux: Use integrated fluxes for the analysis instead of
                peak fluxes, defaults to 'False'.

        Returns:
            Bokeh grid object containing figure.
        """

        if use_int_flux:
            x_label = 'eta_int'
            y_label = 'v_int'
            title = "Int. Flux"
        else:
            x_label = 'eta_peak'
            y_label = 'v_peak'
            title = 'Peak Flux'

        bokeh_df = df
        negative_v = bokeh_df[y_label] <= 0
        if negative_v.any():
            indices = bokeh_df[negative_v].index
            self.logger.warning("Negative V encountered. Removing...")
            self.logger.debug(f"Negative V indices: {indices.values}")

            bokeh_df = bokeh_df.drop(indices)

        # generate fitted curve data for plotting
        eta_x = np.linspace(
            norm.ppf(0.001, loc=eta_fit_mean, scale=eta_fit_sigma),
            norm.ppf(0.999, loc=eta_fit_mean, scale=eta_fit_sigma),
        )
        eta_y = norm.pdf(eta_x, loc=eta_fit_mean, scale=eta_fit_sigma)

        v_x = np.linspace(
            norm.ppf(0.001, loc=v_fit_mean, scale=v_fit_sigma),
            norm.ppf(0.999, loc=v_fit_mean, scale=v_fit_sigma),
        )
        v_y = norm.pdf(v_x, loc=v_fit_mean, scale=v_fit_sigma)

        PLOT_WIDTH = 700
        PLOT_HEIGHT = PLOT_WIDTH
        fig = figure(
            plot_width=PLOT_WIDTH,
            plot_height=PLOT_HEIGHT,
            aspect_scale=1,
            x_axis_type="log",
            y_axis_type="log",
            x_axis_label="eta",
            y_axis_label="V",
            tooltips=[("source", "@id")],
        )
        cmap = linear_cmap(
            "n_selavy",
            cc.kb,
            df["n_selavy"].min(),
            df["n_selavy"].max(),
        )

        fig.scatter(
            x=x_label, y=y_label, color=cmap,
            marker="circle", size=5, source=df
        )

        # axis histograms
        # filter out any forced-phot points for these
        x_hist = figure(
            plot_width=PLOT_WIDTH,
            plot_height=100,
            x_range=fig.x_range,
            y_axis_type=None,
            x_axis_type="log",
            x_axis_location="above",
            title="VAST eta-V {}".format(title),
            tools="",
        )
        x_hist_data, x_hist_edges = np.histogram(
            np.log10(bokeh_df[x_label]), density=True, bins=50,
        )
        x_hist.quad(
            top=x_hist_data,
            bottom=0,
            left=10 ** x_hist_edges[:-1],
            right=10 ** x_hist_edges[1:],
        )
        x_hist.line(10 ** eta_x, eta_y, color="black")
        x_hist_sigma_span = Span(
            location=eta_cutoff,
            dimension="height",
            line_color="black",
            line_dash="dashed",
        )
        x_hist.add_layout(x_hist_sigma_span)
        fig.add_layout(x_hist_sigma_span)

        y_hist = figure(
            plot_height=PLOT_HEIGHT,
            plot_width=100,
            y_range=fig.y_range,
            x_axis_type=None,
            y_axis_type="log",
            y_axis_location="right",
            tools="",
        )
        y_hist_data, y_hist_edges = np.histogram(
            np.log10(bokeh_df[y_label]), density=True, bins=50,
        )
        y_hist.quad(
            right=y_hist_data,
            left=0,
            top=10 ** y_hist_edges[:-1],
            bottom=10 ** y_hist_edges[1:],
        )
        y_hist.line(v_y, 10 ** v_x, color="black")
        y_hist_sigma_span = Span(
            location=v_cutoff,
            dimension="width",
            line_color="black",
            line_dash="dashed",
        )
        y_hist.add_layout(y_hist_sigma_span)
        fig.add_layout(y_hist_sigma_span)

        variable_region = BoxAnnotation(
            left=eta_cutoff,
            bottom=v_cutoff,
            fill_color="orange",
            fill_alpha=0.3,
            level="underlay",
        )
        fig.add_layout(variable_region)
        grid = gridplot(
            [[x_hist, Spacer(width=100, height=100)], [fig, y_hist]]
        )
        grid.css_classes.append("mx-auto")

        return grid

    def run_eta_v_analysis(
        self, eta_sigma: float, v_sigma: float,
        query: Optional[str] = None, df: Optional[pd.DataFrame] = None,
        use_int_flux: bool = False, plot_type: str = 'bokeh',
        diagnostic: bool = False
    ) -> Union[
        Tuple[float, float, pd.DataFrame, plt.figure, plt.figure],
        Tuple[float, float, pd.DataFrame, gridplot, plt.figure]
    ]:
        """
        Run the eta, v analysis on the pipeline run, with optional
        inputs to use a query or filtered dataframe (see Rowlinson
        et al., 2018,
        https://ui.adsabs.harvard.edu/abs/2019A%26C....27..111R/abstract).

        Args:
            eta_sigma: The minimum sigma value of the eta distribution
                to be used as a threshold.
            v_sigma: The minimum sigma value of the v distribution
                to be used as a threshold.
            query: String query to apply to the dataframe before
                the analysis is run, defaults to None.
            df: Dataframe of sources from the pipeline run, defaults
                to None. If None then the sources from the PipeAnalysis object
                are used.
            use_int_flux: Use integrated fluxes for the analysis instead of
                peak fluxes, defaults to 'False'.
            plot_type: Select which format the candidates plot should be
                returned in. Either 'bokeh' or 'matplotlib', defaults
                to 'bokeh'.
            diagnostic: When 'True' the diagnostic plot is also returned,
                defaults to 'False'.

        Returns:
            Tuple containing the eta cutoff value, the v cutoff value,
            dataframe of candidates, candidates plot and, if selected, the
            diagnostic plot.

        Raise:
            Exception: Entered `plot_type` is not a valid plot type.
        """
        plot_types = ['bokeh', 'matplotlib']

        if plot_type not in plot_types:
            raise Exception(
                "Not a valid plot type!"
                " Must be 'bokeh' or 'matplotlib'."
            )

        if df is None:
            df = self.sources

        if query is not None:
            df = df.query(query)

        (
            eta_fit_mean, eta_fit_sigma,
            v_fit_mean, v_fit_sigma
        ) = self._fit_eta_v(df, use_int_flux=use_int_flux)

        v_cutoff = 10 ** (v_fit_mean + v_sigma * v_fit_sigma)
        eta_cutoff = 10 ** (eta_fit_mean + eta_sigma * eta_fit_sigma)

        if plot_type == 'bokeh':
            plot = self._plot_eta_v_bokeh(
                df, eta_fit_mean, eta_fit_sigma,
                v_fit_mean, v_fit_sigma, eta_cutoff, v_cutoff,
                use_int_flux=use_int_flux
            )
        else:
            plot = self._plot_eta_v_matplotlib(
                df, eta_fit_mean, eta_fit_sigma,
                v_fit_mean, v_fit_sigma, eta_cutoff, v_cutoff,
                use_int_flux=use_int_flux
            )

        if use_int_flux:
            label = 'int'
        else:
            label = 'peak'

        candidates = df.query(
            "v_{0} > {1} "
            "& eta_{0} > {2}".format(
                label,
                v_cutoff,
                eta_cutoff
            )
        )

        if diagnostic:
            diag = self.eta_v_diagnostic_plot(
                eta_cutoff, v_cutoff, df, use_int_flux=use_int_flux
            )
            return eta_cutoff, v_cutoff, candidates, plot, diag
        else:
            return eta_cutoff, v_cutoff, candidates, plot

    def add_credible_levels(self, filename: str) -> None:
        """
        Calculate the minimum credible region of a given skymap
        containing each source.

        Args:
            filename: The path to the skymap in healpix format

        Returns:
            None
        """
        add_credible_levels(filename, self.sources)

__init__(self, name, images, skyregions, relations, sources, associations, bands, measurements, measurement_pairs_file, vaex_meas=False, n_workers=3, scheduler='processes') special

Constructor method.

Parameters:

Name Type Description Default
name str

The name of the pipeline run.

required
images DataFrame

Images dataframe from the pipeline run loaded from images.parquet. A pandas.core.frame.DataFrame instance.

required
skyregions DataFrame

Sky regions dataframe from the pipeline run loaded from skyregions.parquet. A pandas.core.frame.DataFrame instance.

required
relations DataFrame

Relations dataframe from the pipeline run loaded from relations.parquet. A pandas.core.frame.DataFrame instance.

required
sources DataFrame

Sources dataframe from the pipeline run loaded from sources.parquet. A pandas.core.frame.DataFrame instance.

required
associations DataFrame

Associations dataframe from the pipeline run loaded from 'associations.parquet'. A pandas.core.frame.DataFrame instance.

required
bands DataFrame

The bands dataframe from the pipeline run loaded from 'bands.parquet'.

required
measurements Union[pandas.core.frame.DataFrame, vaex.dataframe.DataFrame]

Measurements dataframe from the pipeline run loaded from measurements.parquet and the forced measurements parquet files. A pandas.core.frame.DataFrame or vaex.dataframe.DataFrame instance.

required
measurement_pairs_file str

The location of the two epoch pairs file from the pipeline. It is a list of locations due to the fact that two pipeline runs could be combined.

required
vaex_meas bool

'True' if the measurements have been loaded using vaex from an arrow file. False means the measurements are loaded into a pandas DataFrame.

False
n_workers int

Number of workers (cpus) available.

3
scheduler str

Dask scheduling option to use. Options are "processes" (parallel processing) or "single-threaded". Defaults to "single-threaded".

'processes'

Returns:

Type Description
None

None

Source code in vasttools/pipeline.py
def __init__(
    self,
    name: str,
    images: pd.DataFrame,
    skyregions: pd.DataFrame,
    relations: pd.DataFrame,
    sources: pd.DataFrame,
    associations: pd.DataFrame,
    bands: pd.DataFrame,
    measurements: Union[pd.DataFrame, vaex.dataframe.DataFrame],
    measurement_pairs_file: str,
    vaex_meas: bool = False,
    n_workers: int = HOST_NCPU - 1,
    scheduler: str = 'processes',
) -> None:
    """
    Constructor method.

    Args:
        name: The name of the pipeline run.
        images: Images dataframe from the pipeline run
            loaded from images.parquet. A `pandas.core.frame.DataFrame`
            instance.
        skyregions: Sky regions dataframe from the pipeline run
            loaded from skyregions.parquet. A `pandas.core.frame.DataFrame`
            instance.
        relations: Relations dataframe from the pipeline run
            loaded from relations.parquet. A `pandas.core.frame.DataFrame`
            instance.
        sources: Sources dataframe from the pipeline run
            loaded from sources.parquet. A `pandas.core.frame.DataFrame`
            instance.
        associations: Associations dataframe from the pipeline run loaded
            from 'associations.parquet'. A `pandas.core.frame.DataFrame`
            instance.
        bands: The bands dataframe from the pipeline run loaded from
            'bands.parquet'.
        measurements: Measurements dataframe from the pipeline run
            loaded from measurements.parquet and the forced measurements
            parquet files.  A `pandas.core.frame.DataFrame` or
            `vaex.dataframe.DataFrame` instance.
        measurement_pairs_file: The location of the two epoch pairs file
            from the pipeline. It is a list of locations due to the fact
            that two pipeline runs could be combined.
        vaex_meas: 'True' if the measurements have been loaded using
            vaex from an arrow file. `False` means the measurements are
            loaded into a pandas DataFrame.
        n_workers: Number of workers (cpus) available.
        scheduler: Dask scheduling option to use. Options are "processes"
            (parallel processing) or "single-threaded". Defaults to
            "single-threaded".

    Returns:
        None
    """
    super().__init__(
        name, images, skyregions, relations, sources, associations,
        bands, measurements, measurement_pairs_file, vaex_meas, n_workers,
        scheduler
    )

add_credible_levels(self, filename)

Calculate the minimum credible region of a given skymap containing each source.

Parameters:

Name Type Description Default
filename str

The path to the skymap in healpix format

required

Returns:

Type Description
None

None

Source code in vasttools/pipeline.py
def add_credible_levels(self, filename: str) -> None:
    """
    Calculate the minimum credible region of a given skymap
    containing each source.

    Args:
        filename: The path to the skymap in healpix format

    Returns:
        None
    """
    add_credible_levels(filename, self.sources)

eta_v_diagnostic_plot(self, eta_cutoff, v_cutoff, df=None, use_int_flux=False)

Adapted from code written by Antonia Rowlinson. Produces the eta, V 'diagnostic plot' (see Rowlinson et al., 2018, https://ui.adsabs.harvard.edu/abs/2019A%26C....27..111R/abstract).

Parameters:

Name Type Description Default
eta_cutoff float

The log10 eta_cutoff from the analysis.

required
v_cutoff float

The log10 v_cutoff from the analysis.

required
df Optional[pandas.core.frame.DataFrame]

Dataframe containing the sources from the Pipeline run. If not provided then the self.sources dataframe will be used. A pandas.core.frame.DataFrame instance.

None
use_int_flux bool

Use integrated fluxes for the analysis instead of peak fluxes, defaults to 'False'.

False

Returns:

Type Description
<function figure at 0x7f3949cb2310>

matplotlib figure containing the plot.

Source code in vasttools/pipeline.py
def eta_v_diagnostic_plot(
    self, eta_cutoff: float, v_cutoff: float,
    df: Optional[pd.DataFrame] = None,
    use_int_flux: bool = False
) -> plt.figure:
    """
    Adapted from code written by Antonia Rowlinson.
    Produces the eta, V 'diagnostic plot'
    (see Rowlinson et al., 2018,
    https://ui.adsabs.harvard.edu/abs/2019A%26C....27..111R/abstract).

    Args:
        eta_cutoff: The log10 eta_cutoff from the analysis.
        v_cutoff: The log10 v_cutoff from the analysis.
        df: Dataframe containing the sources from the Pipeline run. If
            not provided then the `self.sources` dataframe will be used.
            A `pandas.core.frame.DataFrame` instance.
        use_int_flux: Use integrated fluxes for the analysis instead of
            peak fluxes, defaults to 'False'.

    Returns:
        matplotlib figure containing the plot.
    """
    plt.close()  # close any previous ones

    if df is None:
        df = self.sources

    if use_int_flux:
        eta_label = 'eta_int'
        v_label = 'v_int'
    else:
        eta_label = 'eta_peak'
        v_label = 'v_peak'

    eta_cutoff = np.log10(eta_cutoff)
    v_cutoff = np.log10(v_cutoff)

    nullfmt = NullFormatter()  # no labels

    fig = plt.figure(figsize=(12, 12))
    ax1 = fig.add_subplot(221)
    ax2 = fig.add_subplot(222)
    ax3 = fig.add_subplot(223)
    ax4 = fig.add_subplot(224)
    fontP = FontProperties()
    fontP.set_size('large')
    fig.subplots_adjust(hspace=.001, wspace=0.001)
    ax1.set_ylabel(r'$\eta_\nu$', fontsize=28)
    ax3.set_ylabel(r'$V_\nu$', fontsize=28)
    ax3.set_xlabel('Max Flux (Jy)', fontsize=24)
    ax4.set_xlabel('Max Flux / Median Flux', fontsize=24)

    xdata_ax3 = df['max_flux_peak']
    xdata_ax4 = df['max_flux_peak'] / df['avg_flux_peak']
    ydata_ax1 = df[eta_label]
    ydata_ax3 = df[v_label]
    ax1.scatter(xdata_ax3, ydata_ax1, s=10., zorder=5)
    ax2.scatter(xdata_ax4, ydata_ax1, s=10., zorder=6)
    ax3.scatter(xdata_ax3, ydata_ax3, s=10., zorder=7)
    ax4.scatter(xdata_ax4, ydata_ax3, s=10., zorder=8)

    Xax3 = df['max_flux_peak']
    Xax4 = df['max_flux_peak'] / df['avg_flux_peak']
    Yax1 = df[eta_label]
    Yax3 = df[v_label]

    if eta_cutoff != 0 or v_cutoff != 0:
        ax1.axhline(
            y=10.**eta_cutoff, linewidth=2, color='k', linestyle='--'
        )
        ax2.axhline(
            y=10.**eta_cutoff, linewidth=2, color='k', linestyle='--'
        )
        ax3.axhline(
            y=10.**v_cutoff, linewidth=2, color='k', linestyle='--'
        )
        ax4.axhline(
            y=10.**v_cutoff, linewidth=2, color='k', linestyle='--'
        )

    ax1.set_yscale('log')
    ax1.set_xscale('log')
    ax2.set_yscale('log')
    ax3.set_yscale('log')
    ax3.set_xscale('log')
    ax4.set_yscale('log')
    xmin_ax3 = 10.**(int(np.log10(min(Xax3)) - 1.1))
    xmax_ax3 = 10.**(int(np.log10(max(Xax3)) + 1.2))
    xmin_ax4 = 0.8
    xmax_ax4 = int(max(xdata_ax4) + 0.5)
    ymin_ax1 = 10.**(int(np.log10(min(Yax1)) - 1.1))
    ymax_ax1 = 10.**(int(np.log10(max(Yax1)) + 1.2))
    ymin_ax3 = 10.**(int(np.log10(min(Yax3)) - 1.1))
    ymax_ax3 = 10.**(int(np.log10(max(Yax3)) + 1.2))
    ax1.set_ylim(ymin_ax1, ymax_ax1)
    ax3.set_ylim(ymin_ax3, ymax_ax3)
    ax3.set_xlim(xmin_ax3, xmax_ax3)
    ax4.set_xlim(xmin_ax4, xmax_ax4)
    ax1.set_xlim(ax3.get_xlim())
    ax4.set_ylim(ax3.get_ylim())
    ax2.set_xlim(ax4.get_xlim())
    ax2.set_ylim(ax1.get_ylim())
    ax1.xaxis.set_major_formatter(nullfmt)
    ax4.yaxis.set_major_formatter(nullfmt)
    ax2.xaxis.set_major_formatter(nullfmt)
    ax2.yaxis.set_major_formatter(nullfmt)

    return fig

plot_two_epoch_pairs(self, epoch_pair_id, query=None, df=None, vs_min=4.3, m_min=0.26, use_int_flux=False, remove_two_forced=False, plot_type='bokeh', plot_style='a')

Adapted from code written by Andrew O'Brien. Plot the results of the two epoch analysis. Currently this can only plot one epoch pair at a time.

Parameters:

Name Type Description Default
epoch_pair_id int

The epoch pair to plot.

required
query Optional[str]

String query to apply to the dataframe before the analysis is run, defaults to None.

None
df Optional[pandas.core.frame.DataFrame]

Dataframe of sources from the pipeline run, defaults to None. If None then the sources from the PipeAnalysis object are used.

None
vs_min float

The minimum Vs metric value to be considered a candidate, defaults to 4.3.

4.3
m_min float

The minimum m metric absolute value to be considered a candidate, defaults to 0.26.

0.26
use_int_flux bool

Whether to use the integrated fluxes instead of the peak fluxes.

False
remove_two_forced bool

Will exclude any pairs that are both forced extractions if True, defaults to False.

False
plot_type str

Selects whether the returned plot is 'bokeh' or 'matplotlib', defaults to 'bokeh'.

'bokeh'
plot_style str

Select whether to plot with style 'a' (Mooley) or 'b' (Radcliffe). Defaults to 'a'.

'a'

Returns:

Type Description
Union[bokeh.model.model.Model, figure]

Bokeh or matplotlib figure.

Exceptions:

Type Description
Exception

The two epoch metrics must be loaded before using this function.

Exception

'plot_type' is not recognised.

Exception

plot_style is not recognised.

Exception

Pair with entered ID does not exist.

MeasPairsDoNotExistError

The measurement pairs file(s) do not exist for this run

Source code in vasttools/pipeline.py
def plot_two_epoch_pairs(
    self,
    epoch_pair_id: int,
    query: Optional[str] = None,
    df: Optional[pd.DataFrame] = None,
    vs_min: float = 4.3,
    m_min: float = 0.26,
    use_int_flux: bool = False,
    remove_two_forced: bool = False,
    plot_type: str = 'bokeh',
    plot_style: str = 'a'
) -> Union[Model, plt.figure]:
    """
    Adapted from code written by Andrew O'Brien.
    Plot the results of the two epoch analysis. Currently this can only
    plot one epoch pair at a time.

    Args:
        epoch_pair_id: The epoch pair to plot.
        query: String query to apply to the dataframe before the analysis
            is run, defaults to None.
        df: Dataframe of sources from the pipeline run, defaults to None.
            If None then the sources from the PipeAnalysis object are used.
        vs_min: The minimum Vs metric value to be considered a candidate,
            defaults to 4.3.
        m_min: The minimum m metric absolute value to be considered a
            candidate, defaults to 0.26.
        use_int_flux: Whether to use the integrated fluxes instead of the
            peak fluxes.
        remove_two_forced: Will exclude any pairs that are both forced
            extractions if True, defaults to False.
        plot_type: Selects whether the returned plot is 'bokeh' or
            'matplotlib', defaults to 'bokeh'.
        plot_style: Select whether to plot with style 'a' (Mooley)
            or 'b' (Radcliffe). Defaults to 'a'.

    Returns:
        Bokeh or matplotlib figure.

    Raises:
        Exception: The two epoch metrics must be loaded before using this
            function.
        Exception: 'plot_type' is not recognised.
        Exception: `plot_style` is not recognised.
        Exception: Pair with entered ID does not exist.
        MeasPairsDoNotExistError: The measurement pairs file(s) do not
            exist for this run
    """

    self._raise_if_no_pairs()

    if not self._loaded_two_epoch_metrics:
        raise Exception(
            "The two epoch metrics must first be loaded to use the"
            " plotting function. Please do so with the command:\n"
            "'mypiperun.load_two_epoch_metrics()'\n"
            "and try again."
        )

    if plot_type not in ['bokeh', 'matplotlib']:
        raise Exception(
            "'plot_type' value is not recognised!"
            " Must be either 'bokeh' or 'matplotlib'."
        )

    if plot_style not in ['a', 'b']:
        raise Exception(
            "'plot_style' value is not recognised!"
            " Must be either 'a' for Mooley or 'b' for Radcliffe."
        )

    if epoch_pair_id not in self.pairs_df.index.values:
        raise Exception(f"Pair with ID '{epoch_pair_id}' does not exist!")

    if df is None:
        df = self.sources

    if query is not None:
        df = df.query(query)

    pair_epoch_key = self.pairs_df.loc[epoch_pair_id]['pair_epoch_key']

    pairs_df = (
        self.measurement_pairs_df.loc[
            self.measurement_pairs_df.pair_epoch_key == pair_epoch_key
        ]
    )

    if self._vaex_meas_pairs:
        pairs_df = pairs_df.extract().to_pandas_df()

    pairs_df = pairs_df[pairs_df['source_id'].isin(df.index.values)]

    if plot_type == 'bokeh':
        fig = self._plot_epoch_pair_bokeh(
            epoch_pair_id,
            pairs_df,
            vs_min,
            m_min,
            use_int_flux,
            remove_two_forced,
            plot_style
        )
    else:
        fig = self._plot_epoch_pair_matplotlib(
            epoch_pair_id,
            pairs_df,
            vs_min,
            m_min,
            use_int_flux,
            remove_two_forced,
            plot_style
        )

    return fig

recalc_measurement_pairs_df(self, measurements_df)

A method to recalculate the two epoch pair metrics based upon a provided altered measurements dataframe.

Designed for use when the measurement fluxes have been changed.

Parameters:

Name Type Description Default
measurements_df Union[pandas.core.frame.DataFrame, vaex.dataframe.DataFrame]

The altered measurements dataframe in the same format as the standard pipeline dataframe.

required

Returns:

Type Description
Union[pandas.core.frame.DataFrame, vaex.dataframe.DataFrame]

The recalculated measurement pairs dataframe.

Source code in vasttools/pipeline.py
def recalc_measurement_pairs_df(
    self,
    measurements_df: Union[pd.DataFrame, vaex.dataframe.DataFrame]
) -> Union[pd.DataFrame, vaex.dataframe.DataFrame]:
    """
    A method to recalculate the two epoch pair metrics based upon a
    provided altered measurements dataframe.

    Designed for use when the measurement fluxes have been changed.

    Args:
        measurements_df: The altered measurements dataframe in the same
            format as the standard pipeline dataframe.

    Returns:
        The recalculated measurement pairs dataframe.
    """
    if not self._loaded_two_epoch_metrics:
        self.load_two_epoch_metrics()

    new_measurement_pairs = self._filter_meas_pairs_df(
        measurements_df[['id']]
    )

    # an attempt to conserve memory
    if isinstance(new_measurement_pairs, vaex.dataframe.DataFrame):
        new_measurement_pairs = new_measurement_pairs.drop(
            ['vs_peak', 'vs_int', 'm_peak', 'm_int']
        )
    else:
        new_measurement_pairs = new_measurement_pairs.drop(
            ['vs_peak', 'vs_int', 'm_peak', 'm_int'],
            axis=1
        )

    flux_cols = [
        'flux_int',
        'flux_int_err',
        'flux_peak',
        'flux_peak_err',
        'id'
    ]

    # convert a vaex measurements df to panads so an index can be set
    if isinstance(measurements_df, vaex.dataframe.DataFrame):
        measurements_df = measurements_df[flux_cols].to_pandas_df()
    else:
        measurements_df = measurements_df.loc[:, flux_cols].copy()

    measurements_df = (
        measurements_df
        .drop_duplicates('id')
        .set_index('id')
    )

    for i in flux_cols:
        if i == 'id':
            continue
        for j in ['a', 'b']:
            pairs_i = i + f'_{j}'
            id_values = new_measurement_pairs[f'meas_id_{j}'].to_numpy()
            new_flux_values = measurements_df.loc[id_values][i].to_numpy()
            new_measurement_pairs[pairs_i] = new_flux_values

    del measurements_df

    # calculate 2-epoch metrics
    new_measurement_pairs["vs_peak"] = calculate_vs_metric(
        new_measurement_pairs['flux_peak_a'].to_numpy(),
        new_measurement_pairs['flux_peak_b'].to_numpy(),
        new_measurement_pairs['flux_peak_err_a'].to_numpy(),
        new_measurement_pairs['flux_peak_err_b'].to_numpy()
    )
    new_measurement_pairs["vs_int"] = calculate_vs_metric(
        new_measurement_pairs['flux_int_a'].to_numpy(),
        new_measurement_pairs['flux_int_b'].to_numpy(),
        new_measurement_pairs['flux_int_err_a'].to_numpy(),
        new_measurement_pairs['flux_int_err_b'].to_numpy()
    )
    new_measurement_pairs["m_peak"] = calculate_m_metric(
        new_measurement_pairs['flux_peak_a'].to_numpy(),
        new_measurement_pairs['flux_peak_b'].to_numpy()
    )
    new_measurement_pairs["m_int"] = calculate_m_metric(
        new_measurement_pairs['flux_int_a'].to_numpy(),
        new_measurement_pairs['flux_int_b'].to_numpy()
    )

    return new_measurement_pairs

recalc_sources_df(self, measurements_df, min_vs=4.3, measurement_pairs_df=None)

Regenerates a sources dataframe using a user provided measurements dataframe.

Parameters:

Name Type Description Default
measurements_df Union[pandas.core.frame.DataFrame, vaex.dataframe.DataFrame]

Dataframe of measurements with default pipeline columns. A pandas.core.frame.DataFrame or vaex.dataframe.DataFrame instance.

required
min_vs float

Minimum value of the Vs two epoch parameter to use when appending the two epoch metrics maximum.

4.3
measurement_pairs_df Optional[pandas.core.frame.DataFrame]

The recalculated measurement pairs dataframe if applicable. If not provided then the process will assume the fluxes have not changed and will purely filter the measurement pairs dataframe.

None

Returns:

Type Description
DataFrame

The regenerated sources_df. A pandas.core.frame.DataFrame instance.

Exceptions:

Type Description
MeasPairsDoNotExistError

The measurement pairs file(s) do not exist for this run

Source code in vasttools/pipeline.py
def recalc_sources_df(
    self,
    measurements_df: Union[pd.DataFrame, vaex.dataframe.DataFrame],
    min_vs: float = 4.3,
    measurement_pairs_df: Optional[pd.DataFrame] = None
) -> pd.DataFrame:
    """
    Regenerates a sources dataframe using a user provided measurements
    dataframe.

    Args:
        measurements_df: Dataframe of measurements with default pipeline
            columns. A `pandas.core.frame.DataFrame` or
            `vaex.dataframe.DataFrame` instance.
        min_vs: Minimum value of the Vs two epoch parameter to use
            when appending the two epoch metrics maximum.
        measurement_pairs_df: The recalculated measurement pairs dataframe
            if applicable. If not provided then the process will assume
            the fluxes have not changed and will purely filter the
            measurement pairs dataframe.

    Returns:
        The regenerated sources_df.  A `pandas.core.frame.DataFrame`
        instance.

    Raises:
        MeasPairsDoNotExistError: The measurement pairs file(s) do not
            exist for this run
    """

    self._raise_if_no_pairs()

    # Two epoch metrics
    if not self._loaded_two_epoch_metrics:
        self.load_two_epoch_metrics()

    if not self._vaex_meas:
        measurements_df = vaex.from_pandas(measurements_df)

    # account for RA wrapping
    ra_wrap_mask = measurements_df.ra <= 0.1
    measurements_df['ra_wrap'] = measurements_df.func.where(
        ra_wrap_mask, measurements_df[ra_wrap_mask].ra + 360.,
        measurements_df['ra']
    )

    measurements_df['interim_ew'] = (
        measurements_df['ra_wrap'] * measurements_df['weight_ew']
    )

    measurements_df['interim_ns'] = (
        measurements_df['dec'] * measurements_df['weight_ns']
    )

    for col in ['flux_int', 'flux_peak']:
        measurements_df[f'{col}_sq'] = (measurements_df[col] ** 2.)

    # most of the aggregate calculations done in vaex
    sources_df = measurements_df.groupby(
        by='source',
        agg={
            'interim_ew_sum': vaex.agg.sum(
                'interim_ew', selection='forced==False'
            ),
            'interim_ns_sum': vaex.agg.sum(
                'interim_ns', selection='forced==False'
            ),
            'weight_ew_sum': vaex.agg.sum(
                'weight_ew', selection='forced==False'
            ),
            'weight_ns_sum': vaex.agg.sum(
                'weight_ns', selection='forced==False'
            ),
            'avg_compactness': vaex.agg.mean(
                'compactness', selection='forced==False'
            ),
            'min_snr': vaex.agg.min(
                'snr', selection='forced==False'
            ),
            'max_snr': vaex.agg.max(
                'snr', selection='forced==False'
            ),
            'avg_flux_int': vaex.agg.mean('flux_int'),
            'avg_flux_peak': vaex.agg.mean('flux_peak'),
            'max_flux_peak': vaex.agg.max('flux_peak'),
            'max_flux_int': vaex.agg.max('flux_int'),
            'min_flux_peak': vaex.agg.min('flux_peak'),
            'min_flux_int': vaex.agg.min('flux_int'),
            'min_flux_peak_isl_ratio': vaex.agg.min('flux_peak_isl_ratio'),
            'min_flux_int_isl_ratio': vaex.agg.min('flux_int_isl_ratio'),
            'n_measurements': vaex.agg.count('id'),
            'n_selavy': vaex.agg.count('id', selection='forced==False'),
            'n_forced': vaex.agg.count('id', selection='forced==True'),
            'n_siblings': vaex.agg.sum('has_siblings')
        }
    )

    # Drop sources which no longer have any selavy measurements
    sources_df = sources_df[sources_df.n_selavy > 0].extract()

    # Calculate average position
    sources_df['wavg_ra'] = (
        sources_df['interim_ew_sum'] / sources_df['weight_ew_sum']
    )
    sources_df['wavg_dec'] = (
        sources_df['interim_ns_sum'] / sources_df['weight_ns_sum']
    )

    sources_df['wavg_uncertainty_ew'] = (
        1. / np.sqrt(sources_df['weight_ew_sum'])
    )
    sources_df['wavg_uncertainty_ns'] = (
        1. / np.sqrt(sources_df['weight_ns_sum'])
    )

    # the RA wrapping is reverted at the end of the function when the
    # df is in pandas format.

    # TraP variability metrics, using Dask.
    measurements_df_temp = measurements_df[[
        'flux_int', 'flux_int_err', 'flux_peak', 'flux_peak_err', 'source'
    ]].extract().to_pandas_df()

    col_dtype = {
        'v_int': 'f',
        'v_peak': 'f',
        'eta_int': 'f',
        'eta_peak': 'f',
    }

    sources_df_fluxes = (
        dd.from_pandas(measurements_df_temp, self.n_workers)
        .groupby('source')
        .apply(
            pipeline_get_variable_metrics,
            meta=col_dtype
        )
        .compute(num_workers=self.n_workers, scheduler=self.scheduler)
    )

    # Switch to pandas at this point to perform join
    sources_df = sources_df.to_pandas_df().set_index('source')

    sources_df = sources_df.join(sources_df_fluxes)

    sources_df = sources_df.join(
        self.sources[['new', 'new_high_sigma']],
    )

    if measurement_pairs_df is None:
        measurement_pairs_df = self._filter_meas_pairs_df(
            measurements_df[['id']]
        )

    if isinstance(measurement_pairs_df, vaex.dataframe.DataFrame):
        new_measurement_pairs = (
            measurement_pairs_df[
                measurement_pairs_df['vs_int'].abs() >= min_vs
                or measurement_pairs_df['vs_peak'].abs() >= min_vs
            ]
        )
    else:
        min_vs_mask = np.logical_or(
            (measurement_pairs_df['vs_int'].abs() >= min_vs).to_numpy(),
            (measurement_pairs_df['vs_peak'].abs() >= min_vs).to_numpy()
        )
        new_measurement_pairs = measurement_pairs_df.loc[min_vs_mask]
        new_measurement_pairs = vaex.from_pandas(new_measurement_pairs)

    new_measurement_pairs['vs_int_abs'] = (
        new_measurement_pairs['vs_int'].abs()
    )

    new_measurement_pairs['vs_peak_abs'] = (
        new_measurement_pairs['vs_peak'].abs()
    )

    new_measurement_pairs['m_int_abs'] = (
        new_measurement_pairs['m_int'].abs()
    )

    new_measurement_pairs['m_peak_abs'] = (
        new_measurement_pairs['m_peak'].abs()
    )

    sources_df_two_epochs = new_measurement_pairs.groupby(
        'source_id',
        agg={
            'vs_significant_max_int': vaex.agg.max('vs_int_abs'),
            'vs_significant_max_peak': vaex.agg.max('vs_peak_abs'),
            'm_abs_significant_max_int': vaex.agg.max('m_int_abs'),
            'm_abs_significant_max_peak': vaex.agg.max('m_peak_abs'),
        }
    )

    sources_df_two_epochs = (
        sources_df_two_epochs.to_pandas_df().set_index('source_id')
    )

    sources_df = sources_df.join(sources_df_two_epochs)

    del sources_df_two_epochs

    # new relation numbers
    relation_mask = np.logical_and(
        (self.relations.from_source_id.isin(sources_df.index.values)),
        (self.relations.to_source_id.isin(sources_df.index.values))
    )

    new_relations = self.relations.loc[relation_mask]

    sources_df_relations = (
        new_relations.groupby('from_source_id').agg('count')
    ).rename(columns={'to_source_id': 'n_relations'})

    sources_df = sources_df.join(sources_df_relations)

    # nearest neighbour
    sources_sky_coord = gen_skycoord_from_df(
        sources_df, ra_col='wavg_ra', dec_col='wavg_dec'
    )

    idx, d2d, _ = sources_sky_coord.match_to_catalog_sky(
        sources_sky_coord, nthneighbor=2
    )

    sources_df['n_neighbour_dist'] = d2d.degree

    # Fill the NaN values.
    sources_df = sources_df.fillna(value={
        "vs_significant_max_peak": 0.0,
        "m_abs_significant_max_peak": 0.0,
        "vs_significant_max_int": 0.0,
        "m_abs_significant_max_int": 0.0,
        'n_relations': 0,
        'v_int': 0.,
        'v_peak': 0.
    }).drop([
        'interim_ew_sum', 'interim_ns_sum',
        'weight_ew_sum', 'weight_ns_sum'
    ], axis=1)

    # correct the RA wrapping
    ra_wrap_mask = (sources_df['wavg_ra'] >= 360.).to_numpy()
    sources_df.loc[
        ra_wrap_mask, 'wavg_ra'
    ] = sources_df.loc[ra_wrap_mask]["wavg_ra"].to_numpy() - 360.

    # Switch relations column to int
    sources_df['n_relations'] = sources_df['n_relations'].astype(int)

    return sources_df

run_eta_v_analysis(self, eta_sigma, v_sigma, query=None, df=None, use_int_flux=False, plot_type='bokeh', diagnostic=False)

Run the eta, v analysis on the pipeline run, with optional inputs to use a query or filtered dataframe (see Rowlinson et al., 2018, https://ui.adsabs.harvard.edu/abs/2019A%26C....27..111R/abstract).

Parameters:

Name Type Description Default
eta_sigma float

The minimum sigma value of the eta distribution to be used as a threshold.

required
v_sigma float

The minimum sigma value of the v distribution to be used as a threshold.

required
query Optional[str]

String query to apply to the dataframe before the analysis is run, defaults to None.

None
df Optional[pandas.core.frame.DataFrame]

Dataframe of sources from the pipeline run, defaults to None. If None then the sources from the PipeAnalysis object are used.

None
use_int_flux bool

Use integrated fluxes for the analysis instead of peak fluxes, defaults to 'False'.

False
plot_type str

Select which format the candidates plot should be returned in. Either 'bokeh' or 'matplotlib', defaults to 'bokeh'.

'bokeh'
diagnostic bool

When 'True' the diagnostic plot is also returned, defaults to 'False'.

False

Returns:

Type Description
Union[Tuple[float, float, pandas.core.frame.DataFrame, figure, figure], Tuple[float, float, pandas.core.frame.DataFrame, gridplot, figure]]

Tuple containing the eta cutoff value, the v cutoff value, dataframe of candidates, candidates plot and, if selected, the diagnostic plot.

Exceptions:

Type Description
Exception

Entered plot_type is not a valid plot type.

Source code in vasttools/pipeline.py
def run_eta_v_analysis(
    self, eta_sigma: float, v_sigma: float,
    query: Optional[str] = None, df: Optional[pd.DataFrame] = None,
    use_int_flux: bool = False, plot_type: str = 'bokeh',
    diagnostic: bool = False
) -> Union[
    Tuple[float, float, pd.DataFrame, plt.figure, plt.figure],
    Tuple[float, float, pd.DataFrame, gridplot, plt.figure]
]:
    """
    Run the eta, v analysis on the pipeline run, with optional
    inputs to use a query or filtered dataframe (see Rowlinson
    et al., 2018,
    https://ui.adsabs.harvard.edu/abs/2019A%26C....27..111R/abstract).

    Args:
        eta_sigma: The minimum sigma value of the eta distribution
            to be used as a threshold.
        v_sigma: The minimum sigma value of the v distribution
            to be used as a threshold.
        query: String query to apply to the dataframe before
            the analysis is run, defaults to None.
        df: Dataframe of sources from the pipeline run, defaults
            to None. If None then the sources from the PipeAnalysis object
            are used.
        use_int_flux: Use integrated fluxes for the analysis instead of
            peak fluxes, defaults to 'False'.
        plot_type: Select which format the candidates plot should be
            returned in. Either 'bokeh' or 'matplotlib', defaults
            to 'bokeh'.
        diagnostic: When 'True' the diagnostic plot is also returned,
            defaults to 'False'.

    Returns:
        Tuple containing the eta cutoff value, the v cutoff value,
        dataframe of candidates, candidates plot and, if selected, the
        diagnostic plot.

    Raise:
        Exception: Entered `plot_type` is not a valid plot type.
    """
    plot_types = ['bokeh', 'matplotlib']

    if plot_type not in plot_types:
        raise Exception(
            "Not a valid plot type!"
            " Must be 'bokeh' or 'matplotlib'."
        )

    if df is None:
        df = self.sources

    if query is not None:
        df = df.query(query)

    (
        eta_fit_mean, eta_fit_sigma,
        v_fit_mean, v_fit_sigma
    ) = self._fit_eta_v(df, use_int_flux=use_int_flux)

    v_cutoff = 10 ** (v_fit_mean + v_sigma * v_fit_sigma)
    eta_cutoff = 10 ** (eta_fit_mean + eta_sigma * eta_fit_sigma)

    if plot_type == 'bokeh':
        plot = self._plot_eta_v_bokeh(
            df, eta_fit_mean, eta_fit_sigma,
            v_fit_mean, v_fit_sigma, eta_cutoff, v_cutoff,
            use_int_flux=use_int_flux
        )
    else:
        plot = self._plot_eta_v_matplotlib(
            df, eta_fit_mean, eta_fit_sigma,
            v_fit_mean, v_fit_sigma, eta_cutoff, v_cutoff,
            use_int_flux=use_int_flux
        )

    if use_int_flux:
        label = 'int'
    else:
        label = 'peak'

    candidates = df.query(
        "v_{0} > {1} "
        "& eta_{0} > {2}".format(
            label,
            v_cutoff,
            eta_cutoff
        )
    )

    if diagnostic:
        diag = self.eta_v_diagnostic_plot(
            eta_cutoff, v_cutoff, df, use_int_flux=use_int_flux
        )
        return eta_cutoff, v_cutoff, candidates, plot, diag
    else:
        return eta_cutoff, v_cutoff, candidates, plot

run_two_epoch_analysis(self, vs, m, query=None, df=None, use_int_flux=False)

Run the two epoch analysis on the pipeline run, with optional inputs to use a query or filtered dataframe.

Parameters:

Name Type Description Default
vs float

The minimum Vs metric value to be considered a candidate.

required
m float

The minimum m metric absolute value to be considered a candidate.

required
query Optional[str]

String query to apply to the dataframe before the analysis is run, defaults to None.

None
df Optional[pandas.core.frame.DataFrame]

Dataframe of sources from the pipeline run, defaults to None. If None then the sources from the PipeAnalysis object are used.

None
use_int_flux bool

Use integrated fluxes for the analysis instead of peak fluxes, defaults to 'False'.

False

Returns:

Type Description
Tuple[pandas.core.frame.DataFrame, pandas.core.frame.DataFrame]

Tuple containing two dataframes of the candidate sources and pairs.

Exceptions:

Type Description
Exception

The two epoch metrics must be loaded before using this function.

MeasPairsDoNotExistError

The measurement pairs file(s) do not exist for this run

Source code in vasttools/pipeline.py
def run_two_epoch_analysis(
    self, vs: float, m: float, query: Optional[str] = None,
    df: Optional[pd.DataFrame] = None, use_int_flux: bool = False
) -> Tuple[pd.DataFrame, pd.DataFrame]:
    """
    Run the two epoch analysis on the pipeline run, with optional
    inputs to use a query or filtered dataframe.

    Args:
        vs: The minimum Vs metric value to be considered a candidate.
        m: The minimum m metric absolute value to be considered a
            candidate.
        query: String query to apply to the dataframe before the analysis
            is run, defaults to None.
        df: Dataframe of sources from the pipeline run, defaults to None.
            If None then the sources from the PipeAnalysis object are used.
        use_int_flux: Use integrated fluxes for the analysis instead of
            peak fluxes, defaults to 'False'.

    Returns:
        Tuple containing two dataframes of the candidate sources and pairs.

    Raises:
        Exception: The two epoch metrics must be loaded before using this
            function.
        MeasPairsDoNotExistError: The measurement pairs file(s) do not
            exist for this run
    """

    self._raise_if_no_pairs()

    if not self._loaded_two_epoch_metrics:
        raise Exception(
            "The two epoch metrics must first be loaded to use the"
            " plotting function. Please do so with the command:\n"
            "'mypiperun.load_two_epoch_metrics()'\n"
            "and try again."
        )

    if df is None:
        df = self.sources

    if query is not None:
        df = df.query(query)

    allowed_sources = df.index.values

    pairs_df = self.measurement_pairs_df.copy()

    if len(allowed_sources) != self.sources.shape[0]:
        if self._vaex_meas_pairs:
            pairs_df = pairs_df[
                pairs_df['source_id'].isin(allowed_sources)
            ]
        else:
            pairs_df = pairs_df.loc[
                pairs_df['source_id'].isin(allowed_sources)
            ]

    vs_label = 'vs_int' if use_int_flux else 'vs_peak'
    m_abs_label = 'm_int' if use_int_flux else 'm_peak'

    pairs_df[vs_label] = pairs_df[vs_label].abs()
    pairs_df[m_abs_label] = pairs_df[m_abs_label].abs()

    # If vaex convert these to pandas
    if self._vaex_meas_pairs:
        candidate_pairs = pairs_df[
            (pairs_df[vs_label] > vs) & (pairs_df[m_abs_label] > m)
        ]

        candidate_pairs = candidate_pairs.to_pandas_df()

    else:
        candidate_pairs = pairs_df.loc[
            (pairs_df[vs_label] > vs) & (pairs_df[m_abs_label] > m)
        ]

    unique_sources = candidate_pairs['source_id'].unique()

    candidate_sources = self.sources.loc[unique_sources]

    return candidate_sources, candidate_pairs

PipeRun

Class that represents a Pipeline run.

Attributes:

Name Type Description
associations pandas.core.frame.DataFrame

Associations dataframe from the pipeline run loaded from 'associations.parquet'.

bands pandas.core.frame.DataFrame

The bands dataframe from the pipeline run loaded from 'bands.parquet'.

images pandas.core.frame.DataFrame

Dataframe containing all the information on the images of the pipeline run.

measurements Union[pd.DataFrame, vaex.dataframe.DataFrame]

Dataframe containing all the information on the measurements of the pipeline run.

measurement_pairs_file List[str]

List containing the locations of the measurement_pairs.parquet (or.arrow) file(s).

name str

The pipeline run name.

n_workers int

Number of workers (cpus) available.

relations pandas.core.frame.DataFrame

Dataframe containing all the information on the relations of the pipeline run.

skyregions pandas.core.frame.DataFrame

Dataframe containing all the information on the skyregions of the pipeline run.

sources pandas.core.frame.DataFrame

Dataframe containing all the information on the sources of the pipeline run.

sources_skycoord astroy.coordinates.SkyCoord

A SkyCoord object of the default sources attribute.

Source code in vasttools/pipeline.py
class PipeRun(object):
    """
    Class that represents a Pipeline run.

    Attributes:
        associations (pandas.core.frame.DataFrame): Associations dataframe
            from the pipeline run loaded from 'associations.parquet'.
        bands (pandas.core.frame.DataFrame): The bands dataframe from the
            pipeline run loaded from 'bands.parquet'.
        images (pandas.core.frame.DataFrame): Dataframe containing all the
            information on the images of the pipeline run.
        measurements (Union[pd.DataFrame, vaex.dataframe.DataFrame]):
            Dataframe containing all the information on the measurements of
            the pipeline run.
        measurement_pairs_file (List[str]): List containing the locations of
            the measurement_pairs.parquet (or.arrow) file(s).
        name (str): The pipeline run name.
        n_workers (int): Number of workers (cpus) available.
        relations (pandas.core.frame.DataFrame): Dataframe containing all the
            information on the relations of the pipeline run.
        skyregions (pandas.core.frame.DataFrame): Dataframe containing all the
            information on the skyregions of the pipeline run.
        sources (pandas.core.frame.DataFrame): Dataframe containing all the
            information on the sources of the pipeline run.
        sources_skycoord (astroy.coordinates.SkyCoord): A SkyCoord object of
            the default sources attribute.
    """

    def __init__(
        self,
        name: str,
        images: pd.DataFrame,
        skyregions: pd.DataFrame,
        relations: pd.DataFrame,
        sources: pd.DataFrame,
        associations: pd.DataFrame,
        bands: pd.DataFrame,
        measurements: Union[pd.DataFrame, vaex.dataframe.DataFrame],
        measurement_pairs_file: List[str],
        vaex_meas: bool = False,
        n_workers: int = HOST_NCPU - 1,
        scheduler: str = 'processes'
    ) -> None:
        """
        Constructor method.

        Args:
            name: The name of the pipeline run.
            images: Images dataframe from the pipeline run loaded from
                'images.parquet'. A `pandas.core.frame.DataFrame` instance.
            skyregions: Skyregions dataframe from the pipeline run loaded from
                skyregions.parquet. A `pandas.core.frame.DataFrame` instance.
            relations: Relations dataframe from the pipeline run loaded from
                relations.parquet. A `pandas.core.frame.DataFrame` instance.
            sources: Sources dataframe from the pipeline run loaded from
                sources.parquet. A `pandas.core.frame.DataFrame` instance.
            associations: Associations dataframe from the pipeline run loaded
                from 'associations.parquet'. A `pandas.core.frame.DataFrame`
                instance.
            bands: The bands dataframe from the pipeline run loaded from
                'bands.parquet'.
            measurements: Measurements dataframe from the pipeline run
                loaded from measurements.parquet and the forced measurements
                parquet files.  A `pandas.core.frame.DataFrame` or
                `vaex.dataframe.DataFrame` instance.
            measurement_pairs_file: The location of the two epoch pairs file
                from the pipeline. It is a list of locations due to the fact
                that two pipeline runs could be combined.
            vaex_meas: 'True' if the measurements have been loaded using
                vaex from an arrow file. `False` means the measurements are
                loaded into a pandas DataFrame.
            n_workers: Number of workers (cpus) available. Default is
                determined by running `cpu_count()`.
            scheduler: Dask scheduling option to use. Options are "processes"
                (parallel processing) or "single-threaded". Defaults to
                "single-threaded".

        Returns:
            None
        """
        super(PipeRun, self).__init__()
        self.name = name
        self.images = images
        self.skyregions = skyregions
        self.sources = sources
        self.sources_skycoord = self.get_sources_skycoord()
        self.associations = associations
        self.bands = bands
        self.measurements = measurements
        self.measurement_pairs_file = measurement_pairs_file
        self.relations = relations
        self.n_workers = n_workers
        self._vaex_meas = vaex_meas
        self._loaded_two_epoch_metrics = False
        self.scheduler = scheduler

        self.logger = logging.getLogger('vasttools.pipeline.PipeRun')
        self.logger.debug('Created PipeRun instance')

        self._measurement_pairs_exists = self._check_measurement_pairs_file()

    def _check_measurement_pairs_file(self):
        measurement_pairs_exists = True

        for filepath in self.measurement_pairs_file:
            if not os.path.isfile(filepath):
                self.logger.warning(f"Measurement pairs file ({filepath}) does"
                                    f" not exist. You will be unable to access"
                                    f" measurement pairs or two-epoch metrics."
                                    )
                measurement_pairs_exists = False

        return measurement_pairs_exists

    def combine_with_run(
        self, other_PipeRun, new_name: Optional[str] = None
    ):
        """
        Combines the output of another PipeRun object with the PipeRun
        from which this method is being called from.

        !!!warning
            It is assumed you are loading runs from the same Pipeline
            instance. If this is not the case then erroneous results may be
            returned.

        Args:
            other_PipeRun (PipeRun): The other pipeline run to merge.
            new_name: If not None then the PipeRun attribute 'name'
                is changed to the given value.

        Returns:
            PipeRun: The self object with the other pipeline run added.
        """

        self.images = pd.concat(
            [self.images, other_PipeRun.images]
        ).drop_duplicates('path')

        self.skyregions = pd.concat(
            [self.skyregions, other_PipeRun.skyregions],
            ignore_index=True
        ).drop_duplicates('id')

        if self._vaex_meas and other_PipeRun._vaex_meas:
            self.measurements = self.measurements.concat(
                other_PipeRun.measurements
            )

        elif self._vaex_meas and not other_PipeRun._vaex_meas:
            self.measurements = self.measurements.concat(
                vaex.from_pandas(other_PipeRun.measurements)
            )

        elif not self._vaex_meas and other_PipeRun._vaex_meas:
            self.measurements = vaex.from_pandas(self.measurements).concat(
                other_PipeRun.measurements
            )
            self._vaex_meas = True

        else:
            self.measurements = pd.concat(
                [self.measurements, other_PipeRun.measurements],
                ignore_index=True
            ).drop_duplicates(['id', 'source'])

        sources_to_add = other_PipeRun.sources.loc[
            ~(other_PipeRun.sources.index.isin(
                self.sources.index
            ))
        ]

        self.sources = pd.concat([self.sources, sources_to_add])

        # need to keep access to all the different pairs files
        # for two epoch metrics.
        orig_run_pairs_exist = self._measurement_pairs_exists
        other_run_pairs_exist = other_PipeRun._measurement_pairs_exists

        if orig_run_pairs_exist and other_run_pairs_exist:
            for i in other_PipeRun.measurement_pairs_file:
                self.measurement_pairs_file.append(i)

        elif orig_run_pairs_exist:
            self.logger.warning("Not combining measurement pairs because they "
                                " do not exist for the new run."
                                )
            self._measurement_pairs_exists = False

        elif other_run_pairs_exist:
            self.logger.warning("Not combining measurement pairs because they "
                                " do not exist for the original run."
                                )

        del sources_to_add

        if new_name is not None:
            self.name = new_name

        return self

    def get_sources_skycoord(
        self,
        user_sources: Optional[pd.DataFrame] = None,
        ra_col: str = 'wavg_ra',
        dec_col: str = 'wavg_dec',
        ra_unit: u.Unit = u.degree,
        dec_unit: u.Unit = u.degree
    ) -> astropy.coordinates.sky_coordinate.SkyCoord:
        """
        A convenience function to generate a SkyCoord object from the
        sources dataframe. Also has support for custom source lists.

        Args:
            user_sources: Provide a user generated source dataframe
                in place of using the default run sources dataframe.
            ra_col: The column to use for the Right Ascension.
            dec_col: The column to use for the Declination.
            ra_unit: The unit of the RA column, defaults to degrees.
                Must be an astropy.unit value.
            dec_unit: The unit of the Dec column, defaults to degrees.
                Must be an astropy.unit value.

        Returns:
            Skycoord object of the sources. A
            `astropy.coordinates.sky_coordinate.SkyCoord` instance.
        """
        if user_sources is None:
            the_sources = self.sources
        else:
            the_sources = user_sources

        sources_skycoord = gen_skycoord_from_df(
            the_sources, ra_col=ra_col, dec_col=dec_col, ra_unit=ra_unit,
            dec_unit=dec_unit
        )

        return sources_skycoord

    def get_source(
        self,
        id: int,
        field: Optional[str] = None,
        stokes: str = 'I',
        outdir: str = '.',
        user_measurements: Optional[Union[
            pd.DataFrame, vaex.dataframe.DataFrame]
        ] = None,
        user_sources: Optional[pd.DataFrame] = None
    ) -> Source:
        """
        Fetches an individual source and returns a
        vasttools.source.Source object.

        Users do not need
        to change the field, stokes and outdir parameters.

        Args:
            id: The id of the source to load.
            field: The field of the source being loaded, defaults
                to None. If None then the run name is used as the field.
            stokes: Stokes parameter of the source, defaults to 'I'.
            outdir: The output directory where generated plots will
                be saved, defauls to '.' (the current working directory).
            user_measurements: A user generated measurements dataframe to
                use instead of the default pipeline result. The type must match
                the default type of the pipeline (vaex or pandas). Defaults to
                None, in which case the default pipeline measurements are used.
            user_sources: A user generated sources dataframe to use
                instead of the default pipeline result. Format is always a
                pandas dataframe. Defaults to None, in which case the default
                pipeline measurements are used.

        Returns:
            vast tools Source object.
        """

        if user_measurements is None:
            the_measurements = self.measurements
        else:
            the_measurements = user_measurements

        if user_sources is None:
            the_sources = self.sources
        else:
            the_sources = user_sources

        if self._vaex_meas:
            measurements = the_measurements[
                the_measurements['source'] == id
            ].to_pandas_df()

        else:
            measurements = the_measurements.loc[
                the_measurements['source'] == id
            ]

        measurements = measurements.merge(
            self.images[[
                'path',
                'noise_path',
                'measurements_path',
                'frequency'
            ]], how='left',
            left_on='image_id',
            right_index=True
        ).rename(
            columns={
                'path': 'image',
                'noise_path': 'rms',
                'measurements_path': 'selavy'
            }
        )

        measurements = measurements.rename(
            columns={
                'time': 'dateobs',
            }
        ).sort_values(
            by='dateobs'
        ).reset_index(drop=True)

        s = the_sources.loc[id]

        num_measurements = s['n_measurements']

        source_coord = SkyCoord(
            s['wavg_ra'],
            s['wavg_dec'],
            unit=(u.deg, u.deg)
        )

        source_name = "VAST {}".format(
            source_coord.to_string(
                "hmsdms", sep='', precision=1
            ).replace(
                " ", ""
            )[:15]
        )
        source_epochs = [str(i) for i in range(1, num_measurements + 1)]
        if field is None:
            field = self.name
        measurements['field'] = field
        measurements['epoch'] = source_epochs
        measurements['stokes'] = stokes
        measurements['skycoord'] = [
            source_coord for i in range(num_measurements)
        ]
        measurements['detection'] = measurements['forced'] == False
        source_fields = [field for i in range(num_measurements)]
        source_stokes = stokes
        source_base_folder = None
        source_crossmatch_radius = None
        source_outdir = outdir
        source_image_type = None

        thesource = Source(
            source_coord,
            source_name,
            source_epochs,
            source_fields,
            source_stokes,
            None,
            source_crossmatch_radius,
            measurements,
            source_base_folder,
            source_image_type,
            islands=False,
            outdir=source_outdir,
            pipeline=True
        )

        return thesource

    def _raise_if_no_pairs(self):
        if not self._measurement_pairs_exists:
             raise MeasPairsDoNotExistError("This method cannot be used as "
                                            "the measurement pairs are not "
                                            "available for this pipeline run."
                                            )

    def load_two_epoch_metrics(self) -> None:
        """
        Loads the two epoch metrics dataframe, usually stored as either
        'measurement_pairs.parquet' or 'measurement_pairs.arrow'.

        The two epoch metrics dataframe is stored as an attribute to the
        PipeRun object as self.measurement_pairs_df. An epoch 'key' is also
        added to the dataframe.

        Also creates a 'pairs_df' that lists all the possible epoch pairs.
        This is stored as the attribute self.pairs_df.

        Returns:
            None

        Raises:
            MeasPairsDoNotExistError: The measurement pairs file(s) do not
                exist for this run
        """

        self._raise_if_no_pairs()

        image_ids = self.images.sort_values(by='datetime').index.tolist()

        pairs_df = pd.DataFrame.from_dict(
            {'pair': combinations(image_ids, 2)}
        )

        pairs_df = (
            pd.DataFrame(pairs_df['pair'].tolist())
            .rename(columns={0: 'image_id_a', 1: 'image_id_b'})
            .merge(
                self.images[['datetime', 'name']],
                left_on='image_id_a', right_index=True,
                suffixes=('_a', '_b')
            )
            .merge(
                self.images[['datetime', 'name']],
                left_on='image_id_b', right_index=True,
                suffixes=('_a', '_b')
            )
        ).reset_index().rename(
            columns={
                'index': 'id',
                'name_a': 'image_name_a',
                'name_b': 'image_name_b'
            }
        )

        pairs_df['td'] = pairs_df['datetime_b'] - pairs_df['datetime_a']

        pairs_df.drop(['datetime_a', 'datetime_b'], axis=1)

        pairs_df['pair_epoch_key'] = (
            pairs_df[['image_name_a', 'image_name_b']]
            .apply(
                lambda x: f"{x['image_name_a']}_{x['image_name_b']}", axis=1
            )
        )

        self._vaex_meas_pairs = False
        if len(self.measurement_pairs_file) > 1:
            arrow_files = (
                [i.endswith(".arrow") for i in self.measurement_pairs_file]
            )
            if np.any(arrow_files):
                measurement_pairs_df = vaex.open_many(
                    self.measurement_pairs_file[arrow_files]
                )
                for i in self.measurement_pairs_file[~arrow_files]:
                    temp = pd.read_parquet(i)
                    temp = vaex.from_pandas(temp)
                    measurement_pairs_df = measurement_pairs_df.concat(temp)
                self._vaex_meas_pairs = True
                warnings.warn("Measurement pairs have been loaded with vaex.")
            else:
                measurement_pairs_df = (
                    dd.read_parquet(self.measurement_pairs_file).compute()
                )
        else:
            if self.measurement_pairs_file[0].endswith('.arrow'):
                measurement_pairs_df = (
                    vaex.open(self.measurement_pairs_file[0])
                )
                self._vaex_meas_pairs = True
                warnings.warn("Measurement pairs have been loaded with vaex.")
            else:
                measurement_pairs_df = (
                    pd.read_parquet(self.measurement_pairs_file[0])
                )

        if self._vaex_meas_pairs:
            measurement_pairs_df['pair_epoch_key'] = (
                measurement_pairs_df['image_name_a'] + "_"
                + measurement_pairs_df['image_name_b']
            )

            pair_counts = measurement_pairs_df.groupby(
                measurement_pairs_df.pair_epoch_key, agg='count'
            )

            pair_counts = pair_counts.to_pandas_df().rename(
                columns={'count': 'total_pairs'}
            ).set_index('pair_epoch_key')
        else:
            measurement_pairs_df['pair_epoch_key'] = (
                measurement_pairs_df[['image_name_a', 'image_name_b']]
                .apply(
                    lambda x: f"{x['image_name_a']}_{x['image_name_b']}",
                    axis=1
                )
            )

            pair_counts = measurement_pairs_df[
                ['pair_epoch_key', 'image_name_a']
            ].groupby('pair_epoch_key').count().rename(
                columns={'image_name_a': 'total_pairs'}
            )

        pairs_df = pairs_df.merge(
            pair_counts, left_on='pair_epoch_key', right_index=True
        )

        del pair_counts

        pairs_df = pairs_df.dropna(subset=['total_pairs']).set_index('id')

        self.measurement_pairs_df = measurement_pairs_df
        self.pairs_df = pairs_df.sort_values(by='td')

        self._loaded_two_epoch_metrics = True

    def _add_times(
        self, row: pd.Series, every_hour: bool = False
    ) -> List[pd.Series]:
        """
        Adds the times required for planet searching.

        By default it adds the beginning and end of the observation.
        The every_hour option adds the time every hour during the observation,
        which is required for the Sun and Moon.

        Args:
            row: The series row containing the information.
            every_hour: Add times to the dataframe every hour during the
                observation, defaults to 'False'.

        Returns:
            List of times to be searched for planets, in the format of rows.
        """
        if row['duration'] == 0.:
            return row['DATEOBS']

        if every_hour:
            hours = int(row['duration'] / 3600.)
            times = [
                row['DATEOBS'] + timedelta(
                    seconds=3600. * h
                )
                for h in range(hours + 1)
            ]
            return times

        else:
            return [
                row['DATEOBS'],
                row['DATEOBS'] + timedelta(
                    seconds=row['duration']
                )
            ]

    def check_for_planets(self) -> pd.DataFrame:
        """
        Checks the pipeline run for any planets in the field.

        All planets are checked: Mercury, Venus, Mars, Jupiter,
        Saturn, Uranus, Neptune, Pluto in addition to the Sun and Moon.

        Returns:
            DataFrame with list of planet positions. It will be empty if no
            planets are found. A `pandas.core.frame.DataFrame` instance.
        """
        from vasttools import ALLOWED_PLANETS
        ap = ALLOWED_PLANETS.copy()

        planets_df = (
            self.images.loc[:, [
                'datetime',
                'duration',
                'centre_ra',
                'centre_dec',
            ]]
            .reset_index()
            .rename(
                columns={
                    'id': 'image_id',
                    'datetime': 'DATEOBS',
                    'centre_ra': 'centre-ra',
                    'centre_dec': 'centre-dec'
                }
            )
        )

        # Split off a sun and moon df so we can check more times
        sun_moon_df = planets_df.copy()
        ap.remove('sun')
        ap.remove('moon')

        # check planets at start and end of observation
        planets_df['DATEOBS'] = planets_df[['DATEOBS', 'duration']].apply(
            self._add_times,
            axis=1
        )
        planets_df['planet'] = [ap for i in range(planets_df.shape[0])]

        # check sun and moon every hour
        sun_moon_df['DATEOBS'] = sun_moon_df[['DATEOBS', 'duration']].apply(
            self._add_times,
            args=(True,),
            axis=1
        )

        sun_moon_df['planet'] = [
            ['sun', 'moon'] for i in range(sun_moon_df.shape[0])
        ]

        planets_df = pd.concat([planets_df, sun_moon_df], ignore_index=True)

        del sun_moon_df

        planets_df = planets_df.explode('planet').explode('DATEOBS').drop(
            'duration', axis=1
        )
        planets_df['planet'] = planets_df['planet'].str.capitalize()

        # reset index as there might be doubles but keep the id column as this
        # signifies the image id.
        planets_df = planets_df.reset_index(drop=True)

        meta = {
            'image_id': 'i',
            'DATEOBS': 'datetime64[ns]',
            'centre-ra': 'f',
            'centre-dec': 'f',
            'planet': 'U',
            'ra': 'f',
            'dec': 'f',
            'sep': 'f'
        }

        result = (
            dd.from_pandas(planets_df, self.n_workers)
            .groupby('planet')
            .apply(
                match_planet_to_field,
                meta=meta
            ).compute(
                scheduler=self.scheduler,
                n_workers=self.n_workers
            )
        )

        if result.empty:
            warnings.warn("No planets found.")

        return result

    def filter_by_moc(self, moc: mocpy.MOC):
        """
        Filters the PipeRun object to only contain the sources that are
        located within the provided moc area.

        Args:
            moc: MOC instance for which to filter the run by.

        Returns:
            PipeAnalysis: new_PipeRun
        """
        source_mask = moc.contains(
            self.sources_skycoord.ra, self.sources_skycoord.dec)

        new_sources = self.sources.loc[source_mask].copy()

        if self._vaex_meas:
            new_meas = self.measurements[
                self.measurements['source'].isin(new_sources.index.values)]
            new_meas = new_meas.extract()
        else:
            new_meas = self.measurements.loc[
                self.measurements['source'].isin(
                    new_sources.index.values
                )].copy()

        new_images = self.images.loc[
            self.images.index.isin(new_meas['image_id'].tolist())].copy()

        new_skyregions = self.skyregions[
            self.skyregions['id'].isin(new_images['skyreg_id'].values)].copy()

        new_associations = self.associations[
            self.associations['source_id'].isin(
                new_sources.index.values
            ).copy()
        ]

        new_bands = self.bands[
            self.bands['id'].isin(new_images['band_id'])
        ]

        new_relations = self.relations[
            self.relations['from_source_id'].isin(
                new_sources.index.values
            ).copy()
        ]

        new_PipeRun = PipeAnalysis(
            name=self.name,
            images=new_images,
            skyregions=new_skyregions,
            relations=new_relations,
            sources=new_sources,
            associations=new_associations,
            bands=new_bands,
            measurements=new_meas,
            measurement_pairs_file=self.measurement_pairs_file,
            vaex_meas=self._vaex_meas
        )

        return new_PipeRun

    def create_moc(
        self, max_depth: int = 9, ignore_large_run_warning: bool = False
    ) -> mocpy.MOC:
        """
        Create a MOC file that represents the area covered by
        the pipeline run.

        !!!warning
            This will take a very long time for large runs.

        Args:
            max_depth: Max depth parameter passed to the
                MOC.from_polygon_skycoord() function, defaults to 9.
            ignore_large_run_warning: Ignores the warning of creating a MOC on
                a large run.

        Returns:
            MOC object.
        """
        images_to_use = self.images.drop_duplicates(
            'skyreg_id'
        )['path'].values

        if not ignore_large_run_warning and images_to_use.shape[0] > 10:
            warnings.warn(
                "Creating a MOC for a large run will take a long time!"
                " Run again with 'ignore_large_run_warning=True` if you"
                " are sure you want to run this. A smaller `max_depth` is"
                " highly recommended."
            )
            return

        moc = create_moc_from_fits(
            images_to_use[0],
            max_depth=max_depth
        )

        if images_to_use.shape[0] > 1:
            for img in images_to_use[1:]:
                img_moc = create_moc_from_fits(
                    img,
                    max_depth
                )
                moc = moc.union(img_moc)

        return moc

__init__(self, name, images, skyregions, relations, sources, associations, bands, measurements, measurement_pairs_file, vaex_meas=False, n_workers=3, scheduler='processes') special

Constructor method.

Parameters:

Name Type Description Default
name str

The name of the pipeline run.

required
images DataFrame

Images dataframe from the pipeline run loaded from 'images.parquet'. A pandas.core.frame.DataFrame instance.

required
skyregions DataFrame

Skyregions dataframe from the pipeline run loaded from skyregions.parquet. A pandas.core.frame.DataFrame instance.

required
relations DataFrame

Relations dataframe from the pipeline run loaded from relations.parquet. A pandas.core.frame.DataFrame instance.

required
sources DataFrame

Sources dataframe from the pipeline run loaded from sources.parquet. A pandas.core.frame.DataFrame instance.

required
associations DataFrame

Associations dataframe from the pipeline run loaded from 'associations.parquet'. A pandas.core.frame.DataFrame instance.

required
bands DataFrame

The bands dataframe from the pipeline run loaded from 'bands.parquet'.

required
measurements Union[pandas.core.frame.DataFrame, vaex.dataframe.DataFrame]

Measurements dataframe from the pipeline run loaded from measurements.parquet and the forced measurements parquet files. A pandas.core.frame.DataFrame or vaex.dataframe.DataFrame instance.

required
measurement_pairs_file List[str]

The location of the two epoch pairs file from the pipeline. It is a list of locations due to the fact that two pipeline runs could be combined.

required
vaex_meas bool

'True' if the measurements have been loaded using vaex from an arrow file. False means the measurements are loaded into a pandas DataFrame.

False
n_workers int

Number of workers (cpus) available. Default is determined by running cpu_count().

3
scheduler str

Dask scheduling option to use. Options are "processes" (parallel processing) or "single-threaded". Defaults to "single-threaded".

'processes'

Returns:

Type Description
None

None

Source code in vasttools/pipeline.py
def __init__(
    self,
    name: str,
    images: pd.DataFrame,
    skyregions: pd.DataFrame,
    relations: pd.DataFrame,
    sources: pd.DataFrame,
    associations: pd.DataFrame,
    bands: pd.DataFrame,
    measurements: Union[pd.DataFrame, vaex.dataframe.DataFrame],
    measurement_pairs_file: List[str],
    vaex_meas: bool = False,
    n_workers: int = HOST_NCPU - 1,
    scheduler: str = 'processes'
) -> None:
    """
    Constructor method.

    Args:
        name: The name of the pipeline run.
        images: Images dataframe from the pipeline run loaded from
            'images.parquet'. A `pandas.core.frame.DataFrame` instance.
        skyregions: Skyregions dataframe from the pipeline run loaded from
            skyregions.parquet. A `pandas.core.frame.DataFrame` instance.
        relations: Relations dataframe from the pipeline run loaded from
            relations.parquet. A `pandas.core.frame.DataFrame` instance.
        sources: Sources dataframe from the pipeline run loaded from
            sources.parquet. A `pandas.core.frame.DataFrame` instance.
        associations: Associations dataframe from the pipeline run loaded
            from 'associations.parquet'. A `pandas.core.frame.DataFrame`
            instance.
        bands: The bands dataframe from the pipeline run loaded from
            'bands.parquet'.
        measurements: Measurements dataframe from the pipeline run
            loaded from measurements.parquet and the forced measurements
            parquet files.  A `pandas.core.frame.DataFrame` or
            `vaex.dataframe.DataFrame` instance.
        measurement_pairs_file: The location of the two epoch pairs file
            from the pipeline. It is a list of locations due to the fact
            that two pipeline runs could be combined.
        vaex_meas: 'True' if the measurements have been loaded using
            vaex from an arrow file. `False` means the measurements are
            loaded into a pandas DataFrame.
        n_workers: Number of workers (cpus) available. Default is
            determined by running `cpu_count()`.
        scheduler: Dask scheduling option to use. Options are "processes"
            (parallel processing) or "single-threaded". Defaults to
            "single-threaded".

    Returns:
        None
    """
    super(PipeRun, self).__init__()
    self.name = name
    self.images = images
    self.skyregions = skyregions
    self.sources = sources
    self.sources_skycoord = self.get_sources_skycoord()
    self.associations = associations
    self.bands = bands
    self.measurements = measurements
    self.measurement_pairs_file = measurement_pairs_file
    self.relations = relations
    self.n_workers = n_workers
    self._vaex_meas = vaex_meas
    self._loaded_two_epoch_metrics = False
    self.scheduler = scheduler

    self.logger = logging.getLogger('vasttools.pipeline.PipeRun')
    self.logger.debug('Created PipeRun instance')

    self._measurement_pairs_exists = self._check_measurement_pairs_file()

check_for_planets(self)

Checks the pipeline run for any planets in the field.

All planets are checked: Mercury, Venus, Mars, Jupiter, Saturn, Uranus, Neptune, Pluto in addition to the Sun and Moon.

Returns:

Type Description
DataFrame

DataFrame with list of planet positions. It will be empty if no planets are found. A pandas.core.frame.DataFrame instance.

Source code in vasttools/pipeline.py
def check_for_planets(self) -> pd.DataFrame:
    """
    Checks the pipeline run for any planets in the field.

    All planets are checked: Mercury, Venus, Mars, Jupiter,
    Saturn, Uranus, Neptune, Pluto in addition to the Sun and Moon.

    Returns:
        DataFrame with list of planet positions. It will be empty if no
        planets are found. A `pandas.core.frame.DataFrame` instance.
    """
    from vasttools import ALLOWED_PLANETS
    ap = ALLOWED_PLANETS.copy()

    planets_df = (
        self.images.loc[:, [
            'datetime',
            'duration',
            'centre_ra',
            'centre_dec',
        ]]
        .reset_index()
        .rename(
            columns={
                'id': 'image_id',
                'datetime': 'DATEOBS',
                'centre_ra': 'centre-ra',
                'centre_dec': 'centre-dec'
            }
        )
    )

    # Split off a sun and moon df so we can check more times
    sun_moon_df = planets_df.copy()
    ap.remove('sun')
    ap.remove('moon')

    # check planets at start and end of observation
    planets_df['DATEOBS'] = planets_df[['DATEOBS', 'duration']].apply(
        self._add_times,
        axis=1
    )
    planets_df['planet'] = [ap for i in range(planets_df.shape[0])]

    # check sun and moon every hour
    sun_moon_df['DATEOBS'] = sun_moon_df[['DATEOBS', 'duration']].apply(
        self._add_times,
        args=(True,),
        axis=1
    )

    sun_moon_df['planet'] = [
        ['sun', 'moon'] for i in range(sun_moon_df.shape[0])
    ]

    planets_df = pd.concat([planets_df, sun_moon_df], ignore_index=True)

    del sun_moon_df

    planets_df = planets_df.explode('planet').explode('DATEOBS').drop(
        'duration', axis=1
    )
    planets_df['planet'] = planets_df['planet'].str.capitalize()

    # reset index as there might be doubles but keep the id column as this
    # signifies the image id.
    planets_df = planets_df.reset_index(drop=True)

    meta = {
        'image_id': 'i',
        'DATEOBS': 'datetime64[ns]',
        'centre-ra': 'f',
        'centre-dec': 'f',
        'planet': 'U',
        'ra': 'f',
        'dec': 'f',
        'sep': 'f'
    }

    result = (
        dd.from_pandas(planets_df, self.n_workers)
        .groupby('planet')
        .apply(
            match_planet_to_field,
            meta=meta
        ).compute(
            scheduler=self.scheduler,
            n_workers=self.n_workers
        )
    )

    if result.empty:
        warnings.warn("No planets found.")

    return result

combine_with_run(self, other_PipeRun, new_name=None)

Combines the output of another PipeRun object with the PipeRun from which this method is being called from.

Warning

It is assumed you are loading runs from the same Pipeline instance. If this is not the case then erroneous results may be returned.

Parameters:

Name Type Description Default
other_PipeRun PipeRun

The other pipeline run to merge.

required
new_name Optional[str]

If not None then the PipeRun attribute 'name' is changed to the given value.

None

Returns:

Type Description
PipeRun

The self object with the other pipeline run added.

Source code in vasttools/pipeline.py
def combine_with_run(
    self, other_PipeRun, new_name: Optional[str] = None
):
    """
    Combines the output of another PipeRun object with the PipeRun
    from which this method is being called from.

    !!!warning
        It is assumed you are loading runs from the same Pipeline
        instance. If this is not the case then erroneous results may be
        returned.

    Args:
        other_PipeRun (PipeRun): The other pipeline run to merge.
        new_name: If not None then the PipeRun attribute 'name'
            is changed to the given value.

    Returns:
        PipeRun: The self object with the other pipeline run added.
    """

    self.images = pd.concat(
        [self.images, other_PipeRun.images]
    ).drop_duplicates('path')

    self.skyregions = pd.concat(
        [self.skyregions, other_PipeRun.skyregions],
        ignore_index=True
    ).drop_duplicates('id')

    if self._vaex_meas and other_PipeRun._vaex_meas:
        self.measurements = self.measurements.concat(
            other_PipeRun.measurements
        )

    elif self._vaex_meas and not other_PipeRun._vaex_meas:
        self.measurements = self.measurements.concat(
            vaex.from_pandas(other_PipeRun.measurements)
        )

    elif not self._vaex_meas and other_PipeRun._vaex_meas:
        self.measurements = vaex.from_pandas(self.measurements).concat(
            other_PipeRun.measurements
        )
        self._vaex_meas = True

    else:
        self.measurements = pd.concat(
            [self.measurements, other_PipeRun.measurements],
            ignore_index=True
        ).drop_duplicates(['id', 'source'])

    sources_to_add = other_PipeRun.sources.loc[
        ~(other_PipeRun.sources.index.isin(
            self.sources.index
        ))
    ]

    self.sources = pd.concat([self.sources, sources_to_add])

    # need to keep access to all the different pairs files
    # for two epoch metrics.
    orig_run_pairs_exist = self._measurement_pairs_exists
    other_run_pairs_exist = other_PipeRun._measurement_pairs_exists

    if orig_run_pairs_exist and other_run_pairs_exist:
        for i in other_PipeRun.measurement_pairs_file:
            self.measurement_pairs_file.append(i)

    elif orig_run_pairs_exist:
        self.logger.warning("Not combining measurement pairs because they "
                            " do not exist for the new run."
                            )
        self._measurement_pairs_exists = False

    elif other_run_pairs_exist:
        self.logger.warning("Not combining measurement pairs because they "
                            " do not exist for the original run."
                            )

    del sources_to_add

    if new_name is not None:
        self.name = new_name

    return self

create_moc(self, max_depth=9, ignore_large_run_warning=False)

Create a MOC file that represents the area covered by the pipeline run.

Warning

This will take a very long time for large runs.

Parameters:

Name Type Description Default
max_depth int

Max depth parameter passed to the MOC.from_polygon_skycoord() function, defaults to 9.

9
ignore_large_run_warning bool

Ignores the warning of creating a MOC on a large run.

False

Returns:

Type Description
MOC

MOC object.

Source code in vasttools/pipeline.py
def create_moc(
    self, max_depth: int = 9, ignore_large_run_warning: bool = False
) -> mocpy.MOC:
    """
    Create a MOC file that represents the area covered by
    the pipeline run.

    !!!warning
        This will take a very long time for large runs.

    Args:
        max_depth: Max depth parameter passed to the
            MOC.from_polygon_skycoord() function, defaults to 9.
        ignore_large_run_warning: Ignores the warning of creating a MOC on
            a large run.

    Returns:
        MOC object.
    """
    images_to_use = self.images.drop_duplicates(
        'skyreg_id'
    )['path'].values

    if not ignore_large_run_warning and images_to_use.shape[0] > 10:
        warnings.warn(
            "Creating a MOC for a large run will take a long time!"
            " Run again with 'ignore_large_run_warning=True` if you"
            " are sure you want to run this. A smaller `max_depth` is"
            " highly recommended."
        )
        return

    moc = create_moc_from_fits(
        images_to_use[0],
        max_depth=max_depth
    )

    if images_to_use.shape[0] > 1:
        for img in images_to_use[1:]:
            img_moc = create_moc_from_fits(
                img,
                max_depth
            )
            moc = moc.union(img_moc)

    return moc

filter_by_moc(self, moc)

Filters the PipeRun object to only contain the sources that are located within the provided moc area.

Parameters:

Name Type Description Default
moc MOC

MOC instance for which to filter the run by.

required

Returns:

Type Description
PipeAnalysis

new_PipeRun

Source code in vasttools/pipeline.py
def filter_by_moc(self, moc: mocpy.MOC):
    """
    Filters the PipeRun object to only contain the sources that are
    located within the provided moc area.

    Args:
        moc: MOC instance for which to filter the run by.

    Returns:
        PipeAnalysis: new_PipeRun
    """
    source_mask = moc.contains(
        self.sources_skycoord.ra, self.sources_skycoord.dec)

    new_sources = self.sources.loc[source_mask].copy()

    if self._vaex_meas:
        new_meas = self.measurements[
            self.measurements['source'].isin(new_sources.index.values)]
        new_meas = new_meas.extract()
    else:
        new_meas = self.measurements.loc[
            self.measurements['source'].isin(
                new_sources.index.values
            )].copy()

    new_images = self.images.loc[
        self.images.index.isin(new_meas['image_id'].tolist())].copy()

    new_skyregions = self.skyregions[
        self.skyregions['id'].isin(new_images['skyreg_id'].values)].copy()

    new_associations = self.associations[
        self.associations['source_id'].isin(
            new_sources.index.values
        ).copy()
    ]

    new_bands = self.bands[
        self.bands['id'].isin(new_images['band_id'])
    ]

    new_relations = self.relations[
        self.relations['from_source_id'].isin(
            new_sources.index.values
        ).copy()
    ]

    new_PipeRun = PipeAnalysis(
        name=self.name,
        images=new_images,
        skyregions=new_skyregions,
        relations=new_relations,
        sources=new_sources,
        associations=new_associations,
        bands=new_bands,
        measurements=new_meas,
        measurement_pairs_file=self.measurement_pairs_file,
        vaex_meas=self._vaex_meas
    )

    return new_PipeRun

get_source(self, id, field=None, stokes='I', outdir='.', user_measurements=None, user_sources=None)

Fetches an individual source and returns a vasttools.source.Source object.

Users do not need to change the field, stokes and outdir parameters.

Parameters:

Name Type Description Default
id int

The id of the source to load.

required
field Optional[str]

The field of the source being loaded, defaults to None. If None then the run name is used as the field.

None
stokes str

Stokes parameter of the source, defaults to 'I'.

'I'
outdir str

The output directory where generated plots will be saved, defauls to '.' (the current working directory).

'.'
user_measurements Union[pandas.core.frame.DataFrame, vaex.dataframe.DataFrame]

A user generated measurements dataframe to use instead of the default pipeline result. The type must match the default type of the pipeline (vaex or pandas). Defaults to None, in which case the default pipeline measurements are used.

None
user_sources Optional[pandas.core.frame.DataFrame]

A user generated sources dataframe to use instead of the default pipeline result. Format is always a pandas dataframe. Defaults to None, in which case the default pipeline measurements are used.

None

Returns:

Type Description
Source

vast tools Source object.

Source code in vasttools/pipeline.py
def get_source(
    self,
    id: int,
    field: Optional[str] = None,
    stokes: str = 'I',
    outdir: str = '.',
    user_measurements: Optional[Union[
        pd.DataFrame, vaex.dataframe.DataFrame]
    ] = None,
    user_sources: Optional[pd.DataFrame] = None
) -> Source:
    """
    Fetches an individual source and returns a
    vasttools.source.Source object.

    Users do not need
    to change the field, stokes and outdir parameters.

    Args:
        id: The id of the source to load.
        field: The field of the source being loaded, defaults
            to None. If None then the run name is used as the field.
        stokes: Stokes parameter of the source, defaults to 'I'.
        outdir: The output directory where generated plots will
            be saved, defauls to '.' (the current working directory).
        user_measurements: A user generated measurements dataframe to
            use instead of the default pipeline result. The type must match
            the default type of the pipeline (vaex or pandas). Defaults to
            None, in which case the default pipeline measurements are used.
        user_sources: A user generated sources dataframe to use
            instead of the default pipeline result. Format is always a
            pandas dataframe. Defaults to None, in which case the default
            pipeline measurements are used.

    Returns:
        vast tools Source object.
    """

    if user_measurements is None:
        the_measurements = self.measurements
    else:
        the_measurements = user_measurements

    if user_sources is None:
        the_sources = self.sources
    else:
        the_sources = user_sources

    if self._vaex_meas:
        measurements = the_measurements[
            the_measurements['source'] == id
        ].to_pandas_df()

    else:
        measurements = the_measurements.loc[
            the_measurements['source'] == id
        ]

    measurements = measurements.merge(
        self.images[[
            'path',
            'noise_path',
            'measurements_path',
            'frequency'
        ]], how='left',
        left_on='image_id',
        right_index=True
    ).rename(
        columns={
            'path': 'image',
            'noise_path': 'rms',
            'measurements_path': 'selavy'
        }
    )

    measurements = measurements.rename(
        columns={
            'time': 'dateobs',
        }
    ).sort_values(
        by='dateobs'
    ).reset_index(drop=True)

    s = the_sources.loc[id]

    num_measurements = s['n_measurements']

    source_coord = SkyCoord(
        s['wavg_ra'],
        s['wavg_dec'],
        unit=(u.deg, u.deg)
    )

    source_name = "VAST {}".format(
        source_coord.to_string(
            "hmsdms", sep='', precision=1
        ).replace(
            " ", ""
        )[:15]
    )
    source_epochs = [str(i) for i in range(1, num_measurements + 1)]
    if field is None:
        field = self.name
    measurements['field'] = field
    measurements['epoch'] = source_epochs
    measurements['stokes'] = stokes
    measurements['skycoord'] = [
        source_coord for i in range(num_measurements)
    ]
    measurements['detection'] = measurements['forced'] == False
    source_fields = [field for i in range(num_measurements)]
    source_stokes = stokes
    source_base_folder = None
    source_crossmatch_radius = None
    source_outdir = outdir
    source_image_type = None

    thesource = Source(
        source_coord,
        source_name,
        source_epochs,
        source_fields,
        source_stokes,
        None,
        source_crossmatch_radius,
        measurements,
        source_base_folder,
        source_image_type,
        islands=False,
        outdir=source_outdir,
        pipeline=True
    )

    return thesource

get_sources_skycoord(self, user_sources=None, ra_col='wavg_ra', dec_col='wavg_dec', ra_unit=Unit("deg"), dec_unit=Unit("deg"))

A convenience function to generate a SkyCoord object from the sources dataframe. Also has support for custom source lists.

Parameters:

Name Type Description Default
user_sources Optional[pandas.core.frame.DataFrame]

Provide a user generated source dataframe in place of using the default run sources dataframe.

None
ra_col str

The column to use for the Right Ascension.

'wavg_ra'
dec_col str

The column to use for the Declination.

'wavg_dec'
ra_unit Unit

The unit of the RA column, defaults to degrees. Must be an astropy.unit value.

Unit("deg")
dec_unit Unit

The unit of the Dec column, defaults to degrees. Must be an astropy.unit value.

Unit("deg")

Returns:

Type Description
SkyCoord

Skycoord object of the sources. A astropy.coordinates.sky_coordinate.SkyCoord instance.

Source code in vasttools/pipeline.py
def get_sources_skycoord(
    self,
    user_sources: Optional[pd.DataFrame] = None,
    ra_col: str = 'wavg_ra',
    dec_col: str = 'wavg_dec',
    ra_unit: u.Unit = u.degree,
    dec_unit: u.Unit = u.degree
) -> astropy.coordinates.sky_coordinate.SkyCoord:
    """
    A convenience function to generate a SkyCoord object from the
    sources dataframe. Also has support for custom source lists.

    Args:
        user_sources: Provide a user generated source dataframe
            in place of using the default run sources dataframe.
        ra_col: The column to use for the Right Ascension.
        dec_col: The column to use for the Declination.
        ra_unit: The unit of the RA column, defaults to degrees.
            Must be an astropy.unit value.
        dec_unit: The unit of the Dec column, defaults to degrees.
            Must be an astropy.unit value.

    Returns:
        Skycoord object of the sources. A
        `astropy.coordinates.sky_coordinate.SkyCoord` instance.
    """
    if user_sources is None:
        the_sources = self.sources
    else:
        the_sources = user_sources

    sources_skycoord = gen_skycoord_from_df(
        the_sources, ra_col=ra_col, dec_col=dec_col, ra_unit=ra_unit,
        dec_unit=dec_unit
    )

    return sources_skycoord

load_two_epoch_metrics(self)

Loads the two epoch metrics dataframe, usually stored as either 'measurement_pairs.parquet' or 'measurement_pairs.arrow'.

The two epoch metrics dataframe is stored as an attribute to the PipeRun object as self.measurement_pairs_df. An epoch 'key' is also added to the dataframe.

Also creates a 'pairs_df' that lists all the possible epoch pairs. This is stored as the attribute self.pairs_df.

Returns:

Type Description
None

None

Exceptions:

Type Description
MeasPairsDoNotExistError

The measurement pairs file(s) do not exist for this run

Source code in vasttools/pipeline.py
def load_two_epoch_metrics(self) -> None:
    """
    Loads the two epoch metrics dataframe, usually stored as either
    'measurement_pairs.parquet' or 'measurement_pairs.arrow'.

    The two epoch metrics dataframe is stored as an attribute to the
    PipeRun object as self.measurement_pairs_df. An epoch 'key' is also
    added to the dataframe.

    Also creates a 'pairs_df' that lists all the possible epoch pairs.
    This is stored as the attribute self.pairs_df.

    Returns:
        None

    Raises:
        MeasPairsDoNotExistError: The measurement pairs file(s) do not
            exist for this run
    """

    self._raise_if_no_pairs()

    image_ids = self.images.sort_values(by='datetime').index.tolist()

    pairs_df = pd.DataFrame.from_dict(
        {'pair': combinations(image_ids, 2)}
    )

    pairs_df = (
        pd.DataFrame(pairs_df['pair'].tolist())
        .rename(columns={0: 'image_id_a', 1: 'image_id_b'})
        .merge(
            self.images[['datetime', 'name']],
            left_on='image_id_a', right_index=True,
            suffixes=('_a', '_b')
        )
        .merge(
            self.images[['datetime', 'name']],
            left_on='image_id_b', right_index=True,
            suffixes=('_a', '_b')
        )
    ).reset_index().rename(
        columns={
            'index': 'id',
            'name_a': 'image_name_a',
            'name_b': 'image_name_b'
        }
    )

    pairs_df['td'] = pairs_df['datetime_b'] - pairs_df['datetime_a']

    pairs_df.drop(['datetime_a', 'datetime_b'], axis=1)

    pairs_df['pair_epoch_key'] = (
        pairs_df[['image_name_a', 'image_name_b']]
        .apply(
            lambda x: f"{x['image_name_a']}_{x['image_name_b']}", axis=1
        )
    )

    self._vaex_meas_pairs = False
    if len(self.measurement_pairs_file) > 1:
        arrow_files = (
            [i.endswith(".arrow") for i in self.measurement_pairs_file]
        )
        if np.any(arrow_files):
            measurement_pairs_df = vaex.open_many(
                self.measurement_pairs_file[arrow_files]
            )
            for i in self.measurement_pairs_file[~arrow_files]:
                temp = pd.read_parquet(i)
                temp = vaex.from_pandas(temp)
                measurement_pairs_df = measurement_pairs_df.concat(temp)
            self._vaex_meas_pairs = True
            warnings.warn("Measurement pairs have been loaded with vaex.")
        else:
            measurement_pairs_df = (
                dd.read_parquet(self.measurement_pairs_file).compute()
            )
    else:
        if self.measurement_pairs_file[0].endswith('.arrow'):
            measurement_pairs_df = (
                vaex.open(self.measurement_pairs_file[0])
            )
            self._vaex_meas_pairs = True
            warnings.warn("Measurement pairs have been loaded with vaex.")
        else:
            measurement_pairs_df = (
                pd.read_parquet(self.measurement_pairs_file[0])
            )

    if self._vaex_meas_pairs:
        measurement_pairs_df['pair_epoch_key'] = (
            measurement_pairs_df['image_name_a'] + "_"
            + measurement_pairs_df['image_name_b']
        )

        pair_counts = measurement_pairs_df.groupby(
            measurement_pairs_df.pair_epoch_key, agg='count'
        )

        pair_counts = pair_counts.to_pandas_df().rename(
            columns={'count': 'total_pairs'}
        ).set_index('pair_epoch_key')
    else:
        measurement_pairs_df['pair_epoch_key'] = (
            measurement_pairs_df[['image_name_a', 'image_name_b']]
            .apply(
                lambda x: f"{x['image_name_a']}_{x['image_name_b']}",
                axis=1
            )
        )

        pair_counts = measurement_pairs_df[
            ['pair_epoch_key', 'image_name_a']
        ].groupby('pair_epoch_key').count().rename(
            columns={'image_name_a': 'total_pairs'}
        )

    pairs_df = pairs_df.merge(
        pair_counts, left_on='pair_epoch_key', right_index=True
    )

    del pair_counts

    pairs_df = pairs_df.dropna(subset=['total_pairs']).set_index('id')

    self.measurement_pairs_df = measurement_pairs_df
    self.pairs_df = pairs_df.sort_values(by='td')

    self._loaded_two_epoch_metrics = True

Pipeline

Class to interface with VAST Pipeline results.

Attributes:

Name Type Description
project_dir str

The pipeline project directory provided by the user on initialisation.

Source code in vasttools/pipeline.py
class Pipeline(object):
    """
    Class to interface with VAST Pipeline results.

    Attributes:

        project_dir (str): The pipeline project directory provided by the user
            on initialisation.
    """

    def __init__(self, project_dir: Optional[str] = None) -> None:
        """
        Constructor method.

        The system variable `PIPELINE_WORKING_DIR` will be checked
        first with the project_dir input the fallback option.

        Args:
            project_dir: The directory of the pipeline results. Only required
                when the system variable is not defined, defaults to 'None'.

        Returns:
            None

        Raises:
            Exception: The `PIPELINE_WORKING_DIR` could not be determined.
            Exception: Pipeline run directory is not found.
        """
        super(Pipeline, self).__init__()

        if project_dir is None:
            pipeline_run_path = os.getenv('PIPELINE_WORKING_DIR')
            if pipeline_run_path is None:
                raise PipelineDirectoryError(
                    "The pipeline run directory could not be determined!"
                    " Either the system environment 'PIPELINE_WORKING_DIR'"
                    " must be defined or the 'project_dir' argument defined"
                    " when initialising the pipeline class object."
                )
        else:
            pipeline_run_path = os.path.abspath(str(project_dir))

        if not os.path.isdir(pipeline_run_path):
            raise PipelineDirectoryError(
                "Pipeline run directory {} not found!".format(
                    pipeline_run_path
                )
            )

        self.project_dir = pipeline_run_path

    def list_piperuns(self) -> List[str]:
        """
        Lists the runs present in the pipeline directory.

        Note this just list the directories, i.e. it is not known whether the
        runs are actually processed.

        Returns:
            List of pipeline run names present in directory.
        """
        jobs = sorted(glob.glob(
            os.path.join(self.project_dir, "*")
        ))

        jobs = [i.split("/")[-1] for i in jobs]
        jobs.remove('images')

        return jobs

    def list_images(self) -> List[str]:
        """
        Lists all images processed in the pipeline directory.

        Returns:
            List of images processed.
        """
        img_list = sorted(glob.glob(
            os.path.join(self.project_dir, "images", "*")
        ))

        img_list = [i.split("/")[-1] for i in img_list]

        return img_list

    def load_runs(
        self, run_names: List[str], name: Optional[str] = None,
        n_workers: int = HOST_NCPU - 1
    ) -> PipeAnalysis:
        """
        Wrapper to load multiple runs in one command.

        Args:
            run_names: List containing the names of the runs to load.
            name: A name for the resulting pipeline run.
            n_workers: The number of workers (cpus) available.

        Returns:
            Combined PipeAnalysis object.
        """
        piperun = self.load_run(
            run_names[0],
            n_workers=n_workers
        )

        if len(run_names) > 1:
            for r in run_names[1:]:
                piperun = piperun.combine_with_run(
                    self.load_run(
                        r,
                        n_workers=n_workers
                    )
                )
        if name is not None:
            piperun.name = name

        return piperun

    def load_run(
        self, run_name: str, n_workers: int = HOST_NCPU - 1
    ) -> PipeAnalysis:
        """
        Process and load a pipeline run.

        Args:
            run_name: The name of the run to load.
            n_workers: The number of workers (cpus) available.

        Returns:
            PipeAnalysis object.

        Raises:
            ValueError: Entered pipeline run does not exist.
        """

        run_dir = os.path.join(
            self.project_dir,
            run_name
        )

        if not os.path.isdir(run_dir):
            raise OSError(
                "Run '%s' directory does not exist!",
                run_name
            )
            return

        images = pd.read_parquet(
            os.path.join(
                run_dir,
                'images.parquet'
            )
        )

        skyregions = pd.read_parquet(
            os.path.join(
                run_dir,
                'skyregions.parquet'
            ),
            engine='pyarrow'
        )

        bands = pd.read_parquet(
            os.path.join(
                run_dir,
                'bands.parquet'
            ),
            engine='pyarrow'
        )

        images = images.merge(
            skyregions[[
                'id',
                'centre_ra',
                'centre_dec',
                'xtr_radius'
            ]], how='left',
            left_on='skyreg_id',
            right_on='id'
        ).drop(
            'id_y', axis=1
        ).rename(
            columns={'id_x': 'id'}
        ).merge(  # second merge for band
            bands[['id', 'frequency', 'bandwidth']],
            how='left',
            left_on='band_id',
            right_on='id'
        ).drop(
            'id_y', axis=1
        ).rename(
            columns={'id_x': 'id'}
        )

        relations = pd.read_parquet(
            os.path.join(
                run_dir,
                'relations.parquet'
            ),
            engine='pyarrow'
        )

        sources = pd.read_parquet(
            os.path.join(
                run_dir,
                'sources.parquet'
            ),
            engine='pyarrow'
        )

        to_move = ['n_meas', 'n_meas_sel', 'n_meas_forced', 'n_sibl', 'n_rel']
        sources_len = sources.shape[1]
        for c in to_move:
            col = sources.pop(c)
            sources.insert(sources_len - 1, c, col)

        sources = sources.rename(
            columns={
                'n_meas_forced': 'n_forced',
                'n_meas': 'n_measurements',
                'n_meas_sel': 'n_selavy',
                'n_sibl': 'n_siblings',
                'n_rel': 'n_relations'
            }
        )

        associations = pd.read_parquet(
            os.path.join(
                run_dir,
                'associations.parquet'
            ),
            engine='pyarrow'
        )

        vaex_meas = False

        if os.path.isfile(os.path.join(
            run_dir,
            'measurements.arrow'
        )):
            measurements = vaex.open(
                os.path.join(run_dir, 'measurements.arrow')
            )

            vaex_meas = True

            warnings.warn("Measurements have been loaded with vaex.")

        else:
            m_files = images['measurements_path'].tolist()
            m_files += sorted(glob.glob(os.path.join(
                run_dir,
                "forced_measurements*.parquet"
            )))

            # use dask to open measurement parquets
            # as they are spread over many different files
            measurements = dd.read_parquet(
                m_files,
                engine='pyarrow'
            ).compute()

            measurements = measurements.loc[
                measurements['id'].isin(associations['meas_id'].values)
            ]

            measurements = (
                associations.loc[:, ['meas_id', 'source_id']]
                .set_index('meas_id')
                .merge(
                    measurements,
                    left_index=True,
                    right_on='id'
                )
                .rename(columns={'source_id': 'source'})
            ).reset_index(drop=True)

        images = images.set_index('id')

        if os.path.isfile(os.path.join(
            run_dir,
            "measurement_pairs.arrow"
        )):
            measurement_pairs_file = [os.path.join(
                run_dir,
                "measurement_pairs.arrow"
            )]
        else:
            measurement_pairs_file = [os.path.join(
                run_dir,
                "measurement_pairs.parquet"
            )]

        piperun = PipeAnalysis(
            name=run_name,
            images=images,
            skyregions=skyregions,
            relations=relations,
            sources=sources,
            associations=associations,
            bands=bands,
            measurements=measurements,
            measurement_pairs_file=measurement_pairs_file,
            vaex_meas=vaex_meas
        )

        return piperun

__init__(self, project_dir=None) special

Constructor method.

The system variable PIPELINE_WORKING_DIR will be checked first with the project_dir input the fallback option.

Parameters:

Name Type Description Default
project_dir Optional[str]

The directory of the pipeline results. Only required when the system variable is not defined, defaults to 'None'.

None

Returns:

Type Description
None

None

Exceptions:

Type Description
Exception

The PIPELINE_WORKING_DIR could not be determined.

Exception

Pipeline run directory is not found.

Source code in vasttools/pipeline.py
def __init__(self, project_dir: Optional[str] = None) -> None:
    """
    Constructor method.

    The system variable `PIPELINE_WORKING_DIR` will be checked
    first with the project_dir input the fallback option.

    Args:
        project_dir: The directory of the pipeline results. Only required
            when the system variable is not defined, defaults to 'None'.

    Returns:
        None

    Raises:
        Exception: The `PIPELINE_WORKING_DIR` could not be determined.
        Exception: Pipeline run directory is not found.
    """
    super(Pipeline, self).__init__()

    if project_dir is None:
        pipeline_run_path = os.getenv('PIPELINE_WORKING_DIR')
        if pipeline_run_path is None:
            raise PipelineDirectoryError(
                "The pipeline run directory could not be determined!"
                " Either the system environment 'PIPELINE_WORKING_DIR'"
                " must be defined or the 'project_dir' argument defined"
                " when initialising the pipeline class object."
            )
    else:
        pipeline_run_path = os.path.abspath(str(project_dir))

    if not os.path.isdir(pipeline_run_path):
        raise PipelineDirectoryError(
            "Pipeline run directory {} not found!".format(
                pipeline_run_path
            )
        )

    self.project_dir = pipeline_run_path

list_images(self)

Lists all images processed in the pipeline directory.

Returns:

Type Description
List[str]

List of images processed.

Source code in vasttools/pipeline.py
def list_images(self) -> List[str]:
    """
    Lists all images processed in the pipeline directory.

    Returns:
        List of images processed.
    """
    img_list = sorted(glob.glob(
        os.path.join(self.project_dir, "images", "*")
    ))

    img_list = [i.split("/")[-1] for i in img_list]

    return img_list

list_piperuns(self)

Lists the runs present in the pipeline directory.

Note this just list the directories, i.e. it is not known whether the runs are actually processed.

Returns:

Type Description
List[str]

List of pipeline run names present in directory.

Source code in vasttools/pipeline.py
def list_piperuns(self) -> List[str]:
    """
    Lists the runs present in the pipeline directory.

    Note this just list the directories, i.e. it is not known whether the
    runs are actually processed.

    Returns:
        List of pipeline run names present in directory.
    """
    jobs = sorted(glob.glob(
        os.path.join(self.project_dir, "*")
    ))

    jobs = [i.split("/")[-1] for i in jobs]
    jobs.remove('images')

    return jobs

load_run(self, run_name, n_workers=3)

Process and load a pipeline run.

Parameters:

Name Type Description Default
run_name str

The name of the run to load.

required
n_workers int

The number of workers (cpus) available.

3

Returns:

Type Description
PipeAnalysis

PipeAnalysis object.

Exceptions:

Type Description
ValueError

Entered pipeline run does not exist.

Source code in vasttools/pipeline.py
def load_run(
    self, run_name: str, n_workers: int = HOST_NCPU - 1
) -> PipeAnalysis:
    """
    Process and load a pipeline run.

    Args:
        run_name: The name of the run to load.
        n_workers: The number of workers (cpus) available.

    Returns:
        PipeAnalysis object.

    Raises:
        ValueError: Entered pipeline run does not exist.
    """

    run_dir = os.path.join(
        self.project_dir,
        run_name
    )

    if not os.path.isdir(run_dir):
        raise OSError(
            "Run '%s' directory does not exist!",
            run_name
        )
        return

    images = pd.read_parquet(
        os.path.join(
            run_dir,
            'images.parquet'
        )
    )

    skyregions = pd.read_parquet(
        os.path.join(
            run_dir,
            'skyregions.parquet'
        ),
        engine='pyarrow'
    )

    bands = pd.read_parquet(
        os.path.join(
            run_dir,
            'bands.parquet'
        ),
        engine='pyarrow'
    )

    images = images.merge(
        skyregions[[
            'id',
            'centre_ra',
            'centre_dec',
            'xtr_radius'
        ]], how='left',
        left_on='skyreg_id',
        right_on='id'
    ).drop(
        'id_y', axis=1
    ).rename(
        columns={'id_x': 'id'}
    ).merge(  # second merge for band
        bands[['id', 'frequency', 'bandwidth']],
        how='left',
        left_on='band_id',
        right_on='id'
    ).drop(
        'id_y', axis=1
    ).rename(
        columns={'id_x': 'id'}
    )

    relations = pd.read_parquet(
        os.path.join(
            run_dir,
            'relations.parquet'
        ),
        engine='pyarrow'
    )

    sources = pd.read_parquet(
        os.path.join(
            run_dir,
            'sources.parquet'
        ),
        engine='pyarrow'
    )

    to_move = ['n_meas', 'n_meas_sel', 'n_meas_forced', 'n_sibl', 'n_rel']
    sources_len = sources.shape[1]
    for c in to_move:
        col = sources.pop(c)
        sources.insert(sources_len - 1, c, col)

    sources = sources.rename(
        columns={
            'n_meas_forced': 'n_forced',
            'n_meas': 'n_measurements',
            'n_meas_sel': 'n_selavy',
            'n_sibl': 'n_siblings',
            'n_rel': 'n_relations'
        }
    )

    associations = pd.read_parquet(
        os.path.join(
            run_dir,
            'associations.parquet'
        ),
        engine='pyarrow'
    )

    vaex_meas = False

    if os.path.isfile(os.path.join(
        run_dir,
        'measurements.arrow'
    )):
        measurements = vaex.open(
            os.path.join(run_dir, 'measurements.arrow')
        )

        vaex_meas = True

        warnings.warn("Measurements have been loaded with vaex.")

    else:
        m_files = images['measurements_path'].tolist()
        m_files += sorted(glob.glob(os.path.join(
            run_dir,
            "forced_measurements*.parquet"
        )))

        # use dask to open measurement parquets
        # as they are spread over many different files
        measurements = dd.read_parquet(
            m_files,
            engine='pyarrow'
        ).compute()

        measurements = measurements.loc[
            measurements['id'].isin(associations['meas_id'].values)
        ]

        measurements = (
            associations.loc[:, ['meas_id', 'source_id']]
            .set_index('meas_id')
            .merge(
                measurements,
                left_index=True,
                right_on='id'
            )
            .rename(columns={'source_id': 'source'})
        ).reset_index(drop=True)

    images = images.set_index('id')

    if os.path.isfile(os.path.join(
        run_dir,
        "measurement_pairs.arrow"
    )):
        measurement_pairs_file = [os.path.join(
            run_dir,
            "measurement_pairs.arrow"
        )]
    else:
        measurement_pairs_file = [os.path.join(
            run_dir,
            "measurement_pairs.parquet"
        )]

    piperun = PipeAnalysis(
        name=run_name,
        images=images,
        skyregions=skyregions,
        relations=relations,
        sources=sources,
        associations=associations,
        bands=bands,
        measurements=measurements,
        measurement_pairs_file=measurement_pairs_file,
        vaex_meas=vaex_meas
    )

    return piperun

load_runs(self, run_names, name=None, n_workers=3)

Wrapper to load multiple runs in one command.

Parameters:

Name Type Description Default
run_names List[str]

List containing the names of the runs to load.

required
name Optional[str]

A name for the resulting pipeline run.

None
n_workers int

The number of workers (cpus) available.

3

Returns:

Type Description
PipeAnalysis

Combined PipeAnalysis object.

Source code in vasttools/pipeline.py
def load_runs(
    self, run_names: List[str], name: Optional[str] = None,
    n_workers: int = HOST_NCPU - 1
) -> PipeAnalysis:
    """
    Wrapper to load multiple runs in one command.

    Args:
        run_names: List containing the names of the runs to load.
        name: A name for the resulting pipeline run.
        n_workers: The number of workers (cpus) available.

    Returns:
        Combined PipeAnalysis object.
    """
    piperun = self.load_run(
        run_names[0],
        n_workers=n_workers
    )

    if len(run_names) > 1:
        for r in run_names[1:]:
            piperun = piperun.combine_with_run(
                self.load_run(
                    r,
                    n_workers=n_workers
                )
            )
    if name is not None:
        piperun.name = name

    return piperun

PipelineDirectoryError (Exception)

An error to indicate an error with the pipeline directory.

Source code in vasttools/pipeline.py
class PipelineDirectoryError(Exception):
    """
    An error to indicate an error with the pipeline directory.
    """
    pass

Last update: July 30, 2024
Created: July 30, 2024