Skip to content

new_sources.py

check_primary_image(row)

Checks if the primary image is in the image list.

Parameters:

Name Type Description Default
row Series

Row of the missing_sources_df, need the keys 'primary' and 'img_list'.

required

Returns:

Type Description
bool

True if the primary image is in the image list.

Source code in vast_pipeline/pipeline/new_sources.py
23
24
25
26
27
28
29
30
31
32
33
34
35
def check_primary_image(row: pd.Series) -> bool:
    """
    Checks if the primary image is in the image list.

    Args:
        row:
            Row of the missing_sources_df, need the keys 'primary' and
            'img_list'.

    Returns:
        True if the primary image is in the image list.
    """
    return row['primary'] in row['img_list']

gen_array_coords_from_wcs(coords, wcs)

Converts SkyCoord coordinates to array coordinates given a wcs.

Parameters:

Name Type Description Default
coords SkyCoord

The coordinates to convert.

required
wcs WCS

The WCS to use for the conversion.

required

Returns:

Type Description
ndarray

Array containing the x and y array coordinates of the input sky coordinates, e.g.: np.array([[x1, x2, x3], [y1, y2, y3]])

Source code in vast_pipeline/pipeline/new_sources.py
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
def gen_array_coords_from_wcs(coords: SkyCoord, wcs: WCS) -> np.ndarray:
    """
    Converts SkyCoord coordinates to array coordinates given a wcs.

    Args:
        coords:
            The coordinates to convert.
        wcs:
            The WCS to use for the conversion.

    Returns:
        Array containing the x and y array coordinates of the input sky
            coordinates, e.g.:
            np.array([[x1, x2, x3], [y1, y2, y3]])
    """
    array_coords = wcs.world_to_array_index(coords)
    array_coords = np.array([
        np.array(array_coords[0]),
        np.array(array_coords[1]),
    ])

    return array_coords

get_image_rms_measurements(group, nbeam=3, edge_buffer=1.0)

Take the coordinates provided from the group and measure the array cell value in the provided image.

Parameters:

Name Type Description Default
group DataFrame

The group of sources to measure in the image, requiring the columns: 'source', 'wavg_ra', 'wavg_dec' and 'img_diff_rms_path'.

required
nbeam int

The number of half beamwidths (BMAJ) away from the edge of the image or a NaN value that is acceptable.

3
edge_buffer float

Multiplicative factor applied to nbeam to act as a buffer.

1.0

Returns:

Type Description
DataFrame

The group dataframe with the 'img_diff_true_rms' column added. The column will contain 'NaN' entires for sources that fail.

