1
2
3
4 """
5 Invoke CU5. Create library H2-K difference image frame products.
6
7 @author: E. Sutorius
8 @org: WFAU, IfA, University of Edinburgh
9
10 @newfield contributors: Contributors, Contributors (Alphabetical Order)
11 @contributors: R.S. Collins
12 """
13
14 from collections import defaultdict
15 import dircache
16 import mx.DateTime as mxdt
17 import os
18 import shutil
19
20 import wsatools.Astrometry as astro
21 from wsatools.CLI import CLI
22 from wsatools.DbConnect.CuSession import CuSession
23 import wsatools.DbConnect.DbConstants as dbc
24 from wsatools.DbConnect.DbSession import Join
25 import wsatools.ExternalProcess as extp
26 from wsatools.File import File
27 import wsatools.FitsUtils as fits
28 from wsatools.Logger import Logger, ForLoopMonitor
29 import wsatools.Utilities as utils
30
31
32 -class Cu5(CuSession):
33 """ Create library H2-K difference image frame products for given database
34 release number.
35 """
36
37
38 cuNum = 5
39 _autoCommit = True
40 _reqCuEventID = True
41 _useWorkDir = True
42
43
44
45
46
47
48 dateRange = CuSession.sysc.obsCal.dateRange()
49
50 releaseNum = None
51
52 ignoreLastRun = False
53
54 resumeID = 0
55
56
57
59 """ Do CU5. """
60
61 if self.resumeID > 0:
62 Logger.addMessage("Determining provenance info for CU5 run %s..."
63 % self.resumeID)
64
65 provInfoDict, timeStamp, outputList, counterStart = \
66 self._getPrevCU5Info()
67 else:
68 timeStamp = mxdt.utc().strftime("%Y%m%d")
69 outputList = []
70 counterStart = 0
71
72
73 diffDir = (self.sysc.testOutputPath() if not self.archive.isRealRun else
74 utils.getNextDisk(self.sysc, preAllocMem=500))
75
76 diffDir = os.path.join(diffDir, self.sysc.diffDir)
77 outFilePath = \
78 os.path.join(diffDir, '%s_v%s' % (timeStamp, self.releaseNum))
79
80 nanFilePath = os.path.join(os.path.dirname(outFilePath), "nan")
81
82 Logger.addMessage(
83 "Difference images will be created in: " + outFilePath)
84
85 Logger.addMessage(
86 "Broken files with NaN errors will go here: " + nanFilePath)
87
88 lastRun = (self._queryLastRuns() if not self.ignoreLastRun else dict())
89
90
91 for progID, h2ID, kID, wcs, method in self.archive.query(
92 "programmeID, filter1, filter2, wcs, method", "RequiredDiffImage"):
93
94 self.programmeID.add(progID)
95 self.programme.setCurRow(programmeID=progID)
96
97 Logger.addMessage("Determining list of new images for programme %s"
98 "..." % self.programme.getAcronym(progID).upper())
99
100
101 h2Images = self._getImageList(h2ID, progID, lastRun.get(progID))
102 kImages = self._getImageList(kID, progID, lastRun.get(progID))
103 Logger.addMessage("found %s H2 and %s K images." %
104 (len(h2Images), len(kImages)))
105
106 if not h2Images or not kImages:
107 Logger.addMessage("<Info> No difference images require "
108 "curation for programme "
109 + self.programme.getAcronym(progID).upper())
110 continue
111
112 Logger.addMessage("Pairing images...")
113 pairDict, numPairs = self._pairImages(h2Images, kImages)
114 Logger.addMessage("found %s pairs in total, using %s "
115 "time-wise nearest pairs." % (numPairs, len(pairDict)))
116
117 Logger.addMessage("Creating difference images...")
118
119 if self.resumeID > 0:
120 newDict = {}
121 for h2Entry in pairDict.itervalues():
122 h2Img, h2Cat, h2Conf, kImg, kCat, kConf = h2Entry[0]
123 h2ImgFile = File(h2Img)
124 kImgFile = File(kImg)
125 if not [os.path.join(h2ImgFile.subdir, h2ImgFile.base),
126 os.path.join(kImgFile.subdir, kImgFile.base)] \
127 in provInfoDict.values():
128
129 newDict[h2Img] = h2Entry
130
131 pairDict = newDict
132 Logger.addMessage("Rerun on %s pairs." % len(pairDict))
133
134 utils.ensureDirExist(diffDir)
135 utils.ensureDirExist(outFilePath)
136 utils.ensureDirExist(nanFilePath)
137
138
139 brokenFiles = []
140 progress = ForLoopMonitor(pairDict)
141 for counter, h2Entry in enumerate(pairDict.itervalues()):
142 h2Img, h2Cat, h2Conf, kImg, kCat, kConf = h2Entry[0]
143 h2ImgID = os.path.basename(h2Img).split('_')[1]
144 kImgID = os.path.basename(kImg).split('_')[1]
145 outputPathName = os.path.join(outFilePath,
146 'e%s_%05u_%s_%s_diff.fit' %
147 (timeStamp, counterStart + counter + 1, h2ImgID, kImgID))
148
149 imgFiles = '%s,%s' % (h2Img, kImg)
150 catFiles = '%s,%s' % (h2Cat, kCat)
151 if self.sysc.emptyFitsCatalogueFileName in catFiles.lower():
152 catFiles = "nocats"
153
154
155 confMapFiles, confPathName = (
156 ('%s,%s' % (h2Conf, kConf), fits.getConfMap(outputPathName))
157 if h2Conf and kConf else ('', ''))
158
159 newPairs = []
160 cmd = "fitsio_subtract %s %s wcs=%s %s %s %s %s" % (imgFiles,
161 catFiles, wcs, outputPathName, method, confMapFiles,
162 confPathName)
163
164
165 progOutput = extp.run(cmd, raiseOnError=False,
166 cwd=self.sysc.tempWorkPath())
167
168 Logger.addMessage('\n'.join(progOutput), alwaysLog=False)
169 if any(("Simple method statistic" in line and "nan" in line)
170 for line in progOutput):
171 brokenFiles.append(outputPathName)
172 shutil.move(outputPathName, nanFilePath)
173 if confPathName:
174 shutil.move(confPathName, nanFilePath)
175 else:
176 components = [fits.FileList(h2Img, h2Conf, cat=None),
177 fits.FileList(kImg, kConf, cat=None)]
178
179 fits.prepHeaders(outputPathName, confPathName, components)
180 newPairs.append(outputPathName)
181 if confPathName:
182 newPairs.append(confPathName)
183 fits.compressFits(newPairs)
184 outputList += newPairs
185
186 progress.testForOutput()
187
188 if not outputList and not brokenFiles:
189 Logger.addMessage("<WARNING> No difference images created for "
190 "programme " + self.programme.getAcronym(progID).upper())
191 continue
192
193
194 ingestListPath = (self.archive.sharePath('ingestLists')
195 if self.archive.isRealRun else
196 os.path.join(self.sysc.testOutputPath(), self.sysc.productLogDir))
197
198 utils.ensureDirExist(ingestListPath)
199 prefix = "cu%02did%d_%s_%s" % (self.cuNum, self.cuEventID,
200 os.getenv('HOST'), self.archive.database)
201
202 listPath = os.path.join(ingestListPath, prefix + "_xfer.list")
203 file(listPath, 'w').write('\n'.join(outputList) + '\n')
204 Logger.addMessage("Wrote ingest list to " + listPath)
205
206 if len(outputList) < 2 * len(numPairs):
207 Logger.addMessage("<WARNING> The number of output files (%d)"
208 " differs from twice the number of"
209 " pairs (%d)." % (len(outputList),
210 2 * len(numPairs)))
211 if brokenFiles:
212 brokenPath = os.path.join(os.curdir, prefix + "_broken.list")
213 Logger.addMessage("<Info> %s Files are broken. Listed in %s" %
214 (len(brokenFiles), brokenPath))
215 file(brokenPath, 'w').write('\n'.join(brokenFiles) + '\n')
216
217
218
220 """
221 Obtains the list of attributes for new images of a certain filter for
222 a certain programme since the last run.
223
224 @param filterID: Select images of this filter ID.
225 @type filterID: int
226 @param progID: Select images from this programme ID.
227 @type progID: int
228 @param lastRun: Select images created since this cuEventID.
229 @type lastRun: int
230
231 @return: List of image attributes defined in query.
232 @rtype: list(tuple)
233
234 """
235 attrs = "raBase, decBase, fileName, catName, confID, mjdObs, object"
236 dateRange = "utDate BETWEEN '%s' AND '%s'" % self.dateRange
237 return self.archive.query(attrs,
238 fromStr=Join(["Multiframe", "ProgrammeFrame", "MultiframeDetector"],
239 ["multiframeID"]),
240 whereStr=(("Multiframe.cuEventID > %s AND " % lastRun)
241 if lastRun else "") + "frameType='leavstack' AND "
242 "(Multiframe.deprecated=0 OR Multiframe.deprecated=81) "
243 "AND (MultiframeDetector.deprecated=0"
244 " OR MultiframeDetector.deprecated=81) "
245 "AND Multiframe.filterID=%s AND programmeID=%s AND %s "
246 "GROUP BY %s HAVING COUNT(*)=4 ORDER BY decBase ASC"
247 % (filterID, progID, dateRange, attrs))
248
249
250
252 """
253 Produces a list of paired images.
254
255 @param h2Images: List of H2 filter images.
256 @type h2Images: list(tuple)
257 @param kImages: List of K filter images.
258 @type kImages: list(tuple)
259
260 @return: List of paired image file names.
261 @rtype: defaultdict, int
262
263 """
264 raIndex = 0
265 tol = self.programme.getAttr('frameSetTolerance')
266 pairedDict = defaultdict(list)
267 numPairs = 0
268
269 for h2Ra, h2Dec, h2FileName, h2CatName, h2ConfID, h2MjdObs, h2Object \
270 in h2Images:
271
272 h2ConfPathName = self.archive.query("fileName", "Multiframe",
273 "multiframeID=%s AND fileName!='%s'" %
274 (h2ConfID, dbc.charDefault()), firstOnly=True, default='')
275
276 h2ConfPathName = h2ConfPathName.split(':')[-1]
277 h2FilePathName = h2FileName.split(':')[-1]
278 h2CatPathName = h2CatName.split(':')[-1]
279
280
281 kDict = defaultdict(list)
282 h2kRatings = utils.Ratings()
283 for _kRa, _kDec, kFileName, kCatName, kConfID, kMjdObs, kObject \
284 in astro.matchFrames(kImages, raIndex, (h2Ra, h2Dec), tol):
285
286 kConfPathName = self.archive.query("fileName", "Multiframe",
287 "multiframeID=%s AND fileName!='%s'" %
288 (kConfID, dbc.charDefault()), firstOnly=True, default='')
289
290 kConfPathName = kConfPathName.split(':')[-1]
291 kFilePathName = kFileName.split(':')[-1]
292 kCatPathName = kCatName.split(':')[-1]
293
294 if kObject.strip() == h2Object.strip():
295 kDict[kFilePathName] += [kFilePathName, kCatPathName,
296 kConfPathName]
297 numPairs += 1
298 h2kRatings[kFilePathName] = abs(h2MjdObs - kMjdObs)
299
300
301 if kDict:
302 h2Entry = [h2FilePathName, h2CatPathName, h2ConfPathName]
303 h2Entry += kDict[h2kRatings.getKeyByRating(0)]
304 pairedDict[h2FilePathName].append(h2Entry)
305
306 return pairedDict, numPairs
307
308
309
311 timeStamp = self.archive.query(
312 selectStr="timeStamp",
313 fromStr="ArchiveCurationHistory",
314 whereStr="cuID=%s and cuEventID=%s" % (self.cuNum, self.resumeID)
315 )[0]
316
317 prevCu5TimeStamp = str(timeStamp).partition(' ')[0].replace('-', '')
318 prevCu5DateVersStr = "%s_v%s" % (prevCu5TimeStamp, self.releaseNum)
319 disks = self.sysc.availableRaidFileSystem()
320 fileList = []
321 for dirName, dateDirs in fits.getAllPaths(disks, [self.sysc.diffDir]):
322 for diffDir in dateDirs:
323 if diffDir == prevCu5DateVersStr:
324 dirPath = os.path.join(dirName, diffDir)
325 fileList.extend(os.path.join(dirPath, fileName)
326 for fileName in dircache.listdir(dirPath)
327 if fileName.endswith(self.sysc.mefType))
328
329 diffFileList = [x for x in fileList
330 if x.endswith(self.sysc.diffSuffix + self.sysc.mefType)]
331
332
333 provInfoDict = {}
334 maxCounter = 0
335 for fileName in diffFileList:
336 hdulist = fits.open(fileName)
337 try:
338 shdr = hdulist[1].header
339 h2ImgProv, kImgProv = shdr["PROV0001"], shdr["PROV0002"]
340 provInfoDict.setdefault(fileName, []).extend(
341 [h2ImgProv, kImgProv])
342 except IndexError:
343 Logger.addMessage("Extension missing in %s" % (fileName))
344 hdulist.close()
345 maxCounter = max(maxCounter,
346 int(os.path.basename(fileName).split('_', 2)[1]))
347
348 return provInfoDict, prevCu5TimeStamp, fileList, maxCounter
349
350
351
353 """
354 @return: A dictionary containing the cuEventIDs of the last run of this
355 CU for every programme it has been run on, referenced by
356 programmeID.
357 @rtype: dict(int:int)
358
359 """
360 Logger.addMessage("Determining when CU5 was last run for each "
361 "programme...")
362
363 return dict(self.archive.query(
364 selectStr="programmeID, MAX(ArchiveCurationHistory.cuEventID)",
365 fromStr=Join(["ArchiveCurationHistory",
366 "ProgrammeCurationHistory"], ["cuEventID"]),
367 whereStr="cuID=%s AND status=1 GROUP BY programmeID" % self.cuNum))
368
369
370
371
372
373 if __name__ == '__main__':
374
375 CLI.progArgs.remove("comment")
376 CLI.progArgs += [
377 CLI.Argument("begin_date", "05A", isValOK=CLI.isDateOK),
378 CLI.Argument("end_date", "05A", isValOK=CLI.isDateOK),
379 CLI.Argument("releaseNum", '1', isValOK=lambda x: x.isdigit())]
380
381 CLI.progOpts += [
382 CLI.Option('r', 'resume',
383 'resume CU5 run with given cuEventID', "INT", '0',
384 isValOK=lambda x: x.isdigit()),
385 CLI.Option('i', 'ignorelast', 'ignore check when CU5 was last run.'),
386 CLI.Option('v', 'verbose',
387 'display details of every fitsio_subtract run')]
388
389 cli = CLI(Cu5, "$Revision: 9352 $")
390 Logger.isVerbose = cli.getOpt('verbose')
391 Logger.addMessage(cli.getProgDetails())
392
393 cu = Cu5(cli=cli)
394 try:
395 cu.dateRange = cu.sysc.obsCal.dateRange(cli.getArg("begin_date"),
396 cli.getArg("end_date"))
397 except Exception as error:
398 eType = "Invalid Option"
399 Logger.addExceptionMessage(error, eType)
400 raise SystemExit(eType + ": see log " + cu._log.pathName)
401
402 cu.ignoreLastRun = int(cli.getOpt("ignorelast"))
403 cu.resumeID = int(cli.getOpt("resume"))
404 cu.releaseNum = cli.getArg("releaseNum")
405 cu.run()
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424