1
2
3
4 """
5 Checks ingests into Multiframe vs FlatFileLookup vs FITS files on disk.
6
7 @author: E. Sutorius
8 @org: WFAU, IfA, University of Edinburgh
9 """
10
11 from collections import defaultdict, namedtuple
12 import inspect
13 import mx.DateTime as mxTime
14 import os
15 import time
16
17 from wsatools.CLI import CLI
18 from wsatools.DbConnect.DbSession import DbSession
19 from wsatools.File import File
20 import wsatools.FitsUtils as fits
21 from wsatools.DbConnect.IngCuSession import IngCuSession
22 from wsatools.Logger import Logger, ForLoopMonitor
23 import wsatools.Utilities as utils
24
25
27 """ Check if metadata ingests where successful.
28 """
29
30
31
32 - def __init__(self,
33 curator=CLI.getOptDef("curator"),
34 database=DbSession.database,
35 beginDate=CLI.getOptDef("begin"),
36 endDate=CLI.getOptDef("end"),
37 outPath=CLI.getOptDef("outpath"),
38 cuNums=CLI.getOptDef("cunums"),
39 versionStr=CLI.getOptDef("version"),
40 excludedIDs=CLI.getOptDef("exclude"),
41 checkOmetries=CLI.getOptDef("ometries"),
42 isTrialRun=DbSession.isTrialRun,
43 comment=CLI.getArgDef("comment")):
44 """
45 @param beginDate: First date to process, eg. 20050101.
46 @type beginDate: str
47 @param comment: Descriptive comment as to why curation task is
48 being performed.
49 @type comment: str
50 @param checkOmetries: Check Astrometry and Phototmetry tables.
51 @type checkOmetries: bool
52 @param cuNums: Curation task numbers.
53 @type cuNums: list[int]
54 @param curator: Name of curator.
55 @type curator: str
56 @param database: Name of the database to connect to.
57 @type database: str
58 @param endDate: Last date to process, eg. 20050131.
59 @type endDate: str
60 @param excludedIDs: Excluded fileIDs.
61 @type excludedIDs: list
62 @param isTrialRun: If True, do not perform database modifications.
63 @type isTrialRun: bool
64 @param outPath: Log file directory.
65 @type outPath: str
66 @param versionStr: Version number.
67 @type versionStr: str
68
69 """
70
71 super(CheckIngests, self).__init__(cuNum=0,
72 curator=curator,
73 comment=comment,
74 database=database,
75 autoCommit=False,
76 isTrialRun=isTrialRun)
77
78 beginDate, endDate = \
79 self.sysc.obsCal.getDatesFromInput(beginDate, endDate)
80
81 typeTranslation = {"curator":str,
82 "database":str,
83 "beginDate":str,
84 "endDate":str,
85 "cuNums":list,
86 "versionStr":str,
87 "excludedIDs":list,
88 "outPath":str,
89 "checkOmetries":bool,
90 "isTrialRun":bool,
91 "comment":str}
92
93 super(CheckIngests, self).attributesFromArguments(
94 inspect.getargspec(CheckIngests.__init__)[0], locals(),
95 types=typeTranslation)
96
97 if not self.versionStr:
98 self.versionStr = str(max(
99 self.sysc.obsCal.maxVersOfDate(self.beginDate),
100 self.sysc.obsCal.maxVersOfDate(self.endDate)))
101
102 if self.outPath == "./":
103 self.outPath = "./check%s" % self.sysc.loadDatabase
104 utils.ensureDirExist(self.outPath)
105
106
107
117
118
119
121 Logger.addMessage("Checking Ingests from %s to %s" % (
122 self.beginDate, self.endDate))
123 Logger.addMessage("Reading disk info...")
124
125 fitsDirs = self.getDiskInfo()
126
127
128 diskFileDict = defaultdict(list)
129 for dateVersStr in sorted(fitsDirs.invFitsDateDict):
130 dirPath = os.path.join(
131 fitsDirs.invFitsDateDict[dateVersStr], dateVersStr)
132 for fileName in os.listdir(dirPath):
133 theFile = File(os.path.join(dirPath, fileName))
134 if theFile.fileID not in self.excludedIDs \
135 and ".fit" in theFile.ext:
136 diskFileDict[theFile.fileID].append(theFile.name)
137 del theFile
138
139 obsDates = self.sysc.obsCal.getObsDates(self.beginDate, self.endDate,
140 self.versionStr)
141 obsMonths = sorted(set(x[:6] for x in obsDates))
142
143
144 ffluDict = defaultdict(list)
145 mfidDict = defaultdict()
146 mfDict = defaultdict(list)
147 progIDbyFileID = defaultdict(list)
148 fileIDbyProgID = defaultdict(list)
149 db = DbSession(self.database)
150 db.enableDirtyRead()
151
152 Logger.addMessage("Querying metadata...")
153 progress = ForLoopMonitor(obsDates)
154 for dateVersStr in obsDates:
155 ffluData = db.query(
156 "multiframeID, fileName", "FlatFileLookup",
157 " AND ".join(["multiframeID>0",
158 "dateVersStr='%s'" % dateVersStr]))
159 for multiframeID, fileName in sorted(ffluData):
160 theFile = File(fileName)
161 if theFile.fileID not in self.excludedIDs:
162 mfidDict[multiframeID] = theFile.fileID
163 ffluDict[theFile.fileID] = [theFile.name, multiframeID]
164 del theFile
165 mfData = db.query(
166 "fileName, catName, programmeID",
167 "Multiframe m, programmeFrame p",
168 " AND ".join(["m.multiframeID = p.multiframeID",
169 "m.multiframeID>0",
170 "fileName like '%%%s%%'" % dateVersStr,
171 "fileName not like '%%deprecated%%'"]))
172 for fileName, catName, progID in sorted(mfData):
173 theFile = File(fileName)
174 if theFile.fileID not in self.excludedIDs:
175 if self.sysc.emptyFitsCatalogueFileName in catName:
176
177 pass
178 else:
179 mfDict[theFile.fileID].extend([theFile.name, catName])
180 progIDbyFileID[theFile.fileID].append(progID)
181 fileIDbyProgID[progID].append(theFile.fileID)
182 del theFile
183 progress.testForOutput()
184
185
186 theSurvs = defaultdict(list)
187 photoSurvs = defaultdict(list)
188 astroSurvs = defaultdict(list)
189 if 4 in self.cuNums:
190 if "VVV" in self.database:
191 tmpProgIdDict = dict(db.query(
192 "programmeID, detectionTable", "Programme",
193 "programmeID=120"))
194 else:
195 tmpProgIdDict = dict(db.query(
196 "programmeID, detectionTable", "Programme",
197 "programmeID>=1"))
198 if self.sysc.isVSA():
199 del tmpProgIdDict[100]
200 del tmpProgIdDict[101]
201
202 if "VVV" in self.database:
203 theSurvs = {120: [x for x in db.query(
204 "name", "sysobjects", "name like 'vvvDetectionRaw20%'")
205 if any(m in x for m in obsMonths)]}
206 if self.checkOmetries:
207 photoSurvs = {120: [x for x in db.query(
208 "name", "sysobjects",
209 "name like 'vvvDetectionPhotometry20%'")
210 if any(m in x for m in obsMonths)]}
211 astroSurvs = {120: [x for x in db.query(
212 "name", "sysobjects",
213 "name like 'vvvDetectionAstrometry20%'")
214 if any(m in x for m in obsMonths)]}
215 else:
216 for pid in tmpProgIdDict:
217 theSurvs[pid] = [tmpProgIdDict[pid] + "Raw"]
218 if self.checkOmetries:
219 photoSurvs[pid] = [tmpProgIdDict[pid] + "Photometry"]
220 astroSurvs[pid] = [tmpProgIdDict[pid] + "Astrometry"]
221
222
223
224 detData = defaultdict(lambda : defaultdict(int))
225 photoData = defaultdict(lambda : defaultdict(int))
226 astroData = defaultdict(lambda : defaultdict(int))
227 tmpData = defaultdict()
228 Logger.addMessage("Querying catalogues...")
229 for pid in sorted(theSurvs):
230 for detName in theSurvs[pid]:
231 Logger.addMessage("... %s" % detName)
232 tmpData = dict(db.query(
233 "multiframeID, count(*)", detName,
234 groupBy="multiframeID"))
235 for mfid in tmpData:
236 detData[pid][mfid] += tmpData[mfid]
237 if self.checkOmetries:
238 for pid in sorted(photoSurvs):
239 for detName in photoSurvs[pid]:
240 Logger.addMessage("... %s" % detName)
241 tmpData = dict(db.query(
242 "multiframeID, count(*)", detName,
243 groupBy="multiframeID"))
244 for mfid in tmpData:
245 photoData[pid][mfid] += tmpData[mfid]
246 for pid in sorted(astroSurvs):
247 for detName in astroSurvs[pid]:
248 Logger.addMessage("... %s" % detName)
249 tmpData = dict(db.query(
250 "multiframeID, count(*)", detName,
251 groupBy="multiframeID"))
252 for mfid in tmpData:
253 astroData[pid][mfid] += tmpData[mfid]
254
255
256 misFiles = defaultdict(list)
257 if 3 in self.cuNums:
258 for fileID in diskFileDict:
259 if fileID not in ffluDict:
260 misFiles["fflu"].append((fileID, diskFileDict[fileID]))
261 if fileID not in mfDict:
262 misFiles["mf"].append((fileID, diskFileDict[fileID]))
263 elif len(diskFileDict[fileID]) == 3 and \
264 sorted(mfDict[fileID]) != sorted(diskFileDict[fileID])[::2]:
265 misFiles["cat"].append((fileID, diskFileDict[fileID]))
266
267 if 4 in self.cuNums:
268 missMfids = defaultdict(list)
269 for progID in sorted(theSurvs):
270 allMfIds = set()
271 for fileID in fileIDbyProgID[progID]:
272 if fileID in ffluDict and \
273 fileID in mfDict and len(mfDict[fileID]) > 1:
274 allMfIds.add(ffluDict[fileID][1])
275 if ffluDict[fileID][1] not in detData[progID] or \
276 detData[progID][ffluDict[fileID][1]] == 0:
277 if any(x in tmpProgIdDict.keys()
278 for x in progIDbyFileID[fileID]):
279 misFiles["detmf"].append(
280 (fileID,
281 ["%s: %d" % tuple(ffluDict[fileID])]))
282 misFiles["detcat"].append(
283 (fileID,
284 mfDict[fileID]))
285
286 missMfids['C'].append(ffluDict[fileID][1])
287
288 if self.checkOmetries:
289 for mfid in sorted(allMfIds):
290 fileID = mfidDict[mfid]
291 if mfid not in photoData[progID] \
292 or photoData[progID][mfid] == 0:
293 misFiles["detphoto"].append(
294 (fileID, mfDict[fileID]))
295
296 if mfid not in missMfids['P']:
297 missMfids['P'].append(mfid)
298 if mfid not in astroData[progID] \
299 or astroData[progID][mfid] == 0:
300 misFiles["detastro"].append(
301 (fileID, mfDict[fileID]))
302
303 if mfid not in missMfids['A']:
304 missMfids['A'].append(mfid)
305 if mfid not in detData[progID] \
306 or detData[progID][mfid] == 0:
307 misFiles["detraw"].append(
308 (fileID, mfDict[fileID]))
309
310 if mfid not in missMfids['R']:
311 missMfids['R'].append(mfid)
312
313 outFile = File(os.path.join(
314 self.outPath, "checkIngests_%s_%s_%s_%s.log" % (
315 '_'.join(map(str, self.cuNums)),
316 self.sysc.loadDatabase, self.beginDate, self.endDate)))
317 outFile.wopen()
318 if 3 in self.cuNums:
319 toing3File = File(os.path.join(
320 self.outPath, "toing_3_%s_%s_%s.list" % (
321 self.sysc.loadDatabase, self.beginDate, self.endDate)))
322 toing3File.wopen()
323 if 4 in self.cuNums:
324 toing4File = File(os.path.join(
325 self.outPath, "toing_4_%s_%s_%s.list" % (
326 self.sysc.loadDatabase, self.beginDate, self.endDate)))
327 toing4File.wopen()
328
329 for misType in misFiles:
330 outFile.writetheline("%d missing in %s:" % (len(misFiles[misType]),
331 misType.upper()))
332 for entry in sorted(misFiles[misType]):
333 if "mf" in misType:
334 outFile.writetheline('%s : %s' % (entry[0],
335 ','.join(entry[1])))
336 if misType == "mf":
337 toing3File.writetheline('\n'.join(entry[1]))
338 if misType in ["detcat", "detraw", "detphoto", "detastro"]:
339 outFile.writetheline('%s : %s' % (entry[0],
340 ','.join(entry[1])))
341 toing4File.writetheline('\n'.join(entry[1]))
342
343 outFile.writetheline("MFIDs:")
344 for misType in missMfids:
345 outFile.writetheline(" %s: %s" % (misType,
346 ','.join("%d" % i for i in sorted(missMfids[misType]))))
347 outFile.close()
348 Logger.addMessage("Output written to %s" % outFile.name)
349 if 3 in self.cuNums:
350 toing3File.close()
351 Logger.addMessage("Ingest list written to %s" % (toing3File.name))
352 if 4 in self.cuNums:
353 toing4File.close()
354 Logger.addMessage("Ingest list written to %s" % (toing4File.name))
355
356
357
358
359
360 if __name__ == "__main__":
361
362 CLI.progOpts += [
363 CLI.Option('b', "begindate",
364 "first date to process, eg. 20050101",
365 "DATE", str(IngCuSession.beginDateDef)),
366 CLI.Option('e', "enddate",
367 "last date to process, eg. 20050131",
368 "DATE", str(IngCuSession.endDateDef)),
369 CLI.Option('o', "outpath",
370 "directory where the output data is written to",
371 "PATH", './'),
372 CLI.Option('v', "version",
373 "overwrite CASU version with this version",
374 "STR", ''),
375 CLI.Option('x', "cunums",
376 "csv list of CU numbers to check (3, 4)",
377 "LIST", '3',
378 isValOK=lambda x: x.replace(',', '').isdigit() and \
379 all(int(y) in (3, 4) for y in x.replace(',', ''))),
380 CLI.Option('O', "ometries",
381 "with CU4 check also Astr- and Phot-ometry tables"),
382 CLI.Option('X', "exclude",
383 "fileIDs (<dateVersStr>/<file root without cat>) to "
384 "be excluded",
385 "LIST")
386 ]
387 cli = CLI(CheckIngests, "$Revision: 10232 $")
388
389 checkIngests = CheckIngests(cli.getOpt("curator"),
390 cli.getArg("database"),
391 cli.getOpt("begindate"),
392 cli.getOpt("enddate"),
393 cli.getOpt("outpath"),
394 cli.getOpt("cunums"),
395 cli.getOpt("version"),
396 cli.getOpt("exclude"),
397 cli.getOpt("ometries"),
398 cli.getOpt("test"),
399 cli.getArg("comment"))
400 checkIngests.run()
401