1
2
3 """
4 Merges detections over all passbands into a binary source table. Can also
5 're-seam' the source table, marking duplicate sources as either being
6 primary or secondary depending upon their quality.
7
8 @warning: The order of sequences are very important in various parts of this
9 code. The numbering of passbands is sorted into filterID order
10 (with multiple epochs in-between e.g. Z, K_1, K_2, H_1, H_2) in
11 some places and by order in the table schema (e.g. mergeLog) in
12 other places. This is essentially the same, as the schema order is
13 created by a script that does indeed follow filterID order. The
14 numbering of passbands is important for cameo schemas & epochs for
15 merging, and detection outgests for pairing. Also, presently the
16 outgested detections for reseaming only must be in declination
17 order, despite being resorted into that order by the pairing code,
18 because they are read later in by the Python code, which expects
19 them to be in declination order. This could be recoded easily.
20
21 @todo: Implement View, FrameSetView, and FrameView objects.
22 @todo: Remove sort on outgest during reseaming and alter _analyseSeams() to
23 remove requirement that pointers are sorted by declination, which
24 will also speed up _getFrameSetSources() query as no sort by dec
25 will be required.
26 @todo: Make SourceMerger a base class, that CU6,7,9 have like an Outgester
27 class.
28
29 @author: N.C. Hambly
30 @org: WFAU, IfA, University of Edinburgh
31
32 @newfield contributors: Contributors, Contributors (Alphabetical Order)
33 @contributors: R.S. Collins
34 """
35
36 from __future__ import division, print_function
37 from future_builtins import map, zip
38
39 from collections import defaultdict, namedtuple, Counter
40 from itertools import groupby
41 import math
42 import numpy
43 from operator import attrgetter, itemgetter
44 import os
45 import shutil
46
47 import wsatools.Astrometry as astro
48 from wsatools.CLI import CLI
49 import wsatools.CSV as csv
50 import wsatools.DataFactory as df
51 import wsatools.DbConnect.CommonQueries as queries
52 import wsatools.DbConnect.DbConstants as dbc
53 from wsatools.DbConnect.DbSession import Join, Ingester, odbc, Outgester, \
54 SelectSQL
55 from wsatools.DbConnect.CuSession import CuSession
56 import wsatools.DbConnect.Schema as schema
57 from wsatools.Logger import Logger, ForLoopMonitor
58 import wsatools.pairing as pairing
59 from wsatools.SystemConstants import DepCodes
60 import wsatools.Utilities as utils
61 import wsatools.WfcamsrcInterface as wfcamsrc
62 from wsatools.ProgrammeBuilder import SynopticSetup
63
64
66 """ Merges detections over all passbands into a binary source table.
67 """
68
69
70
71 commencementMessage = ("Source merging... Sit back and relax. Progress "
72 "estimates are for this stage only (ingest and "
73 "seaming are still to come).")
74 """ Message to log at the commencement of source merging.
75 """
76 freeCacheInterval = 1000
77 """ Number of frame sets between calling FREEPROCCACHE during reseaming.
78 """
79 minEpochSep = 183
80 """ Minimum epoch separation in days before a proper motion solution is
81 worthwhile calculating.
82 """
83 seamErrorBitMask = 0xffffff00
84 """ Error bit mask for removing innocuous information bits from error
85 bit masks prior to seaming.
86 """
87 seamingPairCriterion = 0.8
88 """ The match radius for associating the same source in overlap regions
89 between frame sets in all mosaiced surveys. This parameter is passed
90 directly to the C pairing applications, and is an angular radius
91 expressed in arcseconds. Originally (e.g. EDR) this was set to 1.0
92 arcsec; discussions with SJW etc. suggest this was somewhat overly
93 generous so it has been reduced to sqrt(2.0) times the typical
94 worst centroiding error, which is equal to about 2pix, or 0.8 arcsec.
95 At the same time, duplicate detection in the seaming algorithm has been
96 relaxed to not insist on the same passband set for duplicates.
97 """
98 swVersion = 1.0
99 """ The merge software version. Keeps track of any changes after
100 deployment; if changes to this module (or any module used by this
101 one) require a complete remerging of all merged source tables in the
102 archive, then increment this merge software version number. NB: use
103 with caution, since the resulting catalogue processing will take a
104 long time if the archive is large.
105 """
106
107 _autoCommit = True
108 _isPersistent = True
109 _isVerbose = True
110 _useWorkDir = True
111 _useExtNum = True
112
113
114
115
116
117 continueCompleteReseam = False
118 """ Should only be used to continue reseaming a source table from scratch
119 following a crash. This marks frame sets as old whilst reseaming,
120 without the safety catch of resetting all frame sets to new initially.
121 """
122 dateRange = CuSession.sysc.obsCal.dateRange()
123 """ Range of observation dates to merge. Use begin dates with caution, as
124 for real runs you always need this to be the early default value.
125 """
126 doCompleteReseam = False
127 """ Reset the merge-log to mark all frame sets as new, and reseam the
128 entire source table. This allows us to mark frame sets as old whilst we
129 are reseaming, which is unsafe if we were to be reseaming partial data.
130 """
131
132
133
134 _extractor = None
135 """ Name of source catalogue extraction tool used to create this
136 programme's detections.
137 """
138 _filterPasses = None
139 """ A list of filterIDs and number of passes for this programme.
140 """
141 _frameSetIDName = ''
142 """ Name of the column in the source table that identifies the unique ID of
143 the frame set for a given source.
144 """
145 _idxInfo = None
146 """ A dictionary of schema.Index object lists referenced by table name.
147 """
148 _mergeLog = ''
149 """ Name of the merge log table for this programme.
150 """
151 _mfidIndices = None
152 """ List of column indices in the merge log table that give the
153 multiframeIDs to each passband's image.
154 """
155 _newFrameSetIdx = 0
156 """ The column index in the merge log table that contains the newFrameSet
157 attribute.
158 """
159 _numFilterCol = 0
160 """ The number of columns in the merge log that contain passband-specific
161 attribute values, for a given passband.
162 """
163 _numHeaderCol = 0
164 """ The number of columns in the merge log table before the repeating
165 set of columns for passband-specific attributes begin.
166 """
167 _outgester = None
168 """ Outgester object for outgesting detections and sources for reseaming.
169 """
170 _progSchema = None
171 """ Dictionary of programme's table schema referenced by table name.
172 """
173 _passbands = None
174 """ List of short-names for passbands of this programme, e.g. k_2.
175 """
176 _sourceIDName = ''
177 """ Name of the column in the source table for the unique source ID.
178 """
179 _sourceTable = ''
180 """ Name of the table to ingest merged sources for this programme.
181 """
182
183
184
185 CLI.progArgs.append(CLI.Argument('programmeID', 'LAS'))
186 CLI.progOpts.append(CLI.Option('v', 'verbose', 'display details for every '
187 'frame set currently being processed'))
188
189
190
216
217
218
245
246
247
249 """
250 Merge source detections from different passbands to create the source
251 table for ingest.
252
253 Method: For every detector frame set:
254 1) Obtain the detection list in each passband, bulk-outgest of binary
255 numerical data into the curation-client/load-server share file system;
256 2) Create the sets of pair pointer files, forwards and backwards, for
257 each distinct pair of passbands;
258 3) Create the set of merged pointers based on "hand-shake" pairing
259 between any pair of pointer sets (i.e. ensuring consistency of pair
260 pointers between the passbands);
261 4) Create the merged source list according to the schema prescription,
262 appending new sources to an accumulating binary file for ingest back
263 into the database.
264
265 Low-level C/C++ application codes are extensively used to do the heavy
266 CPU/IO work.
267
268 @param newFrameSets: List of frame sets to be merged.
269 @type newFrameSets: list(self.FrameSet)
270 @param matchRadius: Pair criterion in arcseconds.
271 @type matchRadius: float
272
273 @return: Pair of file names for the binary source table and the CSV
274 formatted merge-log table for ingest. Also returns a status to
275 say whether the source merging was complete or not.
276 @rtype: str, str, bool
277
278 """
279 if not self._passbands:
280 raise SourceMerger.CuError("This survey has no passbands defined.")
281
282
283 if "eBV" in self._progSchema[self._sourceTable].columns:
284 self._updateAebvCSVFile()
285
286 joinAlias = "Raw"
287 detTables = [(self.programme.getDetectionTable(), joinAlias),
288 (self.programme.getAstrometryTable(), "Astro"),
289 (self.programme.getPhotometryTable(), "Photo")]
290
291 commonCols = utils.getDuplicates(utils.unpackList(
292 map(str, self._progSchema[table].columns)
293 for table, _alias in detTables))
294
295 detSchema = self._progSchema[self.programme.getAttr('detectionTable')]
296 detColumns = ', '.join(('%s.%s' % (joinAlias, column)
297 if str(column) in commonCols else str(column))
298 for column in detSchema.columns)
299
300 detTables = Join([(self._outgester.tablePath(table), alias)
301 for table, alias in detTables],
302 ["multiframeID", "extNum", "seqNum"])
303
304 Logger.addMessage("Outgesting default row", alwaysLog=self._isVerbose)
305 defRowQuery = SelectSQL(detColumns, detTables,
306 where="%s.multiframeID=%s AND %s.extNum=%s" %
307 (joinAlias, dbc.intDefault(), joinAlias, dbc.tinyIntDefault()))
308
309 defRowPathName = self._outgester.outgestQuery(defRowQuery)
310 if os.path.getsize(defRowPathName) == 0:
311 raise SourceMerger.CuError("Default detection row is empty.")
312
313
314 posColData, outgestRowSize = self._getPositionColumnData()
315
316
317
318 workPath = self.sysc.tempWorkPath()
319 fileIDs = []
320 outgestFiles = []
321 cameoFiles = []
322 for passband in self._passbands:
323 fileID = passband + "_detections"
324 fileIDs.append(fileID)
325
326 outgestPathName = self._outgester.getFilePath(fileID)
327 outgestFiles.append(outgestPathName)
328
329 cameoPathName = os.path.join(workPath, "cameo_%s.dat" % passband)
330 cameoFiles.append(cameoPathName)
331 data = ["%s %s %s" % (outgestPathName, passband, outgestRowSize)]
332 data += self._getCameoPassbandData(passband)
333 file(cameoPathName, 'w').writelines(line + '\n' for line in data)
334
335 if len(cameoFiles) is 1:
336 cameoFiles *= 2
337
338
339 mergeTablePathName = os.path.join(workPath, "mergetable.dat")
340
341 resultsPathName = os.path.join(workPath, self._sourceTable + ".nat")
342
343
344 firstID = self._getNextID(self._sourceIDName, self._sourceTable,
345 addProgID=True)
346
347 Logger.addMessage("Creating a look-up dictionary of frame epochs",
348 alwaysLog=self._isVerbose)
349 epochDict = self._createEpochDict()
350 Logger.addMessage("Creating a look-up dictionary of frame pixel scales",
351 alwaysLog=self._isVerbose)
352 pixScaleDict = self._createPixScaleDict()
353 Logger.addMessage(self.commencementMessage, alwaysLog=self._isVerbose)
354
355 failures = []
356 progress = ForLoopMonitor(newFrameSets)
357 for frameCount, frameSet in enumerate(newFrameSets):
358 Logger.addMessage("Processing frame set %s of %s" % (frameCount + 1,
359 len(newFrameSets)), alwaysLog=False)
360
361 Logger.addMessage("Outgesting detections...", alwaysLog=False)
362 try:
363
364
365 inputList = []
366 for mfidIndex, fileID in zip(self._mfidIndices, fileIDs):
367 if frameSet[mfidIndex] == dbc.intDefault():
368
369 os.system(
370 "touch " + self._outgester.getFilePath(fileID))
371
372 outgestPathName = ''
373 else:
374 whereStr = "%s.multiframeID=%s AND %s.seqNum > 0" \
375 % (joinAlias, frameSet[mfidIndex], joinAlias)
376
377
378 if self._useExtNum:
379 whereStr += " AND %s.extNum=%s" \
380 % (joinAlias, frameSet[mfidIndex + 1])
381
382 detTable = self.programme.getMonthlyDetectionTables(
383 self.dateRange,
384 excludeDeeps=isinstance(self, SynopticMerger),
385 mfIDs=[frameSet[mfidIndex]])
386
387 outDetTables = detTables
388 if "Raw20" in detTable:
389 outDetTables = [(detTable, joinAlias),
390 (detTable.replace("Raw", "Astrometry"), "Astro"),
391 (detTable.replace("Raw", "Photometry"), "Photo")]
392
393 outDetTables = \
394 Join([(self._outgester.tablePath(table), alias)
395 for table, alias in outDetTables],
396 ["multiframeID", "extNum", "seqNum"])
397
398 outgestPathName = self._outgester.outgestQuery(
399 SelectSQL(detColumns, outDetTables, whereStr), fileID)
400
401 inputList.append(tuple([outgestPathName] + posColData))
402
403 except Exception as error:
404
405
406 Logger.addExceptionMessage(error)
407 Logger.addMessage("<Warning> Frame set %s is incomplete. "
408 "Skipping..." % frameSet.ID)
409 failures.append(frameSet)
410 else:
411 Logger.addMessage("Pairing & merging detections...",
412 alwaysLog=False)
413
414
415 if len(inputList) is 1:
416
417
418 inputList *= 2
419 mergeList = range(2)
420 else:
421 mergeList = range(len(self._passbands))
422
423
424
425
426 pairing.multiPair(inputList, matchRadius, workPath)
427
428
429 path, mergeTableFileName = os.path.split(mergeTablePathName)
430 numMerged = pairing.merge(mergeList, path, mergeTableFileName)
431 Logger.addMessage("Frame set UID %s contains %s merged sources"
432 % (frameSet.ID, numMerged), alwaysLog=False)
433
434 if numMerged > 0:
435
436
437
438
439 for filePathName in outgestFiles:
440 if os.path.getsize(filePathName) == 0:
441 os.remove(filePathName)
442 shutil.copy(defRowPathName, filePathName)
443
444
445 epochs = [epochDict.get(frameSet[mfdIdx],
446 dbc.realDefault())
447 for mfdIdx in self._mfidIndices]
448
449 uniqueEpochs = set(epochs)
450 uniqueEpochs.discard(dbc.realDefault())
451
452 isLongBaseline = (len(uniqueEpochs) > 1
453 and max(uniqueEpochs) - min(uniqueEpochs)
454 > self.minEpochSep)
455
456 centrePos = ((frameSet.ra, frameSet.dec)
457 if isLongBaseline else None)
458
459
460 pixScales = [pixScaleDict.get(frameSet[mfdIdx],
461 dbc.realDefault())
462 for mfdIdx in self._mfidIndices]
463
464 if len(epochs) is 1:
465 epochs *= 2
466 pixScales *= 2
467
468 Logger.addMessage("Appending to source table binary "
469 "file...", alwaysLog=False)
470
471 wfcamsrc.mergeDetections(mergeTablePathName,
472 resultsPathName, self._sourceTable,
473 self.programme.getSchemaScript(), cameoFiles, epochs,
474 pixScales, self.cuEventID, firstID, frameSet.ID,
475 self._extractor, self._frameSetIDName,
476 self._sourceIDName, centrePos, self.minEpochSep)
477
478 firstID += numMerged
479
480
481
482
483 for filePathName in outgestFiles:
484 try:
485 os.remove(filePathName)
486 except OSError as error:
487 if "No such file" not in str(error):
488 raise
489
490 if self._isVerbose:
491 progress.testForOutput("Completed frameSetID %s" % frameSet.ID)
492
493
494
495
496 mergeLogPathName = os.path.join(self.sysc.tempWorkPath(),
497 'mergeLog.csv')
498 csv.File(mergeLogPathName,
499 'w+').writelines(set(newFrameSets) - set(failures))
500
501 if failures:
502 Logger.addMessage("<WARNING> Not all frame sets were merged.")
503
504 return resultsPathName, mergeLogPathName, not failures
505
506
507
509 """
510 Reseam the merged source list for a given programme. Where the same
511 sources have been observed over multiple frame sets, this routine
512 identifies the best source candidate in overlap regions that produce
513 non-unique source pairs. The priOrSec flag is set to the framesetID,
514 with the best source so primary sources are (priOrSec = 0 OR
515 priOrSec=framesetID).
516
517 Method: identifies every current frame set for the programme that is
518 new or has a new adjacent frame; for old current frame sets that have
519 a new adjacent frame, resets the seam flags to default (ie. not seamed)
520 for safety; for every central frame / adjacent frames group, creates
521 lists of sources with attribute subselection required for determination
522 of seamless list; produces pair pointers between the central frame
523 sources and those in adjacent frames; for every central frame record
524 chooses the best source on the basis of a seaming algorithm which
525 incorporates such considerations as distance from optical axis, pixel
526 processing quality considerations etc.; sets all central frame set
527 sources to seam = 0 (i.e. inclusive); and finally resets all central
528 frame set sources that have duplicates in adjacent frame sets to have a
529 seam flag that either points to the central frame or the adjacent
530 frame, depending on which contains the source that is to be considered
531 the best.
532
533 """
534 if self.doCompleteReseam:
535 Logger.addMessage("Marking all frame sets as new")
536 self.archive.update(self._mergeLog, "newFrameSet=%s" % dbc.yes())
537
538 Logger.addMessage("Preparing for seaming stage")
539
540
541 sourceRowData = self._getSrcPositionColumnData()
542
543
544
545
546
547
548
549
550
551 adjacentFrameTol = 2.2 * self._getMedianImageExtent()
552 currentFrameSets = self._getExistingFrameSets()
553 numNew = self.archive.queryNumRows(self._mergeLog,
554 whereStr="newFrameSet=%s" % dbc.yes())
555 Logger.addMessage("...of which %s are marked as new" % numNew)
556
557 self._addSeamingIndices()
558
559 Logger.addMessage("Reseaming sources...")
560 progress = ForLoopMonitor(currentFrameSets)
561 for setCount, centralFrameSet in enumerate(currentFrameSets):
562 Logger.addMessage("Processing frame set %s of %s" % (setCount + 1,
563 len(currentFrameSets)), alwaysLog=False)
564
565 if setCount % SourceMerger.freeCacheInterval == 0:
566 self.archive.freeProcCache()
567
568
569 adjacentFrames = astro.matchFrames(currentFrameSets, raIndex=2,
570 centrePos=(centralFrameSet.ra, centralFrameSet.dec),
571 tolerance=adjacentFrameTol)
572
573
574 adjacentFrames.remove(centralFrameSet)
575
576 if not adjacentFrames:
577
578
579 if centralFrameSet[self._newFrameSetIdx] is dbc.yes():
580 self.archive.updateEntries(
581 self._sourceTable, "priOrSec=%s" % dbc.no(),
582 [(self._frameSetIDName, centralFrameSet.ID)])
583
584
585
586
587 if adjacentFrames and (
588 centralFrameSet[self._newFrameSetIdx] is dbc.yes()
589 or dbc.yes() in set(frame[self._newFrameSetIdx]
590 for frame in adjacentFrames)):
591
592
593 if centralFrameSet[self._newFrameSetIdx] is dbc.no():
594 self.archive.updateEntries(
595 self._sourceTable, "priOrSec=%s" % dbc.intDefault(),
596 [(self._frameSetIDName, centralFrameSet.ID)])
597
598 try:
599 sourcesPathName = self._outgester.outgestQuery(
600 SelectSQL("ra, dec",
601 table=self._outgester.tablePath(self._sourceTable),
602 where=self._frameSetIDName + "=%s" % centralFrameSet.ID,
603 orderBy="dec"), "sources_0_")
604 except Exception as error:
605 Logger.addExceptionMessage(error)
606 else:
607
608 set0 = tuple([sourcesPathName] + sourceRowData)
609
610
611
612 fileNames = []
613 for j, frame in enumerate(adjacentFrames):
614 sourcesPathName = self._outgester.outgestQuery(
615 SelectSQL("ra, dec",
616 table=self._outgester.tablePath(self._sourceTable),
617 where=self._frameSetIDName + "=%s" % frame.ID,
618 orderBy="dec"), "sources_%s_" % (j + 1))
619 set1 = tuple([sourcesPathName] + sourceRowData)
620 pFilePath = os.path.join(self.sysc.tempWorkPath(),
621 "pointers_%s_%s.dat")
622 pFile0 = pFilePath % (0, j + 1)
623 radius = SourceMerger.seamingPairCriterion
624 pairing.pairOff(set0, set1, pFile0, radius)
625
626
627 fileNames.append(pFile0)
628
629
630
631
632
633 opticalAxes = [(math.radians(centralFrameSet.ra),
634 math.radians(centralFrameSet.dec))]
635 opticalAxes += [(math.radians(frameSet.ra),
636 math.radians(frameSet.dec))
637 for frameSet in adjacentFrames]
638
639
640
641
642
643
644 seamResults = self._analyseSeams(centralFrameSet.ID,
645 adjacentFrames, fileNames, opticalAxes)
646
647
648
649 self.archive.updateEntries(
650 self._sourceTable, "priOrSec=%s" % dbc.no(),
651 [(self._frameSetIDName, centralFrameSet.ID)])
652
653
654
655
656 for frameSetID, sourceList in seamResults.iteritems():
657 self.archive.updateEntries(
658 self._sourceTable, "priOrSec=%s" % frameSetID,
659 [(self._sourceIDName, sourceList)])
660
661
662
663 if self.doCompleteReseam or self.continueCompleteReseam:
664 self.archive.updateEntries(
665 self._mergeLog, "newFrameSet=%s" % dbc.no(),
666 [(self._frameSetIDName, centralFrameSet.ID)])
667
668 progress.testForOutput("Completed frameSetID %s"
669 % centralFrameSet.ID)
670
671 Logger.addMessage("done.")
672 Logger.addMessage("Resetting individual new frame flags "
673 "after seaming:")
674 Logger.addMessage("No. of frame sets with flags reset to old: %s" %
675 self.archive.update(self._mergeLog,
676 [("new%sframe" % passband, str(dbc.no()))
677 for passband in self._passbands]))
678
679
680 if not self.doCompleteReseam and not self.continueCompleteReseam:
681 self.archive.update(self._mergeLog,
682 [("newFrameSet", str(dbc.no()))])
683
684
685
687 """ To optimise seaming we need to apply an index to the Source table
688 frameSetID, ra, dec columns (the CU7 index), which are outgested,
689 as well as a covering index on the ppErrBits and seqNum attributes
690 that are queried many times (the CU7_new index).
691 """
692 indexExists = False
693 for index in self._idxInfo[self._sourceTable]:
694 if "CU7" in index.name:
695 indexExists = True
696 self.archive.addIndex(index)
697
698 if not indexExists:
699 Logger.addMessage("<WARNING> This programme has no CU7 indices "
700 "defined in %s." % self.sysc.indexScript)
701
702
703
704 - def _analyseSeams(self, centralSetID, adjacentFrames, fileNames,
705 opticalAxes):
706 """
707 Analyse a set of merged sources in an overlap region. Sources that are
708 paired between a central frame set and it's adjacent frame sets. The
709 required seaming data are first extracted from the source table; then
710 the function examines each central frame set merged source in turn,
711 checking the adjacent frame set pairs and noting any identical
712 record that should be used in preference to the central record. The
713 function returns a data object containing lists of sources for each
714 frame set ID (central and adjacents) that need their seam index
715 attribute set to indicate the presence of a secondary identical source
716 or a preferred primary source.
717
718 @todo: Could speed things up by looking at function calls within loops
719 consider passing entire lists to functions instead.
720
721 @param centralSetID: The UID of the central fame set.
722 @type centralSetID: int
723 @param adjacentFrames: The merge log rows of all adjacent frames.
724 @type adjacentFrames: list(tuple)
725 @param fileNames: Filenames of outgest source and pointer files.
726 @type fileNames: list(str)
727 @param opticalAxes: Celestial coordinate pairs of the camera optical
728 axis.
729 @type opticalAxes: list((float, float))
730
731 @return: A duplicate sources dictionary; consisting of a list of source
732 IDs referenced by a certain frame set ID, for which every
733 source in the list must have their seaming flag value set to.
734 @rtype: defaultdict(int:list(int))
735
736 """
737
738
739
740
741 pointers = \
742 [numpy.fromfile(fileName, numpy.int32) for fileName in fileNames]
743 sourceData = [self._getFrameSetSources(centralSetID)]
744 sourceData.extend(
745 self._getFrameSetSources(frameSet[0]) for frameSet in adjacentFrames)
746
747
748
749
750
751
752 if len(pointers) != len(sourceData) - 1:
753 raise SourceMerger.CuError(
754 "Inconsistent data set during seam analysis: \n"
755 " - no. of pointer files: %s\n" % len(pointers) +
756 " - no. of adjacent source datasets: %s" % (len(sourceData) - 1))
757
758 if any(len(ptrAry) != len(pointers[0])
759 for ptrAry in pointers[1:]):
760 raise SourceMerger.CuError(
761 "Inconsistent data set during seam analysis: \n"
762 " - pointer files have inconsistent lengths.")
763
764 if len(sourceData[0]) != len(pointers[0]):
765 raise SourceMerger.CuError(
766 "Inconsistent data set during seam analysis: \n"
767 " - central frame set source data and pointer files have\n"
768 " inconsistent lengths: central frame set sources: %s; no."
769 " of pointers: %s" % (len(sourceData[0]), len(pointers[0])))
770
771
772 duplicates = defaultdict(list)
773 for i in range(len(pointers[0])):
774
775 records = [sourceData[0][i]]
776 frameSetIDs = [centralSetID]
777 axes = [opticalAxes[0]]
778
779 for j in range(len(pointers)):
780 pp = pointers[j][i]
781
782 if pp > -1:
783 records.append(sourceData[j + 1][pp])
784 frameSetIDs.append(adjacentFrames[j][0])
785 axes.append(opticalAxes[j + 1])
786
787
788
789
790 if len(records) > 1:
791
792
793 offset = 4
794 numPerPass = 2
795
796
797 indices = range(offset,
798 offset + numPerPass * len(self._passbands),
799 numPerPass)
800 numDetsList = [
801 sum(1 for i in indices if record[i] != dbc.intDefault() and
802 record[i - 1] <= 0xffffffff ^ SourceMerger.seamErrorBitMask)
803 for record in records]
804
805 bestID = self._pickBestDuplicate(records, axes, frameSetIDs,
806 numDetsList)
807
808 duplicates[bestID].append(records[0][0])
809
810 return duplicates
811
812
813
815 """ @return: A dictionary of frame epochs referenced by multiframeID.
816 @rtype: dict(int, float)
817 """
818 return dict(self.archive.query("Multiframe.multiframeID, mjdObs",
819 Join(["Multiframe", "ProgrammeFrame"], ["multiframeID"]),
820 "Multiframe.multiframeID > 0 AND frameType LIKE '%stack%' AND " +
821 "programmeID=%s" % self.programmeID))
822
823
824
826 """ @return: A dictionary of frame pixel scales referenced by
827 multiframeID.
828 @rtype: dict(int, float)
829 """
830
831 keyValuePairs = "Multiframe.multiframeID, xPixSize"
832 return dict(self.archive.query(keyValuePairs,
833 Join(["Multiframe", "CurrentAstrometry", "ProgrammeFrame"],
834 ["multiframeID"]),
835 "Multiframe.multiframeID > 0 AND frameType LIKE '%stack%' AND " +
836 "programmeID=%s" % self.programmeID, groupBy=keyValuePairs))
837
838
839
841 """
842 Returns the cameo schema for this passband. The schema describes the
843 contents of the individual detection records, i.e. the positions of
844 quantities to be merged and their units. NB: if the base schema for the
845 detection tables is changed, then this function may need to be updated
846 to reflect those changes.
847
848 @param pb: Short name of passband.
849 @type pb: str
850
851 @return: Cameo schema attributes for this passband: Name, first byte on
852 the record, SQL data type, and units.
853 @rtype: list(str)
854
855 """
856 cameoData = []
857 detSchema = self._progSchema[self.programme.getAttr('detectionTable')]
858
859
860 detColumns = dict((name.lower(), detSchema.attribute[name])
861 for name in detSchema.attribute)
862
863
864 for colName in ("ra", "dec", "xErr", "yErr"):
865 column = detColumns[colName.lower()]
866 colPos = detSchema.getBytePos(colName)
867 colUnit = column.tag.get("--/U", dbc.charDefault()).title()
868 colData = [colName, str(colPos), column.dataType, colUnit]
869 cameoData.append(' '.join(colData))
870
871 for column in self._progSchema[self._sourceTable].columns:
872 if column.name.startswith(pb) and "mag" in column.name.lower():
873 colName = (column.name[len(pb):]
874 if not column.name.endswith("SerMag2DErr") else
875 "SerFit2DChi2")
876 colPos = detSchema.getBytePos(colName)
877 colUnit = column.tag.get("--/U", "mag")
878 colData = [column.name, str(colPos), column.dataType, colUnit]
879 if colPos:
880 cameoData.append(' '.join(colData))
881
882
883
884 for colName in ("HlCorSMjRadAs", "Gausig", "Ell", "PA", "ErrBits",
885 "Deblend", "Class", "ClassStat", "ppErrBits",
886 "SeqNum", "ObjID", "ExtNum", "Mjd", "AverageConf"):
887 try:
888 column = detColumns[colName.lower()]
889 except KeyError:
890 pass
891 else:
892 colPos = detSchema.getBytePos(colName)
893 colUnit = column.tag.get("--/U", dbc.charDefault()).title()
894 colData = [colName, str(colPos), column.dataType, colUnit]
895 cameoData.append(pb + ' '.join(colData))
896
897 return cameoData
898
899
900
902 """ @return: All frame sets in MergeLog for this programme.
903 @rtype: list(tuple)
904 """
905 Logger.addMessage("Creating list of existing frame sets...")
906 frameSets = self.archive.query(
907 selectStr="*",
908 fromStr=self._mergeLog,
909 whereStr="deprecated = 0 ORDER BY dec",
910 ResultsTuple=self.FrameSet)
911
912 Logger.addMessage("...number of existing frame sets: %s "
913 % len(frameSets))
914
915 return frameSets
916
917
918
920 """
921 Get merged source data from a given frame set for the purposes of
922 reseaming. The function makes a selection of attributes from those
923 available for each merged source; those attributes are for use by the
924 seaming algorithm to decide which is the best record amongst a set of
925 duplicates. This function also returns two scalar values which encode
926 the offset and number of attributes per passband as dictated by the
927 selection.
928
929 @param frameSetID: The UID of the source frame set.
930 @type frameSetID: int
931
932 @return: Attributes for the set of sources in the specified frame set.
933 @rtype: list(tuples)
934
935 """
936 return self.archive.query(
937 selectStr=self._sourceIDName + ", ra, dec, " +
938 ", ".join("%sppErrBits, %sSeqNum" %
939 (pb, pb) for pb in self._passbands),
940 fromStr=self._sourceTable,
941 whereStr=self._frameSetIDName + "=%s ORDER BY dec" % frameSetID)
942
943
944
969
970
971
972 - def _getNextID(self, attrName, tableName, where='', addProgID=False):
973 """
974 Obtains next available ID number in the db for a given ID attribute.
975
976 @param attrName: Name of ID attribute.
977 @type attrName: str
978 @param tableName: Name of the table where attribute resides.
979 @type tableName: str
980 @param where: Optional SQL WHERE clause.
981 @type where: str
982 @param addProgID: If True, prefix initial ID with the programme UID to
983 make ID unique across the entire archive.
984 @type addProgID: bool
985
986 @return: Next available ID number.
987 @rtype: int
988
989 """
990 nextID = super(SourceMerger,
991 self)._getNextID(attrName, tableName, where)
992 if addProgID and nextID is 1:
993 nextID = self.programmeID * 65536L * 65536L + 1L
994 return nextID
995
996
997
999 """ Prepare info for C modules about the detection position column data
1000 from the table schema.
1001 """
1002 detSchema = self._progSchema[self.programme.getAttr('detectionTable')]
1003
1004 posColData = []
1005 for colName in ("ra", "dec"):
1006 column = detSchema.attribute[colName]
1007 colPos = detSchema.getBytePos(column.name)
1008 colSize = self.sysc.sqlDataTypeSize[column.dataType]
1009 colUnit = column.tag.get("--/U").upper()[:3]
1010 posColData.append((colPos, colSize, colUnit))
1011
1012 posColData.append(detSchema.getBytePos())
1013
1014 return posColData, posColData[-1]
1015
1016
1017
1019 """ @return: List of filter IDs and number of passes for the programme.
1020 @rtype: list(tuple(int, int))
1021 """
1022 return self.archive.query(
1023 selectStr="filterID, nPass",
1024 fromStr="RequiredFilters",
1025 whereStr="programmeID=%s ORDER BY filterID ASC" % self.programmeID)
1026
1027
1028
1030 """ Define source position column data of a source table outgest for
1031 reseaming stage. Currently, RA and Dec are all that are required
1032 for the overlap pairing stage and the analysis is done later in
1033 Python by executing a query on the DB to get all required analysis
1034 merged source data.
1035 """
1036 sourceRowData = []
1037 srcSchema = self._progSchema[self._sourceTable]
1038
1039 colPos = 0
1040 for colName in ("ra", "dec"):
1041 column = srcSchema.attribute[colName]
1042 colSize = self.sysc.sqlDataTypeSize[column.dataType]
1043 colUnit = column.tag.get("--/U").upper()[:3]
1044 sourceRowData.append((colPos, colSize, colUnit))
1045 colPos += colSize
1046
1047 sourceRowData.append(colPos)
1048
1049 return sourceRowData
1050
1051
1052
1054 """ @return: True if quality bit flags have been fully updated for this
1055 programme's detection table.
1056 @rtype: bool
1057 """
1058
1059
1060
1061
1062
1063
1064
1065
1066 return not self.archive.queryEntriesExist(
1067 Join(["Multiframe", "ProgrammeFrame",
1068 self.programme.getDetectionTable()],
1069 ["multiframeID"]),
1070 where="utDate BETWEEN '%s' AND '%s'" % self.dateRange
1071 + " AND ppErrBitsStatus=0 AND programmeID=%s" % self.programmeID)
1072
1073
1074
1075 - def _pickBestDuplicate(self, overlapSources, overlapAxes, overlapFrameIDs,
1076 overlapDetectNos):
1077 """
1078 Chooses the best source from a set of duplicates in an overlap region.
1079 Given a list of duplicate sources with positions and qualities, and a
1080 list of their respective image optical axis points, choose the best
1081 image out of the set of duplicates. Originally, this was
1082 based on a seaming algorithm that took into account angular distance
1083 from the axis until the precise meaning of the bit-wise quality
1084 flags was clarified, and used proximity to the optical axis only as
1085 the criterion for judging the best source, choosing only from those
1086 sources having the maximum number of constituent detections.
1087 For DR2 et seq., the seaming algorithm has been modified to:
1088 a) mask off innocuous "information"-only error bits amongst the
1089 available passbands;
1090 b) eliminate any duplicate that has "warning" quality bits set
1091 but bringing all back into contention if this would result in no
1092 primary image;
1093 c) check the filter complement, choosing the merged source with
1094 the most filter coverage;
1095 d) then, and only then, use proximity to the frame set centre if
1096 there is still more than one potential primary; the tangent point
1097 for this test is, therefore, the centre of the frame set and not
1098 the optical axis.
1099
1100 @param overlapSources: Merged source data for the duplicates.
1101 @type overlapSources: list(tuple)
1102 @param overlapAxes: Tangent point for each source.
1103 @type overlapAxes: list((float, float))
1104 @param overlapFrameIDs: UIDs for the frame sets of the sources.
1105 @type overlapFrameIDs: list(int)
1106 @param overlapDetectNos: Counts of the no. of detections in each frame.
1107 @type overlapDetectNos: list(int)
1108
1109 @return: The UID of the frame set which contains the best source.
1110 @rtype: int
1111
1112 """
1113
1114 bitChecks = []
1115 numBadBits = []
1116 for source, frameID in zip(overlapSources, overlapFrameIDs):
1117
1118 sourceBits = [source[3 + (2 * i)] for i in range(len(self._passbands))]
1119 numBadBits.append(0)
1120 for bit in sourceBits:
1121 if bit > 0xffffffff ^ SourceMerger.seamErrorBitMask:
1122 numBadBits[-1] += 1
1123 allBits = reduce(lambda x, y: x | y, sourceBits)
1124 bitChecks.append(not (allBits & SourceMerger.seamErrorBitMask))
1125 if bitChecks[-1]:
1126 bestID = frameID
1127
1128
1129 if bitChecks.count(True) == 1:
1130 return bestID
1131
1132 allBad = True not in bitChecks
1133 maxSep = 2.0 * math.pi
1134 maxDets = max(overlapDetectNos)
1135 minBad = min(nBad for numDets, nBad
1136 in zip(overlapDetectNos, numBadBits)
1137 if numDets == maxDets)
1138
1139 for source, axis, frameID, numDets, bitsOK, nBad in zip(overlapSources,
1140 overlapAxes, overlapFrameIDs, overlapDetectNos, bitChecks,
1141 numBadBits):
1142
1143
1144 if (bitsOK or allBad) and numDets == maxDets and nBad == minBad:
1145
1146
1147
1148
1149 pos = (math.radians(source[1]), math.radians(source[2]))
1150 xi, xn = astro.convSkyToTangent(pos, axis)
1151 sep = max(abs(xi), abs(xn))
1152 if sep < maxSep:
1153 maxSep = sep
1154 bestID = frameID
1155
1156 return bestID
1157
1158
1159
1161 """
1162 Remove merged sources that are based on deprecated frame sets.
1163
1164 @param tableName: Source table to edit.
1165 @type tableName: str
1166
1167 @return: Number of sources deleted.
1168 @rtype: int
1169
1170 """
1171 deprecatedFrameSets = SelectSQL(self._frameSetIDName, self._mergeLog,
1172 where="deprecated > 0")
1173
1174 return self.archive.delete(tableName,
1175 whereStr=self._frameSetIDName + " IN (%s)" % deprecatedFrameSets)
1176
1177
1178
1230
1231
1232
1238
1239
1240
1242 """
1243 Put the latest aebv values from table Filter into aebv.csv for mkmerge.
1244
1245 @todo: Make this method a fundamental part of a new WfcamsrcInterface
1246 class, to replace the WfcamsrcInterface module?
1247 """
1248 Logger.addMessage("Filling aebv.csv with latest values")
1249 filePath = os.path.join(self.sysc.srcPath, 'wfcamsrc', 'aebv.csv')
1250 lines = ('%s,%s\n' % (aFilter.shortName.title(), aFilter.aebv)
1251 for aFilter in self.archive.query("shortName, aebv", "Filter"))
1252
1253 file(filePath, 'w').writelines(lines)
1254
1255
1256
1258 """ Create/update merged synoptic source catalogue for a given programme.
1259 """
1260
1261
1262 commencementMessage = "Synoptic source merging..."
1263 """ Message to log at the commencement of source merging.
1264 """
1265 pairingCriterion = 0.8
1266 """ Tight merging criterion because sources should not move in short space
1267 of time.
1268 """
1269 frameSetChunkSize = 150
1270 """ Number of frame sets in each chunk.
1271 """
1272
1273 cuNum = 6
1274 _isVerbose = False
1275
1276
1277
1278
1279
1280 maxProdID = None
1281 """ Maximum product ID to include in merged frame sets.
1282 """
1283 minProdID = None
1284 """ Minimum product ID to include in merged frame sets.
1285 """
1286 prepareOnly = False
1287 """ Only prepare the frame sets and update mergelog, without source merging?
1288 """
1289 skipQualityCheck = False
1290 """ Override quality bit flag check?
1291 """
1292 recreate = False
1293 """ Recreate from scratch
1294 """
1295
1296
1297
1298
1299 _goodFrameSets = []
1300 _incompFrameSets = []
1301 _deprecFrameSets = []
1302 _newFrameSets = []
1303
1304
1305
1307 """ Create/update merged source catalogue.
1308 """
1309 super(SynopticMerger, self)._onRun()
1310
1311 self.synSetup = SynopticSetup(self.programme, self.dateRange)
1312
1313 ingSchema = [self._progSchema[self._mergeLog]]
1314 if not self.prepareOnly:
1315 ingSchema.append(self._progSchema[self._sourceTable])
1316 try:
1317 ingester = Ingester(self.archive, ingSchema, self.shareFileID,
1318 skipSchemaCheck=True)
1319 except schema.MismatchError as error:
1320 raise SynopticMerger.CuError(error)
1321
1322 if not self.prepareOnly and not self.skipQualityCheck:
1323 Logger.addMessage("Checking that the quality bit flags are all set")
1324
1325 if not self._isQualityFlaggingComplete():
1326 raise SynopticMerger.CuError("Detection quality bit flags have"
1327 " not been updated for this programme!")
1328
1329 if self.recreate:
1330 Logger.addMessage("Destroying & recreating %s and emptying %s" %
1331 (self._sourceTable, self._mergeLog))
1332 self.archive.dropTable(self._progSchema[self._sourceTable],
1333 self._progSchema.values())
1334 self.archive.truncate(self._mergeLog)
1335 self.archive.createTable(self._progSchema[self._sourceTable],
1336 self._progSchema.values())
1337
1338
1339
1340 self._getMergeLogInfo()
1341 self._makeNewFrameSets()
1342 if self.mergeLogInfo.values():
1343 self._checkMissingFrameSets()
1344
1345 Logger.addMessage("...found %s complete existing frame sets"
1346 % len(self._goodFrameSets))
1347
1348 Logger.addMessage("...found %s existing frame sets that need updating"
1349 % len(self._incompFrameSets))
1350
1351 Logger.addMessage("...found %s existing frame sets that need deprecating"
1352 % len(self._deprecFrameSets))
1353
1354 Logger.addMessage("...found %s new frame sets" % len(self._newFrameSets))
1355
1356
1357 self._deleteOldFrameSets()
1358 if not self._newFrameSets:
1359 Logger.addMessage("No new synoptic source merging required for "
1360 "this programme.")
1361
1362 if self._newFrameSets and not self.prepareOnly:
1363 Logger.addMessage("Synoptic source merging...")
1364
1365
1366
1367
1368 chunkedFrameSets = list(utils.splitList(self._newFrameSets,
1369 chunkSize=SynopticMerger.frameSetChunkSize))
1370
1371 progress = ForLoopMonitor(chunkedFrameSets)
1372 for frameSets in chunkedFrameSets:
1373 progress.preMessage("Preparing chunk")
1374 sourceTablePathName, mergeLogPathName, isComplete = \
1375 self.mergeSources(frameSets, SynopticMerger.pairingCriterion)
1376
1377 if not isComplete:
1378 raise SynopticMerger.CuError("Synoptic merging failed and "
1379 "needs to be re-run.")
1380
1381 try:
1382 ingester.ingestTable(self._mergeLog, mergeLogPathName,
1383 isCsv=True)
1384
1385 ingester.ingestTable(self._sourceTable, sourceTablePathName,
1386 self._idxInfo, isOrdered=True)
1387
1388 except BaseException as error:
1389 Logger.addExceptionMessage(error)
1390 Logger.addMessage(
1391 "Attempting to rollback %s..." % self._mergeLog)
1392
1393 frameSetIDs = (frameSet.ID for frameSet in frameSets)
1394 Logger.addMessage("%s rows removed." %
1395 self.archive.delete(self._mergeLog,
1396 "synframeSetID IN (%s)"
1397 % ','.join(map(str, frameSetIDs))))
1398 raise
1399
1400 elif self._newFrameSets:
1401 self._ingestMergeLogOnly(ingester)
1402
1403 self._insertDefaultRows()
1404
1405
1406
1408 """ Checks frame set against current mergelog
1409 """
1410
1411 matches = []
1412 if not self.mergeLogInfo.values():
1413 self._newFrameSets.append(self.FrameSet(*frameSet))
1414 return
1415
1416 noNonDefFS = 0
1417 for index, shtName in enumerate(self._passbands):
1418 mfID, extNum = frameSet[9 + 3 * index], frameSet[10 + 3 * index]
1419
1420 if mfID > 0: noNonDefFS += 1
1421 if mfID > 0 and mfID in self.mergeLogInfo["%smfid" % shtName]:
1422
1423
1424 mlIndices = [ind for ind, multID
1425 in enumerate(self.mergeLogInfo["%smfid" % shtName])
1426 if mfID == multID]
1427
1428
1429 if extNum in [self.mergeLogInfo["%senum" % shtName][ind]
1430 for ind in mlIndices]:
1431
1432 matchIndex = mlIndices[
1433 [indx for indx, eNum
1434 in enumerate([self.mergeLogInfo["%senum" % shtName][ind]
1435 for ind in mlIndices])
1436 if extNum == eNum][0]]
1437
1438 matches.append((index, matchIndex,
1439 self.mergeLogInfo[self._frameSetIDName.lower()][matchIndex]))
1440
1441 uniqueFSIDs = set(fsID for _idx, _mIdx, fsID in matches)
1442
1443
1444 if len(uniqueFSIDs) == 1:
1445
1446 noNonDefML = len([self.mergeLogInfo["%smfid" % shtName][matches[0][1]]
1447 for shtName in self._passbands
1448 if (self.mergeLogInfo["%smfid" % shtName][matches[0][1]] > 0)])
1449
1450 if len(matches) == noNonDefFS and len(matches) == noNonDefML:
1451
1452 self._goodFrameSets.append(
1453 self.FrameSet(*self.mergeLogInfoList[matches[0][1]]))
1454
1455
1456 elif len(matches) < noNonDefFS or len(matches) < noNonDefML:
1457
1458
1459
1460 self._incompFrameSets.append((list(uniqueFSIDs)[0], frameSet))
1461
1462
1463 elif len(uniqueFSIDs) > 1:
1464
1465 raise SynopticMerger.CuError("Frameset %s in programme %s is drawn"
1466 " from multiple old frame sets: %s"
1467 % (frameSet[0], self.programmeID,
1468 ','.join(map(str, uniqueFSIDs))))
1469
1470
1471 else:
1472 self._newFrameSets.append(self.FrameSet(*frameSet))
1473
1474
1475
1492
1493
1494
1516
1517
1518
1527
1528
1529
1531 """ @return: List of filter IDs and number of passes for the programme.
1532 @rtype: list(tuple(int, int))
1533 """
1534 return self.archive.query(
1535 selectStr="filterID, 1",
1536 fromStr="RequiredFilters",
1537 whereStr="programmeID=%s ORDER BY filterID ASC" %
1538 self.programmeID)
1539
1540
1541
1543 """
1544 Write the updated merge log of new (plus updated) frame sets (new
1545 frame sets flagged using the merge log attribute "new") to a CSV file.
1546
1547 """
1548 mergeLogPathName = \
1549 os.path.join(self.sysc.tempWorkPath(), 'synopticMergeLog.csv')
1550
1551 csv.File(mergeLogPathName, 'w+').writelines(set(self._newFrameSets))
1552
1553
1554 try:
1555 ingester.ingestTable(self._mergeLog, mergeLogPathName, isCsv=True)
1556 except odbc.DatabaseError as error:
1557 if 'STREAM' in str(error):
1558 Logger.addMessage(
1559 "<ERROR> %s ingest failed. " % self._mergeLog +
1560 "Possibly because of a discrepency between the schema "
1561 "and the entries in RequiredFilters.")
1562 raise
1563
1564
1565
1567 """ Inserts default rows for every frame into synoptic source table.
1568 """
1569 Logger.addMessage(
1570 "Inserting default rows into %s..." % self._sourceTable)
1571
1572
1573 missingDefaults = self.archive.query("synFrameSetID", self._mergeLog,
1574 "synFrameSetID NOT IN (%s)" % SelectSQL("synFrameSetID",
1575 table=self._sourceTable, where="synFrameSetID>0 AND synSeqNum<0"))
1576
1577 if not missingDefaults:
1578 Logger.addMessage("done. No missing defaults")
1579 return
1580
1581 nonDefColumns = ["synFrameSetID", "cuEventID"]
1582
1583 nonDefValues = SelectSQL(
1584 select=", ".join(nonDefColumns),
1585 table=self._sourceTable,
1586 where="synFrameSetID IN (%s)" % ','.join(map(str, missingDefaults)),
1587 groupBy=", ".join(nonDefColumns))
1588
1589 selectStr = ", ".join(
1590 (str(column.getDefaultValue()) if column not in nonDefColumns else
1591 "T.%s" % column)
1592 for column in self._progSchema[self._sourceTable].columns)
1593
1594 numRows = self.archive.copyIntoTable(self._sourceTable,
1595 sourceTable="(%s) AS T" % nonDefValues, columns=selectStr)
1596
1597 Logger.addMessage("done. %s rows inserted." % numRows)
1598
1599
1600
1602 """
1603 Creates new frame sets as appropriate based on available intermediate
1604 stack frames.
1605
1606 @return: List of new frame sets.
1607 @rtype: list(tuple)
1608
1609 @FIXME: If incomplete/deprecated, some frameSetIDs can be repeated
1610 """
1611 useSpecialFilter = False
1612 addPars = ''
1613 addTables = ''
1614 addConst = ''
1615 if self.synSetup.isCorSingleEpoch:
1616 bandMergeTime = self.programme.getAttr("bandMergingCriterion")
1617 if bandMergeTime < 0:
1618 bandMergeTime = 0.
1619 Logger.addMessage("Creating a single epoch of correlated frame sets...")
1620 if self.synSetup.specialFilter:
1621 filterTable = df.Table("Filter", self.archive)
1622
1623 useSpecialFilter = True
1624 matchOB = False
1625 matchDur = False
1626 filterTable.setCurRow(shortName='%s' % self.synSetup.specialFilter['Name'])
1627 specialFilterID = filterTable.getAttr("filterID")
1628 if 'OB' in self.synSetup.specialFilter:
1629 matchOB = True
1630 OBfilterNameList = self.synSetup.specialFilter['OB'].split('.')
1631 OBfilterIDList = []
1632 for filtName in OBfilterNameList:
1633 filterTable.setCurRow(shortName='%s' % filtName)
1634 OBfilterIDList.append(filterTable.getAttr("filterID"))
1635 addPars += ',obsID' if (self.sysc.isVSA() or self.sysc.isOSA()) else ',msbID as obsID'
1636 addTables += ',MultiframeEsoKeys as e' if (self.sysc.isVSA() or self.sysc.isOSA()) else ''
1637 addConst += ' and e.multiframeID=m.multiframeID'
1638 if 'DUR' in self.synSetup.specialFilter:
1639 matchDur = True
1640 filtDur = float(self.synSetup.specialFilter['DUR'])
1641 addPars += ',MAX(totalExpTime) as duration'
1642 addTables += ',MultiframeDetector as d'
1643 addConst += ' and d.multiframeID=m.multiframeID group by m.multiframeID,m.mjdObs,m.filterID%s' % ',obsID' if 'OB' in self.synSetup.specialFilter else ''
1644
1645 else:
1646 bandMergeTime = self.programme.getAttr("bandMergingCriterion")
1647 if bandMergeTime < 0:
1648 msg = "No time specified for band pass merging"
1649 raise SynopticMerger.CuError(msg)
1650
1651 Logger.addMessage("Creating correlated frame sets with spacing %s "
1652 "minutes..." % bandMergeTime)
1653
1654 bandMergeTime /= 1440
1655 nextFrameSetID = \
1656 self._getNextID(self._frameSetIDName, self._mergeLog, addProgID=True)
1657
1658 curAstHdr = ['centralRA', 'centralDec', 'cx', 'cy', 'cz', 'htmID',
1659 'multiframeID', 'extNum']
1660
1661 frameHdr = ['filterID', 'mjdObs', 'filename', 'confID', 'raBase',
1662 'decBase']
1663
1664 dPre = ", D."
1665 fPre = ", F."
1666 reqOffsetPos = self.sysc.isVSA() \
1667 and self.programme.getAttr("epochFrameType") == 'stack'
1668
1669
1670 pointings = self.archive.query("DISTINCT ra, dec, stackRadius"
1671 + (",offsetPos" if reqOffsetPos else ""),
1672 fromStr="Required" + self.programme.getAttr("epochFrameType"),
1673 whereStr="programmeID=%s AND productID BETWEEN %s AND %s"
1674 % (self.programmeID, self.minProdID, self.maxProdID))
1675 for point in pointings:
1676 mfIDs = set()
1677 for filterID, _nPass in self._filterPasses:
1678 mfIDs.update(
1679 queries.getAllFieldObs(self.archive, point.ra, point.dec,
1680 60 * point.stackRadius, filterID, self.programmeID,
1681 frameType=self.programme.getAttr("epochFrameType"),
1682 dateRange=self.synSetup.dateRangeOfTable[self.synSetup.corBMTable],
1683 offsetPos=(point.offsetPos if reqOffsetPos else None),
1684 noDeeps=True))
1685
1686
1687
1688 intStacks = self.archive.query(
1689 selectStr="mjdObs, m.filterID, m.multiframeID%s" % addPars,
1690 fromStr="Multiframe as m%s" % addTables,
1691 whereStr="m.multiframeID IN (%s) %s" % (", ".join(map(str, mfIDs)), addConst),
1692 orderBy="mjdObs")
1693
1694 frameSetIDs = []
1695 if self.synSetup.isCorSingleEpoch:
1696
1697
1698
1699
1700
1701
1702 if useSpecialFilter:
1703 dataInFormat = False
1704 if matchOB:
1705
1706 completeOBs = []
1707 iStacksOB = sorted(intStacks, key=attrgetter("obsID"))
1708 for obsID, obStacks in groupby(iStacksOB, attrgetter("obsID")):
1709 obStacks = list(obStacks)
1710 if len(set(OBfilterIDList).difference(set([obStk.filterID for obStk in obStacks]))) == 0:
1711 completeOBs.append(obsID)
1712 if completeOBs:
1713 dataInFormat = True
1714 if matchDur:
1715 durStacks = [epStack for epStack in intStacks
1716 if epStack.duration == filtDur and epStack.filterID == specialFilterID]
1717 if durStacks:
1718 dataInFormat = True
1719 if dataInFormat:
1720 mustHaveFrames = []
1721
1722 hasCompleted = False
1723 constrainFilters = []
1724 for selType in self.synSetup.specialFilter['order']:
1725 if selType == 'OB' and completeOBs and not hasCompleted:
1726 if len(completeOBs) == 1:
1727 obsID = completeOBs.pop()
1728 mustHaveFrames.extend([epStack for epStack in intStacks
1729 if epStack.obsID == obsID])
1730 constrainFilters = OBfilterIDList
1731 hasCompleted = True
1732 else:
1733
1734 if matchDur:
1735 obsIDDur = [epStack.obsID for epStack in intStacks
1736 if epStack.obsID in completeOBs
1737 and epStack.duration == filtDur
1738 and epStack.filterID == specialFilterID]
1739 if len(obsIDDur) == 1:
1740 obsID = obsIDDur.pop()
1741 mustHaveFrames.extend([epStack for epStack in intStacks
1742 if epStack.obsID == obsID])
1743 constrainFilters = OBfilterIDList
1744 hasCompleted = True
1745 else:
1746 mustHaveFrames.extend([epStack for epStack in intStacks
1747 if epStack.obsID in completeOBs])
1748 constrainFilters = OBfilterIDList
1749 hasCompleted = True
1750 if selType == 'DUR' and durStacks and not hasCompleted:
1751 mustHaveFrames.extend(durStacks)
1752 constrainFilters.append(specialFilterID)
1753 hasCompleted = True
1754
1755 intStacks = [epStack for epStack in intStacks
1756 if (epStack.filterID not in constrainFilters or
1757 epStack.multiframeID in [frame.multiframeID
1758 for frame in mustHaveFrames])]
1759
1760
1761
1762 initPos = 0
1763 possFrameSets = []
1764 while initPos < len(intStacks):
1765 initMjdObs = intStacks[initPos][0]
1766
1767 numObs = defaultdict(int)
1768 groupNos = []
1769 groupInd = []
1770 goodGroup = True
1771 allFiltersUsed = False
1772 for ii, stkInf in enumerate(intStacks):
1773 if stkInf.mjdObs >= initMjdObs and goodGroup:
1774 numObs[stkInf.filterID] += 1
1775
1776 if not allFiltersUsed:
1777 if len(numObs) > 1:
1778 groupNos.append((stkInf.multiframeID, stkInf.filterID, stkInf.mjdObs))
1779 else:
1780 groupNos = [(stkInf.multiframeID, stkInf.filterID, stkInf.mjdObs)]
1781 groupInd.append(ii)
1782 allFiltersUsed = set(fID for fID, _nP in self._filterPasses).difference(set(filtID for filtID in numObs)) == set()
1783
1784 possFrameSets.append((groupNos, allFiltersUsed))
1785 initPos = max(groupInd) + 1
1786 if possFrameSets > 1:
1787
1788
1789 durationList = []
1790 areAFU = len([fSet for fSet, afu in possFrameSets if afu]) > 0
1791 for fSet, allFiltUsed in possFrameSets:
1792 if areAFU and allFiltUsed:
1793 mjdObsList = [mjdObs for _mfID, _fID, mjdObs in fSet]
1794 durationList.append(max(mjdObsList) - min(mjdObsList))
1795 elif not areAFU:
1796 mjdObsList = [mjdObs for _mfID, _fID, mjdObs in fSet]
1797 durationList.append(max(mjdObsList) - min(mjdObsList))
1798 index = [idx for idx, dur in enumerate(durationList) if dur == min(durationList)].pop()
1799 possFrameSets = [possFrameSets[index]]
1800
1801 fSet, _allFiltUsed = possFrameSets.pop()
1802 groupNos = self._selectBestMfIDs(fSet)
1803 frameSetIDs.append(','.join(map(str, groupNos)))
1804 else:
1805
1806
1807 initPos = 0
1808 while initPos < len(intStacks):
1809
1810
1811 initMjdObs = intStacks[initPos][0]
1812 finalMjdObs = initMjdObs + bandMergeTime
1813
1814
1815
1816
1817
1818
1819
1820
1821
1822
1823
1824
1825
1826
1827
1828
1829
1830
1831
1832 numObs = defaultdict(int)
1833 groupNos = []
1834 groupInd = []
1835 goodGroup = True
1836 for ii, (mjdObs, filterID, mfID) in enumerate(intStacks):
1837 if mjdObs >= initMjdObs and goodGroup:
1838 numObs[filterID] += 1
1839 goodGroup = numObs[filterID] <= 1
1840 if goodGroup and mjdObs <= finalMjdObs:
1841 groupNos.append(mfID)
1842 groupInd.append(ii)
1843
1844 frameSetIDs.append(','.join(map(str, groupNos)))
1845 initPos = max(groupInd) + 1
1846
1847
1848 for frameIDs in frameSetIDs:
1849 allFrames = self.archive.query(
1850 selectStr="D." + dPre.join(curAstHdr) + fPre + fPre.join(frameHdr),
1851 fromStr="Multiframe AS F, MultiframeDetector AS MFD, "
1852 "CurrentAstrometry AS D",
1853 whereStr="D.multiframeID=F.multiframeID AND F.%s"
1854 " AND D.multiframeID=MFD.multiframeID"
1855 " AND D.extNum=MFD.extNum AND MFD.%s"
1856 " AND D.multiframeID IN (%s)"
1857 % (DepCodes.selectNonDeprecated,
1858 DepCodes.selectNonDeprecated, frameIDs))
1859
1860
1861 allFrames.sort(key=attrgetter("extNum"))
1862 for _extNum, group in groupby(allFrames, attrgetter("extNum")):
1863 frames = list(group)
1864 if frames:
1865 frameSet = [nextFrameSetID, self.cuEventID]
1866
1867
1868
1869 frameSet.extend(frames[0][:6])
1870 frameSet.append(numpy.mean([f.mjdObs for f in frames]))
1871
1872
1873
1874
1875 for filterID, _nPass in self._filterPasses:
1876 multiframeID = dbc.intDefault()
1877 extNum = dbc.tinyIntDefault()
1878 newFrame = dbc.tinyIntDefault()
1879 for frame in frames:
1880 if filterID == frame.filterID:
1881 multiframeID = frame.multiframeID
1882 extNum = frame.extNum
1883 newFrame = dbc.no()
1884
1885 frameSet += [multiframeID, extNum, newFrame]
1886
1887 frameSet += \
1888 [SynopticMerger.swVersion, dbc.yes(), dbc.no()]
1889 if self.synSetup.isCorSingleEpoch:
1890 duration = (max(f.mjdObs for f in frames) -
1891 min(f.mjdObs for f in frames))
1892 frameSet.append(duration)
1893
1894 self._checkFrameSet(frameSet)
1895 nextFrameSetID += 1
1896
1897
1898
1900 """ Chooses best mfIDs to match time info
1901 """
1902
1903 filtCount = Counter([fID for _mfID, fID, _mjdObs in frameSetInfo])
1904 singles = [fID for fID in filtCount if filtCount[fID] == 1]
1905 multis = [fID for fID in filtCount if filtCount[fID] > 1]
1906 groupNos = []
1907 mjds = []
1908 for filterID in singles:
1909 groupNos.extend([mfID for mfID, fID, _mjdObs in frameSetInfo if fID == filterID])
1910 mjds.extend([mjdObs for _mfID, fID, mjdObs in frameSetInfo if fID == filterID])
1911
1912
1913 for filterID in multis:
1914 meanGroupMjd = numpy.mean(mjds)
1915 bestMfID = sorted([(mfID, abs(mjdObs - meanGroupMjd))
1916 for mfID, fID, mjdObs in frameSetInfo if fID == filterID],
1917 key=itemgetter(1))[0][0]
1918
1919 groupNos.append(bestMfID)
1920 mjds.extend([mjdObs for mfID, _fID, mjdObs in frameSetInfo if mfID == bestMfID])
1921 return groupNos
1922
1923
1929
1930
1931
1933 """ Create/update merged tile-pawprint catalogue for a given programme.
1934 """
1935
1936
1937
1938 commencementMessage = "Tile-pawprint source merging..."
1939 """ Message to log at the commencement of source merging.
1940 """
1941 pairingCriterion = 0.8
1942 """ Tight merging criterion because sources should not move in short space
1943 of time.
1944 """
1945 _useExtNum = False
1946 """ Override useExtNum selection
1947 """
1948
1949 cuNum = 26
1950 frameSetChunkSize = 150
1951 _isVerbose = False
1952
1953
1954
1955
1956
1957 deepOnly = False
1958 """ Match deep tile and pawprint data only
1959 """
1960 prepareOnly = False
1961 """ Only prepare the tile sets and update mergelog, without source merging?
1962 """
1963 skipQualityCheck = False
1964 """ Override quality bit flag check?
1965 """
1966 recreate = False
1967 """ Recreate tables
1968 """
1969 addDefaults = False
1970 """ Add defaults only
1971 """
1972 newTileList = None
1973 """ Create table for these multiframes only
1974 """
1975
1976
1978 """ Create/update merged tile<->pawprint catalogue.
1979 """
1980
1981 super(TilePawPrintMerger, self)._onRun()
1982
1983 ingSchema = [self._progSchema[self._mergeLog]]
1984 if not self.prepareOnly:
1985 ingSchema.append(self._progSchema[self._sourceTable])
1986
1987 try:
1988 ingester = Ingester(self.archive, ingSchema, self.shareFileID,
1989 skipSchemaCheck=True)
1990 except schema.MismatchError as error:
1991 raise TilePawPrintMerger.CuError(error)
1992
1993 if self.addDefaults:
1994 self._insertDefaultRows()
1995
1996 if self.recreate and not self.addDefaults:
1997 Logger.addMessage("Destroying & recreating %s and emptying %s" %
1998 (self._sourceTable, self._mergeLog))
1999 self.archive.dropTable(self._progSchema[self._sourceTable],
2000 self._progSchema.values())
2001 self.archive.truncate(self._mergeLog)
2002 self.archive.createTable(self._progSchema[self._sourceTable],
2003 self._progSchema.values())
2004
2005
2006
2007 newTileSets = []
2008 if not self.addDefaults:
2009 newTileSets = self._makeNewTileSets()
2010
2011 Logger.addMessage("...found %s new tile sets" % len(newTileSets))
2012 if not newTileSets:
2013 Logger.addMessage("No new tile<->pawprint merging required for "
2014 "this programme.")
2015
2016 if self.prepareOnly:
2017 self._ingestMergeLogOnly(ingester, newTileSets)
2018 else:
2019 chunkedTileSets = list(utils.splitList(newTileSets,
2020 chunkSize=TilePawPrintMerger.frameSetChunkSize))
2021
2022 progress = ForLoopMonitor(chunkedTileSets)
2023 for tileSets in chunkedTileSets:
2024 progress.preMessage("Preparing chunk")
2025
2026
2027
2028 mergedTablePathName, mergeLogPathName, isComplete = \
2029 self.mergeSources(tileSets, self.pairingCriterion)
2030
2031 if not isComplete:
2032 raise TilePawPrintMerger.CuError("Tile-Pawprint merging "
2033 "failed and needs to be re-run.")
2034
2035 try:
2036 ingester.ingestTable(self._mergeLog, mergeLogPathName,
2037 isCsv=True)
2038
2039 ingester.ingestTable(self._sourceTable, mergedTablePathName,
2040 self._idxInfo, isOrdered=True)
2041
2042 except BaseException as error:
2043 Logger.addExceptionMessage(error)
2044 Logger.addMessage(
2045 "Attempting to rollback %s..." % self._mergeLog)
2046
2047 tileSetIDs = (tileSet.ID for tileSet in tileSets)
2048 Logger.addMessage("%s rows removed."
2049 % self.archive.delete(self._mergeLog,
2050 "tileSetID IN (%s)" % ','.join(map(str, tileSetIDs))))
2051
2052 raise
2053
2054 self._insertDefaultRows()
2055
2056
2057
2059 """ Write the updated merge log of new (plus updated) frame sets (new
2060 frame sets flagged using the merge log attribute "new") to a CSV
2061 file)
2062 """
2063 mergeLogPathName = os.path.join(self.sysc.tempWorkPath(),
2064 'tilePawPrintMergeLog.csv')
2065
2066 csv.File(mergeLogPathName, 'w+').writelines(set(newTileSets))
2067 try:
2068 ingester.ingestTable(self._mergeLog, mergeLogPathName, isCsv=True)
2069 except odbc.DatabaseError as error:
2070 if 'STREAM' in str(error):
2071 Logger.addMessage("<ERROR> %s ingest failed. " % self._mergeLog
2072 + "Possibly because of a discrepency between the schema "
2073 "and the entries in RequiredFilters.")
2074 raise
2075
2076
2077
2079 """
2080 Creates new tile sets as appropriate based on available intermediate
2081 stack frames.
2082
2083 @return: List of new tile sets.
2084 @rtype: list(tuple)
2085
2086 """
2087 Logger.addMessage("Creating list of tile sets to be merged...")
2088 mlCols = self.archive.queryColumnNames(self._mergeLog)
2089 expNOffsets = len([col for col in mlCols
2090 if ('mfID' in col and 'tl' not in col)])
2091
2092 nextFrameSetID = \
2093 self._getNextID(self._frameSetIDName, self._mergeLog, addProgID=True)
2094
2095 curAstHdr = ['centralRA', 'centralDec', 'cx', 'cy', 'cz', 'htmID',
2096 'multiframeID']
2097
2098 frameType = 'tile%deep%stack' if self.deepOnly else '%tile%stack'
2099 dateSel = ("utDate BETWEEN '%s' AND '%s' AND " % self.dateRange
2100 if not self.deepOnly else "")
2101
2102 selString = ''
2103 if self.newTileList:
2104 selString = 'F.multiframeID IN (%s) AND ' \
2105 % ','.join(map(str, self.newTileList))
2106
2107 tiles = self.archive.query(
2108 selectStr="D." + ", D.".join(curAstHdr),
2109 fromStr="Multiframe as F,ProgrammeFrame as p,CurrentAstrometry as D",
2110 whereStr="programmeID=%s and F.multiframeID=p.multiframeID and "
2111 "frameType like '%s' and F.deprecated=0 and "
2112 "D.multiframeID=F.multiframeID and D.extNum=2 and %s %s"
2113 "F.multiframeID NOT IN (%s)"
2114 % (self.programmeID, frameType, dateSel, selString,
2115 SelectSQL("tlmfID", self.programme.getTileSetTable(),
2116 "deprecated=0")))
2117
2118 frameSets = []
2119 for tile in tiles:
2120 mfIDs = self.archive.query("multiframeID", "Provenance",
2121 whereStr="combiframeID=%s" % tile.multiframeID)
2122
2123 if len(mfIDs) != expNOffsets:
2124 Logger.addMessage("<WARNING> Tile %s does not have expected "
2125 "number of offsets: %s out of expected %s"
2126 % (tile.multiframeID, len(mfIDs), expNOffsets))
2127 continue
2128
2129
2130 pawPFrames = self.archive.query("multiframeID, offsetID",
2131 fromStr="Multiframe",
2132 whereStr="multiframeID IN (%s)" % ','.join(map(str, mfIDs)),
2133 orderBy="offsetID")
2134
2135 frameSet = [nextFrameSetID, self.cuEventID]
2136
2137
2138 frameSet.extend(tile)
2139 frameSet.extend(frame.multiframeID for frame in pawPFrames)
2140 frameSet += [TilePawPrintMerger.swVersion, dbc.yes(), dbc.no()]
2141 frameSets.append(self.FrameSet(*frameSet))
2142 nextFrameSetID += 1
2143
2144 return frameSets
2145
2146
2147
2153
2154
2155
2157 """ Initialisation routine for this programme's merge-log and passband
2158 data.
2159
2160 @todo: This will be a *lot* simpler when I've implemented the
2161 FrameSetView class.
2162 """
2163
2164 mlCols = self.archive.queryColumnNames(self._mergeLog)
2165 if not mlCols:
2166 Logger.addMessage("<ERROR> Merge-log table %s not found"
2167 % self._mergeLog)
2168 raise SystemExit
2169
2170 expNOffsets = sum('mfID' in col and 'tl' not in col for col in mlCols)
2171
2172 self._passbands = ["tl"]
2173 for offset in range(expNOffsets):
2174 self._passbands += ["o%d" % (offset + 1)]
2175
2176 mlCols[0] = "ID"
2177 self.FrameSet = namedtuple(self._mergeLog, mlCols)
2178
2179
2180 for index, attr in enumerate(mlCols):
2181 if attr.endswith('mfID'):
2182 break
2183
2184 self._numHeaderCol = index
2185
2186
2187 for index, attr in enumerate(mlCols[self._numHeaderCol + 1:]):
2188 if attr.endswith('mfID') or attr == 'mergeSwVersion':
2189 break
2190
2191 self._numFilterCol = index + 1
2192
2193 finalIdx = self._numHeaderCol + len(self._passbands) * self._numFilterCol
2194 self._mfidIndices = \
2195 range(self._numHeaderCol, finalIdx, self._numFilterCol)
2196
2197 self._newFrameSetIdx = finalIdx + 1
2198
2199
2200
2202 """ Inserts default row into TileSet and TilePawPrints table.
2203 """
2204 if not self.archive.queryEntriesExist(self._mergeLog, "tileSetID<0"):
2205 Logger.addMessage("Inserting default row into %s..."
2206 % self._mergeLog)
2207
2208 self.archive.insertData(self._mergeLog, [column.getDefaultValue()
2209 for column in self._progSchema[self._mergeLog].columns])
2210
2211 if not self.prepareOnly and \
2212 not self.archive.queryEntriesExist(self._sourceTable, "tileSetID<0"):
2213
2214
2215 nonDefColumns = ["cuEventID", "tlExtNum", "o1ExtNum", "o2ExtNum",
2216 "o3ExtNum", "o4ExtNum", "o5ExtNum", "o6ExtNum"]
2217
2218 nonDefValues = SelectSQL(
2219 select="cuEventID, 0 AS tlExtNum, 0 AS o1ExtNum, 0 AS o2ExtNum,"
2220 "0 AS o3ExtNum, 0 AS o4ExtNum, 0 AS o5ExtNum, "
2221 "0 AS o6ExtNum",
2222 table=self._mergeLog,
2223 where="tileSetID<0")
2224
2225 Logger.addMessage("Inserting default row into %s..."
2226 % self._sourceTable)
2227
2228 selectStr = ", ".join(
2229 (str(column.getDefaultValue()) if column not in nonDefColumns else
2230 "T.%s" % column)
2231 for column in self._progSchema[self._sourceTable].columns)
2232
2233 numRows = self.archive.copyIntoTable(self._sourceTable,
2234 sourceTable="(%s) AS T" % nonDefValues, columns=selectStr)
2235
2236 Logger.addMessage("done. %s rows inserted." % numRows)
2237
2238
2239 missingDefaults = self.archive.query("tileSetID", self._mergeLog,
2240 "tileSetID>0 AND tileSetID NOT IN (%s)" % SelectSQL("tileSetID",
2241 table=self._sourceTable, where="tileSetID>0 AND tileSetSeqNum<0"))
2242
2243
2244 if missingDefaults:
2245 nonDefColumns = ["tileSetID", "cuEventID"]
2246
2247 nonDefValues = SelectSQL(
2248 select=", ".join(nonDefColumns),
2249 table=self._sourceTable,
2250 where="tileSetID IN (%s)" % ','.join(map(str, missingDefaults)),
2251 groupBy=", ".join(nonDefColumns))
2252
2253 selectStr = ", ".join(
2254 (str(column.getDefaultValue()) if column not in nonDefColumns else
2255 "T.%s" % column)
2256 for column in self._progSchema[self._sourceTable].columns)
2257
2258 numRows = self.archive.copyIntoTable(self._sourceTable,
2259 sourceTable="(%s) AS T" % nonDefValues, columns=selectStr)
2260
2261 Logger.addMessage("done. %s rows inserted." % numRows)
2262
2263
2264