Skip to content

loading.py

SQL_update(df, model, index=None, columns=None)

Generate the SQL code required to update the database.

Parameters:

Name Type Description Default
df DataFrame

DataFrame containing the new data to be uploaded to the database. The columns to be updated need to have the same headers between the df and the table in the database.

required
model Model

The model that is being updated.

required
index Optional[str]

Header of the column to join on, determines which rows in the different tables match. If None, then use the primary key column.

None
columns Optional[List[str]]

The column headers of the columns to be updated. If None, updates all columns except the index column.

None

Returns:

Type Description
str

The SQL command to update the database.

Source code in vast_pipeline/pipeline/loading.py
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
def SQL_update(
    df: pd.DataFrame, model: models.Model, index: Optional[str] = None,
    columns: Optional[List[str]] = None
) -> str:
    '''
    Generate the SQL code required to update the database.

    Args:
        df:
            DataFrame containing the new data to be uploaded to the database.
            The columns to be updated need to have the same headers between
            the df and the table in the database.
        model:
            The model that is being updated.
        index:
            Header of the column to join on, determines which rows in the
            different tables match. If None, then use the primary key column.
        columns:
            The column headers of the columns to be updated. If None, updates
            all columns except the index column.

    Returns:
        The SQL command to update the database.
    '''
    # set index and columns if None
    if index is None:
        index = model._meta.pk.name
    if columns is None:
        columns = df.columns.tolist()
        columns.remove(index)

    # get names
    table = model._meta.db_table
    new_columns = ', '.join('new_' + c for c in columns)
    set_columns = ', '.join(c + '=new_' + c for c in columns)

    # get index values and new values
    column_headers = [index]
    column_headers.extend(columns)
    data_arr = df[column_headers].to_numpy()
    values = []
    for row in data_arr:
        val_row = '(' + ', '.join(f'{val}' for val in row) + ')'
        values.append(val_row)
    values = ', '.join(values)

    # update database
    SQL_comm = f"""
        UPDATE {table}
        SET {set_columns}
        FROM (VALUES {values})
        AS new_values (index_col, {new_columns})
        WHERE {index}=index_col;
    """

    return SQL_comm

bulk_upload_model(djmodel, generator, batch_size=10000, return_ids=False)

Bulk upload a list of generator objects of django models to db.

Parameters:

Name Type Description Default
djmodel Model

The Django pipeline model to be uploaded.

required
generator Iterable[Generator[Model, None, None]]

The generator objects of the model to upload.

required
batch_size int

How many records to upload at once.

10000
return_ids bool

When set to True, the database IDs of the uploaded objects are returned.

False

Returns:

Type Description
List[int]

None or a list of the database IDs of the uploaded objects.

Source code in vast_pipeline/pipeline/loading.py
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
@transaction.atomic
def bulk_upload_model(
    djmodel: models.Model,
    generator: Iterable[Generator[models.Model, None, None]],
    batch_size: int = 10_000,
    return_ids: bool = False,
) -> List[int]:
    '''
    Bulk upload a list of generator objects of django models to db.

    Args:
        djmodel:
            The Django pipeline model to be uploaded.
        generator:
            The generator objects of the model to upload.
        batch_size:
            How many records to upload at once.
        return_ids:
            When set to True, the database IDs of the uploaded objects are
            returned.

    Returns:
        None or a list of the database IDs of the uploaded objects.

    '''
    reset_queries()

    bulk_ids = []
    while True:
        items = list(islice(generator, batch_size))
        if not items:
            break
        out_bulk = djmodel.objects.bulk_create(items)
        logger.info('Bulk created #%i %s', len(out_bulk), djmodel.__name__)
        # save the DB ids to return
        if return_ids:
            bulk_ids.extend(list(map(lambda i: i.id, out_bulk)))

    if return_ids:
        return bulk_ids

make_upload_associations(associations_df)

Uploads the associations from the supplied associations DataFrame.

Parameters:

Name Type Description Default
associations_df DataFrame

DataFrame containing the associations information from the pipeline.

required

Returns:

Type Description
None

None.

Source code in vast_pipeline/pipeline/loading.py
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
def make_upload_associations(associations_df: pd.DataFrame) -> None:
    """
    Uploads the associations from the supplied associations DataFrame.

    Args:
        associations_df:
            DataFrame containing the associations information from the
            pipeline.

    Returns:
        None.
    """
    logger.info('Upload associations...')

    mem_usage = get_df_memory_usage(associations_df)
    logger.debug(f"associations_df memory usage: {mem_usage}MB")
    log_total_memory_usage()

    assoc_chunk_size = 100000
    for i in range(0, len(associations_df), assoc_chunk_size):
        bulk_upload_model(
            Association,
            association_models_generator(
                associations_df[i:i + assoc_chunk_size])
        )

