Bases: BaseCommand
This script is used to clean the data for pipeline run(s). Use --help for usage.
Source code in vast_pipeline/management/commands/clearpiperun.py
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144 | class Command(BaseCommand):
"""
This script is used to clean the data for pipeline run(s).
Use --help for usage.
"""
help = (
'Delete a pipeline run and all related images, sources, etc.'
' Will not delete objects if they are also related to another '
'pipeline run.'
)
def add_arguments(self, parser: ArgumentParser) -> None:
"""
Enables arguments for the command.
Args:
parser (ArgumentParser): The parser object of the command.
Returns:
None
"""
# positional arguments (required)
parser.add_argument(
'piperuns',
nargs='+',
type=str,
default=None,
help=(
'Name or path of pipeline run(s) to delete. Pass "clearall" to'
' delete all the runs.'
)
)
# keyword arguments (optional)
parser.add_argument(
'--keep-parquet',
required=False,
default=False,
action='store_true',
help=(
'Flag to keep the pipeline run(s) parquet files. '
'Will also apply to arrow files if present.'
)
)
parser.add_argument(
'--remove-all',
required=False,
default=False,
action='store_true',
help='Flag to remove all the content of the pipeline run(s) folder.'
)
def handle(self, *args, **options) -> None:
"""
Handle function of the command.
Args:
*args: Variable length argument list.
**options: Variable length options.
Returns:
None
"""
# configure logging
if options['verbosity'] > 1:
# set root logger to use the DEBUG level
root_logger = logging.getLogger('')
root_logger.setLevel(logging.DEBUG)
# set the traceback on
options['traceback'] = True
if options['keep_parquet'] and options['remove_all']:
raise CommandError(
'"--keep-parquets" flag is incompatible with "--remove-all" flag'
)
piperuns = options['piperuns']
flag_all_runs = True if 'clearall' in piperuns else False
if flag_all_runs:
logger.info('clearing all pipeline run in the database')
piperuns = list(Run.objects.values_list('name', flat=True))
for piperun in piperuns:
p_run_name = get_p_run_name(piperun)
try:
p_run = Run.objects.get(name=p_run_name)
except Run.DoesNotExist:
raise CommandError(f'Pipeline run {p_run_name} does not exist')
logger.info("Deleting pipeline '%s' from database", p_run_name)
with transaction.atomic():
p_run.status = 'DEL'
p_run.save()
p_run.delete()
# remove forced measurements in db if presents
forced_parquets = remove_forced_meas(p_run.path)
# Delete parquet or folder eventually
if not options['keep_parquet'] and not options['remove_all']:
logger.info('Deleting pipeline "%s" parquets', p_run_name)
parquets = (
glob(os.path.join(p_run.path, '*.parquet'))
+ glob(os.path.join(p_run.path, '*.arrow'))
)
for parquet in parquets:
try:
os.remove(parquet)
except OSError as e:
self.stdout.write(self.style.WARNING(
f'Parquet file "{os.path.basename(parquet)}" not existent'
))
pass
if options['remove_all']:
logger.info('Deleting pipeline folder')
try:
shutil.rmtree(p_run.path)
except Exception as e:
self.stdout.write(self.style.WARNING(
f'Issues in removing run folder: {e}'
))
pass
|
add_arguments(parser)
Enables arguments for the command.
Parameters:
Name | Type | Description | Default |
parser | ArgumentParser | The parser object of the command. | required |
Returns:
Source code in vast_pipeline/management/commands/clearpiperun.py
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72 | def add_arguments(self, parser: ArgumentParser) -> None:
"""
Enables arguments for the command.
Args:
parser (ArgumentParser): The parser object of the command.
Returns:
None
"""
# positional arguments (required)
parser.add_argument(
'piperuns',
nargs='+',
type=str,
default=None,
help=(
'Name or path of pipeline run(s) to delete. Pass "clearall" to'
' delete all the runs.'
)
)
# keyword arguments (optional)
parser.add_argument(
'--keep-parquet',
required=False,
default=False,
action='store_true',
help=(
'Flag to keep the pipeline run(s) parquet files. '
'Will also apply to arrow files if present.'
)
)
parser.add_argument(
'--remove-all',
required=False,
default=False,
action='store_true',
help='Flag to remove all the content of the pipeline run(s) folder.'
)
|
handle(*args, **options)
Handle function of the command.
Parameters:
Name | Type | Description | Default |
*args | | Variable length argument list. | () |
**options | | | {} |
Returns:
Source code in vast_pipeline/management/commands/clearpiperun.py
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144 | def handle(self, *args, **options) -> None:
"""
Handle function of the command.
Args:
*args: Variable length argument list.
**options: Variable length options.
Returns:
None
"""
# configure logging
if options['verbosity'] > 1:
# set root logger to use the DEBUG level
root_logger = logging.getLogger('')
root_logger.setLevel(logging.DEBUG)
# set the traceback on
options['traceback'] = True
if options['keep_parquet'] and options['remove_all']:
raise CommandError(
'"--keep-parquets" flag is incompatible with "--remove-all" flag'
)
piperuns = options['piperuns']
flag_all_runs = True if 'clearall' in piperuns else False
if flag_all_runs:
logger.info('clearing all pipeline run in the database')
piperuns = list(Run.objects.values_list('name', flat=True))
for piperun in piperuns:
p_run_name = get_p_run_name(piperun)
try:
p_run = Run.objects.get(name=p_run_name)
except Run.DoesNotExist:
raise CommandError(f'Pipeline run {p_run_name} does not exist')
logger.info("Deleting pipeline '%s' from database", p_run_name)
with transaction.atomic():
p_run.status = 'DEL'
p_run.save()
p_run.delete()
# remove forced measurements in db if presents
forced_parquets = remove_forced_meas(p_run.path)
# Delete parquet or folder eventually
if not options['keep_parquet'] and not options['remove_all']:
logger.info('Deleting pipeline "%s" parquets', p_run_name)
parquets = (
glob(os.path.join(p_run.path, '*.parquet'))
+ glob(os.path.join(p_run.path, '*.arrow'))
)
for parquet in parquets:
try:
os.remove(parquet)
except OSError as e:
self.stdout.write(self.style.WARNING(
f'Parquet file "{os.path.basename(parquet)}" not existent'
))
pass
if options['remove_all']:
logger.info('Deleting pipeline folder')
try:
shutil.rmtree(p_run.path)
except Exception as e:
self.stdout.write(self.style.WARNING(
f'Issues in removing run folder: {e}'
))
pass
|