Source code in vast_pipeline/pipeline/new_sources.py
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
def get_image_rms_measurements(
    group: pd.DataFrame, nbeam: int = 3, edge_buffer: float = 1.0
) -> pd.DataFrame:
    """
    Take the coordinates provided from the group
    and measure the array cell value in the provided image.

    Args:
        group:
            The group of sources to measure in the image, requiring the
            columns: 'source', 'wavg_ra', 'wavg_dec' and 'img_diff_rms_path'.
        nbeam:
            The number of half beamwidths (BMAJ) away from the edge of the
            image or a NaN value that is acceptable.
        edge_buffer:
            Multiplicative factor applied to nbeam to act as a buffer.

    Returns:
        The group dataframe with the 'img_diff_true_rms' column added. The
            column will contain 'NaN' entires for sources that fail.
    """
    if len(group) == 0:
        # input dataframe is empty, nothing to do
        logger.debug(f"No image RMS measurements to get, returning")
        return group
    image = group.iloc[0]['img_diff_rms_path']

    logger.debug(f"{image} - num. meas. to get: {len(group)}")
    partition_mem = get_df_memory_usage(group)
    logger.debug(f"{image} - partition memory usage: {partition_mem}MB")

    get_rms_timer = StopWatch()

    with open_fits(image) as hdul:
        header = hdul[0].header
        wcs = WCS(header, naxis=2)
        data = hdul[0].data.squeeze()

    logger.debug(f"{image} - Time to load fits: {get_rms_timer.reset()}s")

    # Here we mimic the forced fits behaviour,
    # sources within 3 half BMAJ widths of the image
    # edges are ignored. The user buffer is also
    # applied for consistency.
    pixelscale = (
        proj_plane_pixel_scales(wcs)[1] * u.deg
    ).to(u.arcsec)

    bmaj = header["BMAJ"] * u.deg

    npix = round(
        (nbeam / 2. * bmaj.to('arcsec') /
         pixelscale).value
    )

    npix = int(round(npix * edge_buffer))

    coords = SkyCoord(
        group.wavg_ra, group.wavg_dec, unit=(u.deg, u.deg)
    )

    array_coords = gen_array_coords_from_wcs(coords, wcs)

    # check for pixel wrapping
    x_valid = np.logical_or(
        array_coords[0] >= (data.shape[0] - npix),
        array_coords[0] < npix
    )

    y_valid = np.logical_or(
        array_coords[1] >= (data.shape[1] - npix),
        array_coords[1] < npix
    )

    valid = ~np.logical_or(
        x_valid, y_valid
    )

    valid_indexes = group[valid].index.values

    group = group.loc[valid_indexes]

    if group.empty:
        # early return if all sources failed range check
        logger.debug(
            'All sources out of range in new source rms measurement'
            f' for image {image}.'
        )
        group['img_diff_true_rms'] = np.nan
        return group

    # Now we also need to check proximity to NaN values
    # as forced fits may also drop these values
    coords = SkyCoord(
        group.wavg_ra, group.wavg_dec, unit=(u.deg, u.deg)
    )

    array_coords = gen_array_coords_from_wcs(coords, wcs)

    acceptable_no_nan_dist = int(
        round(bmaj.to('arcsec').value / 2. / pixelscale.value)
    )

    nan_valid = []

    # Get slices of each source and check NaN is not included.
    for i, j in zip(array_coords[0], array_coords[1]):
        sl = tuple((
            slice(i - acceptable_no_nan_dist, i + acceptable_no_nan_dist),
            slice(j - acceptable_no_nan_dist, j + acceptable_no_nan_dist)
        ))
        if np.any(np.isnan(data[sl])):
            nan_valid.append(False)
        else:
            nan_valid.append(True)

    valid_indexes = group[nan_valid].index.values

    if np.any(nan_valid):
        # only run if there are actual values to measure
        rms_values = data[
            array_coords[0][nan_valid],
            array_coords[1][nan_valid]
        ]

        # not matched ones will be NaN.
        group.loc[
            valid_indexes, 'img_diff_true_rms'
        ] = rms_values.astype(np.float64) * 1.e3

    else:
        group['img_diff_true_rms'] = np.nan

    return group

new_sources(sources_df, missing_sources_df, min_sigma, edge_buffer, p_run, n_cpu=5, max_partition_mb=15)

Processes the new sources detected to check that they are valid new sources. This involves checking to see that the source should be seen at all in the images where it is not detected. For valid new sources the snr value the source would have in non-detected images is also calculated.

Parameters:

Name Type Description Default
sources_df DataFrame

The sources found from the association step.

required
missing_sources_df DataFrame

The dataframe containing the 'missing detections' for each source. See the source code comments for the layout of this dataframe.

required
min_sigma float

The minimum sigma value acceptable when compared to the minimum rms of the respective image.

required
edge_buffer float

Multiplicative factor to be passed to the 'get_image_rms_measurements' function.

required
p_run Run

The pipeline run.

required
n_cpu int

The desired number of workers for Dask

5
max_partition_mb int

The desired maximum size (in MB) of the partitions for Dask.

15

Returns:

Type Description
DataFrame

The original input dataframe with the 'img_diff_true_rms' column added. The column will contain 'NaN' entires for sources that fail. Columns: source - source id, int. img_list - list of images, List. wavg_ra - weighted average RA, float. wavg_dec - weighted average Dec, float. skyreg_img_list - list of sky regions of images in img_list, List. img_diff - The images missing from coverage, List. primary - What should be the first image, str. detection - The first detection image, str. detection_time - Datetime of detection, datetime.datetime. img_diff_time - Datetime of img_diff list, datetime.datetime. img_diff_rms_min - Minimum rms of diff images, float. img_diff_rms_median - Median rms of diff images, float. img_diff_rms_path - rms path of diff images, str. flux_peak - Flux peak of source (detection), float. diff_sigma - SNR in differnce images (compared to minimum), float. img_diff_true_rms - The true rms value from the diff images, float. new_high_sigma - peak flux / true rms value, float.

