1
2
3 """
4 Framework for parallel-running curation sessions. Performs the set of
5 actions that CUs 1 to 4 are required to do. These curation tasks should
6 inherit from this class.
7
8 @author: E. Sutorius
9 @org: WFAU, IfA, University of Edinburgh
10
11 @newfield contributors: Contributors, Contributors (Alphabetical Order)
12 @contributors: R.S. Collins
13 """
14
15 from future_builtins import zip
16
17 import csv
18 from collections import defaultdict
19 import inspect
20 import mx.DateTime as mxTime
21 import os
22 import re
23 import shutil
24 import signal
25 import socket
26 from subprocess import Popen, PIPE
27 import time
28
29 from wsatools.CLI import CLI
30 import wsatools.DbConnect.DbConstants as dbc
31 from wsatools.DbConnect.DbSession import DbSession, odbc
32 from wsatools.ExternalProcess import Error as ExtError
33 from wsatools.File import File
34 import wsatools.FitsUtils as fits
35 from wsatools.Logger import Logger
36 import wsatools.DbConnect.Schema as schema
37 from wsatools.SystemConstants import SystemConstants
38 import wsatools.Utilities as utils
42 """ A parallel curation session for transfer/ingest CUs 1 to 4.
43 """
45 """ A curation error instead of a programming error. """
46 pass
47
48 beginDateDef = 10000101
49 endDateDef = 99991231
50
51
52 reqWorkDir = None
53 keepWorkDir = False
54
55
56 _defComment = 'A useful comment'
57 _fileList = None
58 _isDbLocked = False
59 _numSessions = 0
60
61
62 sysc = SystemConstants()
63
64
65 CLI.progArgs += [CLI.Argument('comment', _defComment, isOptional=True)]
66 CLI.progOpts += [CLI.Option('c', 'curator', 'name of curator', 'NAME',
67 os.getenv('USER'))]
68
69
70
80 """
81 Initialises data members for all database state flags and details for
82 this curation session. Writes a database lock and opens a connection.
83 Also initiates a Programme table object.
84
85 @param cuNum: Curation task number.
86 @type cuNum: int
87 @param curator: Name of curator.
88 @type curator: str
89 @param comment: Descriptive comment as to why curation task is
90 being performed.
91 @type comment: str
92 @param reqWorkDir: If True, a working directory will be created.
93 @type reqWorkDir: bool
94 @param keepWorkDir: Don't remove working directory.
95 @type keepWorkDir: bool
96 @param database: Name of the database to connect to.
97 @type database: str
98 @param userName: Optionally override default database username.
99 @type userName: str
100 @param autoCommit: If True, do not auto-commit transactions. This
101 speeds up transactions through minimal logging.
102 @type autoCommit: bool
103 @param isTrialRun: If True, do not perform database modifications,
104 just print the SQL statement to the terminal.
105 @type isTrialRun: bool
106
107 """
108 self.attributesFromArguments(
109 inspect.getargspec(IngCuSession.__init__)[0], locals())
110
111
112
113 IngCuSession.sysc = SystemConstants(database.rpartition('.')[2])
114
115
116 self._cuEventID = None
117 self._cuTable = None
118 self.comment = ("Running CU" + str(cuNum)
119 if comment is IngCuSession._defComment else comment)
120
121 self._lockPathName = \
122 os.path.join(os.getenv('HOME'), "dblock-" + self.database)
123 self._logFile = None
124 self._rolledBack = None
125 self._shareFileID = ''
126
127
128 if self.reqWorkDir:
129 self._workPath = self.createWorkingDir()
130 else:
131 self._workPath = os.curdir
132 self._allFileName = os.path.join(self._workPath, 'filelist.dat')
133
134 self.dbMntPath = IngCuSession.sysc.dbMntPath()
135 self._hostName = socket.gethostbyaddr(socket.gethostname())[1][0]
136 self._success = dbc.yes()
137
138
139
149
150
151
166
167
168
170 """
171 Create the translation dictionary of programme identification strings
172 to programme ID numbers for all programmes. It contains associations
173 between data flow system (DFS) programme identification strings (ie. as
174 supplied in ingest FITS files) and the WSA programme unique
175 identification numbers.
176
177 @todo: This method wouldn't be needed if a ProgrammeTable object always
178 existed and had an equivalent method to peform this translation.
179 Hence, the ProgrammeTable object would replace the dictionary
180 here and there wouldn't be a need to query the database again.
181 """
182
183 queryResult = \
184 self.archive.query("dfsIDString, programmeID", "Programme")
185
186 self._progIDofName = \
187 dict((name.upper(), progID) for name, progID in queryResult)
188
189 if self.sysc.isWSA():
190 self._progIDofName["PTS"] = self._progIDofName["U/06A/52"]
191 self._progIDofName["COMM"] = self._progIDofName["U/EC/"]
192 self._progIDofName["UHS"] = self._progIDofName["U/UHS/"]
193 if self.sysc.isVSA():
194 self._progIDofName["TECH"] = self._progIDofName["TECHNICAL"]
195 self._progIDofName["MAINT"] = self._progIDofName["MAINTENANCE"]
196 if self.sysc.isOSA():
197 self._progIDofName["TECH"] = self._progIDofName["TECHNICAL"]
198
199 self._progNameOfID = dict(zip(self._progIDofName.itervalues(),
200 self._progIDofName.iterkeys()))
201 if self.sysc.isWSA():
202 self._progNameOfID[self._progIDofName["U/06A/52"]] = "PTS"
203 self._progNameOfID[self._progIDofName["U/EC/"]] = "COMM"
204 self._progNameOfID[self._progIDofName["U/UHS/"]] = "UHS"
205 if self.sysc.isVSA():
206 self._progNameOfID[self._progIDofName["TECHNICAL"]] = "TECH"
207 self._progNameOfID[self._progIDofName["MAINTENANCE"]] = "MAINT"
208 if self.sysc.isOSA():
209 self._progNameOfID[self._progIDofName["TECHNICAL"]] = "TECH"
210
211 Logger.addMessage("Created the translation dictionary")
212
213
214
239
240
241
243 """
244 Obtain the next available cuEventID. Whilst putting a placeholder row
245 into the ArchiveCurationHistory table to reserve this cuEventID,
246 preventing it from being hijacked by a parallel process.
247
248 """
249 cuEventID = self._getNextID(dbc.curationEventUIDAttributeName(),
250 dbc.archiveCurationHistoryTableName())
251
252 if self.sysc.isVSA():
253 dbID = 2 if self.archive.database == "VSAVVV" else 1
254 cuEventID = 10 * (cuEventID // 10 + 1) + dbID
255
256
257 try:
258 self.archive.insertData(dbc.archiveCurationHistoryTableName(),
259 [cuEventID, self.cuNum, dbc.charDefault(), dbc.charDefault(),
260 utils.makeMssqlTimeStamp(), self.curator,
261 "in progress: " + self.comment, dbc.yes()],
262 enforcePKC=True)
263 self.archive.commitTransaction()
264 except odbc.ProgrammingError as error:
265 if "permission was denied" in str(error):
266 raise SystemExit("<ERROR> You do not have write "
267 "permission on database " + self.database)
268 else:
269 raise
270
271 return cuEventID
272
273
274
275 - def _getNextID(self, attrName, tableName, where=''):
276 """
277 Obtain next available ID number in the db for a given ID attribute.
278
279 @param attrName: Name of ID attribute.
280 @type attrName: str
281 @param tableName: Name of the table where attribute resides.
282 @type tableName: str
283 @param where: Optional SQL WHERE clause.
284 @type where: str
285
286 @return: Next available ID number.
287 @rtype: int
288
289 """
290 maxID = self.archive.queryAttrMax(attrName, tableName, where)
291
292 if maxID and maxID > 0:
293 return maxID + 1
294 else:
295 return 1
296
297
298
300 """
301 Prefix for the comment in the ArchiveCurationHistory table.
302
303 @param success: The success status of the CU.
304 @type success: int
305 @param rolledBack: Flag to indicate ingest roll back.
306 @type rolledBack: int
307
308 @return: Prefix for the comment in the ArchiveCurationHistory table.
309 @rtype: str
310
311 """
312 if not success and rolledBack:
313 return "failed: "
314 elif success and rolledBack:
315 return "processed: "
316 elif success and not rolledBack:
317 return "ingested: "
318
319
320
322 """ Tidy up run tasks specific to this CU following an exception.
323 """
324
325 signal.signal(signal.SIGINT, signal.SIG_IGN)
326 self._success = dbc.no()
327 try:
328 self.archive.rollbackTransaction()
329 except AttributeError:
330
331 Logger.addMessage(
332 "No database connection, so nothing to rollback.")
333 else:
334 self._rolledBack = dbc.yes()
335
336 signal.signal(signal.SIGINT, signal.SIG_DFL)
337
338
339
341 """ Performs all the tasks that define this curation session. """
342 pass
343
344
345
346 @staticmethod
348 """
349 Parse the given list of CU strings into a list of cuNum, cuEventID
350 and serverName.
351
352 @return: List of tuples characterising the CU events to be ingested.
353 @rtype: list
354
355 """
356 cnList = []
357 for entry in cus:
358 try:
359 cuNum, idservdate = entry.partition('-')[::2]
360 except AttributeError:
361 cuNum = entry
362 idservdate = '--'
363 searchTerms = (idservdate.split('-')+ ['', '', ''])[:3]
364 evtId = ''
365 serv = ''
366 date = ''
367 for x in searchTerms:
368 if x.isalpha():
369 serv = x
370 elif x.isdigit() and x.startswith("20") and len(x) == 6 \
371 and int(x[4:]) in range(1,13):
372 date = x
373 elif x.isdigit():
374 evtId = x
375 cnList.append(("cu%02d" % int(cuNum), "id" + evtId, serv, date))
376 return cnList
377
378
379
381 """
382 Writes database lock. If lock already present then pause and wait for
383 database to unlock.
384
385 """
386 numTries = 0
387 midTries = 100
388 delay = 2
389 taskIdStr = ("[%d] " % self._cuEventID if self._cuEventID else '')
390 while os.path.exists(self._lockPathName):
391 time.sleep(delay)
392 numTries += 1
393 if numTries == 1:
394 print("%sA lock file exists: %s" % (taskIdStr,
395 self._lockPathName))
396 print("End the task that owns the lock and/or manually "
397 "delete the lock file and this curation session will "
398 "automatically start.")
399 elif maxTries < 1000 and numTries > maxTries:
400 raise Exception("%sTimed out in accessing the"
401 " database" % taskIdStr)
402 elif maxTries >= 1000 and numTries >= midTries:
403 delay = min(120, 10*int(1 + (numTries - midTries)/10))
404 Logger.addMessage("%sWaiting %d s..." % (taskIdStr, delay))
405 elif numTries > 15000:
406 raise Exception("%sTimed out after %d s in accessing the"
407 " database" % (taskIdStr, numTries))
408
409 file(self._lockPathName, 'w').write("DB locked by CU%s. PID = %s.%s" %
410 (self.cuNum, os.getenv('HOST'), os.getpid()))
411
412 IngCuSession._isDbLocked = True
413
414
415
417 """
418 Adds an entry for this curation session to the ArchiveCurationHistory
419 database table.
420
421 @param row: List of entries for the ArchiveCurationHistory table.
422 @type row: list
423
424 """
425 Logger.addMessage("Updating archive curation history with this row:")
426 Logger.addMessage(repr(row))
427
428
429
430 self.archive.delete(dbc.archiveCurationHistoryTableName(),
431 whereStr="cuEventID=%s" % row[0])
432 self.archive.insertData(dbc.archiveCurationHistoryTableName(), row)
433
434
435
459
460
461
463 """
464 Adds an entry for this curation session to the ProgrammeCurationHistory
465 database table.
466
467 @param row: List of entries for the ProgrammeCurationHistory table.
468 @type row: list
469
470 """
471 Logger.addMessage("Updating programme curation history with this row:")
472 Logger.addMessage(repr(row))
473
474
475
476 self.archive.delete(dbc.programmeCurationHistoryTableName(),
477 whereStr="programmeID=%s AND cuEventID=%s" % (row[0], row[1]))
478 self.archive.insertData(dbc.programmeCurationHistoryTableName(), row)
479
480
481
484 """Initialize automatically instance variables from __init__ arguments.
485 If types dictionary is given, translate commandline str arguments into
486 correct types.
487
488 @param argumentNames: List of arguments of the class to be initialized.
489 @type argumentNames: list[str]
490 @param argDict: Dictionary of arguments and their values.
491 @type argDict: dict{arg:value}
492 @param prefix: Prefix to be given to the argument names, eg. '_'
493 @type prefix: str
494 @param types: Dictionary of arguments and their types.
495 @type types: dict{arg:type}
496 """
497 if types:
498 argDict.pop("typeTranslation")
499 for arg in argDict:
500 if arg in types:
501 if type(argDict[arg]) is type(None):
502 argDict[arg] = ''
503 if types[arg] is not type(argDict[arg]):
504 if types[arg] is int:
505 argDict[arg] = int(argDict[arg])
506 if types[arg] in (list, set, tuple):
507 for row in csv.reader([argDict[arg]]):
508 try:
509 argDict[arg] = types[arg](int(x)
510 for x in row if isinstance(x, str))
511 except ValueError:
512 argDict[arg] = types[arg](row)
513 if types[arg] is dict:
514 tmpDict = defaultdict()
515 for row in csv.reader([argDict[arg]]):
516 for x in row:
517 tmpDict.update(types[arg](
518 [re.split('[:=]', x)]))
519 argDict[arg] = tmpDict.copy()
520
521 self = argDict.pop('self')
522 for arg in argumentNames[1:]:
523 setattr(self, prefix + arg, argDict[arg])
524
525
526
528 """
529 Create a list of FITS files to be ingest into the database from the
530 list supplied in the file at the given path. The list is stored in the
531 L{_fileList} member variable as a pair of image and catalogue file
532 paths and also written to a file by L{writeIngestList()} for the
533 C++ code.
534
535 @param fileListPath: Full path to the file containing the list of FITS
536 files to ingest, as an optional alternative to the
537 command-line supplied option, xferLog.
538 @type fileListPath: str
539
540 """
541 img = {}
542 cat = {}
543 fixcat = {}
544 for filePath in utils.ParsedFile(fileListPath or self.xferlog):
545 fileDir, fileName = filePath.split(os.sep)[-2:]
546 fileSubPath = os.path.join(fileDir, fileName)
547
548 if fileName.startswith((self.sysc.casuPrefix,
549 self.sysc.wfauPrefix)):
550
551 catType = self.sysc.catSuffix + self.sysc.catType
552 fixcatType = self.sysc.fixcatSuffix + self.sysc.catType
553 if fileName.endswith(fixcatType):
554 run = fileSubPath.replace(fixcatType, '')
555 fixcat[run] = filePath
556 elif fileName.endswith(catType):
557 run = fileSubPath.replace(catType, '')
558 cat[run] = filePath
559
560 elif fileName.endswith(self.sysc.mefType) \
561 and not fileName.endswith("_squish.fit"):
562 run = fileSubPath.replace(self.sysc.mefType, '')
563 img[run] = filePath
564 else:
565 run = os.path.splitext(fileSubPath)[0]
566 img[run] = filePath
567
568 orphanedCats = set(cat) - set(img)
569 if self.sysc.isOSA():
570 orphanedFixCats = set(fixcat) - set(img)
571 if orphanedFixCats:
572 Logger.addMessage(
573 "<Warning> There are orphaned fixed catalogues in this list"
574 " without corresponding image files for that run: "
575 + ', '.join(orphanedFixCats))
576
577 else:
578 if orphanedCats:
579 Logger.addMessage(
580 "<Warning> There are orphaned catalogues in this list "
581 "without corresponding image files for that run: "
582 + ', '.join(orphanedCats))
583
584
585 self._dateList = set(os.path.dirname(img[run]) for run in img)
586 catDef = self.sysc.emptyFitsCataloguePathName()
587 if self.sysc.isOSA():
588 self._fileList = [(img[run], fixcat.get(run, cat.get(run, catDef)))
589 for run in img]
590 else:
591 self._fileList = [(img[run], cat.get(run, catDef)) for run in img]
592 self.writeIngestList()
593
594
595
596 @staticmethod
598 """Create a file prefix used for the ingest log and the data files.
599
600 @param cuNum: The number of the CU.
601 @type cuNum: int
602 @param cuEventID: The cuEventID of this CU run.
603 @type cuEventID: int
604 @param hostName: The host name where the CU is running.
605 @type hostName: str
606 @param database: The database where the data is ingested to.
607 @type database: str
608 """
609 if not cuEventID:
610 cuEventID = -9999
611 return "cu%02did%d_%s_%s" % (cuNum, cuEventID, hostName, database)
612
613
614
616 """
617 Create the standard log file name for the current cuEventID or the
618 given list of CU event IDs.
619
620 @param cuEventIDs: Optional list of CU eventIDs to record.
621 @type cuEventIDs: list(int)
622
623 @return: A filename for the log.
624 @rtype: str
625
626 """
627 if cuEventIDs and not None in cuEventIDs:
628 eventIDStr = utils.numberRange(cuEventIDs, sep="_",
629 useTens=self.sysc.isVSA())
630 elif cuEventIDs is not None:
631 eventIDStr = 'NONE'
632
633 return '%s_cu%sid%s.log' % (self.database.rpartition('.')[2],
634 self.cuNum, self._cuEventID
635 if cuEventIDs is None else eventIDStr)
636
637
638
665
666
667
669 words = line.split()
670 for word in words:
671 for detTable in detTableList:
672 if detTable in word:
673 return detTable
674 return ''
675
676
677
679 """
680 Create a temporary working directory. Directories will have a 4 digit
681 extension number for multiple tmpdirs created by one top level script
682 calling different CUs.
683
684 @return: Path to the working directory.
685 @rtype: str
686
687 """
688 workSpacePath = self.sysc.tempWorkPath()
689 workSpaceDir, _workSpaceBase = os.path.split(workSpacePath)
690 if os.path.exists(workSpacePath):
691 os.rename(workSpacePath, workSpacePath + '.old')
692 Logger.addMessage(
693 "<Warning> curationClientWorkPath %s exists, moved to %s" %
694 (workSpacePath, workSpacePath + '.old'))
695
696 tmpList = sorted(d for d in os.listdir(workSpaceDir)
697 if os.path.isdir(os.path.join(workSpaceDir, d))
698 and str(os.getpid()) in d and '.' in d and not d.endswith(".old"))
699
700 dirNum = 0
701 for tmpDirName in tmpList:
702 tmpDirNumMax = int(os.path.splitext(tmpList[-1])[1][1:])
703 if tmpDirNumMax != len(tmpList) - 1:
704 dirNum = 0
705 for tmpDirName in tmpList:
706 newTmpDirName = tmpDirName + ".old"
707 tmpPath = os.path.join(workSpaceDir, tmpDirName)
708 if os.path.exists(tmpPath):
709 newPath = os.path.join(workSpaceDir, newTmpDirName)
710 os.rename(tmpPath, newPath)
711 else:
712 dirNum = tmpDirNumMax + 1
713
714 workSpace = workSpacePath + ".%04u" % dirNum
715
716 os.mkdir(workSpace)
717 return workSpace
718
719
720
722 """
723 Determines the full set of programmes for a given list of FITS files.
724
725 @param fileListPath: Full path to the file listing all FITS files
726 for which their programmes should be
727 determined.
728 @type fileListPath: str
729
730 @return: A dictionary of programme IDs referenced by FITS file name,
731 and a dictionary of new project DFS ID strings referenced by
732 FITS file name.
733 @rtype: dict(str:int), dict(str:str)
734
735 """
736 progOfFile = {}
737 newProgOfFile = {}
738
739
740 for fileName in utils.extractColumn(fileListPath, colNum=0):
741
742 if fileName.endswith(self.sysc.mefType):
743
744 try:
745 fitsFile = fits.open(fileName)
746 except:
747 Logger.addMessage("Broken file:" + fileName)
748 raise
749 fitsFile.close()
750
751
752 try:
753 hduNum = (1 if self.sysc.tileSuffix in fileName else 0)
754 project = [fitsFile[hduNum].header.get(key,
755 "Unknown").strip()
756 for key in self.sysc.projectKeys]
757
758 if project[0] == "Example WFCAM survey":
759 project[0] = "TEST"
760
761 progName, self.knownSurveyTranslations = \
762 self.translateProgID(project, self._progIDofName,
763 self.sysc.surveyTranslations)
764
765 except TypeError, details:
766 print details
767 Logger.addMessage(fileName + ". Probably not FITS")
768 except KeyError:
769 Logger.addMessage(fileName + ". Can't get program ID")
770 except IndexError:
771 Logger.addMessage(fileName + ". Probably not compressed")
772 else:
773 progID = self._progIDofName.get(progName.upper(),
774 dbc.intDefault())
775
776 if progID != dbc.intDefault():
777 progOfFile[fileName] = (progID, progName)
778 else:
779 if progName[0] == 'n':
780 newProgOfFile[fileName] = \
781 project[0].partition('(')[0].upper()
782 else:
783 newProgOfFile[fileName] = project[0].upper()
784
785 if newProgOfFile:
786 nplPath = os.path.join(self.sysc.corruptFilesPath(),
787 "nonPIDFiles_%s.dat"
788 % time.strftime("%Y%m%d_%H%M%S", time.gmtime()))
789
790 file(nplPath, 'w').writelines(
791 utils.joinDict(newProgOfFile, ',', '\n'))
792
793 Logger.addMessage("Files that don't have a valid programme ID are "
794 "listed in " + nplPath)
795
796 return progOfFile, newProgOfFile
797
798
799
801 """
802 Creates a error file name from the given name, including the details of
803 this CU run, e.g. database, cuEventID.
804
805 @param name: Unique name for this type of error file.
806 @type name: str
807
808 @return: Full file name for error file.
809 @rtype: str
810
811 """
812 return "%s_%s_cu%sid%s_%s.dat" % (self.database.rpartition('.')[2],
813 name, self.cuNum, self._cuEventID,
814 mxTime.utc().strftime("%Y%m%d_%H%M%S"))
815
816
817
819 """ Get the multiframeIDs from FlatFileLookUp for a given dateVersStr.
820 """
821 dataDict = {}
822
823 for mfID, fileName in self.archive.query("multiframeID, fileName",
824 "FlatFileLookUp", whereStr="DateVersStr=%r" % dateVersStr):
825 if "PixelFileNoLongerAvailable" not in fileName:
826 dataDict[fileName.rpartition(':')[2]] = mfID
827 return dataDict
828
829
830
832 """
833 Process programmes in given order, append/include all others from the
834 list of all programme IDs.
835
836 @param progIDs: List of all (used) programme IDs.
837 @type progIDs: list(int)
838 @param include: List of project names to be processed.
839 @type include: list(str)
840 @param progOrder: Ordered list of programme names, eg. [las,gps] or
841 [las,others,gps].
842 @type progOrder: list(str)
843
844 @return: Rating enhanced dictionary.
845 @rtype: Utilities.Ratings
846
847 @todo: Will become part of DataFactory.ProgrammeTable when programme
848 translation object moved there.
849 """
850 procProgs = []
851 exclProgs = []
852 for entry in include:
853
854 if entry.startswith('x-'):
855 exclProgs.append(entry.lower().replace('x-', '', 1))
856 else:
857 procProgs.append(entry.lower())
858 if not procProgs:
859 procProgs = ['all']
860
861 progOrder = [x.lower() for x in progOrder]
862 try:
863 otherName = \
864 list(set(["other", "others"]).intersection(progOrder))[0]
865 except IndexError:
866 otherName = ''
867
868 progIDofName = dict(
869 (name.lower().replace("u/ukidss/", ''), progID)
870 for name, progID in self._progIDofName.items())
871
872
873 if 'all' in procProgs:
874 procProgs = progIDs
875 elif 'ns' in procProgs:
876 procProgs = [pid for pid in progIDs if pid > 10000]
877 elif self.sysc.isWSA():
878 if 'ukidss' in procProgs:
879 procProgs = [pid for pid in progIDs if 10 < pid <= 105]
880 else:
881 procProgs = [progIDofName[entry] for entry in procProgs
882 if entry in progIDofName \
883 and progIDofName[entry] in progIDs]
884 elif self.sysc.isVSA() or self.sysc.isOSA():
885 procProgs = [progIDofName[entry] for entry in procProgs
886 if entry in progIDofName \
887 and progIDofName[entry] in progIDs]
888
889 if exclProgs:
890 exclProgs = [progIDofName[entry] for entry in exclProgs
891 if entry in progIDofName]
892 procProgs = list(set(procProgs).difference(exclProgs))
893
894 pidOrder = []
895 for progName in progOrder:
896 if progName != otherName:
897 for trprName in progIDofName:
898 if progName in trprName.lower() \
899 and progIDofName[trprName] in procProgs:
900 pidOrder.append(progIDofName[trprName])
901
902 progOrderRatedDict = utils.Ratings(zip(pidOrder, range(len(pidOrder))))
903 othersIndex = (progOrder.index(otherName) if otherName else
904 len(progIDofName) + 1)
905
906 nonOrderedProgs = \
907 set(progIDofName.values()).intersection(procProgs) - set(pidOrder)
908
909 nonOrderDict = {}
910 for progID in nonOrderedProgs:
911 nonOrderDict[progID] = othersIndex - 0.5
912
913 progOrderRatedDict.update(nonOrderDict)
914 Logger.addMessage("Programme IDs to be processed: "
915 + ','.join(repr(n) for n in progOrderRatedDict))
916
917 return progOrderRatedDict
918
919
920
922 if serverName != os.getenv('HOST'):
923 return self.sysc.privNetCmd(serverName, cmnd)
924 else:
925 return cmnd
926
927
928
930 """
931 Remove the temporary working directory.
932 """
933 try:
934 shutil.rmtree(self._workPath)
935 except OSError as error:
936 Logger.addMessage(
937 "<Warning> Unable to remove temporary working directory "
938 "%s: %s" % (self._workPath, error))
939
940
941
979
980
981
982 - def runSysCmd(self, cmd, showErrors=True, isVerbose=False):
983 """
984 Run a system command.
985
986 @param cmd: The command to run.
987 @type cmd: str
988 @param isVerbose: If True, print error messages.
989 @type isVerbose: bool
990
991 @return: List of lines returned.
992 @rtype: list(str)
993
994 """
995 proc = Popen(cmd, shell=True, stdout=PIPE, stderr=PIPE)
996 stdOut = [x.rstrip() for x in proc.stdout]
997 if isVerbose:
998 for x in stdOut:
999 print("[%s]> %s" % (str(mxTime.now()), x))
1000
1001 errors = proc.stderr.readlines()
1002 if showErrors and errors:
1003 print(''.join(errors).rstrip())
1004 return stdOut, errors
1005
1006
1007
1009 """
1010 """
1011 schemaFileName = "%s_%sSchema.sql" % (self.sysc.loadDatabase,
1012 surveyName.lower())
1013 detTableList = ["%sDetection%s" % (surveyName.lower(), splitTable)
1014 for splitTable in ['Raw', 'Astrometry', 'Photometry']]
1015
1016 for month in sorted(monthSet):
1017 monthlySchemaFileName = self.createMonthlyDetSchema(
1018 schemaFileName, detTableList, month, ReDo)
1019
1020
1021 try:
1022 self._connectToDb(maxTries=4000)
1023 for detTableName in detTableList:
1024 for month in sorted(monthSet):
1025 theDetectionMonth = schema.parseTables(
1026 monthlySchemaFileName, [detTableName + month])[0]
1027 if not self.archive.existsTable(theDetectionMonth.name):
1028
1029 Logger.addMessage(
1030 "[%d] Creating monthly detection table "
1031 "on %s for %s" % (
1032 self._cuEventID, self.database, month))
1033 self.archive.copyTable(
1034 detTableName + "Default", theDetectionMonth,
1035 where="multiframeid=0",
1036 fileGroup="%sDetection_FG" % surveyName.lower(),
1037 attachForeignKeys=False)
1038 self.archive.commitTransaction()
1039 except odbc.ProgrammingError as error:
1040 if "permission was denied" in str(error):
1041 raise SystemExit(
1042 "<ERROR> You do not have write "
1043 "permission on database " + self.database)
1044 finally:
1045 self._disconnectFromDb()
1046
1047
1048
1050 """ Test if programme is one of the main VISTA surveys.
1051 """
1052 for prog in self.sysc.scienceProgs:
1053 if prog.upper() in obsName.upper():
1054 return prog
1055
1056 return None
1057
1058
1059
1060 - def translateProgID(self, pIDkeys, progTranslation, knownTranslations={}):
1061 """
1062 Translates a programme ID string into a programme ID number.
1063
1064 @param pIDkeys: List of Programme ID strings. For VISTA the
1065 order is: OBS PROG ID, OBS NAME, DPR CATG
1066 @type pIDkeys: list
1067 @param progTranslation: A dictionary of upper-case programme
1068 identification string keys and programme IDs
1069 values.
1070 @type progTranslation: dict(str:int)
1071 @param knownTranslations: A dictionary of programme identification keys
1072 and dfsIDStrings.
1073 @type knownTranslations: dict(str:int)
1074
1075 @return: Programme ID number. Default int value if unknown programme.
1076 @rtype: int
1077
1078 @todo: Tie this up with L{DataFactory.ProgrammeTable} somehow, so that
1079 progTranslation isn't required.
1080
1081 """
1082 if self.sysc.isWSA():
1083 reqProgName = pIDkeys[0].upper()
1084
1085
1086 if reqProgName.startswith("U/UKIDSS"):
1087
1088
1089 if "_SV" in reqProgName:
1090 reqProgName = reqProgName[:reqProgName.rfind("_SV") + 3]
1091 else:
1092 for progName in progTranslation:
1093 if (progName.startswith("U/UKIDSS")
1094 and progName in reqProgName):
1095 reqProgName = progName
1096 elif reqProgName.startswith("U/EC/"):
1097 for progName in progTranslation:
1098 if (progName.startswith("U/EC/")
1099 and progName in reqProgName):
1100 reqProgName = progName
1101
1102 elif reqProgName.startswith("U/UHS/"):
1103 reqProgName = "U/UHS/"
1104
1105 elif reqProgName == "UKIRTCAL":
1106 reqProgName = "CAL"
1107 else:
1108 reqProgName = ''
1109
1110 progInObsName = self.testForProgs(pIDkeys[1])
1111
1112 if pIDkeys[0].upper().startswith("60.A-9"):
1113
1114 if pIDkeys[0].upper() in knownTranslations:
1115
1116 reqProgName = knownTranslations[pIDkeys[0].upper()]
1117 elif progInObsName:
1118
1119 reqProgName = progInObsName
1120
1121 else:
1122
1123 if pIDkeys[2].upper() in ["CALIB"]:
1124 reqProgName = "CAL"
1125
1126 else:
1127 reqProgName = "TECHNICAL"
1128 knownTranslations[pIDkeys[0].upper()] = reqProgName
1129 else:
1130
1131 if pIDkeys[0].upper().startswith('1'):
1132 if pIDkeys[0].partition('(')[0].upper() \
1133 in knownTranslations:
1134
1135 reqProgName = knownTranslations[
1136 pIDkeys[0].partition('(')[0].upper()]
1137 elif ''.join(pIDkeys[0].partition('.')[:2]).upper() \
1138 in knownTranslations:
1139
1140 reqProgName = knownTranslations[
1141 ''.join(pIDkeys[0].partition('.')[:2]).upper()]
1142 knownTranslations[pIDkeys[0].upper()] = reqProgName
1143 elif progInObsName:
1144
1145 reqProgName = progInObsName
1146
1147 else:
1148 reqProgName = pIDkeys[0].upper()
1149 elif pIDkeys[0].partition('(')[0].upper() in progTranslation:
1150 reqProgName = pIDkeys[0].partition('(')[0].upper()
1151 knownTranslations[pIDkeys[0].partition('(')[0].upper()] = \
1152 pIDkeys[0].partition('(')[0].upper()
1153 elif pIDkeys[0].upper() in progTranslation:
1154 reqProgName = pIDkeys[0].upper()
1155 knownTranslations[pIDkeys[0].upper()] = pIDkeys[0].upper()
1156 elif not self.sysc.isOSA() and progInObsName:
1157 reqProgName = progInObsName
1158
1159 elif pIDkeys[0].upper()[:3].isdigit():
1160 reqProgName = 'n' + pIDkeys[0].partition('(')[0].upper()
1161 else:
1162 if pIDkeys[2].upper() in ["CALIB"]:
1163 reqProgName = "CAL"
1164 else:
1165 reqProgName = pIDkeys[0].upper()
1166
1167 return reqProgName, knownTranslations
1168
1169
1170
1172 """
1173 Writes the list of not ingestable files into a list in the corrupt
1174 directory for further investigation.
1175
1176 @param notIngList: List of files that cannot be ingested.
1177 @type notIngList: list(str)
1178
1179 """
1180 errorFileName = self.getErrorFileName("notIngFiles")
1181 errorFilePath = os.path.join(self.sysc.corruptFilesPath(),
1182 errorFileName)
1183 file(errorFilePath, 'w').writelines(f + '\n' for f in notIngList)
1184
1185 Logger.addMessage("%s files that are not ingested are listed in %s" %
1186 (len(notIngList), errorFilePath))
1187
1188 pctNotIng = 100 * len(notIngList) / len(self._fileList)
1189 message = \
1190 "There are %s%% files not ingested! Check the log files!" % pctNotIng
1191 if pctNotIng > 90:
1192 Logger.addMessage("<ERROR> " + message)
1193 elif pctNotIng > 50:
1194 Logger.addMessage("<Warning> " + message)
1195
1196
1197
1199 """
1200 Writes the list of FITS files to ingest to file "filelist.dat" in the
1201 work directory in a 'imagefile catfile num' format for the C++ code.
1202
1203 """
1204 lines = ('%s %s %s\n' % (imgFile, catFile, i)
1205 for i, (imgFile, catFile) in enumerate(self._fileList))
1206 file(self._allFileName, 'w').writelines(lines)
1207 Logger.addMessage("Ingest list written to %s" % self._allFileName)
1208
1209
1210
1211
1212
1213
1214
1215
1216
1217
1218
1219
1220
1221
1222
1223
1224
1225
1226
1227
1228
1229
1230
1231
1232
1233
1234
1235
1236
1237
1238
1239
1240