This module defines the command for creating an arrow output file for a previously completed pipeline run.
Command
Bases: BaseCommand
This command creates measurements and measurement_pairs arrow files for a completed pipeline run.
Source code in vast_pipeline/management/commands/createmeasarrow.py
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147 | class Command(BaseCommand):
"""
This command creates measurements and measurement_pairs arrow files for a
completed pipeline run.
"""
help = (
'Create `measurements.arrow` and `measurement_pairs.arrow` files for a'
' completed pipeline run.'
)
def add_arguments(self, parser: ArgumentParser) -> None:
"""
Enables arguments for the command.
Args:
parser (ArgumentParser): The parser object of the command.
Returns:
None
"""
# positional arguments
parser.add_argument(
'piperun',
type=str,
help='Path or name of the pipeline run.'
)
parser.add_argument(
'--overwrite',
action='store_true',
required=False,
default=False,
help="Overwrite previous 'measurements.arrow' file.",
)
def handle(self, *args, **options) -> None:
"""
Handle function of the command.
Args:
*args: Variable length argument list.
**options: Variable length options.
Returns:
None
"""
piperun = options['piperun']
p_run_name, run_folder = get_p_run_name(
piperun,
return_folder=True
)
# configure logging
root_logger = logging.getLogger('')
f_handler = logging.FileHandler(
os.path.join(run_folder, timeStamped('gen_arrow_log.txt')),
mode='w'
)
f_handler.setFormatter(root_logger.handlers[0].formatter)
root_logger.addHandler(f_handler)
if options['verbosity'] > 1:
# set root logger to use the DEBUG level
root_logger.setLevel(logging.DEBUG)
# set the traceback on
options['traceback'] = True
try:
p_run: Run = Run.objects.get(name=p_run_name)
except Run.DoesNotExist:
raise CommandError(f'Pipeline run {p_run_name} does not exist')
if p_run.status != 'END':
raise CommandError(f'Pipeline run {p_run_name} has not completed.')
measurements_arrow = os.path.join(run_folder, 'measurements.arrow')
measurement_pairs_arrow = os.path.join(
run_folder, 'measurement_pairs.arrow'
)
if os.path.isfile(measurements_arrow):
if options['overwrite']:
logger.info("Removing previous 'measurements.arrow' file.")
os.remove(measurements_arrow)
else:
logger.error(
f'Measurements arrow file already exists for {p_run_name}'
' and `--overwrite` has not been selected.'
)
raise CommandError(
f'Measurements arrow file already exists for {p_run_name}'
' and `--overwrite` has not been selected.'
)
if os.path.isfile(measurement_pairs_arrow):
if options['overwrite']:
logger.info(
"Removing previous 'measurement_pairs.arrow' file."
)
os.remove(measurement_pairs_arrow)
else:
logger.error(
'Measurement pairs arrow file already exists for'
f' {p_run_name} and `--overwrite` has not been selected.'
)
raise CommandError(
'Measurement pairs arrow file already exists for'
f' {p_run_name} and `--overwrite` has not been selected.'
)
logger.info("Creating measurements arrow file for '%s'.", p_run_name)
create_measurements_arrow_file(p_run)
if p_run.get_config(validate_inputs=False, prev=True)["variability"]["pair_metrics"]:
logger.info(
"Creating measurement pairs arrow file for '%s'.", p_run_name
)
create_measurement_pairs_arrow_file(p_run)
logger.info(
"Arrow files created successfully for '%s'!", p_run_name
)
|
add_arguments(parser)
Enables arguments for the command.
Parameters:
Name | Type | Description | Default |
parser | ArgumentParser | The parser object of the command. | required |
Returns:
Source code in vast_pipeline/management/commands/createmeasarrow.py
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56 | def add_arguments(self, parser: ArgumentParser) -> None:
"""
Enables arguments for the command.
Args:
parser (ArgumentParser): The parser object of the command.
Returns:
None
"""
# positional arguments
parser.add_argument(
'piperun',
type=str,
help='Path or name of the pipeline run.'
)
parser.add_argument(
'--overwrite',
action='store_true',
required=False,
default=False,
help="Overwrite previous 'measurements.arrow' file.",
)
|
handle(*args, **options)
Handle function of the command.
Parameters:
Name | Type | Description | Default |
*args | | Variable length argument list. | () |
**options | | | {} |
Returns:
Source code in vast_pipeline/management/commands/createmeasarrow.py
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147 | def handle(self, *args, **options) -> None:
"""
Handle function of the command.
Args:
*args: Variable length argument list.
**options: Variable length options.
Returns:
None
"""
piperun = options['piperun']
p_run_name, run_folder = get_p_run_name(
piperun,
return_folder=True
)
# configure logging
root_logger = logging.getLogger('')
f_handler = logging.FileHandler(
os.path.join(run_folder, timeStamped('gen_arrow_log.txt')),
mode='w'
)
f_handler.setFormatter(root_logger.handlers[0].formatter)
root_logger.addHandler(f_handler)
if options['verbosity'] > 1:
# set root logger to use the DEBUG level
root_logger.setLevel(logging.DEBUG)
# set the traceback on
options['traceback'] = True
try:
p_run: Run = Run.objects.get(name=p_run_name)
except Run.DoesNotExist:
raise CommandError(f'Pipeline run {p_run_name} does not exist')
if p_run.status != 'END':
raise CommandError(f'Pipeline run {p_run_name} has not completed.')
measurements_arrow = os.path.join(run_folder, 'measurements.arrow')
measurement_pairs_arrow = os.path.join(
run_folder, 'measurement_pairs.arrow'
)
if os.path.isfile(measurements_arrow):
if options['overwrite']:
logger.info("Removing previous 'measurements.arrow' file.")
os.remove(measurements_arrow)
else:
logger.error(
f'Measurements arrow file already exists for {p_run_name}'
' and `--overwrite` has not been selected.'
)
raise CommandError(
f'Measurements arrow file already exists for {p_run_name}'
' and `--overwrite` has not been selected.'
)
if os.path.isfile(measurement_pairs_arrow):
if options['overwrite']:
logger.info(
"Removing previous 'measurement_pairs.arrow' file."
)
os.remove(measurement_pairs_arrow)
else:
logger.error(
'Measurement pairs arrow file already exists for'
f' {p_run_name} and `--overwrite` has not been selected.'
)
raise CommandError(
'Measurement pairs arrow file already exists for'
f' {p_run_name} and `--overwrite` has not been selected.'
)
logger.info("Creating measurements arrow file for '%s'.", p_run_name)
create_measurements_arrow_file(p_run)
if p_run.get_config(validate_inputs=False, prev=True)["variability"]["pair_metrics"]:
logger.info(
"Creating measurement pairs arrow file for '%s'.", p_run_name
)
create_measurement_pairs_arrow_file(p_run)
logger.info(
"Arrow files created successfully for '%s'!", p_run_name
)
|