1
2
3
4 """
5 Invoke CU3. Ingest details of image products.
6
7 @author: E. Sutorius
8 @org: WFAU, IfA, University of Edinburgh
9
10 @newfield contributors: Contributors, Contributors (Alphabetical Order)
11 @contributors: I. Bond, J. Bryant, R.S. Collins, N.C. Hambly
12 """
13
14 from collections import defaultdict
15 import inspect
16 import os
17 import math
18 import pyfits
19 import time
20 import warnings
21
22 from wsatools.CLI import CLI
23 import wsatools.DataFactory as df
24 import wsatools.DbConnect.DbConstants as dbc
25 from wsatools.DbConnect.DbSession import DbSession
26 from wsatools.Logger import ForLoopMonitor
27 from wsatools.DbConnect.IngCuSession import IngCuSession
28 from wsatools.DbConnect.IngIngester import IngestLogger as Logger, \
29 IngestLogFile
30 import wsatools.DbConnect.Schema as schema
31 from wsatools.File import File
32 import wsatools.FitsUtils as fits
33 import wsatools.Utilities as utils
34 import wsatools.WfcamsrcInterface as wfcamsrc
38 """Class for updating MfIDs and doing pre-ingest checks.
39 """
40 corruptFileDict = defaultdict(list)
41 maxHDUsDef = {"WSA":5, "VSA":17, "OSA":33}
42 maxHDUs = None
43
44
45
46 @staticmethod
48 """Check if the fits file has the correct number of extensions.
49 @param hduList: The list of all HDUs.
50 @type hduList: PyFITS list object.
51 """
52 if len(hduList) < ModifyMfIDs.maxHDUs:
53 return -1
54 elif len(hduList) > ModifyMfIDs.maxHDUs:
55 return -2
56 else:
57 return 1
58
59
60
61 @staticmethod
62 - def writeNewMfIDs(fileMfIDDict, fileList, redo, forceComp=False):
63 """
64 Write new multiframeIDs and wsaTimestamps into the files.
65 @param fileList: List of tuples (pixelFile, catalogueFile)
66 @type fileList: list((str,str))
67 @param forceComp: Compress uncompressed FITS files during MfID
68 writing.
69 @type forceComp: bool
70 @param redo: If True, overwrite existing MfID.
71 @type redo: bool
72 @return: The next available MultiframeID.
73 @rtype: int
74
75 """
76 sysc = IngCuSession.sysc
77 progress = ForLoopMonitor(fileList)
78 for fileNames in fileList:
79
80 dirName = os.path.split(os.path.dirname(fileNames[0]))[1][2:8]
81 modtime = time.strptime(time.ctime(os.stat(fileNames[0]).st_mtime))
82 wsaTimestamp = dirName + time.strftime("%y%m%d%H%M%S", modtime)
83
84
85 if fileNames[0] in fileMfIDDict:
86 fileMfID = fileMfIDDict[fileNames[0]]
87 else:
88 Logger.addMessage("<ERROR> No multiframeID available.")
89 raise SystemExit
90
91
92 modif = [0, 0, 0]
93
94
95 if sysc.mosaicSuffix in fileNames[0]:
96 ModifyMfIDs.maxHDUs = 2
97 elif sysc.tileSuffix in fileNames[0]:
98 ModifyMfIDs.maxHDUs = 2
99 else:
100 ModifyMfIDs.maxHDUs = ModifyMfIDs.maxHDUsDef[sysc.loadDatabase]
101
102
103 extNum = 1 if sysc.tileSuffix in fileNames[0] else 0
104
105 catUpdate = False
106 try:
107 with warnings.catch_warnings(record=True) as w:
108
109 warnings.simplefilter("always")
110 fimgptr = fits.open(fileNames[0], "update")
111
112 if len(w) and "truncated" in str(w[-1].message):
113 Logger.addMessage(
114 "<Warning> %s: %s" % (str(w[-1].message),
115 fileNames[0]))
116 ModifyMfIDs.corruptFileDict["truncFits"].append(
117 fileNames)
118
119 if sysc.tileSuffix in fileNames[0] \
120 and fimgptr[0].header["NAXIS"] > 0:
121 isExtOK = -3
122 else:
123 isExtOK = ModifyMfIDs.checkExtensions(fimgptr)
124
125 if isExtOK == -3 and forceComp:
126 fimgptr.close()
127 fits.compressFits([fileNames[0]])
128 fimgptr = fits.open(fileNames[0], "update")
129 isExtOK = ModifyMfIDs.checkExtensions(fimgptr)
130
131 if isExtOK > -2:
132
133 modif[0] = fits.writeToFitsHdu(fimgptr, (extNum,),
134 sysc.loadDatabase + "_TIME", wsaTimestamp,
135 sysc.loadDatabase + " time stamp", redo)
136
137
138 if sysc.isOSA():
139 for hdunum in range(1, ModifyMfIDs.maxHDUs):
140 camNum = sysc.hduNumDict[int(fimgptr[hdunum].header[
141 "HIERARCH ESO DET CHIP ID"].partition('#')[2])]
142 modif[2] = fits.writeToFitsHdu(
143 fimgptr, (hdunum,), "CAMNUM", camNum,
144 "Number of OMEGACAM detector (1-32)", True)
145
146
147 if "empty" not in fileNames[1]:
148 cattime = time.strptime(time.ctime(
149 os.stat(fileNames[1]).st_mtime))
150 catTimestamp = dirName + time.strftime("%y%m%d%H%M%S",
151 cattime)
152 fcatptr = fits.open(fileNames[1], "update")
153 fits.writeToFitsHdu(fcatptr, (0,),
154 sysc.loadDatabase + "_TIME", catTimestamp,
155 sysc.loadDatabase + " time stamp", redo,
156 verify='silentfix')
157 fcatptr.close()
158 timetpl = time.strptime(catTimestamp[len(dirName):],
159 '%y%m%d%H%M%S')
160 modtime = time.mktime(timetpl)
161 os.utime(fileNames[1], (time.time(), modtime))
162
163
164 modif[1] = fits.writeToFitsHdu(fimgptr, (extNum,),
165 sysc.loadDatabase + "_MFID", fileMfID,
166 sysc.loadDatabase + " Multiframe ID", redo)
167
168
169 if "two" in fileNames[0] and not "conf" in fileNames[0]:
170 for hdunum in range(1, ModifyMfIDs.maxHDUs):
171 circpm = fimgptr[hdunum].header[
172 "CIR_CPM"].partition('_st')
173 circpmnew = ''.join(circpm[:2]) \
174 + "_two_conf.fit" \
175 + '[%d]' % hdunum
176
177 fits.writeToFitsHdu(fimgptr, (hdunum,), "CIR_CPM",
178 circpmnew, "Confidence Map", True)
179
180
181 catUpdate = True
182 modif[0] = 1
183
184 elif isExtOK == -1:
185 ModifyMfIDs.corruptFileDict["missingExt"].append(fileNames)
186 Logger.addMessage(
187 "<Warning> missing extension in " + fileNames[0])
188
189 elif isExtOK == -2:
190 ModifyMfIDs.corruptFileDict["tooManyExt"].append(fileNames)
191 Logger.addMessage(
192 "<Warning> too many extensions in " + fileNames[0])
193
194 elif isExtOK == -3:
195 ModifyMfIDs.corruptFileDict["uncompFits"].append(fileNames)
196 Logger.addMessage(
197 "<Warning> uncompressed FITS file: " + fileNames[0])
198
199
200 fimgptr.close()
201 except AttributeError as error:
202 if "SystemConstants" in repr(error):
203 ModifyMfIDs.corruptFileDict["sysConError"].append(fileNames)
204 Logger.addMessage("<ERROR>:%s in %s"
205 % (error[0], fileNames[0]))
206 else:
207 raise
208 except pyfits.VerifyError as error:
209 Logger.addMessage("<ERROR>:%s in %s" % (error[0], fileNames[0]))
210 if "not FITS standard" in repr(error):
211 fimgptr.close(output_verify='fix')
212 else:
213 fimgptr.close()
214 except Exception as error:
215 print(error)
216 Logger.addMessage("<ERROR>:%s in %s" % (error[0], fileNames[0]))
217
218 try:
219 del fimgptr
220 except Exception as error:
221 print(error)
222 Logger.addMessage("<ERROR>:%s in %s" % (error[0], fileNames[0]))
223
224 if catUpdate:
225 modtime = time.strptime(time.ctime(os.stat(fileNames[1]).st_mtime))
226 wsaTimestamp = dirName + time.strftime("%y%m%d%H%M%S", modtime)
227
228 fimgptr2 = fits.open(fileNames[1], "update")
229 for hdunum in range(1, ModifyMfIDs.maxHDUs):
230 circpm = fimgptr2[hdunum].header[
231 "CIR_CPM"].partition('_st')
232 circpmnew = ''.join(circpm[:2]) \
233 + "_two_conf.fit" \
234 + '[%d]' % hdunum
235 fits.fixExponentBug(fimgptr2[hdunum].header)
236 fits.writeToFitsHdu(fimgptr2, (hdunum,), "CIR_CPM",
237 circpmnew, "Confidence Map", True, 'ignore')
238
239 fimgptr2.close()
240 timetpl = time.strptime(wsaTimestamp[len(dirName):],
241 '%y%m%d%H%M%S')
242 modtime = time.mktime(timetpl)
243 os.utime(fileNames[1], (time.time(), modtime))
244
245
246
247 if sum(modif) > 0:
248 timetpl = time.strptime(wsaTimestamp[len(dirName):],
249 '%y%m%d%H%M%S')
250 modtime = time.mktime(timetpl)
251 os.utime(fileNames[0], (time.time(), modtime))
252
253 progress.testForOutput()
254
255 Logger.addMessage("finished.")
256
257
258
259 @staticmethod
261 """
262 Write a zero (0) multiframeID into the files.
263 @param fileList: List of file names
264 @type fileList: list(str)
265 """
266 sysc = IngCuSession.sysc
267 archive = sysc.loadDatabase
268 for fileName in fileList:
269
270 dirName = os.path.split(os.path.dirname(fileName))[1][2:8]
271 modtime = time.strptime(time.ctime(os.stat(fileName).st_mtime))
272 wsaTimestamp = dirName + time.strftime("%y%m%d%H%M%S", modtime)
273
274 extNum = 1 if sysc.tileSuffix in fileName else 0
275
276
277 fimgptr = fits.open(fileName, "update")
278
279 fits.writeToFitsHdu(fimgptr, (extNum,), "%s_MFID" % archive, 0,
280 "%s Multiframe ID" % archive, True)
281
282 fimgptr.close()
283
284
285 modtime = time.mktime(time.strptime(wsaTimestamp[len(dirName):],
286 '%y%m%d%H%M%S'))
287 os.utime(fileName, (time.time(), modtime))
288
289
290 Logger.addMessage("<Warning> Set MfID to 0 in " + fileName)
291 Logger.addMessage("finished.")
292
293
294
295 -class Cu3(IngCuSession):
296 """
297 Invokes frame metadata ingestion for a given log of files transferred from
298 the data processing centre.
299
300 method: Establish locked DB connectivity; initiate curation objects;
301 cycle though the transfer log, accumulating individual table metadata in
302 CSV files. The database bulk ingest from those CSV files and the update of
303 the curation logs will be done at a later stage.
304
305 """
307 """ A curation error instead of a programming error. """
308 pass
309
310
311 cuedFileExists = False
312
313
314
315 - def __init__(self,
316 curator=CLI.getOptDef("curator"),
317 database=DbSession.database,
318 csvSharePath=None,
319 programmeList=CLI.getOptDef("programmes"),
320 noMfID=CLI.getOptDef("nomfid"),
321 ReDo=CLI.getOptDef("redo"),
322 onlyMfID=CLI.getOptDef("onlymfid"),
323 excludeFiles=CLI.getOptDef("exclude"),
324 isTrialRun=DbSession.isTrialRun,
325 keepWorkDir=CLI.getOptDef("keepwork"),
326 forceComp=CLI.getOptDef("forcecomp"),
327 isWfauProduct=CLI.getOptDef("iswfauprod"),
328 xferlog=CLI.getArgDef("xferlog"),
329 comment=CLI.getArgDef("comment")):
330 """
331 @param curator: Name of curator.
332 @type curator: str
333 @param comment: Descriptive comment as to why curation task
334 is being performed.
335 @type comment: str
336 @param csvSharePath: Directory where data is written to.
337 @type csvSharePath: str
338 @param database: Name of the database to connect to.
339 @type database: str
340 @param excludeFiles: Excluded files from processing.
341 @type excludeFiles: list(str)
342 @param forceComp: Compress uncompressed FITS files during MfID
343 writing.
344 @type forceComp: bool
345 @param isTrialRun: If True, do not perform database modifications.
346 @type isTrialRun: bool
347 @param isWfauProduct: If True, files (e20*) are in a products subdir.
348 @type isWfauProduct: bool
349 @param keepWorkDir: Don't remove working directory.
350 @type keepWorkDir: bool
351 @param noMfID: If True, don't write new MfID into FITS file.
352 @type noMfID: bool
353 @param onlyMfID: If True, only write new MfID into FITS file.
354 @type onlyMfID: bool
355 @param programmeList: Only process data for given programmes (accepts
356 keywords 'all', 'ns' (non-survey),
357 'ukidss' (all 5 main surveys)).
358 @type programmeList: list(str)
359 @param ReDo: If True, overwrite existing MfID.
360 @type ReDo: bool
361 @param xferlog: Logfile containing files to be ingested.
362 @type xferlog: str
363
364 """
365 typeTranslation = {"curator":str,
366 "database":str,
367 "csvSharePath":str,
368 "programmeList":list,
369 "noMfID":bool,
370 "ReDo":bool,
371 "onlyMfID":bool,
372 "excludeFiles":list,
373 "isTrialRun":bool,
374 "xferlog":str,
375 "keepWorkDir":bool,
376 "forceComp":bool,
377 "isWfauProduct":bool,
378 "comment":str}
379
380 super(Cu3, self).attributesFromArguments(
381 inspect.getargspec(Cu3.__init__)[0], locals(),
382 types=typeTranslation)
383
384
385 super(Cu3, self).__init__(cuNum=3,
386 curator=self.curator,
387 comment=self.comment,
388 reqWorkDir=True,
389 keepWorkDir=self.keepWorkDir,
390 database=self.database,
391 autoCommit=False,
392 isTrialRun=self.isTrialRun)
393
394
395 self._tableListPath = \
396 os.path.join(self._workPath, "metadata_schema.dat")
397
398 lines = ("%s %s\n" % (self.sysc.metadataSchema(), table)
399 for table in self.sysc.metadataTables)
400
401 file(self._tableListPath, 'w').writelines(lines)
402
403
404 self.createFileList()
405
406 self.procFileName = os.path.join(self._workPath, 'procfilelist.dat')
407
408
409 tmpList = []
410 if self.excludeFiles:
411 Logger.addMessage("Excluding %s" % ','.join(self.excludeFiles))
412 for excl in self.excludeFiles:
413 for entry in self._fileList:
414 if not (excl in entry[0] or excl in entry[1]):
415 tmpList.append(entry)
416 self._fileList = tmpList[:]
417
418 self._initFromDB()
419
420 self.csvSharePath = self.csvSharePath or self.archive.sharePath()
421 utils.ensureDirExist(self.csvSharePath)
422
423
424
474
475
476
478 """ Do CU3.
479 """
480 Logger.addMessage(
481 "Started CU%s on list: %s" % (self.cuNum, self.xferlog))
482
483
484 log = Logger(self.createLogFileName())
485
486 try:
487
488 if not self._fileList:
489 Logger.addMessage("No new metadata are required to be loaded.")
490 return
491
492
493 self._outPrefix = self.createFilePrefix(
494 self.cuNum, self._cuEventID, self._hostName,
495 self.database.rpartition('.')[2])
496
497
498 ingestLogFile = IngestLogFile(os.path.join(
499 self.dbMntPath, self._outPrefix + ".log"))
500 Logger.addMessage(
501 "CUEvent %s: No. of files containing metadata to be ingested: %s"
502 % (self._cuEventID, len(self._fileList)))
503
504
505
506 rewriteFileList = False
507
508 if not self.onlyMfID:
509 self.setTableVariables()
510
511 for table in self._tableDict:
512 csvFileName = os.path.join(
513 self.csvSharePath, self._tableDict[table]["ingfile"])
514 if os.path.exists(csvFileName):
515 os.remove(csvFileName)
516
517
518
519
520
521
522 prodFileName, prodFileNum = self.splitFileList(rewriteFileList)
523
524
525
526 os.rename(self.csvFileName,
527 ''.join(self.csvFileName.rpartition('.csv')[:2]))
528
529 fitsForIngList = self.selectFilesByProg()
530
531
532 if prodFileNum > 0:
533 Logger.addMessage(
534 "Running exmeta on production frames first...")
535
536
537 notIngList, wcsNaNFlag = wfcamsrc.extractMetadata(
538 self._tableListPath, prodFileName, self._cuEventID,
539 self._workPath, self._outPrefix, log.pathName, self.sysc)
540
541
542 Logger.addMessage("Running exmeta on %d/%d frames..." % (
543 len(fitsForIngList), len(self._fileList)))
544 notIngList, wcsNaNFlag = wfcamsrc.extractMetadata(
545 self._tableListPath, self.procFileName, self._cuEventID,
546 self.csvSharePath, self._outPrefix, log.pathName, self.sysc)
547
548 if notIngList:
549 self.writeErrorFiles(notIngList)
550
551 if self.sysc.isWSA():
552 fitsList = [fileNames[0] for fileNames in fitsForIngList]
553 pathList = [os.path.dirname(fileNames[0])
554 for fileNames in fitsForIngList]
555 satList = self.checkForSatData(
556 os.path.commonprefix(pathList))
557 if satList:
558 satCsv = self.parseSatData(satList, fitsList)
559 self.updateTableDict()
560
561
562 timestamp = utils.makeMssqlTimeStamp()
563 histories = {"DCH": [], "ACH": [], "PCH": []}
564
565 histories["DCH"].extend(self._dateList)
566
567 histories["ACH"].append(
568 [self._cuEventID, self.cuNum, log.pathName, dbc.charDefault(),
569 timestamp, self.curator, self.comment, dbc.yes()])
570
571 isNaN = wcsNaNFlag is None
572 preComment = ("WCSERROR: " if isNaN else
573 self._historyCommentPrefix(self._success,
574 dbc.yes()))
575
576 row = (self._cuEventID, self.cuNum, log.pathName,
577 dbc.charDefault(), timestamp, self.curator,
578 preComment + self.comment, dbc.yes())
579
580 self._connectToDb()
581 self._updateArchiveCurationHistory(row)
582 self.archive.commitTransaction()
583
584
585 for programmeID in self._progOrderRatedDict:
586 histories["PCH"].append(
587 [programmeID, self._cuEventID, timestamp, -1])
588
589
590 for dateDirPath in histories["DCH"]:
591 cuedFileName = "CU%02dED_%s" % (
592 self.cuNum, self.database.rpartition('.')[2])
593 if os.path.exists(os.path.join(dateDirPath, cuedFileName)):
594 self.cuedFileExists = True
595
596
597 if isNaN:
598 utils.ensureDirExist(
599 self.sysc.wcsErrorFilesPath(self.archive.sharePath()))
600
601 if os.path.exists(ingestLogFile.name):
602 newName = os.path.join(
603 self.sysc.wcsErrorFilesPath(self.archive.sharePath()),
604 os.path.basename(ingestLogFile.name))
605 os.rename(ingestLogFile.name, newName + ".wcserr")
606
607 for tableName in self._tableDict:
608 csvFileName = self.archive.sharePath(
609 self._tableDict[tableName]["ingfile"])
610 newName = os.path.join(
611 self.sysc.wcsErrorFilesPath(self.archive.sharePath()),
612 self._tableDict[tableName]["ingfile"])
613 os.rename(csvFileName, newName + ".wcserr")
614
615 Logger.addMessage(
616 "<WCSERROR> All created files have been moved to %s." %
617 self.sysc.wcsErrorFilesPath(self.archive.sharePath()))
618 else:
619 ingestLogFile.pickleWrite(
620 self._ingestOrder, self._tableDict, histories)
621 Logger.addMessage(
622 "Curation History info written to " + ingestLogFile.name)
623 self._disconnectFromDb()
624
625 Logger.addMessage(
626 "Processing of %s files finished." % len(fitsForIngList))
627
628 forceFlag = (" -F" if self.cuedFileExists else '')
629 Logger.addMessage("Please, run ./IngestCUFiles.py -i -r %s%s %s"
630 % (ingestLogFile.name, forceFlag, self.database))
631
632 except Cu3.NoFilesError:
633 Logger.addMessage("No metadata are required to be loaded.")
634 except Exception as error:
635 Logger.addExceptionDetails(error)
636 finally:
637 Logger.addMessage("Log written to " + log.pathName)
638 Logger.dump(file(log.pathName, 'a'))
639 Logger.reset()
640
641
642
644 dirList = [os.path.join(commonPath, x) for x in os.listdir(commonPath)
645 if x.startswith("satellites")]
646 return dirList
647
648
649
651 inclList = []
652 for fitsFileName in fitsList:
653 fitsFile = File(fitsFileName)
654 inclList.append('%s_%s' % (fitsFile.sdate, fitsFile.runno))
655 del fitsFile
656 print inclList
657
658 for satFileName in satList:
659 satFile = File(satFileName)
660 satFile.ropen()
661 colNames = satFile.readline().split()
662 print colNames
663 satellites = satFile.readlines(omitEmptyLines=True,
664 findValues=inclList)
665 satDict = defaultdict(lambda : defaultdict())
666
667 satCount = 1
668 for sat in satellites:
669 satAttr = []
670 satTmp = sat.split()
671 print satTmp
672 satAttr.append(satTmp[0])
673 satAttr.append(str(15 * (int(satTmp[1]) \
674 + (int(satTmp[2]) + float(satTmp[3])/60.)/60.)))
675 satAttr.append(str(self.signum(int(satTmp[4])) * (
676 abs(int(satTmp[4])) + (int(satTmp[5]) \
677 + float(satTmp[6])/60.)/60.)))
678 for i in range(7, len(satTmp)):
679 satAttr.append(satTmp[i])
680 for i, col in enumerate(colNames):
681 if satAttr[0] in satDict:
682 satCount += 1
683 else:
684 satCount = 1
685 if i > 0:
686 satDict[(satAttr[0], satCount)][col] = satAttr[i]
687 satFile.close()
688 del satFile
689
690 mfidDict = defaultdict()
691 for date in self.mfIDofFileOfDate:
692 for fileName, satCount in satDict:
693 for mfidFile in self.mfIDofFileOfDate[date]:
694 if fileName.rpartition('_')[0] in mfidFile:
695 mfidDict[fileName] = self.mfIDofFileOfDate[date][
696 mfidFile]
697
698 outFileName = os.path.join(self.csvSharePath,
699 "%s_u13alm01SatelliteDetection.csv" % self._outPrefix)
700 outFile = File(outFileName)
701 outFile.wopen()
702 counter = 0
703 for entry in satDict:
704 counter += 1
705 ell = (15+float(satDict[entry]["ell'"]))/16
706 convFaktor = math.pi/180
707 delta = self.signum(float(satDict[entry]["PA'"])) * 90
708 pa = math.atan(math.tan(convFaktor*(
709 float(satDict[entry]["PA'"]) + 90)) / 16) / convFaktor + delta
710 line = ",".join([str(mfidDict[entry[0]]), satDict[entry]["ext"],
711 str(self._cuEventID), str(satCount), '3',
712 satDict[entry]["MJD"],
713 satDict[entry]["X"], str(dbc.realDefault()),
714 satDict[entry]["Y"], str(dbc.realDefault()),
715 satDict[entry]["RA"], satDict[entry]["Dec"],
716 satDict[entry]["HA"], satDict[entry]["npix"],
717 satDict[entry]["ell'"], str(ell),
718 satDict[entry]["PA'"], str(pa),
719 str(dbc.realDefault()), str(dbc.realDefault()),
720 satDict[entry]["J"], satDict[entry]["Jerr"],
721 satDict[entry]["PkHt"], str(dbc.realDefault()),
722 '0', '0', str(counter)])
723 outFile.writetheline(line)
724 outFile.close()
725 return outFileName
726
727
728
730 table = "u13alm01SatelliteDetection"
731 schema = "NonSurvey/WSANSu13alm01_u13alm01Schema.sql"
732 self._ingestOrder.append(table)
733 self._tableDict[table] = dict(schema=schema,
734 ingfile="%s_%s.csv" % (self._outPrefix, table))
735
736
737
739 if(integ < 0):
740 return -1
741 elif(integ >= 0):
742 return 1
743
744
745
747 """
748 Insert new programme ID entries into the Programme table.
749
750 @param newProgs: A set of new project DFS ID strings, for which
751 programme IDs must be assigned.
752 @type newProgs: set(str)
753
754 """
755 Logger.addMessage("Identifying new programmes...")
756 maxPIDNS = max(10000,
757 self.archive.queryAttrMax("programmeID", "Programme",
758 "programmeID<11000"))
759 maxPIDSat = max(11000,
760 self.archive.queryAttrMax("programmeID", "Programme"))
761
762 tableSchema = \
763 schema.parseTables(self.sysc.curationSchema(), ["Programme"])[0]
764
765 for dfsIDString in newProgs:
766 if self.sysc.isWSA() and "LM" in dfsIDString.rpartition('/')[2]:
767 maxPID = maxPIDSat + 1
768 else:
769 maxPID = maxPIDNS + 1
770 row = []
771 if self.sysc.isVSA() or self.sysc.isOSA():
772 dfsIDString = dfsIDString.partition('(')[0]
773 for column in tableSchema.columns:
774 if column.name == "programmeID":
775 row.append(maxPID)
776 elif column.name == "dfsIDString":
777 row.append(dfsIDString)
778 elif column.name == "title":
779 if self.sysc.isWSA():
780 if "LM" in dfsIDString.rpartition('/')[2]:
781 row.append("LM satellite %s" % dfsIDString)
782 else:
783 row.append("Non-survey programme %s" % dfsIDString)
784 else:
785 if dfsIDString.startswith('2'):
786 title = "DDT programme %s" % dfsIDString
787 elif dfsIDString.startswith('3'):
788 title = "Short programme %s" % dfsIDString
789 else:
790 title = "ESO programme %s" % dfsIDString
791 row.append(title)
792 elif column.name == "propPeriod":
793 row.append(dbc.nonSurveyProprietaryPeriod())
794 elif column.name == "pairingCriterion":
795 if self.sysc.isWSA() \
796 and "LM" in dfsIDString.rpartition('/')[2]:
797 row.append(column.getDefaultValue())
798 else:
799 row.append(dbc.neighbourJoinCriterion())
800 elif column.name == "adjacentFrameTolerance":
801 if self.sysc.isWSA() \
802 and "LM" in dfsIDString.rpartition('/')[2]:
803 row.append(column.getDefaultValue())
804 else:
805 row.append(dbc.adjacentFrameTolerance())
806 else:
807 row.append(column.getDefaultValue())
808
809 self.archive.insertData(tableSchema.name, row)
810 Logger.addMessage("Updated Programme table for %s: %s" %
811 (dfsIDString, maxPID))
812
813
814
816 """ Query Programme table and write programmes.csv file for C++ code.
817
818 @todo: It might make more sense to write the contents of
819 self._progIDofName to the programmes.csv, which includes the
820 merging of different types of programmes. This would save
821 some code duplication in exmeta.
822 """
823 progs = self.archive.query("dfsIDString, programmeID", "Programme")
824
825 if self.sysc.isWSA():
826 file(self.csvFileName, 'w').write('\n'.join("%s, %s" % prog
827 for prog in progs))
828 else:
829 progDict = dict(progs)
830 csvFile = File(self.csvFileName)
831 csvFile.wopen()
832
833 for fileName in progIDofFile:
834 csvFile.writetheline("%s, %s" % (
835 fileName, progIDofFile[fileName][1]))
836 for progID in self.knownSurveyTranslations:
837 csvFile.writetheline("%s, %s" % (
838 progID, progDict[self.knownSurveyTranslations[progID]]))
839 for prog in progs:
840 csvFile.writetheline("%s, %s" % prog)
841
842 csvFile.close()
843
844
845
876
877
878
880 """
881 Select files by programme list.
882 """
883
884 progOfFile = self.fileProgIdDict.copy()
885 progIDs = self._progIDs
886
887 for entry in self.programmeList:
888 if '==' in entry:
889 inProg, _eql, outProg = entry.rpartition('==')
890 inPID = self._progIDofName[inProg.upper()]
891 if self.sysc.isWSA():
892 outPID = self._progIDofName[
893 outProg.upper() if '/' in outProg
894 else "U/UKIDSS/%s" % outProg.upper()]
895 else:
896 outPID = self._progIDofName[outProg.upper()]
897 for fileName in progOfFile:
898 if progOfFile[fileName] == inPID:
899 progOfFile[fileName] = outPID
900 progIDs.discard(inPID)
901 progIDs.add(outPID)
902
903
904 self._progOrderRatedDict = self.getProcessOrder(
905 progIDs, include=self.programmeList)
906
907
908 fitsForIng = []
909 for imgName, catName in sorted(self._fileList):
910 if progOfFile[imgName][0] in self._progOrderRatedDict:
911 fitsForIng.append([imgName, catName])
912
913 file(self.procFileName, 'w').writelines(
914 ' '.join(line) + ' %s\n' % i
915 for i, line in enumerate(fitsForIng))
916
917 return fitsForIng
918
919
920
922 """ Get Multiframe IDs for processed dates.
923 """
924 Logger.addMessage(
925 "Preparing the file name to multiframe ID look-up table...")
926
927 dateList = sorted(set(os.path.basename(os.path.dirname(x[0]))
928 for x in self._fileList))
929
930 self.mfIDofFileOfDate = {}
931 for date in dateList:
932 self.mfIDofFileOfDate[date] = self.getMultiframeIDs(date)
933
934 for fileNames in self._fileList:
935 if fileNames[0] not in self.mfIDofFileOfDate[date]:
936 mfidName = dbc.multiframeUIDAttributeName()
937 if os.path.basename(fileNames[0]).startswith("e20") and \
938 self.sysc.isVSA() and \
939 self.fileProgIdDict[fileNames[0]][0] == 120:
940 fileMfID = max(self._getNextID(
941 mfidName, "FlatFileLookUp",
942 where="%s>1000000000000" % mfidName), 1000000000001)
943 else:
944 fileMfID = self._getNextID(
945 mfidName, "FlatFileLookUp",
946 where="%s<1000000000000" % mfidName)
947
948 self.archive.insertData("FlatFileLookUp",
949 (fileMfID, self._cuEventID, date, '%s%s' %
950 (self.sysc.pixelServerHostName, fileNames[0])))
951 self.archive.commitTransaction()
952
953
954 self.mfIDofFileOfDate[date] = self.getMultiframeIDs(date)
955
956
957
959 """set the variables dependend of the tables used in this cu
960 """
961
962 tableList = zip(utils.extractColumn(self._tableListPath, 1),
963 utils.extractColumn(self._tableListPath, 0))
964
965 self._ingestOrder = []
966 self._tableDict = {}
967 for table, schema in tableList:
968 self._ingestOrder.append(table)
969 self._tableDict[table] = dict(schema=schema,
970 ingfile="%s_%s.csv" % (self._outPrefix, table))
971
972
973
975 """Split the production frames from the general filelist. If update
976 is True, re-write the general file list as well.
977 """
978 fileNameList = []
979 prodFileName = os.path.join(self._workPath, "prodfilelist.dat")
980 fileNameList.append(prodFileName)
981 prodFile = File(prodFileName)
982 prodFile.wopen()
983 sciFileName = os.path.join(self._workPath, "scifilelist.dat")
984 fileNameList.append(sciFileName)
985 sciFile = File(sciFileName)
986 sciFile.wopen()
987 sfnum = 0
988 pfnum = 0
989 for entry in self._fileList:
990 outline = ' '.join(entry) + ' '
991 filebase = os.path.basename(entry[0])
992 if filebase.startswith(
993 (self.sysc.casuPrefix, self.sysc.wfauPrefix)):
994 sciFile.writetheline(outline + str(sfnum))
995 sfnum += 1
996 else:
997 prodFile.writetheline(outline + str(pfnum))
998 pfnum += 1
999 prodFile.close()
1000 sciFile.close()
1001
1002 if update:
1003 file(self._allFileName, 'w').writelines(
1004 ' '.join(line) + ' %s\n' % i
1005 for i, line in enumerate(self._fileList))
1006
1007 return prodFileName, pfnum
1008
1009
1010
1047
1048
1049
1050
1051 if __name__ == '__main__':
1052
1053 CLI.progArgs.append(CLI.Argument("xferlog", "xferlog.log"))
1054 CLI.progOpts += [
1055 CLI.Option('f', "forcecomp",
1056 "compress uncompressed FITS files during MfID writing"),
1057
1058 CLI.Option('m', "onlymfid",
1059 "only write MfIDs"),
1060
1061 CLI.Option('n', "nomfid",
1062 "don't write new MfID into FITS file"),
1063
1064 CLI.Option('o', "outpath",
1065 "directory where the csv file is written to",
1066 "PATH"),
1067
1068 CLI.Option('r', "redo",
1069 "enable overwriting of existing MfIDs"),
1070
1071 CLI.Option('s', "iswfauprod",
1072 "subdir is in products"),
1073
1074 CLI.Option('K', "keepwork",
1075 "Don't remove working dir."),
1076
1077 CLI.Option('P', "programmes",
1078 "only process data for given programmes (accepts keywords "
1079 "'all', 'ns' (non-survey), 'ukidss' (all 5 main surveys); "
1080 "one programme suffixed with 'x-' excludes this programme.)",
1081 "LIST", "all"),
1082
1083 CLI.Option('X', "exclude",
1084 "exclude given files/file patterns from processing",
1085 "LIST", '')]
1086
1087 cli = CLI(Cu3, "$Revision: 10247 $")
1088 Logger.addMessage(cli.getProgDetails())
1089
1090 cu3 = Cu3(cli.getOpt("curator"),
1091 cli.getArg("database"),
1092 cli.getOpt("outpath"),
1093 cli.getOpt("programmes"),
1094 cli.getOpt("nomfid"),
1095 cli.getOpt("redo"),
1096 cli.getOpt("onlymfid"),
1097 cli.getOpt("exclude"),
1098 cli.getOpt("test"),
1099 cli.getOpt("keepwork"),
1100 cli.getOpt("forcecomp"),
1101 cli.getOpt("iswfauprod"),
1102 cli.getArg("xferlog"),
1103 cli.getArg("comment"))
1104 cu3.run()
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180