Source code in vast_pipeline/pipeline/new_sources.py
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
def new_sources(
    sources_df: pd.DataFrame, missing_sources_df: pd.DataFrame,
    min_sigma: float, edge_buffer: float, p_run: Run, n_cpu: int = 5,
    max_partition_mb: int = 15
) -> pd.DataFrame:
    """
    Processes the new sources detected to check that they are valid new
    sources. This involves checking to see that the source *should* be seen at
    all in     the images where it is not detected. For valid new sources the
    snr value the source would have in non-detected images is also calculated.

    Args:
        sources_df:
            The sources found from the association step.
        missing_sources_df:
            The dataframe containing the 'missing detections' for each source.
            See the source code comments for the layout of this dataframe.
        min_sigma:
            The minimum sigma value acceptable when compared to the minimum
            rms of the respective image.
        edge_buffer:
            Multiplicative factor to be passed to the
            'get_image_rms_measurements' function.
        p_run:
            The pipeline run.
        n_cpu:
            The desired number of workers for Dask
        max_partition_mb:
            The desired maximum size (in MB) of the partitions for Dask.

    Returns:
        The original input dataframe with the 'img_diff_true_rms' column
            added. The column will contain 'NaN' entires for sources that fail.
            Columns:
                source - source id, int.
                img_list - list of images, List.
                wavg_ra - weighted average RA, float.
                wavg_dec - weighted average Dec, float.
                skyreg_img_list - list of sky regions of images in img_list,
                    List.
                img_diff - The images missing from coverage, List.
                primary - What should be the first image, str.
                detection - The first detection image, str.
                detection_time - Datetime of detection, datetime.datetime.
                img_diff_time - Datetime of img_diff list, datetime.datetime.
                img_diff_rms_min - Minimum rms of diff images, float.
                img_diff_rms_median - Median rms of diff images, float.
                img_diff_rms_path - rms path of diff images, str.
                flux_peak - Flux peak of source (detection), float.
                diff_sigma - SNR in differnce images (compared to minimum),
                    float.
                img_diff_true_rms - The true rms value from the diff images,
                    float.
                new_high_sigma - peak flux / true rms value, float.
    """
    # Missing sources df layout
    # +----------+----------------------------------+-----------+------------+
    # |   source | img_list                         |   wavg_ra |   wavg_dec |
    # |----------+----------------------------------+-----------+------------+
    # |      278 | ['VAST_0127-73A.EPOCH01.I.fits'] |  22.2929  |   -71.8717 |
    # |      702 | ['VAST_0127-73A.EPOCH01.I.fits'] |  28.8125  |   -69.3547 |
    # |      776 | ['VAST_0127-73A.EPOCH01.I.fits'] |  31.8223  |   -70.4674 |
    # |      844 | ['VAST_0127-73A.EPOCH01.I.fits'] |  17.3152  |   -72.346  |
    # |      934 | ['VAST_0127-73A.EPOCH01.I.fits'] |   9.75754 |   -72.9629 |
    # +----------+----------------------------------+-----------+------------+
    # ------------------------------------------------------------------+
    #  skyreg_img_list                                                  |
    # ------------------------------------------------------------------+
    #  ['VAST_0127-73A.EPOCH01.I.fits', 'VAST_0127-73A.EPOCH08.I.fits'] |
    #  ['VAST_0127-73A.EPOCH01.I.fits', 'VAST_0127-73A.EPOCH08.I.fits'] |
    #  ['VAST_0127-73A.EPOCH01.I.fits', 'VAST_0127-73A.EPOCH08.I.fits'] |
    #  ['VAST_0127-73A.EPOCH01.I.fits', 'VAST_0127-73A.EPOCH08.I.fits'] |
    #  ['VAST_0127-73A.EPOCH01.I.fits', 'VAST_0127-73A.EPOCH08.I.fits'] |
    # ------------------------------------------------------------------+
    # ----------------------------------+
    #  img_diff                         |
    # ----------------------------------|
    #  ['VAST_0127-73A.EPOCH08.I.fits'] |
    #  ['VAST_0127-73A.EPOCH08.I.fits'] |
    #  ['VAST_0127-73A.EPOCH08.I.fits'] |
    #  ['VAST_0127-73A.EPOCH08.I.fits'] |
    #  ['VAST_0127-73A.EPOCH08.I.fits'] |
    # ----------------------------------+
    timer = StopWatch()
    debug_timer = StopWatch()

    logger.info("Starting new source analysis.")

    cols = [
        'id', 'name', 'noise_path', 'datetime',
        'rms_median', 'rms_min', 'rms_max',
    ]

    images_df = pd.DataFrame(list(
        Image.objects.filter(
            run=p_run
        ).values(*tuple(cols))
    )).set_index('name')

    # Get rid of sources that are not 'new', i.e. sources which the
    # first sky region image is not in the image list
    new_sources_df = missing_sources_df[
        missing_sources_df['in_primary'] == False
    ].drop(
        columns=['in_primary']
    )

    # Check if the previous sources would have actually been seen
    # i.e. are the previous images sensitive enough

    # save the index before exploding
    new_sources_df = new_sources_df.reset_index()

    # Explode now to avoid two loops below
    new_sources_df = new_sources_df.explode('img_diff')

    # Merge the respective image information to the df
    new_sources_df = new_sources_df.merge(
        images_df[['datetime']],
        left_on='detection',
        right_on='name',
        how='left'
    ).rename(columns={'datetime': 'detection_time'})

    new_sources_df = new_sources_df.merge(
        images_df[[
            'datetime', 'rms_min', 'rms_median',
            'noise_path'
        ]],
        left_on='img_diff',
        right_on='name',
        how='left'
    ).rename(columns={
        'datetime': 'img_diff_time',
        'rms_min': 'img_diff_rms_min',
        'rms_median': 'img_diff_rms_median',
        'noise_path': 'img_diff_rms_path'
    })

    logger.debug(f"Time to reset & merge image info into new_sources_df: "
                 f"{debug_timer.reset()}s"
                 )

    # Select only those images that come before the detection image
    # in time.
    new_sources_df = new_sources_df[
        new_sources_df.img_diff_time < new_sources_df.detection_time
    ]

    # merge the detection fluxes in
    new_sources_df = pd.merge(
        new_sources_df, sources_df[['source', 'image', 'flux_peak']],
        left_on=['source', 'detection'], right_on=['source', 'image'],
        how='left'
    ).drop(columns=['image'])

    logger.debug(f"Time to merge detection fluxes into new_sources_df: "
                 f"{debug_timer.reset()}s"
                 )

    # calculate the sigma of the source if it was placed in the
    # minimum rms region of the previous images
    new_sources_df['diff_sigma'] = (
        new_sources_df['flux_peak'].values
        / new_sources_df['img_diff_rms_min'].values
    )

    # keep those that are above the user specified threshold
    new_sources_df = new_sources_df.loc[
        new_sources_df['diff_sigma'] >= min_sigma
    ]

    logger.debug(f"Time to do new_sources_df threshold calcs: "
                 f"{debug_timer.reset()}s"
                 )

    # Now have list of sources that should have been seen before given
    # previous images minimum rms values.

    # Current inaccurate sky regions may mean that the source
    # was in a previous 'NaN' area of the image. This needs to be
    # checked. Currently the check is done by filtering out of range
    # pixels once the values have been obtained (below).
    # This could be done using MOCpy however this is reasonably
    # fast and the io of a MOC fits may take more time.

    # So these sources will be flagged as new sources, but we can also
    # make a guess of how signficant they are. For this the next step is
    # to measure the true rms at the source location.

    # measure the actual rms in the previous images at
    # the source location.

    # PR#713: This part of the code should be rewritten to reflect the new
    # behaviour of parallel_get_rms_measurements. That function should be
    # renamed to something like parallel_get_new_high_sigma and all of the
    # subsequent code in this function moved into it.

    logger.debug("Getting rms measurements...")

    new_sources_df = parallel_get_rms_measurements(
        new_sources_df, edge_buffer=edge_buffer,
        n_cpu=n_cpu, max_partition_mb=max_partition_mb
    )
    logger.debug(f"Time to get rms measurements: {debug_timer.reset()}s")

    # this removes those that are out of range
    new_sources_df['img_diff_true_rms'] = (
        new_sources_df['img_diff_true_rms'].fillna(0.)
    )
    new_sources_df = new_sources_df[
        new_sources_df['img_diff_true_rms'] != 0
    ]

    # calculate the true sigma
    new_sources_df['true_sigma'] = (
        new_sources_df['flux_peak'].values
        / new_sources_df['img_diff_true_rms'].values
    )

    # We only care about the highest true sigma
    # new_sources_df = new_sources_df.sort_values(
    #    by=['source', 'true_sigma']
    # )

    # keep only the highest for each source, rename for the daatabase
    new_sources_df = (
        new_sources_df
        # .drop_duplicates('source')
        .set_index('source')
        .rename(columns={'true_sigma': 'new_high_sigma'})
    )

    # moving forward only the new_high_sigma columns is needed, drop all
    # others.
    new_sources_df = new_sources_df[['new_high_sigma']]

    logger.debug(f"Time to to do final cleanup steps {debug_timer.reset()}s")

    logger.info(
        'Total new source analysis time: %.2f seconds', timer.reset_init()
    )

    return new_sources_df

