1
2
3 """
4 Automatic curation routines. Determines what needs to run on a given
5 programme and automatically calls those curation tasks.
6
7 @author: N.J.G. Cross
8 @org: WFAU, IfA, University of Edinburgh
9
10 @newfield contributors: Contributors, Contributors (Alphabetical Order)
11 @contributors: R.S. Collins
12 """
13
14 from __future__ import division, print_function
15 from future_builtins import map
16
17 from collections import defaultdict
18 import gc
19 import os
20 import time
21
22 from invocations.cu6.cu6 import Cu6
23 from invocations.cu7.cu7 import Cu7
24 from invocations.cu8.cu8 import Cu8
25 from invocations.cu13.cu13 import Cu13
26 from invocations.cu16.cu16 import Cu16
27 from invocations.cu29.cu29 import Cu29
28
29 import wsatools.DataFactory as df
30 import wsatools.DbConnect.CommonQueries as queries
31 from wsatools.DbConnect.CuSession import CuSession
32 import wsatools.DbConnect.DbConstants as dbc
33 from wsatools.DbConnect.DbSession import DbSession, Join, SelectSQL
34 import wsatools.ExternalProcess as extp
35 from wsatools.FitsToDb import ProvenanceFiller
36 import wsatools.FitsUtils as fits
37 from wsatools.Logger import Logger
38 from wsatools.QualityControl import QualityFlagUpdater
39 from wsatools.SourceMerger import SynopticMerger, TilePawPrintMerger
40 from wsatools.ProgrammeBuilder import SynopticSetup, ProgrammeBuilder
41 from wsatools.SystemConstants import DepCodes, SystemConstants
42 import wsatools.Utilities as utils
43
44
45
46
47
48 -def autoCurate(cli, progID, releaseNum=0,
49 dateRange=SystemConstants().obsCal.dateRange(),
50 redoSources=False, redoProducts=None,
51 redoRecal=False, redoNeigh=False, doCu6=False,
52 skipMosaicCheck=False, minProdID=None, maxProdID=None,
53 fileListForType=None, isRerelease=False, numCPUs=None,
54 fields=None):
55 """
56 Automatically runs the necessary curation tasks to prepare a programme's
57 data for release.
58
59 @param cli: Standard command-line options for CuSessions.
60 @type cli: CLI
61 @param progID: Unique ID for programme, or acronym.
62 @type progID: str
63 @param releaseNum: Curate products of this specific WFAU-product version
64 number.
65 @type releaseNum: int
66 @param dateRange: Curate data observed between these nights only.
67 @type dateRange: DateRange
68 @param redoSources: If True, force re-creation of source table.
69 @type redoSources: bool
70 @param redoProducts: List of specified product types to recreate along with
71 any descendants.
72 @type redoProducts: list(str)
73 @param redoRecal: If True, force recalibration of deep survey detections
74 from variability analysis.
75 @type redoRecal: bool
76 @param redoNeigh: If True, force recreation of neighbour tables.
77 @type redoNeigh: bool
78 @param doCu6: If True, also run CU6 with default options at the end.
79 @type doCu6: bool
80 @param skipMosaicCheck: If True, force curation, even if mosaic provenance
81 is incomplete.
82 @type skipMosaicCheck: bool
83 @param minProdID: Minimum product ID to curate for deep stacks.
84 @type minProdID: str
85 @param maxProdID: Maximum product ID to curate for deep stacks.
86 @type maxProdID: str
87 @param fileListForType: Append newly created image and catalogue products
88 to file lists written to the file paths given in
89 this dictionary referenced by product type.
90 @type fileListForType: str
91 @param isRerelease: If True, deprecate existing products of the current
92 release number before proceeding with curation.
93 @type isRerelease: bool
94 @param numCPUs: Optionally limit number of CPUs to this number.
95 @type numCPUs: int
96 @param fields: List of fieldIDs to curate.
97 @type fields: list(str)
98
99 @return: True, if ready for release.
100 @rtype: bool
101
102 """
103 Logger.addMessage("Running automatic programme curation")
104
105 cliComment = cli.resetOptArg("comment", CuSession.comment)
106
107
108 db = DbSession(autoCommit=True, isPersistent=True, cli=cli)
109 programme = df.ProgrammeTable(db)
110 programmeID = programme.setProgID(progID)
111 Logger.addMessage(
112 "Programme %s is a %s survey" % (programme.getAcronym().upper(),
113 "deep" if programme.isDeep() else "shallow"))
114
115 Logger.addMessage("Determining required products for this programme...")
116 numCPUs = Cu13.getUsefulNumCPUs(numCPUs, programme)
117 redoProducts = redoProducts or []
118 fileListForType = fileListForType or fits.createFileListDict(programme)
119 releaseNum = releaseNum or getReleaseNum(programme)
120 Logger.addMessage("This will be release number: %s" % releaseNum)
121
122
123
124
125
126
127
128
129 if isRerelease and programme.isDeep():
130 Logger.addMessage("Deprecating %s products with release number %s..."
131 % (programme.getAcronym().upper(), releaseNum))
132
133
134
135
136 deprecateProducts(releaseNum, db, programme, deleteDeeps=True)
137
138
139 deprecateDodgyDeeps(db, programmeID)
140 Logger.addMessage(
141 "Querying database for existing products for this programme...")
142
143 try:
144 reqProdsOfType = queries.findRequiredDeepProducts(programme, minProdID,
145 maxProdID,
146 fieldIDs=fields)
147 except queries.ProgrammeNotSetUpError as error:
148 raise CuSession.CuError(error)
149
150 Logger.addMessage("Determining synoptic table setup...")
151 synSetup = SynopticSetup(programme, dateRange)
152
153 createdImagesOfType = defaultdict(dict)
154 ingestProdsOfType = {}
155 newProdsOfType = defaultdict(list)
156 for pType in reqProdsOfType:
157 ingestedProdIDs, numMultiframes = getIngestedProducts(db, programmeID,
158 releaseNum, reqProdsOfType[pType], pType)
159
160
161 ingestProdsOfType[pType] = set(reqProdsOfType[pType])
162 if pType not in redoProducts:
163 ingestProdsOfType[pType] -= ingestedProdIDs
164
165 if numMultiframes < len(reqProdsOfType[pType]):
166 Logger.addMessage("Database has an incomplete set of %s products"
167 % pType)
168
169 Logger.addMessage("Searching file system for existing products...")
170 createdImagesOfType[pType] = findCreatedProducts(db, programmeID,
171 releaseNum, reqProdsOfType, ingestedProdIDs, pType)
172
173 newProdsOfType[pType] = \
174 ingestProdsOfType[pType].difference(createdImagesOfType[pType])
175
176 areIngestProds = \
177 any(createdImagesOfType[pType] for pType in createdImagesOfType)
178
179 if areIngestProds:
180 Logger.addMessage("Found non-ingested existing products; "
181 "adding to ingest file lists...")
182
183 for pType in createdImagesOfType:
184 if createdImagesOfType[pType]:
185 fileList = (prod.imageName + '\n'
186 + prod.confName + '\n'
187 + prod.catName + '\n'
188 for prod in createdImagesOfType[pType].values())
189
190 file(fileListForType[pType], 'a').writelines(fileList)
191
192 if any(newProdsOfType[pType] for pType in newProdsOfType):
193 if releaseNum > 1:
194 Logger.addMessage(
195 "Deprecating %s products with release numbers < %s..."
196 % (programme.getAcronym().upper(), releaseNum))
197
198 deprecateProducts(releaseNum, db, programme, depOld=True)
199
200 comment = "Updating detection quality bit-flags for %s CASU products" \
201 % programme.getAcronym().upper()
202
203 Logger.addMessage(comment)
204 prepareCu("QualityFlagUpdater", db, cli, comment=comment)
205 QualityFlagUpdater.skipProvenanceCheck = True
206 QualityFlagUpdater(str(programmeID), cli=cli).run()
207
208 if db.sysc.isVSA():
209 prepareCu("TilePawPrintMerger", db, cli)
210 TilePawPrintMerger.dateRange = dateRange
211 TilePawPrintMerger(str(programmeID), cli=cli).run()
212
213 Logger.addMessage("Attempting to create missing required deep "
214 "products:")
215
216 for pType in newProdsOfType:
217 if newProdsOfType[pType]:
218 Logger.addMessage(pType.title() + "s: " +
219 utils.numberRange(newProdsOfType[pType]))
220
221 prepareCu("Cu13", db, cli,
222 comment="Creating deep products for release %s of " % releaseNum
223 + programme.getAcronym().upper())
224
225 creator = Cu13(str(programmeID), cli=cli)
226 creator.breakIntoParts = True
227 creator.dateRange = dateRange
228 creator.doQFlagCheck = False
229 creator.fileListForType = fileListForType
230 creator.newProductsOfType = createdImagesOfType
231 creator.numCPUs = numCPUs
232 creator.releaseNum = releaseNum
233 creator.reprocessTypes = redoProducts
234 creator.reqProdsOfType = newProdsOfType
235 creator.useDeep = True
236
237
238
239
240
241 creator.run()
242
243
244 for pType in ingestProdsOfType:
245 ingestProdsOfType[pType] -= \
246 newProdsOfType[pType].difference(creator.newProductsOfType[pType])
247
248 del creator
249
250 for pType in ingestProdsOfType:
251 if ingestProdsOfType[pType]:
252 areIngestProds = True
253 Logger.addMessage(
254 "Deprecating %ss to be ingested with product IDs: " % pType
255 + utils.numberRange(ingestProdsOfType[pType]))
256
257 deprecateProducts(releaseNum, db, programme,
258 (pType, ingestProdsOfType[pType]))
259
260
261 fileLists = [fileList for fileList in fileListForType.values()
262 if os.path.exists(fileList)]
263
264 if areIngestProds:
265 if not ingestDeepStacks(db, fileLists, releaseNum) \
266 or programme.isNonSurvey():
267
268
269
270 cli.resetOptArg("comment", cliComment)
271 return False
272
273 isNewData = areIngestProds
274 isNewDeepData = areIngestProds
275 if not areIngestProds:
276 Logger.addMessage("Querying existing source data...")
277 mergedMFIDs = getMergedMFIDs(db, programme)
278 fTypes = \
279 queries.getFrameSelection(programme.getAttr("sourceProdType"),
280 deepOnly=True)
281
282 multiframeIDs = db.query("Multiframe.multiframeID",
283 fromStr=Join(["Multiframe", "ProgrammeFrame"], "multiframeID"),
284 whereStr="programmeID=%d AND releaseNum=%d AND productID>0 AND %s "
285 "AND deprecated=0" % (programmeID, releaseNum, fTypes))
286
287
288 isNewDeepData = len(set(multiframeIDs) - mergedMFIDs) > 0
289 fTypes = \
290 queries.getFrameSelection(programme.getAttr("sourceProdType"))
291
292 multiframeIDs = db.query("Multiframe.multiframeID",
293 fromStr=Join(["Multiframe", "ProgrammeFrame"], "multiframeID"),
294 whereStr="programmeID=%d AND releaseNum=%d AND productID>0 AND %s "
295 "AND deprecated=0" % (programmeID, releaseNum, fTypes))
296
297 isNewData = len(set(multiframeIDs) - mergedMFIDs) > 0
298
299
300 if programme.isDeep() and isNewDeepData:
301
302
303 if not programme.isNonSurvey():
304 prepareCu("ProvenanceFiller", db, cli,
305 comment="Updating Provenance for new deep stacks in "
306 + programme.getAcronym().upper())
307
308 ProvenanceFiller.combiFrameTypes = {
309 'undef': ['deep%', 'mosaic%'],
310 'nOffsets': ['tiledeep%']}
311
312 ProvenanceFiller.isQuickRun = True
313 ProvenanceFiller(cli=cli).run()
314
315
316 deprecateMissingDeepFrames(db, programmeID)
317
318 comment = "Updating detection quality bit-flags for %s WFAU products"
319 comment %= programme.getAcronym().upper()
320 Logger.addMessage(comment)
321 prepareCu("QualityFlagUpdater", db, cli, comment=comment)
322 QualityFlagUpdater.isDeepOnly = True
323 QualityFlagUpdater.skipProvenanceCheck = skipMosaicCheck
324 QualityFlagUpdater(str(programmeID), cli=cli).run()
325
326
327 if db.sysc.isVSA():
328 prepareCu("TilePawPrintMerger", db, cli)
329 TilePawPrintMerger.deepOnly = True
330 TilePawPrintMerger(str(programmeID), cli=cli).run()
331
332 doSources = redoSources or isNewData
333 doSeaming = doSources or db.queryEntriesExist(programme.getMergeLogTable(),
334 where="newFrameSet=%s" % dbc.yes())
335
336 if doSources or doSeaming:
337 prepareCu("Cu7", db, cli)
338 Cu7.continueCompleteReseam = True
339 Cu7.createNewTable = doSources
340
341
342 Cu7.dateRange = dateRange
343 Cu7(str(programmeID), cli=cli).run()
344
345
346 reqProducts = reqProdsOfType[programme.getAttr("sourceProdType")]
347 minProdID = min(reqProducts)
348 maxProdID = max(reqProducts)
349 doRecal = programme.isDeep() and (redoRecal or isUncalibrated(programme))
350
351
352 if doRecal and programme.getAttr("epochFrameType") == "stack":
353 prepareCu("Cu8", db, cli,
354 comment="Relative recalibration of %s intermediate stacks"
355 % programme.getAcronym().upper())
356
357 calPath = (db.sysc.calibrationDir() if db.isRealRun else
358 os.path.join(db.sysc.testOutputPath(), "calibrations"))
359
360 utils.ensureDirExist(calPath)
361
362
363 Cu8.copyFits = not db.isRealRun
364 Cu8.dateRange = dateRange
365 Cu8.releaseNum = releaseNum
366 if not db.isRealRun:
367 Cu8.fromMf = (0,)
368
369 Cu8.reqProdRange = (minProdID, maxProdID)
370 Cu8.tablePathName = os.path.join(calPath, "wfau-p%s-r%s-%s.log"
371 % (programmeID, releaseNum, time.strftime('%Y%m%d')))
372
373 Cu8(str(programmeID), cli=cli).run()
374
375 isNewSources = doSources or doSeaming
376 if doCu6 and programme.isSynoptic():
377
378
379
380 doSynoptic = isNewSources or doRecal
381 synDateRange = synSetup.getTimeRange(isCorrelated=True)
382 if not doSynoptic:
383 Logger.addMessage("Querying existing synoptic data...")
384 intMFIDs = queries.getIntStacks(db, programmeID, reqProducts,
385 programme.getAttr("sourceProdType"), synDateRange,
386 intFrameType=programme.getAttr("epochFrameType"))
387
388 mergedMFIDs = getMergedMFIDs(db, programme, isSynoptic=True)
389 doSynoptic = len(intMFIDs - mergedMFIDs) > 0
390
391 if doSynoptic:
392 prepareCu("SynopticMerger", db, cli,
393 comment="Producing synoptic source table of correlated passbands")
394
395 SynopticMerger.minProdID = \
396 min(reqProdsOfType[programme.getAttr("epochFrameType")])
397
398 SynopticMerger.maxProdID = \
399 max(reqProdsOfType[programme.getAttr("epochFrameType")])
400
401
402 SynopticMerger.dateRange = synDateRange
403 SynopticMerger(str(programmeID), cli=cli).run()
404
405
406 if programme.isDeep():
407 propagateDeprecations(programme)
408
409
410 doCu29 = db.sysc.isVSA() and (isNewSources
411 or isCuTaskIncomplete(programme, cuNum=29))
412 if doCu29:
413 prepareCu("Cu29", db, cli)
414 Cu29(str(programmeID), cli=cli).run()
415
416
417 if doCu29 or not programme.isNonSurvey():
418 prepareCu("SurveyParser", db, cli)
419 pb = ProgrammeBuilder(str(programmeID),
420 database=db.server + "." + db.database,
421 isTrialRun=True, userName=db.userName)
422
423 pb.createSchemaOnly = True
424 pb.run()
425 del pb
426
427 if redoNeigh or isNewSources or isCuTaskIncomplete(programme, cuNum=16):
428 prepareCu("Cu16", db, cli)
429 Cu16.dateRange = dateRange
430 Cu16(str(programmeID), cli=cli).run()
431
432 if doCu6 and programme.isDeep():
433 prepareCu("Cu6", db, cli)
434 Cu6.minProdID = minProdID
435 Cu6.maxProdID = maxProdID
436 Cu6.dateRange = dateRange
437 Cu6(progID, cli=cli).run()
438
439
440 cli.resetOptArg("comment", cliComment)
441 Logger.addMessage("Automatic programme curation complete")
442
443
444 return programmeID is not db.sysc.scienceProgs.get("UDS")
445
446
447
456
457
458
459
461 """
462 Deprecate deep stack detector frames with no components.
463
464 @param db: Connection to database where deprecations are to be made.
465 @type db: DbSession
466 @param programmeID: Only deprecrate frames belonging to this unique
467 programme ID.
468 @type programmeID: int
469
470 """
471 Logger.addMessage(
472 "Deprecating missing detector frames of newly ingested deep stacks...")
473 Logger.addMessage("...%s deep detector frames deprecated." %
474 db.update("MultiframeDetector", "deprecated=103",
475 where="totalExpTime=0 AND frameType LIKE 'deep%stack' AND "
476 + "programmeID=%s AND MultiframeDetector.deprecated=0"
477 % programmeID,
478 fromTables=Join(["Multiframe", "ProgrammeFrame", "MultiframeDetector"],
479 ["multiframeID"])))
480
481
482
484 """
485 Deprecates deeps where some components have been deprecated
486
487 @param db: Connection to database where deprecations are to be made.
488 @type db: DbSession
489 @param programmeID: UID of current programme
490 @type programmeID: int
491
492
493 """
494
495
496 dodgyDeepMfIDs = SelectSQL(
497 "distinct md.multiframeID",
498 "ProgrammeFrame as p,Multiframe as md,Provenance as v,"
499 "Multiframe as mo",
500 "p.programmeID=%s AND p.multiframeID=md.multiframeID AND "
501 "md.frameType like '%%deep%%stack' AND md.frameType not like "
502 "'%%tile%%' AND md.frameType not like '%%mosaic%%' AND "
503 "md.deprecated IN (0,255) AND md.multiframeID=v.combiframeID AND "
504 "v.multiframeID=mo.multiframeID AND mo.deprecated>0 AND "
505 "mo.deprecated NOT IN (51)" % programmeID)
506
507 nStacks = db.update("Multiframe", [("deprecated", DepCodes.constitDep)],
508 "multiframeID in (%s)" % dodgyDeepMfIDs)
509 Logger.addMessage("Deprecated %s deep stacks with deprecated components"
510 % nStacks)
511 if 'tile' in db.sysc.productTypes:
512
513 dodgyDeepMfIDs = SelectSQL(
514 "distinct md.multiframeID",
515 "ProgrammeFrame as p,Multiframe as md,Provenance as v,"
516 "Multiframe as mo",
517 "p.programmeID=%s AND p.multiframeID=md.multiframeID AND "
518 "md.frameType like 'tile%%deep%%stack' AND md.frameType not "
519 "like '%%mosaic%%' AND "
520 "md.deprecated IN (0,255) AND md.multiframeID=v.combiframeID AND "
521 "v.multiframeID=mo.multiframeID AND mo.deprecated>0 AND "
522 "mo.deprecated NOT IN (51,255)" % programmeID)
523
524 nTiles = db.update("Multiframe", [("deprecated", DepCodes.constitDep)],
525 "multiframeID in (%s)" % dodgyDeepMfIDs)
526 Logger.addMessage("Deprecated %s deep tiles with deprecated components"
527 % nTiles)
528
529
530 -def deprecateProducts(releaseNum, db, programme, prodIDs=None, depOld=False,
531 deleteDeeps=False):
532 """
533 Deprecates products of given release numbers for the current programme.
534 Optionally supply a specific list of product IDs to deprecate.
535
536 @param releaseNum: Release number currently being curated.
537 @type releaseNum: int
538 @param db: Connection to database where deprecations are to be made.
539 @type db: DbSession
540 @param programme: Programme table for current database with the current row
541 set to that of the programme for which the products will
542 be deprecated.
543 @type programme: df.ProgrammeTable
544 @param prodIDs: List of product IDs to deprecate with prodType.
545 @type prodIDs: tuple(prodType, list(int))
546 @param depOld: If True, deprecate files with release numbers older than
547 the current release, else just deprecate files with the
548 current release number.
549 @type depOld: bool
550
551 """
552 if depOld and releaseNum <= 1:
553 return
554
555 if prodIDs:
556 prodType, pIDs = prodIDs
557 frameTypePredicate = queries.getFrameSelection(prodType, noDeeps=False,
558 selType='%stack%')
559 else:
560 prodType = ''
561 frameTypePredicate = "frameType LIKE '%deep%'"
562
563 progPredicate = " AND programmeID=%s" % programme.getAttr("programmeID")
564 productPredicate = (" AND productID IN (%s)" % ','.join(map(str, pIDs))
565 if prodIDs else '')
566
567
568
569 releasePredicate = (" AND releaseNum BETWEEN 1 AND " + str(releaseNum - 1)
570 if depOld else
571 " AND releaseNum=%d and fileName like '%%_v%d%%'" %
572 (releaseNum, releaseNum))
573
574
575 dmfIDs = SelectSQL("Multiframe.multiframeID",
576 table=Join(["Multiframe", "ProgrammeFrame"], ["multiframeID"]),
577 where="deprecated=0 AND " + frameTypePredicate + progPredicate
578 + releasePredicate + productPredicate)
579
580 if deleteDeeps:
581 for mfID, fileName in db.query("multiframeID, fileName", "Multiframe",
582 whereStr="multiframeID IN (%s)" % dmfIDs):
583
584 fName = fileName.split(":")[-1]
585 if os.path.exists(fName):
586 if (db.sysc.testOutputPath() not in fName
587 and os.getenv('USER') != 'scos'):
588 Logger.addMessage("<WARNING> Cannot delete %s, because it "
589 "is write-protected" % fName)
590 else:
591 Logger.addMessage("Deleting %s, which is obsolete" % fName)
592 os.remove(fName)
593 db.update("Multiframe",
594 "fileName='PixelFileNoLongerAvailable:%s'" % fName,
595 where="multiframeID=%s" % mfID)
596
597 numDep = db.update("Multiframe", "deprecated=%s" % DepCodes.reprocWFAU,
598 where="multiframeID IN (%s) and deprecated=0" % dmfIDs)
599
600 Logger.addMessage("...%s products deprecated." % numDep)
601 if numDep:
602 Logger.addMessage("Propagating deprecations to MultiframeDetector...")
603 numDep = db.update("MultiframeDetector",
604 entryList="MultiframeDetector.deprecated=%s" % DepCodes.reprocWFAU,
605 where="Multiframe.deprecated=%s AND " % DepCodes.reprocWFAU
606 + "MultiframeDetector.deprecated!=%s" % DepCodes.reprocWFAU,
607 fromTables=Join(["Multiframe", "MultiframeDetector"],
608 ["multiframeID"]))
609
610 Logger.addMessage("...%s frames deprecated." % numDep)
611
612
613
614
615
616 if not depOld:
617 releasePredicate = " AND releaseNum=%d" % releaseNum
618
619 Logger.addMessage("Resetting intermediate %s product assignments..."
620 % prodType)
621
622
623 imfIDs = SelectSQL("Multiframe.multiframeID",
624 table=Join(["Multiframe", "ProgrammeFrame"], ["multiframeID"]),
625 where="frameType NOT LIKE '%%deep%%'%s" % (" AND " + frameTypePredicate if prodIDs else "")
626 + releasePredicate + progPredicate + productPredicate)
627
628 numRes = db.update("ProgrammeFrame",
629 entryList=[("productID", dbc.intDefault()),
630 ("releaseNum", dbc.smallIntDefault())],
631 where="multiframeID IN (%s)" % imfIDs)
632
633 Logger.addMessage("...%s productIDs reset" % numRes)
634
635
636
637 -def findCreatedProducts(db, programmeID, releaseNum, reqProdsOfType,
638 ingestedIDs, prodType):
639 """
640 Searches file system to find existing products from the list provided of
641 given file type and release number for the given programme.
642
643 @param db: Connection to database containing the observations.
644 @type db: DbSession
645 @param programmeID: Unique ID of desired programme's products.
646 @type programmeID: int
647 @param releaseNum: Release number of desired products.
648 @type releaseNum: int
649 @param reqProdsOfType: List of required productIDs for each product type.
650 @type reqProdsOfType: list(int)
651 @param ingestedIDs: List of already ingested productIDs for product type.
652 @type ingestedIDs: list(int)
653 @param prodType: Product type.
654 @type prodType: str
655
656 @return: Details of products already created reference by productID.
657 @rtype: dict(int: Cu13.ProductDetails)
658
659 """
660 filterTable = df.Table("Filter", db)
661 imageProds = {}
662 prodInfo = queries.getProductInfo(db, programmeID, prodType,
663 reqProdsOfType[prodType])
664
665 currentlyProcessingProducts = db.query(
666 "fileName",
667 "ProductProcessing",
668 "bitProcessingFlag<1073741824 AND programmeID=%s AND productType='%s'"
669 % (programmeID, prodType))
670
671 currentlyFinishedProducts = db.query(
672 "fileName",
673 "ProductProcessing",
674 "bitProcessingFlag=1073741824 AND programmeID=%s AND productType='%s'"
675 % (programmeID, prodType))
676
677 for productID in reqProdsOfType[prodType]:
678 if productID not in ingestedIDs:
679 prodFilePath = fits.findProduct(programmeID, releaseNum, productID,
680 prodType, prodInfo, filterTable,
681 not db.isRealRun, db.sysc)
682
683 if (prodFilePath and os.path.exists(fits.getConfMap(prodFilePath))
684 and os.path.exists(fits.getCatalogue(prodFilePath))
685 and prodFilePath in currentlyFinishedProducts):
686
687 imageProds[productID] = Cu13.ProductDetails(productID,
688 multiframeID=dbc.intDefault(),
689 imageName=prodFilePath,
690 confName=fits.getConfMap(prodFilePath),
691 catName=fits.getCatalogue(prodFilePath))
692
693 elif prodFilePath and prodFilePath not in currentlyProcessingProducts:
694 Logger.addMessage("Incomplete product %s: %d formed that is "
695 "not in ProductProcessing"
696 " Deleting %s" % (prodType, productID, prodFilePath))
697
698 os.remove(prodFilePath)
699 if os.path.exists(fits.getConfMap(prodFilePath)):
700 os.remove(fits.getConfMap(prodFilePath))
701
702 if os.path.exists(fits.getCatalogue(prodFilePath)):
703 os.remove(fits.getCatalogue(prodFilePath))
704 return imageProds
705
706
707
709 """
710 Searches database for all existing products ingested in the database for
711 the given programme and date range. If deep product IDs provided then just
712 these existing products of the given release number are retrieved,
713 otherwise all non-deprecated observations with catalogues of the given type
714 are retrieved.
715
716 @param db: Connection to database containing the observations.
717 @type db: DbSession
718 @param programmeID: Unique ID of desired programme's observations.
719 @type programmeID: int
720 @param releaseNum: Release number of data to retrieve.
721 @type releaseNum: int
722 @param prodIDs: List of product IDs for all required deep products.
723 @type prodIDs: list(int)
724 @param productType: Frame type string pattern of required product.
725 @type productType: str
726
727 @return: Set of product IDs already assigned in the database and the total
728 number of multiframes with assigned product IDs.
729 @rtype: tuple(set(int), int)
730
731 """
732 products = db.query("Multiframe.multiframeID, productID",
733 fromStr=Join(["Multiframe", "ProgrammeFrame"], ["multiframeID"]),
734 whereStr="programmeID=%s AND releaseNum=%s AND productID IN (%s)"
735 " AND %s AND confID>0 AND %s"
736 % (programmeID, releaseNum, ','.join(map(str, prodIDs)),
737 queries.getFrameSelection(productType),
738 DepCodes.selectNonDeprecated))
739
740
741 numMultiframes = len(set(product.multiframeID for product in products))
742
743 return set(product.productID for product in products), numMultiframes
744
745
746
747
749 """
750 Retrieves all multiframe IDs listed in the merge log (synoptic if
751 requested) of the given programme.
752
753 @param db: Connection to database containing desired merge log.
754 @type db: DbSession
755 @param programme: Programme table for current database with the current
756 row set to the desired programme.
757 @type programme: df.ProgrammeTable
758 @param isSynoptic: If True, retrieve multiframe IDs from the synoptic merge
759 log.
760 @type isSynoptic: bool
761
762 @return: All multiframe IDs listed in the programme's merge log table.
763 @rtype: set(int)
764
765 """
766 bandFrames = df.PassbandList(programme, isSynoptic).getMfCols()
767
768 mergeTable = (programme.getMergeLogTable() if not isSynoptic else
769 programme.getSynopticMergeLogTable())
770
771 mergedMFIDs = set()
772 for mfIDs in db.query(', '.join(bandFrames), mergeTable):
773 if len(bandFrames) > 1:
774 mergedMFIDs.update(mfIDs)
775 else:
776 mergedMFIDs.add(mfIDs)
777
778 mergedMFIDs.discard(dbc.intDefault())
779
780 return mergedMFIDs
781
782
783
785 """
786 Queries database to find the next available release number for a given
787 survey.
788
789 @param programme: Database Programme table information with the row set
790 to the current programme.
791 @type programme: DataFactory.ProgrammeTable
792
793 @return: Next available release number.
794 @rtype: int
795
796 """
797 if not programme.isNonSurvey():
798 raise SystemExit("<ERROR> When curating survey programmes, the "
799 "release number must be expressly stated as the relevant "
800 "WFAU-product version number, which must not be zero.")
801
802 db = programme._db
803 progID = programme.getAttr("programmeID")
804 relNum = db.queryAttrMax("releaseNum", "Release", "surveyID=%s" % progID)
805
806 return (relNum + 1 if relNum else 1)
807
808
809
811 """
812 Ingests into given database the deep stack products listed in the given
813 file lists.
814
815 @param db: Connection to ingest database.
816 @type db: DbSession
817 @param fileLists: List of paths to files containing the list of deep stack
818 product files to ingest.
819 @type fileLists: list(str)
820 @param releaseNum: Release number of data to ingest.
821 @type releaseNum: int
822
823 @return: True, if stacks and catalogues are ingested.
824 @rtype: bool
825
826 """
827 if db.database.startswith(db.sysc.loadDatabase):
828 for filePath in fileLists:
829 if filePath and os.path.exists(filePath):
830 Logger.addMessage(
831 "File list %s must be ingested as part of the ongoing "
832 "chron ingestion job to avoid ingesting files with the same "
833 "multiframe ID" % filePath)
834 return False
835
836 Logger.addMessage("Ingesting deep stacks and catalogues...")
837
838
839 dbScript = os.path.join(db.sysc.curationCodePath(),
840 'invocations', 'cu0', 'DataBuilder.py')
841
842 inScript = os.path.join(db.sysc.curationCodePath(),
843 'invocations', 'cu0', 'IngestCUFiles.py')
844
845
846 dbCmd = "%s -f -o '%s' -r -v %s" % (dbScript, db.sharePath(), releaseNum)
847 inCmd = "%s %s.%s" % (inScript, db.server, db.database)
848
849 for filePath in fileLists:
850 if filePath and os.path.exists(filePath):
851
852 outputDirs = set(os.path.dirname(line) for line in file(filePath))
853 for path in outputDirs:
854 for cuNum in [3, 4]:
855 flagPath = \
856 os.path.join(path, 'CU0%sED_%s' % (cuNum, db.database))
857 if os.path.exists(flagPath):
858 Logger.addMessage("Removing old flag: " + flagPath)
859 os.remove(flagPath)
860
861
862 subDir = 'products/' + getProductType(filePath)
863 args = " -x 3 -l %s -s %s %s.%s"
864 args %= (filePath, subDir, db.server, db.database)
865 extp.run(dbCmd + args, parseStdOut=False)
866 extp.run(inCmd, parseStdOut=False)
867
868
869 for path in outputDirs:
870 for cuNum in [3]:
871 flagPath = \
872 os.path.join(path, 'CU0%sED_%s' % (cuNum, db.database))
873 if not os.path.exists(flagPath):
874 Logger.addMessage("Writing: " + flagPath)
875 file(flagPath, 'w').write('')
876
877
878 args = " -x 4 -l %s -s %s %s.%s"
879 args %= (filePath, subDir, db.server, db.database)
880 extp.run(dbCmd + args, parseStdOut=False)
881 extp.run(inCmd, parseStdOut=False)
882
883 return True
884
885
886
888 """ Checks type of product being ingested
889 """
890 prodTypes = set(fileName.split('products')[1].split('/')[0]
891 for fileName in file(filePath))
892
893 if len(prodTypes) != 1:
894 raise Exception("Incorrect number of product types")
895
896 return prodTypes.pop()
897
898
899
900
902 """
903 Tests whether a programme has had the given CU task run for the current
904 release.
905
906 @param programme: Programme table for current database with the current
907 row set to the programme to test.
908 @type programme: df.ProgrammeTable
909 @param cuNum: Unique CU task ID for the CU to test for incompleteness.
910 @type cuNum: int
911 @param refCuNum: Unique CU task ID for the CU that should always occur
912 sometime prior to the CU being tested on a given
913 release run.
914 @type refCuNum: int
915
916 @return: True, if given programme has had a successful CU run for the
917 current release.
918 @rtype: bool
919
920 """
921 curationHistory = programme._db.query("cuID",
922 Join(["ArchiveCurationHistory", "ProgrammeCurationHistory"],
923 ["cuEventID"]),
924 whereStr="cuID IN (%s, %s) AND programmeID=%s AND status=%s AND "
925 "rolledBack=%s ORDER BY ArchiveCurationHistory.cuEventID"
926 % (refCuNum, cuNum, programme.getAttr("programmeID"),
927 dbc.yes(), dbc.no()))
928
929 return refCuNum in curationHistory and curationHistory[-1] != cuNum
930
931
932
934 """
935 Tests a deep programme to see if CU8 needs to be applied to it.
936
937 @param programme: Programme table for current database with the current
938 row set to the programme to test.
939 @type programme: df.ProgrammeTable
940
941 @return: True, if given programme has not had a successful CU8 run
942 following a deep catalogue ingest.
943 @rtype: bool
944
945 """
946 return isCuTaskIncomplete(programme, cuNum=8)
947
948
949
950
951
952
953
955 """
956 Propagate deprecations from MultiframeDetector to the detection table of
957 the given programme.
958
959 @param programme: Programme table for current database with the current row
960 set to that of the programme where the deprecations
961 require propagating.
962 @type programme: df.ProgrammeTable
963
964 """
965 Logger.addMessage("Updating deprecated flag for table %s..."
966 % programme.getDetectionTable())
967
968 numRows = 0
969 for table in programme.getMonthlyDetectionTables():
970 tableJoin = \
971 Join([table, "MultiframeDetector"], ["multiframeID", "extNum"])
972
973
974
975
976 numRows += programme._db.update(table,
977 entryList=table + ".deprecated=MultiframeDetector.deprecated",
978 where="MultiframeDetector.%s and MultiframeDetector.deprecated>0"
979 % DepCodes.selectNonDepAndTest,
980 fromTables=tableJoin)
981
982
983
984 numRows += programme._db.update(table,
985 entryList="%s.deprecated=%s" % (table, DepCodes.propDepDet),
986 where="MultiframeDetector.deprecated BETWEEN %s AND %s AND "
987 "%s.deprecated!=%s AND MultiframeDetector.%s"
988 % (DepCodes.nonDep + 1, DepCodes.reprocCASU - 1, table,
989 DepCodes.propDepDet, DepCodes.selectDepAndNotTest),
990 fromTables=tableJoin)
991
992
993
994 numRows += programme._db.update(table,
995 entryList="%s.deprecated=MultiframeDetector.deprecated" % table,
996 where=table + ".deprecated!=MultiframeDetector.deprecated AND "
997 "MultiframeDetector.deprecated>=%s" % DepCodes.reprocCASU,
998 fromTables=tableJoin)
999
1000 Logger.addMessage("%s rows affected." % numRows)
1001
1002
1003