1
2
3
4 """
5 Prepare database for CU4 ingests following a completed batch of CU3 ingests
6 (any arbitrary set, say, a month or semester of data). It combines the
7 following tasks:
8 - Copy the illumination tables from CASU to WFAU.
9 - helpers/UpdateIllumEntries.py (optionally, normally done by CU4)
10 - helpers/ProvenanceFiller.py
11 - helpers/SetupProgramme.py
12
13 @author: Eckhard Sutorius
14 @org: WFAU, IfA, University of Edinburgh
15 """
16
17 from __future__ import division, print_function
18 from collections import defaultdict, namedtuple
19 import multiprocessing
20 from operator import attrgetter
21 from itertools import groupby
22 import os
23 import time
24
25 from invocations.cu1.cu1Transfer import CASUQueries
26 from wsatools.CLI import CLI
27 import wsatools.DbConnect.CommonQueries as queries
28 from wsatools.DbConnect.CuSession import CuSession
29 from wsatools.DbConnect.DbSession import DbSession, Join, SelectSQL
30 import wsatools.FitsUtils as fits
31 from wsatools.Logger import Logger, ForLoopMonitor
32 from wsatools.ProgrammeBuilder import ProgrammeBuilder
33 from wsatools.FitsToDb import ProvenanceFiller
34 from wsatools.SystemConstants import DepCodes, SystemConstants
35 import wsatools.Utilities as utils
36
37
39 """ Utility class for calculating properties from the Provenance.
40 """
41
42
43
44 FrameInfo = namedtuple("FrameInfo", "mjdObs detNDit detDit frameType")
45
46
47
48
49 isVerbose = False
50
51 archive = None
52
53 reDo = False
54
55 updateCats = False
56
57
58
59 - def run(self, archive, semesterList=None, version=None, updateCats=False,
60 numThreads=1):
61 """ Updates OB pawprints and MetaData tables with data calculated
62 from tracing back the history of the file through all normal obs.
63 """
64 self.archive = archive
65
66 self.updateCats = updateCats
67 if not self.archive.sysc.isVSA():
68 raise FileHistoryDataFiller.CuError(
69 "This should only be run on the VSA")
70 Logger.isVerbose = self.isVerbose
71 startDate, endDate = self.archive.sysc.obsCal.getDatesFromInput(
72 semesterList[0], semesterList[-1])
73 self.updateTotalExpTime(startDate, endDate)
74
75 frameTypeSel = queries.getFrameSelection('stack', noDeeps=True)
76
77 if self.updateCats:
78 Logger.addMessage("Updating FITS files where the DB "
79 "has already been updated.")
80 records = self.selectExistingDbData(version, startDate,
81 endDate)
82 if records:
83 Logger.addMessage("Updating catalogue files...")
84 self.updateFitsP(records, numThreads)
85
86
87 Logger.addMessage("Selecting OB pawprints that haven't already been "
88 "processed...")
89 redoSel = ('' if self.reDo else "mjdMean<0 AND ")
90 extInfo = self.archive.query(
91 selectStr="D.multiframeID, D.extNum, M.fileName, M.frameType, "
92 "M.confID, "
93 "M.mjdObs, E.detNDit, E.detDit, D.deprecated",
94 fromStr="Multiframe AS M, MultiframeDetector AS D, "
95 "FlatFileLookUp AS F, MultiframeEsoKeys AS E",
96 whereStr="D.multiframeID=M.multiframeID AND "
97 "F.multiframeID=M.multiframeID AND "
98 "E.multiframeID=M.multiframeID AND "
99 "dateVersStr BETWEEN '%s_v%s' AND '%s_v%s' AND "
100 "%s D.%s AND (%s OR "
101 "frameType LIKE 'normal' OR frameType LIKE '%%conf') " % (
102 startDate, version, endDate, version,
103 redoSel, DepCodes.selectNonDeprecated, frameTypeSel))
104 recordDict = defaultdict(list)
105 frameInfoDict = defaultdict(list)
106 depInfoDict = defaultdict(list)
107 for entry in extInfo:
108
109
110 if "stack" in entry.frameType and not 'conf' in entry.frameType:
111 recordDict[(entry.multiframeID, entry.confID, entry.fileName)
112 ].append((entry.extNum, entry.deprecated))
113
114
115 if entry.multiframeID not in frameInfoDict:
116 frameInfoDict[entry.multiframeID] = \
117 FileHistoryDataFiller.FrameInfo(
118 entry.mjdObs, entry.detNDit, entry.detDit, entry.frameType)
119
120
121 depInfoDict[(entry.multiframeID, entry.extNum)] = \
122 entry.deprecated
123 self._mjdDict = {}
124 noNewFound = len(recordDict)
125 if recordDict:
126 Logger.addMessage("Started calculating new MJD values "
127 "for stacks...")
128 else:
129 Logger.addMessage("No new stacks found...")
130 progress = ForLoopMonitor(recordDict)
131
132
133 frameTypeSel = "m.frameType='stack'"
134 fromStr = "Multiframe as m"
135
136
137 whereStr = ("%s and m.multiframeID=vn.combiframeID"
138 % (frameTypeSel))
139
140 mjdObsDict = dict((mfID, (mjdObs, mjdEnd)) for mfID, mjdObs, mjdEnd in self.archive.query(
141 selectStr="m.multiframeID,MIN(mn.mjdObs) as mnMjdObs,"
142 "MAX(mn.mjdObs+(%s/(24.*3600.))) as mxMjdEnd"
143 % (("e.detNdit*e.detDit")
144 if self.sysc.isVSA() else ("mn.expTime")),
145 fromStr=fromStr + ",Provenance as vn,Multiframe as mn%s,FlatFileLookup as f" %
146 (",MultiframeEsoKeys as e" if self.sysc.isVSA() else ""),
147 whereStr=whereStr + " and vn.multiframeID=mn.multiframeID%s "
148 " AND f.dateVersStr BETWEEN '%s_v%s' AND '%s_v%s' AND m.%s AND "
149 "f.multiframeID=m.multiframeID" %
150 (" and mn.multiframeID=e.multiframeID" if self.sysc.isVSA() else "",
151 startDate, version, endDate, version, DepCodes.selectNonDeprecated),
152 groupBy="m.multiframeID"))
153
154 mjdMeanDict = {}
155
156 for mfID, group in groupby(self.archive.query(
157 selectStr="d.multiframeID,d.extNum,AVG(mn.mjdObs+(0.5*%s/(24.*3600.))) as meanMjd"
158 % (("e.detNdit*e.detDit") if self.sysc.isVSA() else ("mn.expTime")),
159 fromStr=fromStr + ",Provenance as vn,Multiframe as mn%s,"
160 "MultiframeDetector as d,FlatFileLookup as f,MultiframeDetector as dn" %
161 (",MultiframeEsoKeys as e" if self.sysc.isVSA() else ""),
162 whereStr=whereStr + " and vn.multiframeID=mn.multiframeID%s"
163 " AND d.multiframeID=m.multiframeID and dn.multiframeID=mn.multiframeID "
164 " AND dn.extNum=d.extNum AND dn.%s"
165 " AND f.dateVersStr BETWEEN '%s_v%s' AND '%s_v%s' AND d.%s m.%s AND "
166 "f.multiframeID=m.multiframeID" %
167 (" and mn.multiframeID=e.multiframeID" if self.sysc.isVSA() else "",
168 DepCodes.selectNonDeprecated, startDate, version, endDate, version, redoSel, DepCodes.selectNonDeprecated),
169 groupBy="d.multiframeID,d.extNum", orderBy="d.multiframeID"), key=attrgetter("multiframeID")):
170 mjdMeanDict[mfID] = dict((extNum, mjdMean) for _mID, extNum, mjdMean in group)
171
172 for (mfID, cfID, filePathName) in recordDict:
173
174 Logger.addMessage("Calculating new values for %d from Provenance "
175 "information" % mfID, alwaysLog=False)
176
177
178 mjdObs = mjdObsDict[mfID][0] if mfID in mjdObsDict else None
179 mjdEnd = mjdObsDict[mfID][1] if mfID in mjdObsDict else None
180 extDict = mjdMeanDict[mfID] if mfID in mjdMeanDict else None
181 if mjdObs:
182 self._mjdDict[mfID] = (mjdObs, mjdEnd, None, extDict)
183
184 Logger.addMessage("Updating DB and FITS files for mfID %d, "
185 "fileName %s " % (mfID, filePathName),
186 alwaysLog=False)
187 self.updateDb(mfID, cfID)
188 progress.testForOutput()
189
190
191 if recordDict:
192 Logger.addMessage("Updating catalogue files...")
193 self.updateFitsP(recordDict.keys(), numThreads)
194
195 Logger.addMessage("Selecting OB tiles that haven't already been "
196 "processed...")
197 frameTypeSel = queries.getFrameSelection('tile', noDeeps=True)
198 redoSel = ('' if self.reDo else "totalExpTimeSum<0 AND ")
199 recordList = self.archive.query(
200 selectStr="M.multiframeID, M.confID, M.fileName",
201 fromStr="Multiframe AS M, FlatFileLookUp AS F",
202 whereStr="F.multiframeID=M.multiframeID AND "
203 "dateVersStr BETWEEN '%s_v%s' AND '%s_v%s' AND "
204 "%s AND %s %s " % (
205 startDate, version, endDate, version,
206 frameTypeSel, redoSel, DepCodes.selectNonDeprecated))
207
208 self._mjdDict = {}
209 noNewFound += len(recordList)
210
211 if recordList:
212 Logger.addMessage("Started calculating new MJD/totalExpTime values"
213 " for tiles...")
214 else:
215 Logger.addMessage("No new tiles found...")
216 frameTypeSel = "m.frameType='tilestack'"
217 fromStr = "Multiframe as m, Provenance as v"
218
219 whereStr = ("%s and m.multiframeID=v.combiframeID and v.multiframeID="
220 % (frameTypeSel))
221
222 mjdObsDict = dict((mfID, (mjdObs, mjdEnd)) for mfID, mjdObs, mjdEnd in self.archive.query(
223 selectStr="m.multiframeID,MIN(mn.mjdObs) as mnMjdObs,"
224 "MAX(mn.mjdObs+(%s/(24.*3600.))) as mxMjdEnd"
225 % (("e.detNdit*e.detDit")
226 if self.sysc.isVSA() else ("mn.expTime")),
227 fromStr=fromStr + ",Provenance as vn,Multiframe as mn%s,FlatFileLookup as f" %
228 (",MultiframeEsoKeys as e" if self.sysc.isVSA() else ""),
229 whereStr=whereStr + "vn.combiframeID and vn.multiframeID=mn.multiframeID%s "
230 " AND f.dateVersStr BETWEEN '%s_v%s' AND '%s_v%s' AND m.%s AND "
231 "f.multiframeID=m.multiframeID" %
232 (" and mn.multiframeID=e.multiframeID" if self.sysc.isVSA() else "",
233 startDate, version, endDate, version, DepCodes.selectNonDeprecated),
234 groupBy="m.multiframeID"))
235
236
237 totExpTimeSumDict = dict(self.archive.query(
238 "md.multiframeID,SUM(mdo.totalExpTime) as tExpSum",
239 "Multiframe as m,MultiframeDetector as md,Provenance as v, "
240 "Multiframe as mo,MultiframeDetector as mdo, CurrentAstrometry as c",
241 "%s and m.multiframeID=v.combiframeID and md.multiframeID="
242 "c.multiframeID and md.extNum=c.extNum and "
243 "v.multiframeID=mo.multiframeID and mo.multiframeID="
244 "mdo.multiframeID and m.multiframeID=md.multiframeID and "
245 "mdo.%s and m.%s"
246 "group by md.multiframeID"
247 % (frameTypeSel, DepCodes.selectNonDeprecated, DepCodes.selectNonDeprecated)))
248 progress = ForLoopMonitor(recordList)
249 for mfID, cfID, filePathName in recordList:
250
251 mjdObs = mjdObsDict[mfID][0] if mfID in mjdObsDict else None
252 mjdEnd = mjdObsDict[mfID][1] if mfID in mjdObsDict else None
253 totalExpTimeSum = totExpTimeSumDict[mfID] if mfID in totExpTimeSumDict else None
254 extDict = {}
255 if mjdObs:
256 self._mjdDict[mfID] = (mjdObs, mjdEnd, totalExpTimeSum, extDict)
257
258
259 Logger.addMessage("Updating DB and FITS files for mfID %d, "
260 "fileName %s " % (mfID, filePathName),
261 alwaysLog=False)
262 self.updateDb(mfID, cfID)
263 progress.testForOutput()
264 if recordList:
265 Logger.addMessage("Updating catalogue files...")
266 self.updateFitsP(recordList, numThreads)
267 if (noNewFound == 0):
268 self._success = True
269
270
271 - def updateTotalExpTime(self, startDate, endDate):
272 """ Update total exposure time of OB images
273 """
274
275 eqnDict = defaultdict(dict)
276
277 eqnDictVSA = {
278 'stack':"SUM(e.detDit * e.detNdit)",
279 'tile':"SUM(e.detDit*e.detNdit)/3."}
280 eqnDictWSA = {
281 'stack':"SUM(m.expTime)"
282 }
283 eqnDict['WSA'] = eqnDictWSA
284 eqnDict['OSA'] = eqnDictWSA
285 eqnDict['VSA'] = eqnDictVSA
286
287 frameTypeDict = defaultdict(list)
288 frameTypeDict['VSA'] = ['stack', 'tile']
289 frameTypeDict['WSA'] = ['stack']
290 frameTypeDict['OSA'] = ['stack']
291 EPS = 0.0001
292
293 for frameType in frameTypeDict[self.sysc.loadDatabase]:
294 frameTypeSel = queries.getFrameSelection(frameType, noDeeps=True, alias='m', selType='%stack')
295 selectStr = "d.multiframeID, d.extNum, %s as tExp" % eqnDict[self.sysc.loadDatabase][frameType]
296 fromStr = "MultiframeDetector as d, Multiframe as m, Provenance as v,"
297 fromStr += "Multiframe as mn"
298 groupBy = ("d.multiframeID, d.extNum, totalExpTime having (d.totalExpTime-%s)>%s"
299 % (eqnDict[self.sysc.loadDatabase][frameType], EPS))
300 wbcls = "v"
301 if frameType == 'tile':
302 fromStr += ", Provenance as v2"
303 wbcls = "v.multiframeID=v2.combiframeID and v2"
304 ekcls = ""
305 if self.sysc.loadDatabase == 'VSA':
306 fromStr += ", MultiframeEsokeys as e"
307 ekcls = " and mn.multiframeID=e.multiframeID"
308 whereStr = ("%s and m.multiframeID=d.multiframeID and "
309 "m.multiframeID=v.combiframeID and "
310 "%s.multiframeID=mn.multiframeID and "
311 "m.utDate BETWEEN '%s' AND '%s'%s"
312 % (frameTypeSel, wbcls, startDate, endDate, ekcls))
313
314 corrTexpSel = SelectSQL(selectStr, fromStr, whereStr, groupBy=groupBy)
315
316 total = self.archive.update("MultiframeDetector",
317 entryList=[("totalExpTime", "T.tExp")],
318 fromTables="(%s) AS T" % corrTexpSel,
319 where="T.multiframeID=MultiframeDetector.multiframeID "
320 "and T.extNum=MultiframeDetector.extNum")
321 self.archive.commitTransaction()
322 Logger.addMessage("Updated %s %s totalExpTimes in MultiframeDetector"
323 % (total, frameType))
324
325
326
327
328 - def selectExistingDbData(self, version, startDate, endDate):
329 """
330 Select data from database where non-default values already exist.
331
332 @param version:
333 @type version:
334 @param startDate:
335 @type startDate:
336 @param endDate:
337 @type endDate:
338 """
339
340
341 frameTypeSel = queries.getFrameSelection('stack', noDeeps=True)
342 extInfo = self.archive.query(
343 selectStr="D.multiframeID, D.extNum, M.fileName,"
344 "M.confID, M.mjdObs, M.mjdEnd,D.mjdMean",
345 fromStr="Multiframe AS M, MultiframeDetector AS D, "
346 "FlatFileLookUp AS F, MultiframeEsoKeys AS E",
347 whereStr="D.multiframeID=M.multiframeID AND "
348 "F.multiframeID=M.multiframeID AND "
349 "E.multiframeID=M.multiframeID AND "
350 "dateVersStr BETWEEN '%s_v%s' AND '%s_v%s' AND "
351 "mjdMean>0 AND D.%s AND %s ORDER BY "
352 "D.multiframeID, D.extNum" % (
353 startDate, version, endDate, version,
354 DepCodes.selectNonDeprecated, frameTypeSel))
355
356
357 recordDict = defaultdict(list)
358 mjdTimeDict = defaultdict(tuple)
359 for entry in extInfo:
360 recordDict[(entry.multiframeID, entry.confID, entry.fileName)
361 ].append((entry.extNum, entry.mjdMean))
362 mjdTimeDict[entry.multiframeID] = (entry.mjdObs, entry.mjdEnd)
363 self._mjdDict = {}
364 for (mfID, cnfID, fileName) in recordDict:
365 extDict = defaultdict()
366 for extNum, mjdMean in recordDict[(mfID, cnfID, fileName)]:
367 extDict[extNum] = mjdMean
368 mjdObs, mjdEnd = mjdTimeDict[mfID]
369 self._mjdDict[mfID] = (mjdObs, mjdEnd, None, extDict)
370 return recordDict
371
372
373
374 - def updateDb(self, mfID, cfID):
375 """
376 Updates DB with MEANMJD values
377
378 @param mfID: MultiframeID of combiframe for which provenan.
379 @type mfID: int
380 @param cfID: MultiframeID of confidence frame.
381 @type cfID: int
382
383 """
384 if mfID in self._mjdDict:
385 mjdObs, mjdEnd, tExpSum, extDict = self._mjdDict[mfID]
386
387 entryList = [("mjdObs", mjdObs), ("mjdEnd", mjdEnd)]
388 if tExpSum:
389 entryList += [("totalExpTimeSum", tExpSum)]
390 self.archive.update("Multiframe",
391 entryList,
392 where="multiframeID IN (%d,%d)" % (mfID, cfID))
393
394 for extNum in extDict:
395 meanMjd = extDict[extNum]
396
397 self.archive.update("MultiframeDetector",
398 [("mjdMean", meanMjd)],
399 where="multiframeID=%d AND extNum=%d" %
400 (mfID, extNum))
401
402
403
404 - def updateFits(self, mfID, filePathName):
405 """
406 Updates DB and catalogue headers with MEANMJD values
407 @todo: when is this used
408
409 @param mfID: MultiframeID of combiframe for which provenan.
410 @type mfID: int
411 @param filePathName: Full path to the FITS file.
412 @type filePathName: str
413
414 """
415 if mfID in self._mjdDict:
416 _mjdObs, _mjdEnd, _tExpSum, extDict = self._mjdDict[mfID]
417
418 catFileName = fits.getCatalogue(filePathName)
419
420 modtime = time.strptime(time.ctime(os.stat(catFileName).st_mtime))
421 fileTimestamp = time.strftime("%y%m%d%H%M%S", modtime)
422
423
424 catFile = fits.open(catFileName, 'update')
425 for extNum in extDict:
426 meanMjd = extDict[extNum]
427
428
429 catFile[extNum - 1].header.update("MEANMJD", meanMjd,
430 "Mean MJDobs of extension")
431 catFile.close()
432
433
434 timetpl = time.strptime(fileTimestamp, '%y%m%d%H%M%S')
435 modtime = time.mktime(timetpl)
436 os.utime(catFileName, (time.time(), modtime))
437
438
439
440 - def updateFitsP(self, records, numThreads):
441 """
442 Updates DB and catalogue headers with MEANMJD values
443
444 @param records: List of mfID, cfID, filePathName
445 @type records: list
446 @param numThreads: Number of threads used in multiprocessing
447 @type numThreads: int
448
449 """
450 inputs = []
451 for mfID, _cfID, filePathName in records:
452 if mfID in self._mjdDict:
453 _mjdObs, _mjdEnd, _tExpSum, extDict = self._mjdDict[mfID]
454 catFileName = fits.getCatalogue(filePathName)
455 if os.path.exists(catFileName):
456 inputs.append((catFileName, extDict))
457
458 UpdateFits.numprocesses = numThreads
459 c = UpdateFits()
460 c.pooling(inputs)
461
462
463
466
467
468
471
472
473
475
476
477
478
479 numprocesses = int(0.6 * multiprocessing.cpu_count())
480
481
482
514
515
516
518 """
519 Update catalogues.
520 @param args: Arguments for updateCatalogueFile.
521 @type args: args
522
523 """
524 result = self.updateCatalogueFile(*args)
525 return '%s(%s) = %s' % (self.updateCatalogueFile.__name__, args,
526 result)
527
528
529
531 """
532 Update the catalogue file.
533
534 @param catFileName: Full path to the FITS file.
535 @type catFileName: str
536 @param extDict: Dictionary of extension numbers and mean MJDs.
537 @type extDict: dict
538
539 """
540
541
542 modtime = time.strptime(time.ctime(os.stat(catFileName).st_mtime))
543 fileTimestamp = time.strftime("%y%m%d%H%M%S", modtime)
544
545
546 catFile = fits.open(catFileName, 'update')
547 for extNum in extDict:
548 meanMjd = extDict[extNum]
549
550
551 catFile[extNum - 1].header.update("MEANMJD", meanMjd,
552 "Mean MJDobs of extension")
553 catFile.close()
554
555
556 timetpl = time.strptime(fileTimestamp, '%y%m%d%H%M%S')
557 modtime = time.mktime(timetpl)
558 os.utime(catFileName, (time.time(), modtime))
559
560
561
562
568
569
570
572 """
573 Removes incorrect rows from Provenance due to reprocessing deprecations.
574
575 """
576 Logger.addMessage("Deleted %s deprecated rows in Provenance"
577 % archive.delete("Provenance", "multiframeID IN (%s)"
578 % SelectSQL("Old.multiframeID",
579 table="Provenance AS P, Multiframe AS Old, Multiframe AS New",
580 where="P.multiframeID=Old.multiframeID AND "
581 "P.combiframeID=New.multiframeID AND Old.deprecated >= %s"
582 " AND New.%s" % (DepCodes.reprocCASU,
583 DepCodes.selectNonDeprecated))))
584
585
586
628
629
630
632 """
633 Update the database entries for illumFile.
634 @param archive: The DB connection.
635 @type archive: DBSession object
636 @param sysc: Initialised SystemConstants object.
637 @type sysc: SystemConstants
638 @param cal: The observation calendar.
639 @type cal: ObsCalendar object
640 @param dateStr: The month or day for which data is updated.
641 @type dateStr: str
642 @param versStr: The version for which data is updated.
643 @type versStr: str
644
645 """
646 filterDict = {"WSA": ['J', 'H', 'K', 'Y', 'Z'],
647 "VSA": ['J', 'H', 'Ks', 'Y', 'Z'],
648 "OSA": []}
649 illumList = archive.query(
650 selectStr="Multiframe.multiframeID, filterName",
651 fromStr=Join(["Multiframe", "FlatFileLookUp"], ["multiframeID"]),
652 whereStr=''.join(["illumFile like 'NONE'",
653 " AND catName not like '%empty_catalogue.fits'",
654 " AND catName not like 'NONE'",
655 " AND dateVersStr like '%s_v%s'" % (dateStr,
656 versStr)]))
657 counter = 0
658 missingIllumFiles = set()
659 for mfID, filterName in illumList:
660 if filterName in filterDict[sysc.loadDatabase]:
661 illumDate = (cal.checkDate(dateStr)[-3:].lower() if sysc.isWSA()
662 else dateStr[:4])
663 illumFileName = '_'.join([
664 "illum", illumDate, cal.getMonth(dateStr).lower(), filterName])
665 if os.path.exists(os.path.join(sysc.illuminationDir(),
666 "%s.table" % illumFileName)):
667 counter += archive.update(
668 "Multiframe", "illumFile='%s.table'" % illumFileName,
669 where="multiframeID=%s" % mfID)
670 else:
671 missingIllumFiles.add(illumFileName)
672 Logger.addMessage("%s entries updated." % counter)
673 return missingIllumFiles
674
675
676
677
678
679 if __name__ == "__main__":
680 CLI.progOpts += [
681 CLI.Option('S', "semester",
682 "Run illum update on this semester list (or 'all')", "LIST"),
683 CLI.Option('U', "update_illum",
684 "Update illum table entries"),
685 CLI.Option('V', "version", "version number of the data", "STR"),
686 CLI.Option('b', "updateCats", "Update the mean mjd in FITS catalogues"
687 " only"),
688 CLI.Option('f', "set_flag",
689 "Set a flag that this code has been run."),
690 CLI.Option('j', "janet",
691 "Use JANET instead of UKLight"),
692 CLI.Option('m', "updateMeanMjd",
693 "Update the mean MJD of OB pawprints"),
694 CLI.Option('p', "provenance",
695 "Update provenance"),
696 CLI.Option('s', "setup",
697 "Setup the programme requirements"),
698 CLI.Option('x', "transfer_illum",
699 "Transfer illum tables"),
700 CLI.Option('y', "threads",
701 "Number of threads used in mean MJD udate",
702 "INT", int(0.6 * multiprocessing.cpu_count()))
703 ]
704
705 cli = CLI("FinaliseFlatFileIngest", "$Revision: 10129 $", __doc__)
706 database = cli.getArg("database")
707 Logger.isVerbose = False
708 log = Logger("%s_FFLI_%s.log" % (database.split('.')[-1],
709 utils.makeTimeStamp().replace(' ', '_')))
710
711 Logger.addMessage(cli.getProgDetails())
712 sysc = SystemConstants(database.split('.')[-1])
713 obsCal = sysc.obsCal
714
715 if not cli.getOpt("semester"):
716 semesterList = obsCal.getSemList()[-3:]
717 elif cli.getOpt("semester").lower() == "all":
718 semesterList = obsCal.getSemList()
719 else:
720 semesterList = cli.getOpt("semester").split(',')
721
722 version = (cli.getOpt("version") or
723 str(obsCal.maxVersOfDate(obsCal.getDates(semesterList[0],
724 "%Y%m%d")[0])))
725
726 doAll = not any([cli.getOpt("setup"), cli.getOpt("provenance"),
727 cli.getOpt("transfer_illum"), cli.getOpt("update_illum"),
728 cli.getOpt("set_flag"), cli.getOpt("updateMeanMjd")])
729
730
731
732 if (doAll or cli.getOpt("transfer_illum")) and not sysc.isOSA():
733 if os.getenv('HOST') == 'menkaure':
734 transferIllumTables(sysc, isUKLight=not cli.getOpt("janet"),
735 isTrialRun=cli.getOpt("test"))
736 else:
737 Logger.addMessage("<WARNING> CASU connection is only available on"
738 " menkaure!")
739
740 if doAll or cli.getOpt("update_illum") or cli.getOpt("set_flag"):
741 fitsDirs = fits.FitsList(sysc, prefix="cu0_")
742 fitsDirs.createFitsDateDict()
743
744 if (doAll or cli.getOpt("update_illum")) and not sysc.isOSA():
745 missingIllumFiles = set()
746 try:
747
748 archive = DbSession(database, False, cli.getOpt("test"),
749 cli.getOpt("user"))
750
751 for dateVersStr in sorted(fitsDirs.invFitsDateDict):
752 dateStr, versStr = dateVersStr.partition("_v")[::2]
753 usedVersion = cli.getOpt("version") or \
754 str(obsCal.maxVersOfDate(dateStr))
755 if obsCal.getDates(semesterList[0], "%Y%m%d")[0] <= dateStr <= \
756 obsCal.getDates(semesterList[-1], "%Y%m%d")[1] \
757 and usedVersion == versStr:
758 Logger.addMessage(
759 "Updating illumination table entries"
760 " in %s for %s (%s v.%s)" % (
761 database, obsCal.formatDate(
762 dateStr, outFormat="%Y-%m-%d"),
763 obsCal.checkDate(dateStr), versStr))
764 missingIllumFiles.update(updateIllumEntries(
765 archive, sysc, obsCal, dateStr, versStr))
766
767
768
769
770
771 if missingIllumFiles:
772 Logger.addMessage(
773 "<WARNING> The following illumination tables do not "
774 "exist, please upload from CASU: %s" %
775 ", ".join(sorted(missingIllumFiles)))
776 del archive
777 except Exception, details:
778 Logger.addExceptionMessage(details)
779 raise
780
781 if doAll or cli.getOpt("provenance"):
782 removeIncorrectProvenance(DbSession(autoCommit=True, cli=cli))
783 cli.resetOptArg("comment", CuSession.comment)
784 Logger.addMessage(cli.getProgDetails("ProvenanceFiller"))
785 ProvenanceFiller(cli=cli).run()
786
787 if doAll or cli.getOpt("setup"):
788 cli.resetOptArg("comment", CuSession.comment)
789 Logger.addMessage(cli.getProgDetails("ProgrammeBuilder"))
790 ProgrammeBuilder(cli=cli).run()
791
792 if doAll or cli.getOpt("set_flag"):
793 Logger.addMessage("Setting flags that this program has run...")
794 for dateVersStr in sorted(fitsDirs.invFitsDateDict):
795 try:
796 dateDirPath = os.path.join(
797 fitsDirs.invFitsDateDict[dateVersStr], dateVersStr)
798 except AttributeError:
799 Logger.addMessage("<ERROR> for %s in %r" % (
800 dateVersStr, fitsDirs.invFitsDateDict[dateVersStr]))
801 for dateVStr in fitsDirs.invFitsDateDict:
802 if len(fitsDirs.invFitsDateDict[dateVStr]) > 1:
803 Logger.addMessage(
804 "The data for %s is in more than 1 directory %r" % (
805 dateVStr,
806 fitsDirs.invFitsDateDict[dateVStr]))
807 raise SystemExit
808
809
810
811 dbName = database.split('.')[-1]
812 cu03edFlag = os.path.join(dateDirPath, "CU03ED_%s" % dbName)
813 fffiedFlag = os.path.join(dateDirPath, "FFFIED_%s" % dbName)
814 if os.path.exists(cu03edFlag) and not os.path.exists(fffiedFlag):
815 Logger.addMessage("wrote %s" % fffiedFlag)
816 file(fffiedFlag, 'w').write(utils.makeTimeStamp() + '\n')
817
818 if sysc.isVSA() and (doAll or cli.getOpt("updateMeanMjd")):
819 cli.resetOptArg("comment", CuSession.comment)
820
821 Logger.addMessage(cli.getProgDetails("FileHistoryDataFiller"))
822
823
824 archive = DbSession(database, False, cli.getOpt("test"),
825 cli.getOpt("user"))
826 archive.enableDirtyRead()
827 FileHistoryDataFiller(cli=cli).run(archive, semesterList, version,
828 cli.getOpt("updateCats"), int(cli.getOpt("threads")))
829
830
831
832
833
834
835
836