Skip to content

pairs.py

calculate_m_metric(flux_a, flux_b)

Calculate the m variability metric which is the modulation index between two fluxes. This is proportional to the fractional variability. See Section 5 of Mooley et al. (2016) for details, DOI: 10.3847/0004-637X/818/2/105.

Parameters:

Name Type Description Default
flux_a float

flux value "A".

required
flux_b float

flux value "B".

required

Returns:

Name Type Description
float float

the m metric for flux values "A" and "B".

Source code in vast_pipeline/pipeline/pairs.py
33
34
35
36
37
38
39
40
41
42
43
44
45
def calculate_m_metric(flux_a: float, flux_b: float) -> float:
    """Calculate the m variability metric which is the modulation index between two fluxes.
    This is proportional to the fractional variability.
    See Section 5 of Mooley et al. (2016) for details, DOI: 10.3847/0004-637X/818/2/105.

    Args:
        flux_a (float): flux value "A".
        flux_b (float): flux value "B".

    Returns:
        float: the m metric for flux values "A" and "B".
    """
    return 2 * ((flux_a - flux_b) / (flux_a + flux_b))

calculate_measurement_pair_metrics(df, n_cpu=0, max_partition_mb=15)

Generate a DataFrame of measurement pairs and their 2-epoch variability metrics from a DataFrame of measurements. For more information on the variability metrics, see Section 5 of Mooley et al. (2016), DOI: 10.3847/0004-637X/818/2/105.

Parameters:

Name Type Description Default
df DataFrame

Input measurements. Must contain columns: id, source, flux_int, flux_int_err, flux_peak, flux_peak_err, has_siblings.

required
n_cpu int

The desired number of workers for Dask

0
max_partition_mb int

The desired maximum size (in MB) of the partitions for Dask.

15

Returns:

Type Description
DataFrame

Measurement pairs and 2-epoch metrics. Will contain columns: source - the source ID id_a, id_b - the measurement IDs flux_int_a, flux_int_b - measurement integrated fluxes in mJy flux_int_err_a, flux_int_err_b - measurement integrated flux errors in mJy flux_peak_a, flux_peak_b - measurement peak fluxes in mJy/beam flux_peak_err_a, flux_peak_err_b - measurement peak flux errors in mJy/beam vs_peak, vs_int - variability t-statistic m_peak, m_int - variability modulation index

