classCommand(BaseCommand):""" This script is used to clean the data for pipeline run(s). Use --help for usage. """help=('Delete a pipeline run and all related images, sources, etc.'' Will not delete objects if they are also related to another ''pipeline run.')defadd_arguments(self,parser:ArgumentParser)->None:""" Enables arguments for the command. Args: parser (ArgumentParser): The parser object of the command. Returns: None """# positional arguments (required)parser.add_argument('piperuns',nargs='+',type=str,default=None,help=('Name or path of pipeline run(s) to delete. Pass "clearall" to'' delete all the runs.'))# keyword arguments (optional)parser.add_argument('--keep-parquet',required=False,default=False,action='store_true',help=('Flag to keep the pipeline run(s) parquet files. ''Will also apply to arrow files if present.'))parser.add_argument('--remove-all',required=False,default=False,action='store_true',help='Flag to remove all the content of the pipeline run(s) folder.')defhandle(self,*args,**options)->None:""" Handle function of the command. Args: *args: Variable length argument list. **options: Variable length options. Returns: None """# configure loggingifoptions['verbosity']>1:# set root logger to use the DEBUG levelroot_logger=logging.getLogger('')root_logger.setLevel(logging.DEBUG)# set the traceback onoptions['traceback']=Trueifoptions['keep_parquet']andoptions['remove_all']:raiseCommandError('"--keep-parquets" flag is incompatible with "--remove-all" flag')piperuns=options['piperuns']flag_all_runs=Trueif'clearall'inpiperunselseFalseifflag_all_runs:logger.info('clearing all pipeline run in the database')piperuns=list(Run.objects.values_list('name',flat=True))forpiperuninpiperuns:p_run_name=get_p_run_name(piperun)try:p_run=Run.objects.get(name=p_run_name)exceptRun.DoesNotExist:raiseCommandError(f'Pipeline run {p_run_name} does not exist')logger.info("Deleting pipeline '%s' from database",p_run_name)p_run.delete()# remove forced measurements in db if presentsforced_parquets=remove_forced_meas(p_run.path)# Delete parquet or folder eventuallyifnotoptions['keep_parquet']andnotoptions['remove_all']:logger.info('Deleting pipeline "%s" parquets',p_run_name)parquets=(glob(os.path.join(p_run.path,'*.parquet'))+glob(os.path.join(p_run.path,'*.arrow')))forparquetinparquets:try:os.remove(parquet)exceptOSErrorase:self.stdout.write(self.style.WARNING(f'Parquet file "{os.path.basename(parquet)}" not existent'))passifoptions['remove_all']:logger.info('Deleting pipeline folder')try:shutil.rmtree(p_run.path)exceptExceptionase:self.stdout.write(self.style.WARNING(f'Issues in removing run folder: {e}'))pass
defadd_arguments(self,parser:ArgumentParser)->None:""" Enables arguments for the command. Args: parser (ArgumentParser): The parser object of the command. Returns: None """# positional arguments (required)parser.add_argument('piperuns',nargs='+',type=str,default=None,help=('Name or path of pipeline run(s) to delete. Pass "clearall" to'' delete all the runs.'))# keyword arguments (optional)parser.add_argument('--keep-parquet',required=False,default=False,action='store_true',help=('Flag to keep the pipeline run(s) parquet files. ''Will also apply to arrow files if present.'))parser.add_argument('--remove-all',required=False,default=False,action='store_true',help='Flag to remove all the content of the pipeline run(s) folder.')
defhandle(self,*args,**options)->None:""" Handle function of the command. Args: *args: Variable length argument list. **options: Variable length options. Returns: None """# configure loggingifoptions['verbosity']>1:# set root logger to use the DEBUG levelroot_logger=logging.getLogger('')root_logger.setLevel(logging.DEBUG)# set the traceback onoptions['traceback']=Trueifoptions['keep_parquet']andoptions['remove_all']:raiseCommandError('"--keep-parquets" flag is incompatible with "--remove-all" flag')piperuns=options['piperuns']flag_all_runs=Trueif'clearall'inpiperunselseFalseifflag_all_runs:logger.info('clearing all pipeline run in the database')piperuns=list(Run.objects.values_list('name',flat=True))forpiperuninpiperuns:p_run_name=get_p_run_name(piperun)try:p_run=Run.objects.get(name=p_run_name)exceptRun.DoesNotExist:raiseCommandError(f'Pipeline run {p_run_name} does not exist')logger.info("Deleting pipeline '%s' from database",p_run_name)p_run.delete()# remove forced measurements in db if presentsforced_parquets=remove_forced_meas(p_run.path)# Delete parquet or folder eventuallyifnotoptions['keep_parquet']andnotoptions['remove_all']:logger.info('Deleting pipeline "%s" parquets',p_run_name)parquets=(glob(os.path.join(p_run.path,'*.parquet'))+glob(os.path.join(p_run.path,'*.arrow')))forparquetinparquets:try:os.remove(parquet)exceptOSErrorase:self.stdout.write(self.style.WARNING(f'Parquet file "{os.path.basename(parquet)}" not existent'))passifoptions['remove_all']:logger.info('Deleting pipeline folder')try:shutil.rmtree(p_run.path)exceptExceptionase:self.stdout.write(self.style.WARNING(f'Issues in removing run folder: {e}'))pass