make_upload_images(paths, image_config)

Carry the first part of the pipeline, by uploading all the images to the image table and populated band and skyregion objects.

Parameters:

Name Type Description Default
paths Dict[str, Dict[str, str]]

Dictionary containing the image, noise and background paths of all the images in the pipeline run. The primary keys are selavy, 'noise' and 'background' with the secondary key being the image name.

required
image_config Dict

Dictionary of configuration options for the image ingestion.

required

Returns:

Type Description
List[Image]

A list of Image objects that have been uploaded.

List[SkyRegion]

A list of SkyRegion objects that have been uploaded.

List[Band]

A list of Band objects that have been uploaded.

Source code in vast_pipeline/pipeline/loading.py
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
def make_upload_images(
    paths: Dict[str, Dict[str, str]], image_config: Dict
) -> Tuple[List[Image], List[SkyRegion], List[Band]]:
    '''
    Carry the first part of the pipeline, by uploading all the images
    to the image table and populated band and skyregion objects.

    Args:
        paths:
            Dictionary containing the image, noise and background paths of all
            the images in the pipeline run. The primary keys are `selavy`,
            'noise' and 'background' with the secondary key being the image
            name.
        image_config:
            Dictionary of configuration options for the image ingestion.

    Returns:
        A list of Image objects that have been uploaded.
        A list of SkyRegion objects that have been uploaded.
        A list of Band objects that have been uploaded.
    '''
    timer = StopWatch()
    images = []
    skyregions = []
    bands = []

    for path in paths['selavy']:
        # STEP #1: Load image and measurements
        image = SelavyImage(
            path,
            paths,
            image_config
        )
        logger.info('Reading image %s ...', image.name)

        # 1.1 get/create the frequency band
        with transaction.atomic():
            band = get_create_img_band(image)
        if band not in bands:
            bands.append(band)

        # 1.2 create image and skyregion entry in DB
        with transaction.atomic():
            img, exists_f = get_create_img(band.id, image)
            skyreg = img.skyreg

            # add image and skyregion to respective lists
            images.append(img)
            if skyreg not in skyregions:
                skyregions.append(skyreg)

            if exists_f:
                logger.info("Image %s already processed", img.name)
                continue

            # 1.3 get the image measurements and save them in DB
            measurements = image.read_selavy(img)
            logger.info(
                "Processed measurements dataframe of shape: (%i, %i)",
                measurements.shape[0],
                measurements.shape[1],
            )

            # upload measurements, a column with the db is added to the df
            measurements = make_upload_measurements(measurements)

            # save measurements to parquet file in pipeline run folder
            base_folder = os.path.dirname(img.measurements_path)
            if not os.path.exists(base_folder):
                os.makedirs(base_folder)

            measurements.to_parquet(img.measurements_path, index=False)
            del measurements, image, band, img

    logger.info(
        'Total images upload/loading time: %.2f seconds',
        timer.reset_init()
    )

    return images, skyregions, bands

make_upload_measurements(measurements_df)

Uploads the measurements from the supplied measurements DataFrame.

Parameters:

Name Type Description Default
measurements_df DataFrame

DataFrame containing the measurements information from the pipeline.

required

Returns:

Type Description
DataFrame

Original DataFrame with the database ID attached to each row.

Source code in vast_pipeline/pipeline/loading.py
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
def make_upload_measurements(measurements_df: pd.DataFrame) -> pd.DataFrame:
    """
    Uploads the measurements from the supplied measurements DataFrame.

    Args:
        measurements_df:
            DataFrame containing the measurements information from the
            pipeline.

    Returns:
        Original DataFrame with the database ID attached to each row.
    """

    logger.info("Upload measurements...")
    mem_usage = get_df_memory_usage(measurements_df)
    logger.debug(f"measurements_df memory usage: {mem_usage}MB")
    log_total_memory_usage()

    meas_dj_ids = bulk_upload_model(
        Measurement,
        measurement_models_generator(measurements_df),
        return_ids=True
    )

    measurements_df['id'] = meas_dj_ids

    return measurements_df

Uploads the related sources from the supplied related sources DataFrame.

Parameters:

Name Type Description Default
related_df DataFrame

DataFrame containing the related sources information from the pipeline.

