1
2
3
4 """
5 Releases non-survey data up to the given last night. First prepares the
6 non-survey data by applying quality control, before automatically
7 determining what kind of survey the programme is, which allows the curation
8 tables to be updated and survey tables defined and created. Finally,
9 performs all necessary release curation tasks on the complete set of
10 non-survey data in the load database and creates a release database on the
11 public catalogue server.
12
13 @author: R.S. Collins
14 @org: WFAU, IfA, University of Edinburgh
15
16 @todo: Have an option to prepare the release of just shallow or deep surveys
17 (i.e. first stage run). However, this requires the script to do
18 the ProgrammeBuilder setup first to determine whether it is shallow
19 or deep, then the ProgrammeTable needs to test and skip if not the
20 right type (without failure).
21 """
22
23 from __future__ import division, print_function
24 from future_builtins import map
25
26 from operator import itemgetter
27 import os
28 import shutil
29 import time
30
31 from invocations.cu19.cu19 import Cu19
32
33 from wsatools.Automator import autoCurate
34 from wsatools.CLI import CLI
35 import wsatools.CSV as csv
36 from wsatools.DataFactory import ProgrammeTable
37 from wsatools.DbConnect.CuSession import CuSession
38 from wsatools.DbConnect.DbSession import DbSession, Join, SelectSQL
39 from wsatools.FitsToDb import ProvenanceFiller
40 import wsatools.FitsUtils as fits
41 from wsatools.Logger import Logger
42 from wsatools.ProgrammeBuilder import ProgrammeBuilder, commitSchema
43 from wsatools.QualityControl import NonSurveyDeprecator
44 from wsatools.SystemConstants import SystemConstants
45 import wsatools.Utilities as utils
46
47
49 """
50 Performs post-data-ingest curation on non-survey data and automatically
51 prepares release databases.
52
53 @todo: This class borrows heavily on the L{CuSession} design, so need to
54 derive both L{CuSession} and this class from a class that represents
55 the common super set.
56 """
57
58 archive = None
59
60 cli = None
61
62 dateRange = SystemConstants().obsCal.dateRange()
63
64 failures = []
65
66 fileListForType = None
67
68 isDryRun = False
69
70 isReleaseRun = False
71
72 isShallowOnly = False
73
74 progIDs = []
75
76 progTable = None
77
78 stage = "No curation task performed"
79
80 successes = []
81
82 sysc = SystemConstants()
83
84 workSpace = ''
85
86
87
130
131
132
134 """ Executes the _onRun() method of the session within a try block, so
135 that all the standard exceptions may be dealt with correctly.
136
137 @todo: This should become part of abstracted class at a level above
138 CuSession.
139 """
140 logFileName = '%s_NonSurveyRelease_%s.log' % \
141 (self.archive.database, time.strftime('%Y%m%d_%H%M'))
142 try:
143 self._onRun()
144 except BaseException as error:
145 with utils.noInterrupt():
146 stdErrMsg = CuSession.logException(error)
147 if error.args:
148 stdErrMsg += ": see log " + logFileName
149 raise SystemExit(stdErrMsg)
150 finally:
151 try:
152 self._onCompletion()
153 finally:
154
155 if not self.archive.isTrialRun:
156 Logger(logFileName)
157
158
159
161 """
162 Queries database and non-survey registration files to determine
163 selection of programmes to release according to command-line options.
164
165 @return: List of acronyms of programmes to release.
166 @rtype: list(str)
167
168 """
169 if self.cli.getOpt("progID") != "NONSURVEYS":
170 progIDs = set(self.progTable.setProgID(progID) for progID
171 in csv.values(self.cli.getOpt("progID")))
172 else:
173
174 allProgs = \
175 [self.progTable.getAttr("dfsIdString", programmeID=progID)
176 for progID in self.progTable.getProgIDList(onlyNonSurvey=True)]
177
178
179 progNames = (set(allProgs) if self.cli.getOpt("non_registered") else
180 self.getRegisteredProgs().intersection(allProgs))
181
182 if not self.cli.getOpt("all"):
183 progNames -= set(self.archive.query("dfsIdString",
184 fromStr="Programme, Release",
185 whereStr="Programme.programmeID=Release.surveyID"))
186
187 progIDs = set(self.progTable.setProgID(progName)
188 for progName in progNames)
189
190 if self.cli.getOpt("exclude"):
191 progIDs -= set(self.progTable.setProgID(prog)
192 for prog in csv.values(self.cli.getOpt("exclude")))
193
194
195 if self.isReleaseRun:
196 progIDs.intersection_update(self.archive.query("programmeID",
197 "Programme, Survey", "Programme.programmeID=Survey.surveyID"))
198
199
200 if self.isShallowOnly:
201 progIDs = set(progID for progID in progIDs
202 if not self.progTable.isDeep(programmeID=progID))
203
204
205 for progID, firstNight in self.getFirstNights(progIDs):
206 if utils.makeDateTime(firstNight) > self.dateRange.end:
207 progIDs.discard(progID)
208
209
210 xSemProgs = set()
211 for progID in progIDs:
212 if self.archive.queryNumRows("ProgrammeFrame",
213 whereStr="multiframeID IN (%s)"
214 % SelectSQL("multiframeID", "ProgrammeFrame",
215 "programmeID=%s" % progID),
216 distinctAttr="programmeID") is not 1:
217
218 if self.cli.getOpt("progID") != "NONSURVEYS":
219 msg = ("<WARNING> %s (%s) is a cross-programme "
220 "non-survey. If its detections are not shared amongst "
221 "all programmeIDs then this script will fail to do the "
222 "right thing.")
223 else:
224 xSemProgs.add(progID)
225 msg = ("<Info> %s (%s) is a, presently "
226 "unsupported, cross-programme non-survey and so is "
227 "excluded from release.")
228
229 Logger.addMessage(msg
230 % (self.progTable.getAcronym(progID).upper(), progID))
231
232 progIDs -= xSemProgs
233
234
235 if not self.isShallowOnly and self.isReleaseRun:
236 self.isShallowOnly = \
237 all(not self.progTable.isDeep(programmeID=progID)
238 for progID in progIDs)
239
240 return \
241 [self.progTable.getAcronym(progID) for progID in sorted(progIDs)]
242
243
244
246 """
247 Moves the file containing the list of deep products to ingest to the
248 share path of the server hosting the database where the products will
249 be stored.
250
251 """
252 ingestDir = self.archive.sharePath("ingestLists")
253 utils.ensureDirExist(ingestDir)
254 for fileListPath in self.fileListForType.values():
255 if os.path.exists(fileListPath):
256 ingestPath = \
257 os.path.join(ingestDir, os.path.basename(fileListPath))
258
259 if os.path.exists(ingestPath):
260 Logger.addMessage("<WARNING> %s already exists! Renaming "
261 "to %s." % (ingestPath, ingestPath + ".old"))
262
263 shutil.move(ingestPath, ingestPath + ".old")
264
265 shutil.move(fileListPath, ingestPath)
266 Logger.addMessage("<IMPORTANT> Please invoke parallel CUs 2-4 "
267 "on this file list: " + ingestPath)
268
269
270
291
292
293
338
339
340
366
367
368
370 """
371 Performs part B auto-curation and release for given programme.
372
373 @param progID: Programme to curate, can be acronym or unique ID.
374 @type progID: str
375 @param relDate: Date for release database name YYYYMMDD.
376 @type relDate: str
377
378 """
379 self.stage = "Automated Curation (Part B)"
380
381
382 isComplete = \
383 autoCurate(self.cli, progID, dateRange=self.dateRange, doCu6=True)
384
385 if not isComplete:
386 raise CuSession.CuError("Programme %s is not ready for release."
387 % progID)
388
389
390 self.stage = "CU19"
391 Logger.addMessage(self.cli.getProgDetails("CU19"))
392 progName = (progID.upper() if not progID.isdigit() else
393 self.progTable.getAcronym(int(progID)).upper())
394
395 releaseServer = self.sysc.publicServers[0]
396 releaseDb = "%sv%s" % (progName, relDate)
397 Cu19(progID, releaseServer + '.' + releaseDb,
398 self.cli.getOpt("curator"), 'Release of ' + releaseDb,
399 self.cli.getArg("database"), self.cli.getOpt("test"),
400 self.cli.getOpt("user")).run()
401
402
403
432
433
434
450
451
452
454 """
455 Wrapper method to apply the given method to every programme in
456 self.progIDs, whilst handling exceptions, so that if one programme
457 fails its failure is logged and execution is continued with the next
458 programme.
459
460 @param method: Method to apply to each progID in self.progIDs.
461 @type method: instancemethod or function
462 @param kwds: Optional keyword argument list for the method.
463 @type kwds: dict
464
465 """
466 for progID in self.progIDs:
467 try:
468 method(progID, **kwds)
469 except BaseException as error:
470 with utils.noInterrupt():
471 CuSession.logException(error)
472 self.failures.append((progID, self.stage))
473 Logger.addMessage(
474 "<ERROR> Curation for programme %s failed at stage %s." %
475 self.failures[-1])
476
477 if isinstance(error, KeyboardInterrupt) \
478 or str(error) == KeyboardInterrupt.__name__:
479 stage = "(curation not started)"
480 remains = self.progIDs[self.progIDs.index(progID) + 1:]
481 self.failures.extend((pID, stage) for pID in remains)
482 break
483 else:
484 self.successes.append(progID)
485
486 if self.failures:
487 self.progIDs = [progID for progID in self.progIDs
488 if progID not in map(itemgetter(0), self.failures)]
489
490 if not self.progIDs:
491 raise CuSession.CuError("All non-surveys failed!")
492
493
494
496 """
497 Database query to determine the first night of observation for given
498 progs.
499
500 @param progIDs: List of programme IDs to query.
501 @type progIDs: list(int)
502
503 @return: List of programmes with date of the first night of
504 observation.
505 @rtype: list(tuple(int, str))
506
507 """
508 if not progIDs:
509 return []
510
511 return self.archive.query("programmeID, MIN(utDate)",
512 Join(["Multiframe", "ProgrammeFrame"], ["multiframeID"]),
513 "programmeID IN (%s) " % ','.join(map(str, progIDs)) +
514 "GROUP BY programmeID")
515
516
517
519 """ @return: Set of registered non-survey programme dfsIdStrings.
520 @rtype: set(str)
521 """
522 progNames = set()
523 for fileName in os.listdir(self.sysc.nonSurveyRegPath()):
524 if '.reg' in fileName:
525 path = self.sysc.nonSurveyRegPath(fileName)
526 for line in utils.ParsedFile(path):
527 if line.startswith("programme="):
528 progNames.add(line.replace("programme=", ""))
529 break
530
531 return progNames
532
533
534
535
536
537 if __name__ == "__main__":
538
539 CLI.progArgs.remove('programmeID')
540 CLI.progArgs.append(CLI.Argument("end_date", "07A", isValOK=CLI.isDateOK))
541
542 CLI.progOpts += [
543 CLI.Option('a', "all",
544 "release all registered programmes not just newly registered ones"),
545 CLI.Option('d', "dry_run",
546 "just print list of programmes that will be curated"),
547 CLI.Option('m', "max_stacks",
548 "set the maximum number of components of a deep stack",
549 "NUMBER", str(ProgrammeBuilder.numberStks),
550 isValOK=lambda val: val.isdigit()),
551 CLI.Option('n', "non_registered",
552 "include non-registered programmes in release"),
553 CLI.Option('p', "progID",
554 "release just the non-survey data with these programme IDs",
555 "LIST", "NONSURVEYS"),
556 CLI.Option('q', "qc_only",
557 "just update the quality control deprecations for these data"),
558 CLI.Option('r', "release",
559 "create release databases (only run when quality controlled and setup)"
560 ),
561 CLI.Option('s', "shallow_only",
562 "release only shallow surveys (with option --release)"),
563 CLI.Option('x', "exclude",
564 "exclude these programme IDs",
565 "LIST")]
566
567
568 CLI.progArgs['comment'] = "Preparing non-survey for release"
569
570 cli = CLI("NonSurveyRelease", "$Revision: 10022 $", __doc__)
571 Logger.isVerbose = cli.getOpt('verbose')
572 Logger.addMessage(cli.getProgDetails())
573
574 if cli.getOpt("dry_run") and not cli.getOpt("test"):
575 exit("Please specify -t/--test when supplying -d/--dry_run")
576
577 if cli.getOpt("shallow_only") and not cli.getOpt("release"):
578 exit("Option -s/--shallow_only may only be used in combination with "
579 "option -r/--release")
580
581 if not any([cli.getOpt("shallow_only"), cli.getOpt("release"),
582 cli.getOpt("qc_only")]):
583 CLI.check64bitServer()
584
585 cal = SystemConstants(cli.getArg("database").split('.')[-1]).obsCal
586 try:
587 NonSurveyCurator.dateRange = cal.dateRange(end=cli.getArg("end_date"))
588 except Exception as error:
589 raise SystemExit("Illegal Option: " + str(error))
590
591 NonSurveyDeprecator.endDate = NonSurveyCurator.dateRange.end
592 Cu19.dateRange = ProgrammeBuilder.dateRange = NonSurveyCurator.dateRange
593
594 NonSurveyCurator(cli).run()
595
596
597
598
599
600