parallel_get_rms_measurements(df, edge_buffer=1.0, n_cpu=0, max_partition_mb=15)

Wrapper function to use 'get_image_rms_measurements' in parallel with Dask. nbeam is not an option here as that parameter is fixed in forced extraction and so is made sure to be fixed here to. This may change in the future.

Parameters:

Name Type Description Default
df DataFrame

The group of sources to measure in the images.

required
edge_buffer float

Multiplicative factor to be passed to the 'get_image_rms_measurements' function.

1.0
n_cpu int

The desired number of workers for Dask

0
max_partition_mb int

The desired maximum size (in MB) of the partitions for Dask.

15

Returns:

Type Description
DataFrame

The original input dataframe with the 'img_diff_true_rms' column added. The column will contain 'NaN' entires for sources that fail.

Source code in vast_pipeline/pipeline/new_sources.py
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
def parallel_get_rms_measurements(
    df: pd.DataFrame, edge_buffer: float = 1.0,
    n_cpu: int = 0, max_partition_mb: int = 15
) -> pd.DataFrame:
    """
    Wrapper function to use 'get_image_rms_measurements'
    in parallel with Dask. nbeam is not an option here as that parameter
    is fixed in forced extraction and so is made sure to be fixed here to. This
    may change in the future.

    Args:
        df:
            The group of sources to measure in the images.
        edge_buffer:
            Multiplicative factor to be passed to the
            'get_image_rms_measurements' function.
        n_cpu:
            The desired number of workers for Dask
        max_partition_mb:
            The desired maximum size (in MB) of the partitions for Dask.


    Returns:
        The original input dataframe with the 'img_diff_true_rms' column
            added. The column will contain 'NaN' entires for sources that fail.
    """

    out = df[[
        'source', 'wavg_ra', 'wavg_dec',
        'img_diff_rms_path'
    ]]

    col_dtype = {
        'source': 'i',
        'wavg_ra': 'f',
        'wavg_dec': 'f',
        'img_diff_rms_path': 'U',
        'img_diff_true_rms': 'f',
    }

    n_workers, n_partitions = calculate_workers_and_partitions(
        df,
        n_cpu=n_cpu,
        max_partition_mb=max_partition_mb
    )
    logger.debug(f"Running get_image_rms_measurements with {n_workers} CPUs")

    out = (
        dd.from_pandas(out, npartitions=n_partitions)
        .groupby('img_diff_rms_path')
        .apply(
            get_image_rms_measurements,
            edge_buffer=edge_buffer,
            meta=col_dtype
        ).compute(num_workers=n_workers, scheduler='processes')
    )

    # We don't need all of the RMS measurements, just the lowest. Keeping all
    # of them results in huge memory usage when merging. However, there is an
    # existing bug: https://github.com/askap-vast/vast-pipeline/issues/713
    # that means that we actually want the _highest_ in order to reproduce the
    # current behaviour. Fixing the bug is beyond the scope of this PR because
    # it means rewriting tests and test data.

    df_to_merge = (df.drop_duplicates('source')
                   .drop(['img_diff_rms_path'], axis=1)
                   )

    out_to_merge = (out.sort_values(
        by=['source', 'img_diff_true_rms'], ascending=False
    )
        .drop_duplicates('source')
    )

    df = df_to_merge.merge(
        out_to_merge[['source', 'img_diff_true_rms']],
        left_on='source', right_on='source',
        how='left'
    )

    return df