36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333 | def run_pipe(
name: str, path_name: Optional[str] = None,
run_dj_obj: Optional[Run] = None, cli: bool = True,
debug: bool = False, user: Optional[User] = None, full_rerun: bool = False,
prev_ui_status: str = 'END'
) -> bool:
'''
Main function to run the pipeline.
Args:
name:
The name of the pipeline run (p_run.name).
path_name:
The path of the directory of the pipeline run (p_run.path),
defaults to None.
run_dj_obj:
The Run object of the pipeline run, defaults to None.
cli:
Flag to signify whether the pipeline run has been run via the UI
(False), or the command line (True). Defaults to True.
debug:
Flag to signify whether to enable debug verbosity to the logging
output. Defaults to False.
user:
The User of the request if made through the UI. Defaults to None.
full_rerun:
If the run already exists, a complete rerun will be performed which
will remove and replace all the previous results.
prev_ui_status:
The previous status through the UI. Defaults to 'END'.
Returns:
Boolean equal to `True` on a successful completion, or in cases of
failures a CommandError is returned.
'''
path = run_dj_obj.path if run_dj_obj else path_name
# set up logging for running pipeline from UI
if not cli:
# set up the logger for the UI job
root_logger = logging.getLogger('')
if debug:
root_logger.setLevel(logging.DEBUG)
f_handler = logging.FileHandler(
os.path.join(path, 'log.txt'),
mode='w'
)
f_handler.setFormatter(root_logger.handlers[0].formatter)
root_logger.addHandler(f_handler)
pipeline = Pipeline(
name=run_dj_obj.name if run_dj_obj else name,
config_path=os.path.join(path, 'config.yaml'),
validate_config=False, # delay validation
)
# Create the pipeline run in DB
p_run, flag_exist = get_create_p_run(
pipeline.name,
pipeline.config["run"]["path"],
)
# copy across config file at the start
logger.debug("Copying temp config file.")
shutil.copyfile(
os.path.join(p_run.path, 'config.yaml'),
os.path.join(p_run.path, 'config_temp.yaml')
)
# validate run configuration
try:
pipeline.config.validate(user=user)
except PipelineConfigError as e:
if debug:
traceback.print_exc()
logger.exception('Config error:\n%s', e)
msg = f'Config error:\n{e}'
# If the run is already created (e.g. through UI) then set status to
# error
pipeline.set_status(p_run, 'ERR')
raise CommandError(msg) if cli else PipelineConfigError(msg)
# clean up pipeline images and forced measurements for re-runs
# Scenarios:
# A. Complete Re-run: If the job is marked as successful then backup
# old parquets and proceed to remove parquets along with forced
# extractions from the database.
# B. Additional Run on successful run: Backup parquets, remove current
# parquets and proceed.
# C. Additional Run on errored run: Do not backup parquets, just delete
# current.
# Flag on the pipeline object on whether the addition mode is on or off.
pipeline.add_mode = False
pipeline.previous_parquets = {}
if not flag_exist:
# check for and remove any present .parquet (and .arrow) files
parquets = (
glob.glob(os.path.join(p_run.path, "*.parquet"))
# TODO Remove arrow when vaex support is dropped.
+ glob.glob(os.path.join(p_run.path, "*.arrow"))
+ glob.glob(os.path.join(p_run.path, "*.bak"))
)
for parquet in parquets:
os.remove(parquet)
else:
# Check if the status is already running or queued. Exit if this is the
# case.
if p_run.status in ['RUN', 'RES']:
logger.error(
"The pipeline run requested to process already has a running"
" or restoring status! Performing no actions. Exiting."
)
return True
# Check for an error status and whether any previous config file
# exists - if it doesn't exist it means the run has failed during
# the first run. In this case we want to clear anything that has gone
# on before so to do that `complete-rerun` mode is activated.
if p_run.status == "ERR" and not os.path.isfile(
os.path.join(p_run.path, "config_prev.yaml")
):
full_rerun = True
# Backup the previous run config
if os.path.isfile(
os.path.join(p_run.path, 'config_prev.yaml')
):
shutil.copy(
os.path.join(p_run.path, 'config_prev.yaml'),
os.path.join(p_run.path, 'config.yaml.bak')
)
# Check if the run has only been initialised, if so we don't want to do
# any previous run checks or cleaning.
if p_run.status == 'INI':
initial_run = True
# check if coming from UI
elif cli is False and prev_ui_status == 'INI':
initial_run = True
else:
initial_run = False
if initial_run is False:
parquets = (
glob.glob(os.path.join(p_run.path, "*.parquet"))
# TODO Remove arrow when arrow files are no longer needed.
+ glob.glob(os.path.join(p_run.path, "*.arrow"))
)
if full_rerun:
if p_run.status == 'END':
backup_parquets(p_run.path)
logger.info(
'Cleaning up pipeline run before re-process data'
)
p_run.image_set.clear()
logger.info(
'Cleaning up forced measurements before re-process data'
)
remove_forced_meas(p_run.path)
for parquet in parquets:
os.remove(parquet)
# remove bak files
bak_files = glob.glob(os.path.join(p_run.path, "*.bak"))
if bak_files:
for bf in bak_files:
os.remove(bf)
# remove previous config if it exists
if os.path.isfile(os.path.join(p_run.path, 'config_prev.yaml')):
os.remove(os.path.join(p_run.path, 'config_prev.yaml'))
# reset epoch_based flag
with transaction.atomic():
p_run.epoch_based = False
p_run.save()
else:
# Before parquets are started to be copied and backed up, a
# check is run to see if anything has actually changed in
# the config
config_diff = pipeline.config.check_prev_config_diff()
if config_diff:
logger.info(
"The config file has either not changed since the"
" previous run or other settings have changed such"
" that a new or complete re-run should be performed"
" instead. Performing no actions. Exiting."
)
os.remove(os.path.join(p_run.path, 'config_temp.yaml'))
pipeline.set_status(p_run, 'END')
return True
if pipeline.config.epoch_based != p_run.epoch_based:
logger.info(
"The 'epoch based' setting has changed since the"
" previous run. A complete re-run is required if"
" changing to epoch based mode or vice versa."
)
os.remove(os.path.join(p_run.path, 'config_temp.yaml'))
pipeline.set_status(p_run, 'END')
return True
if cli and p_run.status == 'END':
backup_parquets(p_run.path)
elif not cli and prev_ui_status == 'END':
backup_parquets(p_run.path)
pipeline.add_mode = True
for i in [
'images', 'associations', 'sources', 'relations',
'measurement_pairs'
]:
pipeline.previous_parquets[i] = os.path.join(
p_run.path, f'{i}.parquet.bak')
if pipeline.config["run"]["suppress_astropy_warnings"]:
warnings.simplefilter("ignore", category=AstropyWarning)
logger.info("VAST Pipeline version: %s", pipeline_version)
logger.info("Source finder: %s", pipeline.config["measurements"]["source_finder"])
logger.info("Using pipeline run '%s'", pipeline.name)
logger.info("Source monitoring: %s", pipeline.config["source_monitoring"]["monitor"])
# log the list of input data files for posterity
input_image_list = [
image
for image_list in pipeline.config["inputs"]["image"].values()
for image in image_list
]
input_selavy_list = [
selavy
for selavy_list in pipeline.config["inputs"]["selavy"].values()
for selavy in selavy_list
]
input_noise_list = [
noise
for noise_list in pipeline.config["inputs"]["noise"].values()
for noise in noise_list
]
if "background" in pipeline.config["inputs"].keys():
input_background_list = [
background
for background_list in pipeline.config["inputs"]["background"].values()
for background in background_list
]
else:
input_background_list = ["N/A", ] * len(input_image_list)
for image, selavy, noise, background in zip(
input_image_list, input_selavy_list, input_noise_list, input_background_list
):
logger.info(
"Matched inputs - image: %s, selavy: %s, noise: %s, background: %s",
image,
selavy,
noise,
background,
)
stopwatch = StopWatch()
# run the pipeline operations
try:
# check if max runs number is reached
pipeline.check_current_runs()
# run the pipeline
pipeline.set_status(p_run, 'RUN')
pipeline.process_pipeline(p_run)
# Create arrow file after success if selected.
if pipeline.config["measurements"]["write_arrow_files"]:
create_measurements_arrow_file(p_run)
create_measurement_pairs_arrow_file(p_run)
except Exception as e:
# set the pipeline status as error
pipeline.set_status(p_run, 'ERR')
logger.exception('Processing error:\n%s', e)
raise CommandError(f'Processing error:\n{e}')
# copy across config file now that it is successful
logger.debug("Copying and cleaning temp config file.")
shutil.copyfile(
os.path.join(p_run.path, 'config_temp.yaml'),
os.path.join(p_run.path, 'config_prev.yaml'))
os.remove(os.path.join(p_run.path, 'config_temp.yaml'))
# set the pipeline status as completed
pipeline.set_status(p_run, 'END')
logger.info(
'Total pipeline processing time %.2f sec',
stopwatch.reset()
)
return True
|