Source code in vast_pipeline/pipeline/pairs.py
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
def calculate_measurement_pair_metrics(
        df: pd.DataFrame, n_cpu: int = 0, max_partition_mb: int = 15) -> pd.DataFrame:
    """Generate a DataFrame of measurement pairs and their 2-epoch variability metrics
    from a DataFrame of measurements. For more information on the variability metrics, see
    Section 5 of Mooley et al. (2016), DOI: 10.3847/0004-637X/818/2/105.

    Args:
        df (pd.DataFrame): Input measurements. Must contain columns: id, source, flux_int,
            flux_int_err, flux_peak, flux_peak_err, has_siblings.
        n_cpu:
            The desired number of workers for Dask
        max_partition_mb:
            The desired maximum size (in MB) of the partitions for Dask.

    Returns:
        Measurement pairs and 2-epoch metrics. Will contain columns:
            source - the source ID
            id_a, id_b - the measurement IDs
            flux_int_a, flux_int_b - measurement integrated fluxes in mJy
            flux_int_err_a, flux_int_err_b - measurement integrated flux errors in mJy
            flux_peak_a, flux_peak_b - measurement peak fluxes in mJy/beam
            flux_peak_err_a, flux_peak_err_b - measurement peak flux errors in mJy/beam
            vs_peak, vs_int - variability t-statistic
            m_peak, m_int - variability modulation index
    """

    n_workers, n_partitions = calculate_workers_and_partitions(
        df,
        n_cpu=n_cpu,
        max_partition_mb=max_partition_mb
    )
    logger.debug(f"Running association with {n_workers} CPUs")

    """Create a DataFrame containing all measurement ID combinations per source.
    Resultant DataFrame will have a MultiIndex(["source", RangeIndex]) where "source" is
    the source ID and RangeIndex is an unnamed temporary ID for each measurement pair,
    unique only together with source.
    DataFrame will have columns [0, 1], each containing a measurement ID. e.g.
                       0      1
        source
        1       0      1   9284
                1      1  17597
                2      1  26984
                3   9284  17597
                4   9284  26984
        ...          ...    ...
        11105   2  11845  19961
        11124   0   3573  12929
                1   3573  21994
                2  12929  21994
        11128   0   6216  23534
    """
    measurement_combinations = (
        dd.from_pandas(df, npartitions=n_partitions)
        .groupby("source")["id"]
        .apply(
            lambda x: pd.DataFrame(list(combinations(x, 2))), meta={0: "i", 1: "i"},)
        .compute(num_workers=n_workers, scheduler="processes")
    )

    """Drop the RangeIndex from the MultiIndex as it isn't required and rename the columns.
    Example resultant DataFrame:
               source   id_a   id_b
        0           1      1   9284
        1           1      1  17597
        2           1      1  26984
        3           1   9284  17597
        4           1   9284  26984
        ...       ...    ...    ...
        33640   11105  11845  19961
        33641   11124   3573  12929
        33642   11124   3573  21994
        33643   11124  12929  21994
        33644   11128   6216  23534
    Where source is the source ID, id_a and id_b are measurement IDs.
    """
    measurement_combinations = measurement_combinations.reset_index(
        level=1, drop=True
    ).rename(columns={0: "id_a", 1: "id_b"}).astype(int).reset_index()

    # Dask has a tendency to swap which order the measurement pairs are
    # defined in, even if the dataframe is pre-sorted. We want the pairs to be
    # in date order (a < b) so the code below corrects any that are not.
    measurement_combinations = measurement_combinations.join(
        df[['source', 'id', 'datetime']].set_index(['source', 'id']),
        on=['source', 'id_a'],
    )

    measurement_combinations = measurement_combinations.join(
        df[['source', 'id', 'datetime']].set_index(['source', 'id']),
        on=['source', 'id_b'], lsuffix='_a', rsuffix='_b'
    )

    to_correct_mask = (
        measurement_combinations['datetime_a']
        > measurement_combinations['datetime_b']
    )

    if np.any(to_correct_mask):
        logger.debug('Correcting measurement pairs order')
        (
            measurement_combinations.loc[to_correct_mask, 'id_a'],
            measurement_combinations.loc[to_correct_mask, 'id_b']
        ) = np.array([
            measurement_combinations.loc[to_correct_mask, 'id_b'].values,
            measurement_combinations.loc[to_correct_mask, 'id_a'].values
        ])

    measurement_combinations = measurement_combinations.drop(
        ['datetime_a', 'datetime_b'], axis=1
    )

    # add the measurement fluxes and errors
    association_fluxes = df.set_index(["source", "id"])[
        ["flux_int", "flux_int_err", "flux_peak", "flux_peak_err", "image"]
    ].rename(columns={"image": "image_name"})
    measurement_combinations = measurement_combinations.join(
        association_fluxes,
        on=["source", "id_a"],
    ).join(
        association_fluxes,
        on=["source", "id_b"],
        lsuffix="_a",
        rsuffix="_b",
    )

    # calculate 2-epoch metrics
    measurement_combinations["vs_peak"] = calculate_vs_metric(
        measurement_combinations.flux_peak_a,
        measurement_combinations.flux_peak_b,
        measurement_combinations.flux_peak_err_a,
        measurement_combinations.flux_peak_err_b,
    )
    measurement_combinations["vs_int"] = calculate_vs_metric(
        measurement_combinations.flux_int_a,
        measurement_combinations.flux_int_b,
        measurement_combinations.flux_int_err_a,
        measurement_combinations.flux_int_err_b,
    )
    measurement_combinations["m_peak"] = calculate_m_metric(
        measurement_combinations.flux_peak_a,
        measurement_combinations.flux_peak_b,
    )
    measurement_combinations["m_int"] = calculate_m_metric(
        measurement_combinations.flux_int_a,
        measurement_combinations.flux_int_b,
    )

    return measurement_combinations

calculate_vs_metric(flux_a, flux_b, flux_err_a, flux_err_b)

Calculate the Vs variability metric which is the t-statistic that the provided fluxes are variable. See Section 5 of Mooley et al. (2016) for details, DOI: 10.3847/0004-637X/818/2/105.

Parameters:

Name Type Description Default
flux_a float

flux value "A".

required
flux_b float

flux value "B".

required
flux_err_a float

error of flux_a.

required
flux_err_b float

error of flux_b.

required

Returns:

Name Type Description
float float

the Vs metric for flux values "A" and "B".

Source code in vast_pipeline/pipeline/pairs.py
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
def calculate_vs_metric(
    flux_a: float, flux_b: float, flux_err_a: float, flux_err_b: float
) -> float:
    """Calculate the Vs variability metric which is the t-statistic that the provided
    fluxes are variable. See Section 5 of Mooley et al. (2016) for details,
    DOI: 10.3847/0004-637X/818/2/105.

    Args:
        flux_a (float): flux value "A".
        flux_b (float): flux value "B".
        flux_err_a (float): error of `flux_a`.
        flux_err_b (float): error of `flux_b`.

    Returns:
        float: the Vs metric for flux values "A" and "B".
    """
    return (flux_a - flux_b) / np.hypot(flux_err_a, flux_err_b)