1
2
3
4 """
5 Invoke CU19. Prepare release database for a given survey. Also incorporates
6 CUs 18 and 20 from the original design.
7
8 @author: R.S. Collins
9 @org: WFAU, IfA, University of Edinburgh
10
11 @newfield contributors: Contributors, Contributors (Alphabetical Order)
12 @contributors: J. Bryant, N.C. Hambly, E. Sutorius
13
14 @todo: Resolve complication over whether PTS/CAL should be considered
15 non-surveys or not - it may well be easier to create survey like DBs
16 for them (though there is the consideration of deprecation
17 selections).
18 @todo: Create a Cu19._parseSchemas() sub-routine to tidy the code up a bit.
19 @todo: Provide Info message about largest table size.
20 @todo: Automatic determination of initial file-group file sizes. See trac
21 ticket:89 for implementation details.
22 """
23
24 from __future__ import division, print_function
25
26 from email.mime.text import MIMEText
27 import smtplib
28
29 from wsatools.CLI import CLI
30 import wsatools.CSV as csv
31 import wsatools.DataFactory as df
32 import wsatools.DbConnect.CommonQueries as queries
33 from wsatools.DbConnect.CuSession import CuSession
34 import wsatools.DbConnect.DbConstants as dbc
35 from wsatools.DbConnect.DbSession import Database, DbSession, Join, odbc, \
36 SelectSQL, bulkCopy
37 import wsatools.DbConnect.Schema as schema
38 from wsatools.Logger import Logger, ForLoopMonitor
39 from wsatools.SystemConstants import DepCodes
40 import wsatools.Utilities as utils
41
42
43 -class Cu19(CuSession):
44 """ Prepare release database for a given survey.
45 """
46
47
48
49 cuNum = 19
50 propPeriodOfGrace = 1
51 """ Additional proprietary period-of-grace in months. This constant defines
52 an additional time interval that is added to the nominal proprietary
53 period of proprietary data such that an allowance is made for the
54 time-lag between ingest (which is the currently assumed start of the
55 proprietary period, as opposed to the date of observation) and release.
56 This period of grace is currently set to one month, since this is of
57 order the time it will take between ingest and release of data.
58 """
59
60 _autoCommit = True
61
62
63
64
65
66
67 appendDb = False
68
69 dateRange = CuSession.sysc.obsCal.dateRange()
70
71 excludeProgs = tuple()
72
73 fieldIDs = ''
74
75 fromTable = None
76
77 insertProgs = tuple()
78
79 keepMetadata = False
80
81 overwriteDb = False
82
83 resumeChunkNumber = 1
84
85
86
87
88 _idxInfo = {}
89 _isNonSurvey = True
90 _objectSchema = []
91 _releaseDatabase = None
92 _releaseServer = None
93 _surveyID = 0
94 _tableList = []
95 _tableSchema = {}
96 _versionNum = 0
97 _workSpace = ''
98
99
100
107 """
108 Initialises member data and prepares database connection.
109
110 @param surveyID: The unique identifier for the survey to release,
111 or db name e.g. 1 for UKIDSS, 106 for TRANSIT.
112 @type surveyID: int or str
113 @param release: Optionally override default name of release
114 database.
115 @type release: str
116 @param curator: Name of curator.
117 @type curator: str
118 @param comment: Descriptive comment as to why curation task is
119 being performed.
120 @type comment: str
121 @param database: Name of the database to connect to.
122 @type database: str
123 @param isTrialRun: If True, do not perform database modifications,
124 just print the SQL statement to the terminal.
125 @type isTrialRun: bool
126 @param userName: Optionally override default database username.
127 @type userName: str
128
129 """
130
131 super(Cu19, self).__init__(curator=curator, comment=comment,
132 database=database, isTrialRun=isTrialRun, userName=userName)
133
134
135 surveyTable = df.Table('Survey', self.archive)
136 if type(surveyID) is int or surveyID.isdigit():
137 try:
138 surveyTable.setCurRow(surveyID=int(surveyID))
139 except ValueError:
140 Logger.addMessage("<ERROR> Survey ID %s does not exist. "
141 "Please select from %s" % (surveyID,
142 ', '.join(surveyTable.getAttr('dbNamePrefix'))))
143 raise SystemExit
144 else:
145 try:
146 surveyTable.setCurRow(dbNamePrefix=surveyID.upper())
147 except ValueError:
148 Logger.addMessage("<ERROR> Survey %s does not exist. "
149 "Please select from %s" % (surveyID.upper(),
150 ', '.join(surveyTable.getAttr('dbNamePrefix'))))
151 raise SystemExit
152
153 Logger.addMessage("of survey: " + surveyTable.getAttr('name'))
154
155 self._surveyID = surveyTable.getAttr('surveyID')
156 self._isNonSurvey = (self._surveyID > dbc.nonSurveyProgrammeOffset() or
157 self._surveyID == 106 or self._surveyID == 200)
158 self._versionNum = self._getNextID('releaseNum', 'Release',
159 'surveyID=%s' % self._surveyID)
160
161
162 self.programmeID = set(self.archive.query("programmeID",
163 "SurveyProgrammes", "surveyID=%s" % self._surveyID))
164
165 if not self.programmeID:
166 Logger.addMessage("<ERROR> There are no programmes specified for "
167 "this survey in table SurveyProgrammes")
168 raise SystemExit
169
170 gap = "\n "
171 Logger.addMessage("The following programmes are part of this survey:" +
172 gap + gap.join(self.programme.getAttr('title', programmeID=pID)
173 for pID in self.programmeID))
174
175 if release:
176 self._releaseServer, releaseDbName = (release.split('.')
177 if '.' in release else (self.archive.server, release))
178 else:
179 self._releaseServer = self.archive.server
180 releaseDbName = surveyTable.getAttr('dbNamePrefix')
181
182 Logger.addMessage("Name of database to be released: " + releaseDbName)
183
184
185 if self._isNonSurvey:
186
187 dbVolume = self.sysc.catServerVolumes(self._releaseServer)[0]
188 self._releaseDatabase = Database(releaseDbName, volumes=[dbVolume],
189 primarySize="100 MB", logSize="10 MB", filegrowth="25%")
190
191 Logger.addMessage("Will create database in "
192 + self._releaseDatabase._folders[0])
193 else:
194 surveys = ' '.join(self.programme.getAcronym(progID)
195 for progID in self.programmeID)
196
197 filegroups = self.sysc.metadataFileGroups
198 filegroups += self.sysc.surveyFileGroups(surveys)
199
200
201 pSize, fgSize = (("4 GB", "40 GB") if self.archive.isLoadDb else
202 ("16 MB", "16 MB"))
203
204 if self.sysc.scienceProgs.get("VVV") in self.programmeID:
205 filegroups = [("vvvDetection", "520 GB"),
206 ("vvvSource", fgSize),
207 ("vvvCuration", fgSize),
208 ("vvvIndices", fgSize)]
209 else:
210 filegroups = list(zip(filegroups, [fgSize] * len(filegroups)))
211
212 Logger.addMessage("Filegroup size = " + fgSize)
213 self._releaseDatabase = Database(releaseDbName, primarySize=pSize,
214 volumes=self.sysc.catServerVolumes(self._releaseServer),
215 dbDir=releaseDbName,
216 filegroups=filegroups)
217
218
219
220 adminDb = DbSession(self._releaseServer + '.' + self.sysc.adminDatabase,
221 autoCommit=True, isTrialRun=self.archive.isTrialRun,
222 userName=self.archive.userName)
223
224 self._workSpace = adminDb.uncPath(adminDb.getBestVolume())
225
226 Logger.addMessage("Using volume %s for temporary outgests"
227 % self._workSpace)
228
229
230
232 """
233 Verify, freeze, backup and release database for a given survey.
234
235 """
236
237 Logger.addMessage("Parsing schemas...")
238
239
240 self._idxInfo = schema.parseIndices(self.sysc.indexScript)
241 self._objectSchema = schema.parseFuncProcs(self.sysc.procScript)
242 if self._isNonSurvey:
243 self._idxInfo.update(schema.parseIndices(self.sysc.nsIndexScript))
244 else:
245 self._objectSchema += (
246 schema.parseFuncProcs(self.sysc.surveyProcScript)
247 + schema.parseViews(self.sysc.surveyViewScript))
248
249 if self.sysc.isVSA():
250
251 progAcronym = \
252 self.programme.getAcronym(list(self.programmeID)[0]).lower()
253
254
255 self._objectSchema = [obj for obj in self._objectSchema
256 if "TilePawTDOnly" not in obj.name
257 or obj.name.startswith(progAcronym)]
258
259 if self.insertProgs:
260 progIDs = \
261 set(self.programme.setProgID(prog) for prog in self.insertProgs)
262
263 self.insertProgs = tuple(self.programme.getAcronym(progID).upper()
264 for progID in progIDs)
265 Logger.addMessage(
266 "Only these programmes will be inserted into existing database: "
267 + ", ".join(self.insertProgs))
268 else:
269 progIDs = utils.arbSort(self.programmeID,
270 [self.sysc.scienceProgs.get("GPS")], isFullKeySet=False)
271
272
273 self.excludeProgs = \
274 tuple(self.programme.getAcronym(self.programme.setProgID(prog))
275 for prog in self.excludeProgs)
276
277
278
279 parsedScripts = set([dbc.charDefault().upper(),
280 dbc.charDefault().lower()])
281 tableSchema = []
282 neighboursSchema = []
283 for progID in progIDs:
284 self.programme.setCurRow(programmeID=progID)
285
286
287 progScript = self.programme.getSchemaScript()
288 if progScript not in parsedScripts:
289 tableSchema += schema.parseTables(progScript)
290 parsedScripts.add(progScript)
291
292
293 neighboursScript = self.programme.getAttr("neighboursSchema")
294 if neighboursScript not in parsedScripts:
295 neighboursSchema += schema.parseTables(neighboursScript)
296 parsedScripts.add(neighboursScript)
297
298 if self.programme.getAcronym() not in self.excludeProgs:
299 tableSchema.extend(table for table in neighboursSchema
300 if table.name.startswith(self.programme.getAcronym()))
301
302 if self.keepMetadata:
303
304 self._tableList = map(str, tableSchema)
305
306
307 tableSchema += schema.parseTables(self.sysc.metadataSchema())
308 tableSchema += schema.parseTables(self.sysc.curationSchema())
309 tableSchema += schema.parseTables(self.sysc.calibrationSchema())
310 if not self.keepMetadata:
311
312 self._tableList = map(str, tableSchema)
313
314
315 self._tableSchema = dict((table.name, table) for table in tableSchema)
316
317
318 Logger.addMessage("Checking that the database schema is correct.")
319 try:
320 self.archive.checkSchema(tableSchema, releasedOnly=True)
321 except schema.MismatchError as error:
322 raise Cu19.CuError(error)
323
324 fromTable = self.fromTable
325 if fromTable and fromTable.startswith("vvvDetectionRaw"):
326 fromTable = "vvvDetection"
327
328 if fromTable and fromTable not in self._tableList:
329 raise Cu19.CuError("Unknown table: " + self.fromTable)
330
331
332
333 adminDb = DbSession(self._releaseServer + '.' + self.sysc.adminDatabase,
334 autoCommit=True, isTrialRun=self.archive.isTrialRun,
335 userName=self.archive.userName)
336
337 if self.appendDb:
338 Logger.addMessage(
339 "Updating database %s..." % self._releaseDatabase.name)
340 try:
341 self._releaseDatabase.attach(adminDb)
342 except odbc.DatabaseError as error:
343 if "activation error. The physical file name" in str(error):
344 raise Cu19.CuError(
345 "Cannot append to database %s as it does not exist."
346 % self._releaseDatabase.name)
347 raise
348 except Database.ExistsError:
349 pass
350 else:
351 Logger.addMessage(
352 "Creating database %s..." % self._releaseDatabase.name)
353 for _attempt in range(2):
354 try:
355 self._releaseDatabase.create(adminDb, self.overwriteDb)
356 except odbc.DatabaseError as error:
357 if "file names listed could not be created" in str(error) \
358 and not self.overwriteDb:
359 try:
360 self._releaseDatabase.attach(adminDb)
361 except odbc.DatabaseError:
362 raise odbc.DatabaseError(error)
363 else:
364 raise
365 except Database.ExistsError as error:
366 raise Cu19.CuError(error)
367 else:
368 break
369
370
371
372 self._releaseDatabase.setBulkLoadOptions(adminDb)
373
374
375 for index in self._idxInfo["ProgrammeFrame"]:
376 self.archive.addIndex(index)
377
378 self._copyOutData()
379
380 if self._releaseServer == self.archive.server:
381 self._releaseDatabase.setReleaseOptions(adminDb)
382 try:
383 self._releaseDatabase.detach(adminDb)
384 except odbc.DatabaseError as error:
385 Logger.addExceptionWarning(error)
386 Logger.addMessage(
387 "Database %s prepared, but still needs to be detached prior "
388 "to copy to the public server." % self._releaseDatabase.name)
389 else:
390 Logger.addMessage(
391 "Database %s prepared, detached and ready for copy to the "
392 "public server." % self._releaseDatabase.name)
393
394 else:
395 releaseServers = [self._releaseServer]
396 if self._releaseServer == "ramses13":
397 self._releaseDatabase.setReleaseOptions(adminDb)
398 else:
399 Logger.addMessage("Mirroring database...")
400 releaseServers.append(self._releaseDatabase.mirror(adminDb))
401 Logger.addMessage("done.")
402
403 if self._isNonSurvey and self._surveyID not in (106, 200):
404 self._setUserNames(releaseServers)
405
406 if self.archive.isRealRun:
407 self._sendMail()
408
409
410
417
418
419
421 """ Fills the release database with the correct data.
422 """
423
424 releaseDb = DbSession(
425 self._releaseServer + '.' + self._releaseDatabase.name,
426 autoCommit=True, isTrialRun=self.archive.isTrialRun,
427 userName=self.archive.userName)
428
429 if self.insertProgs:
430
431
432
433
434 Logger.addMessage(
435 "Removing existing %s%s tables..." % (("metadata, curation and "
436 if not self.keepMetadata else ''), ", ".join(self.insertProgs)))
437
438 existingTables = [self._tableSchema[table]
439 for table in self._tableList if releaseDb.existsTable(table)]
440
441 releaseDb.dropObjects(existingTables)
442
443 if self.appendDb:
444 if self.fromTable and (not self.fromTable.endswith("Detection")
445 and not self.fromTable.endswith("Source") or
446 'X' in self.fromTable) and releaseDb.existsTable(self.fromTable):
447 Logger.addMessage(
448 "Removing existing %s table..." % self.fromTable)
449 releaseDb.dropObjects([self._tableSchema[self.fromTable]])
450
451
452 Logger.addMessage(
453 "Determining list of remaining tables to copy...")
454 existingTables = [table for table in self._tableList
455 if releaseDb.existsTable(table)]
456
457 fromTable = self.fromTable
458 if fromTable and fromTable.startswith("vvvDetectionRaw"):
459 fromTable = "vvvDetection"
460
461 for table in existingTables:
462 if table != fromTable:
463 self._tableList.remove(table)
464
465 self._copyOutTableData(releaseDb)
466
467
468 Logger.addMessage("Resetting empty catalogue filenames to default...")
469 releaseDb.update("Multiframe", "catName=%r" % dbc.charDefault(),
470 where="catName LIKE '%empty%'")
471
472 Logger.addMessage("Creating any views and stored db functions...")
473 if not releaseDb.createObjects(self._objectSchema, ignoreNS=True,
474 releasedOnly=True):
475 msg = "Not all views and functions could be created!"
476 if not self.overwriteDb:
477 Logger.addMessage("<Info> %s This may just be because they "
478 "already exist." % msg)
479 else:
480 Logger.addMessage("<WARNING> " + msg)
481
482 Logger.addMessage("Updating release history with this row:")
483 row = (self._surveyID, self._versionNum, utils.makeMssqlTimeStamp(),
484 self.comment, self._releaseDatabase.name)
485 Logger.addMessage(repr(row))
486 releaseDb.insertData(dbc.releaseTableName(), row)
487 self.archive.insertData(dbc.releaseTableName(), row)
488
489
490
491 releaseDb.updateStatistics()
492 if not self._isNonSurvey or self._surveyID in (106, 200):
493 releaseDb.grantAccess("wsaro" if self.sysc.isWSA() else
494 self.programme.getAcronym().lower() + "ro")
495
496
497
499 """ Returns the date range selection clause for this survey release.
500 """
501 return " AND (utDate BETWEEN '%s' AND '%s'" % self.dateRange \
502 + " OR utDate='%s')" % self.sysc.obsCal.lateDefaultDateTime
503
504
505
516
517
518
520 """
521 Copies out release table data into the new release db. To replicate all
522 necessary load database objects and data that are relevant to that
523 release. Called within the context of the release DB, having first
524 created the release database to receive the replicated data.
525
526 @param releaseDb: A connection to the database to be released.
527 @type releaseDb: DbSession
528
529 @note: Uses BCP native binary outgests and ingests in preference to the
530 MS SQL Server T-SQL statement SELECT INTO, as performance tests
531 have revealed this to be much faster.
532 """
533
534 dateRange = self._getDateRangeSQL()
535 depClause = self._getDeprecationClause()
536 progIDs = csv.join(self.programmeID)
537 if not self.sysc.isWSA():
538
539 progIDs += ",%s" % dbc.calibrationProgrammeID()
540
541 progClause = "programmeID IN (%s)" % progIDs
542
543 if self.fieldIDs:
544 if len(self.programmeID) > 1:
545 raise Cu19.CuError("The fieldIDs option cannot be applied to "
546 "multi-programme surveys")
547
548 releaseMfs = queries.getMultiframes(self.programme, self.dateRange,
549 self.fieldIDs, depClause)
550
551 sourceIDs = queries.getSourceIDs(self.programme, self.fieldIDs)
552 fieldFrameSets = \
553 queries.getFrameSetIDs(self.programme, self.fieldIDs)
554
555
556 calMfs = SelectSQL("multiframeID",
557 table=self.archive.tablePath("ProgrammeFrame"),
558 where="programmeID=%s OR multiframeID=%s"
559 % (dbc.calibrationProgrammeID(), dbc.intDefault()))
560
561 else:
562 progMfs = SelectSQL("multiframeID",
563 table=self.archive.tablePath("ProgrammeFrame"),
564 where=progClause + " OR multiframeID=%s" % dbc.intDefault())
565
566 releaseMfs = SelectSQL("multiframeID",
567 table=self.archive.tablePath("Multiframe"),
568 where="multiframeID IN (%s) AND %s%s" %
569 (progMfs, depClause, dateRange))
570
571 releaseTables = [table for table in self._tableList
572 if self._tableSchema[table].releasable]
573
574 Logger.addMessage("Copying %s tables into database %s" %
575 (len(releaseTables), releaseDb.database))
576
577
578 for count, tableName in enumerate(releaseTables):
579 Logger.addMessage("Outgesting %s/%s %s"
580 % (count + 1, len(releaseTables), tableName))
581
582 tableSchema = self._tableSchema[tableName]
583 if self._isNonSurvey:
584 tableSchema.fileGroup = ''
585
586
587
588 columns = '*'
589 srcTable = self.archive.tablePath(tableName)
590 whereStr = ''
591 whereChunks = []
592 orderBy = tableSchema.primaryKey() + " ASC"
593
594
595
596
597
598
599 isDetTable = \
600 tableName.endswith("Detection") and 'X' not in tableName
601
602 isVVVDetTable = (isDetTable and not self.fieldIDs
603 and self.sysc.scienceProgs.get("VVV") in self.programmeID)
604
605 if isDetTable and not isVVVDetTable:
606 if self.fieldIDs:
607 whereStr, whereChunks = \
608 self._splitOutgest(releaseDb, tableSchema,
609 whereStr="(multiframeID=%s OR multiframeID IN (%s))" \
610 % (dbc.intDefault(), releaseMfs),
611 reqDefRow=True)
612
613 else:
614 columns, srcTable, whereStr, orderBy, detProgs = \
615 self._joinDetectionTables(tableName)
616
617 pkJoin = Join([tableName + "Raw", "Multiframe",
618 "MultiframeDetector", "ProgrammeFrame"],
619 ["multiframeID"])
620
621 pkWhere = "%sRaw.extNum=MultiframeDetector.extNum AND " \
622 "MultiframeDetector.%s%s AND (programmeID IN (%s) OR " \
623 "ProgrammeFrame.multiframeID=%s)" \
624 % (tableName, depClause, dateRange, csv.join(detProgs),
625 dbc.intDefault())
626
627 whereStr, whereChunks = \
628 self._splitOutgest(releaseDb, tableSchema, whereStr,
629 specialJoin=(pkJoin, pkWhere, tableName + "Raw", "R"),
630 reqDefRow=True)
631
632 elif tableName.endswith("Source") and "Synoptic" not in tableName \
633 and 'X' not in tableName:
634
635 if tableName.startswith(self.excludeProgs):
636 whereStr = "sourceID=0"
637 else:
638 if self.fieldIDs:
639 whereStr = "frameSetID IN (%s)" % fieldFrameSets
640
641 whereStr, whereChunks = \
642 self._splitOutgest(releaseDb, tableSchema, whereStr)
643
644 elif self.fieldIDs \
645 and tableName.endswith(("Variability", "VarFrameSetInfo")):
646
647 whereStr = "frameSetID IN (%s)" % fieldFrameSets
648
649 elif tableName.endswith("SynopticSource") and 'X' not in tableName:
650
651 if self.fieldIDs:
652 whereStr = "synFrameSetID IN (%s)" \
653 % queries.getSynFrameSetIDs(self.programme,
654 self.dateRange, self.fieldIDs)
655
656 whereStr, whereChunks = \
657 self._splitOutgest(releaseDb, tableSchema, whereStr)
658
659 elif self.fieldIDs and "BestMatch" in tableName:
660
661 whereStr = "sourceID IN (%s)" % sourceIDs
662
663 elif tableName.endswith("MergeLog"):
664
665 whereStr = depClause
666 if self.fieldIDs and "Synoptic" not in tableName:
667 whereStr += " AND frameSetID IN (%s)" % fieldFrameSets
668
669 elif tableName.endswith("TileSet"):
670
671 whereStr = "tlmfID IN (%s)" % releaseMfs
672
673 elif tableName.endswith("TilePawPrints"):
674
675 tileSetTable = \
676 self.archive.tablePath(tableName.replace("PawPrints", "Set"))
677
678 whereStr = "tileSetID IN (%s)" % SelectSQL(
679 select="tileSetID",
680 table=tileSetTable,
681 where="tlmfID IN (%s)" % releaseMfs)
682
683 elif tableName == "Multiframe":
684
685 whereStr = (releaseMfs.whereStr if not self.fieldIDs else
686 "(multiframeID IN (%s) OR multiframeID IN (%s))"
687 % (releaseMfs, calMfs))
688
689 elif tableName in ("MultiframeDetector", "MultiframeEsoKeys",
690 "FlatFileLookUp"):
691
692 alias = "This"
693 columns = alias + ".*"
694 srcTable = Join(
695 [(self.archive.tablePath(tableName), alias),
696 (self.archive.tablePath("Multiframe"), 'M')],
697 ["multiframeID"])
698
699 depAlias = alias if tableName == "MultiframeDetector" else "M"
700 whereStr = "%s.%s" % (depAlias, depClause)
701 if not self.fieldIDs:
702 whereStr += " AND M.multiframeID IN (%s)%s" \
703 % (progMfs, dateRange)
704 else:
705 whereStr += " AND (M.multiframeID IN (%s) OR "\
706 "M.multiframeID IN (%s))" % (releaseMfs, calMfs)
707
708 orderBy = tableSchema.primaryKey(alias) + " ASC"
709
710 elif tableName in ("CurrentAstrometry", "PreviousMFDZP",
711 "MultiframeDetectorEsoKeys") \
712 or tableName.endswith("AstrometricInfo"):
713
714 alias = "This"
715 columns = alias + ".*"
716 srcTable = Join(
717 [(self.archive.tablePath(tableName), alias),
718 (self.archive.tablePath("Multiframe"), 'M'),
719 (self.archive.tablePath("MultiframeDetector"), 'D')],
720 ["multiframeID"])
721
722 whereStr = "%s.extNum=D.extNum AND D.%s" % (alias, depClause)
723 if not self.fieldIDs:
724 whereStr += " AND M.multiframeID IN (%s)%s" \
725 % (progMfs, dateRange)
726 else:
727 whereStr += " AND (M.multiframeID IN (%s) OR "\
728 "M.multiframeID IN (%s))" % (releaseMfs, calMfs)
729
730 orderBy = tableSchema.primaryKey(alias) + " ASC"
731
732 elif tableName == "ArchiveCurationHistory":
733
734 whereStr = "rolledBack=0 AND cuEventID IN (%s)" % SelectSQL(
735 select="cuEventID",
736 table=self.archive.tablePath("ProgrammeCurationHistory"),
737 where=progClause)
738
739 elif tableName.startswith("Survey") or tableName == "Release":
740
741 whereStr = "surveyID=%s" % self._surveyID
742
743 elif tableName == "Provenance":
744
745 if not self.fieldIDs:
746 whereStr = "combiframeID IN (%s) AND multiframeID IN (%s)"\
747 % (releaseMfs, releaseMfs)
748 else:
749 whereStr = "(combiframeID IN (%s) OR combiframeID IN (%s))"\
750 " AND (multiframeID IN (%s) OR multiframeID IN (%s))"\
751 % (releaseMfs, calMfs, releaseMfs, calMfs)
752
753 elif "Programme" in tableName or tableName.startswith("Required")\
754 or tableName == "ProductLinks":
755
756 whereStr = progClause
757 if tableName == "Programme":
758 whereStr += " OR programmeID = %s" % dbc.intDefault()
759 elif tableName == "ProgrammeFrame":
760 whereStr = "multiframeID IN (%s)" % releaseMfs
761 if self.fieldIDs:
762 whereStr += " OR " + calMfs.whereStr
763
764 elif self.fieldIDs and any(phrase in tableName
765 for phrase in ['X', "Neighbours"]):
766
767 whereStr = "masterObjID IN (%s)" % sourceIDs
768
769 progress = ForLoopMonitor(whereChunks)
770 chunks = (whereChunks or [whereStr]) if not isVVVDetTable else []
771 for whereStr in chunks:
772 progress.preMessage("Outgesting chunk")
773
774 bulkCopy(query=SelectSQL(columns, srcTable, whereStr, orderBy),
775 tableSchema=tableSchema,
776 fromDb=self.archive,
777 toDb=releaseDb,
778 fileTag=self.shareFileID,
779 isSmallTable=not isDetTable)
780
781
782
783 if isVVVDetTable:
784
785 self._bulkCopyVVVDetTable(tableSchema, releaseDb)
786
787
788 for index in self._idxInfo[tableName]:
789 if tableName.startswith("vvv"):
790 releaseDb.goOffline()
791
792 releaseDb.addIndex(index, ignoreNS=True, releasedOnly=True,
793 usingDefaultFG=self._isNonSurvey)
794
795
796 releaseDb.createStatistics([tableSchema])
797
798
799
801 """ Copy VVV detection table by individual monthly table chunks.
802 """
803 rawColumns = [str(column) for column
804 in self._tableSchema[tableSchema.name + "Raw"].columns]
805
806 rawAlias = 'R'
807 columns = ', '.join(('%s.%s' % (rawAlias, column)
808 if str(column) in rawColumns else str(column))
809 for column in tableSchema.columns)
810
811 joinStr = ("D.extNum=R.extNum AND A.extNum=R.extNum AND "
812 "P.extNum=R.extNum AND A.seqNum=R.seqNum AND P.seqNum=R.seqNum"
813 " AND D." + self._getDeprecationClause())
814
815 lastMonthTable = \
816 self.programme.getMonthlyDetectionTables(self.dateRange,
817 excludeDeeps=True)[-1]
818
819 detTables = self.programme.getMonthlyDetectionTables(self.dateRange)
820 if self.fromTable and self.fromTable in detTables:
821 detTables = detTables[detTables.index(self.fromTable):]
822
823 progress = ForLoopMonitor(detTables)
824 for detTable in detTables:
825 progress.preMessage("Outgesting %s table" % detTable)
826 srcTables = [
827 (self.archive.tablePath(detTable), rawAlias),
828 (self.archive.tablePath(detTable.replace("Raw", "Astrometry")),
829 'A'),
830 (self.archive.tablePath(detTable.replace("Raw", "Photometry")),
831 'P'),
832 (self.archive.tablePath("MultiframeDetector"), 'D')]
833
834 pkSrcTables = [detTable, "MultiframeDetector"]
835 pkWhere = detTable + ".extNum=MultiframeDetector.extNum AND " \
836 "MultiframeDetector." + self._getDeprecationClause()
837
838 whereStr = joinStr
839 if detTable == lastMonthTable:
840 endDateClause = " AND utDate <= '%s'" % self.dateRange.end
841 srcTables.append((self.archive.tablePath("Multiframe"), 'M'))
842 whereStr += endDateClause
843 pkSrcTables.append("Multiframe")
844 pkWhere += endDateClause
845
846 pkJoin = Join(pkSrcTables, "multiframeID")
847 chunkOffset = self.resumeChunkNumber - 1
848
849 whereStr, whereChunks = \
850 self._splitOutgest(releaseDb, tableSchema, whereStr,
851 specialJoin=(pkJoin, pkWhere, detTable, "R"),
852 isResumable=False)
853
854 chunkProgress = ForLoopMonitor(whereChunks)
855 for whereStr in (whereChunks or [whereStr]):
856 chunkProgress.preMessage("Outgesting chunk", chunkOffset)
857
858 query = SelectSQL(columns, Join(srcTables, "multiframeID"),
859 whereStr, orderBy=tableSchema.primaryKey(rawAlias))
860
861 bulkCopy(query, tableSchema, self.archive, releaseDb,
862 self.shareFileID, drive=self._workSpace,
863 isSmallTable=False)
864
865
866
867 defs = (column.getDefaultValue() for column in tableSchema.columns)
868 releaseDb.insertData(tableSchema, defs)
869
870
871
873 """
874 Prepares the necessary query to join the split detection tables to
875 create the given detection table.
876
877 """
878 joinAlias = 'R'
879 joinList = [(self._tableSchema[tableName + "Raw"], joinAlias),
880 (self._tableSchema[tableName + "Astrometry"], 'A'),
881 (self._tableSchema[tableName + "Photometry"], 'P'),
882 (self._tableSchema["Multiframe"], 'M'),
883 (self._tableSchema["MultiframeDetector"], 'D'),
884 (self._tableSchema["ProgrammeFrame"], 'F')]
885
886 commonCols = utils.getDuplicates(utils.unpackList(
887 map(str, table.columns) for table, _alias in joinList))
888
889 columns = ', '.join(('%s.%s' % (joinAlias, column)
890 if str(column) in commonCols else str(column))
891 for column in self._tableSchema[tableName].columns)
892
893 srcTable = Join([(self.archive.tablePath(table.name), alias)
894 for table, alias in joinList],
895 ["multiframeID"])
896
897
898
899 detProgs = (self.programmeID.intersection(
900 self.programme.getProgIDsfromTable(tableName))
901 if not tableName.startswith(self.excludeProgs) else [0])
902
903 whereStr = ("D.extNum=R.extNum AND A.extNum=R.extNum AND "
904 "P.extNum=R.extNum AND A.seqNum=R.seqNum AND P.seqNum=R.seqNum AND "
905 "D.%s AND (F.multiframeID=%s OR programmeID IN (%s))%s" %
906 (self._getDeprecationClause(), dbc.intDefault(), csv.join(detProgs),
907 self._getDateRangeSQL()))
908
909 orderBy = self._tableSchema[tableName].primaryKey(joinAlias) + " ASC"
910
911 return columns, srcTable, whereStr, orderBy, detProgs
912
913
914
916 """ Sends e-mail notification to ETWS about successful release.
917 """
918 text = '%s at %s by %s: "%s"\n' % (self._releaseDatabase,
919 utils.makeMssqlTimeStamp(), self.curator, self.comment)
920
921 msg = MIMEText(text, 'plain')
922 msg['Subject'] = "DB Release"
923 msg['From'] = "etws@roe.ac.uk"
924 msg['To'] = "etws@roe.ac.uk"
925 s = smtplib.SMTP('localhost')
926 s.sendmail("etws@roe.ac.uk", ["etws@roe.ac.uk"], msg.as_string())
927 s.quit()
928
929
930
950
951
952
953 - def _splitOutgest(self, releaseDb, table, whereStr='', specialJoin=None,
954 reqDefRow=False, isResumable=True):
955 """
956 Splits the outgest of the given table over several chunks defined by
957 a where clause. NB: Try to avoid the iResumable=False option in a
958 production environment as it is not a reliable way to carry out
959 operations.
960
961 """
962
963 pk = csv.values(table.primaryKey())[0]
964 maxPK = (0 if not isResumable or not releaseDb.existsTable(table.name)
965 else releaseDb.queryAttrMax(pk, table.name)) or 0
966
967 pkRemaining = "%s>%s" % (pk, maxPK)
968 if whereStr:
969 originalWhereStr = whereStr
970 whereStr += " AND "
971
972 if not specialJoin:
973 srcTable = self.archive.tablePath(table)
974 pkWhere = whereStr + pkRemaining
975 groupBy = pk
976 else:
977 srcTable, pkWhere, pkTable, pkAlias = specialJoin
978 pkWhere += " AND " + pkTable + '.' + pkRemaining
979 groupBy = pkTable + '.' + pk
980 pk = pkAlias + '.' + pk
981 pkRemaining = pkAlias + '.' + pkRemaining
982
983
984
985 chunkSize = 1e8
986 if ',' in table.primaryKey():
987 pkCounts = self.archive.queryNumRows(srcTable, pkWhere, groupBy)
988
989 pkRanges = utils.groupByCounts(pkCounts, groupSize=chunkSize)
990 else:
991
992 pkQuery = "MAX(%s) AS pkMax, MIN(%s) AS pkMin" % (pk, pk)
993 pkRange = self.archive.query(pkQuery, srcTable, pkWhere)[0]
994 pkRanges = []
995 curPk = pkRange.pkMin
996 while True:
997 nextPk = curPk + chunkSize - 1
998 if nextPk >= pkRange.pkMax:
999 pkRanges.append((curPk, pkRange.pkMax))
1000 break
1001 pkRanges.append((curPk, nextPk))
1002 curPk = nextPk + 1
1003
1004 if len(pkRanges) is 1:
1005 pkRanges = []
1006
1007 whereChunks = []
1008 if pkRanges:
1009 if not isResumable:
1010 pkRanges = pkRanges[self.resumeChunkNumber - 1:]
1011
1012
1013 self.resumeChunkNumber = Cu19.resumeChunkNumber
1014
1015 whereChunks = [whereStr + "%s BETWEEN %s AND %s"
1016 % (pk, pkMin, pkMax) for pkMin, pkMax in pkRanges]
1017
1018 if reqDefRow and not maxPK:
1019
1020
1021
1022 releaseDb.createObjects([table])
1023 releaseDb.insertData(table.name,
1024 [c.getDefaultValue() for c in table.columns])
1025
1026 elif maxPK:
1027 whereStr += pkRemaining
1028
1029 elif whereStr:
1030 whereStr = originalWhereStr
1031
1032 return whereStr, whereChunks
1033
1034
1035
1036
1037
1038 if __name__ == '__main__':
1039
1040 CLI.progArgs += [
1041 CLI.Argument("surveyID", "U05A100"),
1042 CLI.Argument("begin_date", "05A", isValOK=CLI.isDateOK),
1043 CLI.Argument("end_date", "05A", isValOK=CLI.isDateOK)]
1044
1045 CLI.progOpts += [
1046 CLI.Option('a', "append",
1047 "append tables to any pre-existing release database of the same name"),
1048 CLI.Option('f', "from",
1049 "continue appending to existing database from this table",
1050 "NAME"),
1051 CLI.Option('i', "insert",
1052 "copy only survey detection data for these programmes, e.g. UDS",
1053 "LIST"),
1054 CLI.Option('k', "keep_metadata",
1055 "when inserting keep existing metadata"),
1056 CLI.Option('l', "fields",
1057 "release data from these fieldIDs only",
1058 "LIST"),
1059 CLI.Option('n', "number",
1060 "resume from this chunk number (VVV detection outgests only)",
1061 "NUMBER", str(Cu19.resumeChunkNumber),
1062 isValOK=lambda val: val.isdigit() and int(val) > 0),
1063 CLI.Option('o', "overwrite",
1064 "overwrite any pre-existing release database of the same name"),
1065 CLI.Option('r', "release",
1066 "release database name e.g. UKIDSSDR1PLUS",
1067 "NAME"),
1068 CLI.Option('v', "verbose",
1069 "log additional verbose debug messages"),
1070 CLI.Option('x', "exclude",
1071 "exclude detections from these programmes, e.g. GPS",
1072 "LIST")]
1073
1074 cli = CLI(Cu19, "$Revision: 10301 $")
1075 Logger.isVerbose = cli.getOpt("verbose")
1076 Logger.addMessage(cli.getProgDetails())
1077
1078 cu19 = Cu19(cli.getArg("surveyID").replace('/', ''), cli.getOpt("release"),
1079 cli.getOpt("curator"), cli.getArg("comment"),
1080 cli.getArg("database"), cli.getOpt("test"), cli.getOpt("user"))
1081
1082 cu19.appendDb = cli.getOpt("append")
1083 if not cu19.appendDb and (cli.getOpt("insert") or cli.getOpt("from")):
1084 cu19.appendDb = True
1085
1086 try:
1087 cu19.dateRange = cu19.sysc.obsCal.dateRange(cli.getArg("begin_date"),
1088 cli.getArg("end_date"))
1089 except Exception as error:
1090 eType = "Invalid Option"
1091 Logger.addExceptionMessage(error, eType)
1092 raise SystemExit(eType + ": see log " + cu19._log.pathName)
1093
1094 if cli.getOpt("exclude"):
1095 cu19.excludeProgs = csv.values(cli.getOpt("exclude"))
1096
1097 cu19.fieldIDs = cli.getOpt("fields")
1098 cu19.fromTable = cli.getOpt("from")
1099 cu19.resumeChunkNumber = int(cli.getOpt("number"))
1100 if cli.getOpt("insert") and not cli.getOpt("from"):
1101
1102 cu19.insertProgs = csv.values(cli.getOpt("insert"))
1103 cu19.keepMetadata = cli.getOpt("keep_metadata")
1104
1105 cu19.overwriteDb = cli.getOpt("overwrite") and not cu19.appendDb
1106 cu19.run()
1107
1108
1109