required

Returns:

Type Description
None

None.

Source code in vast_pipeline/pipeline/loading.py
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
def make_upload_related_sources(related_df: pd.DataFrame) -> None:
    """
    Uploads the related sources from the supplied related sources DataFrame.

    Args:
        related_df:
            DataFrame containing the related sources information from the
            pipeline.

    Returns:
        None.
    """
    logger.info('Populate "related" field of sources...')
    mem_usage = get_df_memory_usage(related_df)
    logger.debug(f"related_df memory usage: {mem_usage}MB")
    log_total_memory_usage()
    bulk_upload_model(RelatedSource, related_models_generator(related_df))

make_upload_sources(sources_df, pipeline_run, add_mode=False)

Delete previous sources for given pipeline run and bulk upload new found sources as well as related sources.

Parameters:

Name Type Description Default
sources_df DataFrame

Holds the measurements associated into sources. The output of of thE association step.

required
pipeline_run Run

The pipeline Run object.

required
add_mode bool

Whether the pipeline is running in add image mode.

False

Returns:

Type Description
DataFrame

The input dataframe with the 'id' column added.

Source code in vast_pipeline/pipeline/loading.py
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
def make_upload_sources(
    sources_df: pd.DataFrame, pipeline_run: Run, add_mode: bool = False
) -> pd.DataFrame:
    '''
    Delete previous sources for given pipeline run and bulk upload
    new found sources as well as related sources.

    Args:
        sources_df:
            Holds the measurements associated into sources. The output of of
            thE association step.
        pipeline_run:
            The pipeline Run object.
        add_mode:
            Whether the pipeline is running in add image mode.

    Returns:
        The input dataframe with the 'id' column added.
    '''

    logger.debug("Uploading sources...")
    mem_usage = get_df_memory_usage(sources_df)
    logger.debug(f"sources_df memory usage: {mem_usage}MB")
    log_total_memory_usage()

    # create sources in DB
    with transaction.atomic():
        if (add_mode is False and
                Source.objects.filter(run=pipeline_run).exists()):
            logger.info('Removing objects from previous pipeline run')
            n_del, detail_del = (
                Source.objects.filter(run=pipeline_run).delete()
            )
            logger.info(
                ('Deleting all sources and related objects for this run. '
                 'Total objects deleted: %i'),
                n_del,
            )
            logger.debug('(type, #deleted): %s', detail_del)

    src_dj_ids = bulk_upload_model(
        Source,
        source_models_generator(sources_df, pipeline_run=pipeline_run),
        return_ids=True
    )

    sources_df['id'] = src_dj_ids

    return sources_df

update_sources(sources_df, batch_size=10000)

Update database using SQL code. This function opens one connection to the database, and closes it after the update is done.

Parameters:

Name Type Description Default
sources_df DataFrame

DataFrame containing the new data to be uploaded to the database. The columns to be updated need to have the same headers between the df and the table in the database.

required
batch_size int

The df rows are broken into chunks, each chunk is executed in a separate SQL command, batch_size determines the maximum size of the chunk.

10000

Returns:

Type Description
DataFrame

DataFrame containing the new data to be uploaded to the database.

Source code in vast_pipeline/pipeline/loading.py
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
def update_sources(
    sources_df: pd.DataFrame, batch_size: int = 10_000
) -> pd.DataFrame:
    '''
    Update database using SQL code. This function opens one connection to the
    database, and closes it after the update is done.

    Args:
        sources_df:
            DataFrame containing the new data to be uploaded to the database.
            The columns to be updated need to have the same headers between
            the df and the table in the database.
        batch_size:
            The df rows are broken into chunks, each chunk is executed in a
            separate SQL command, batch_size determines the maximum size of the
            chunk.

    Returns:
        DataFrame containing the new data to be uploaded to the database.
    '''
    # Get all possible columns from the model
    all_source_table_cols = [
        fld.attname for fld in Source._meta.get_fields()
        if getattr(fld, 'attname', None) is not None
    ]

    # Filter to those present in sources_df
    columns = [
        col for col in all_source_table_cols if col in sources_df.columns
    ]

    sources_df['id'] = sources_df.index.values

    batches = np.ceil(len(sources_df) / batch_size)
    dfs = np.array_split(sources_df, batches)
    with connection.cursor() as cursor:
        for df_batch in dfs:
            SQL_comm = SQL_update(
                df_batch, Source, index='id', columns=columns
            )
            cursor.execute(SQL_comm)

    return sources_df