1
2
3
4 """
5 Fills an empty database with the latest data in the WSA/VSA. Will also
6 update the database schema and/or recreate the database from scratch. By
7 default, the full date range of B{non-deprecated} data is synchronised.
8
9 @note: For the detection tables of deep surveys, i.e. UDS and DXS, only the
10 detections from the deep stacks are synchronised, as these are the
11 only ones that contribute to source tables. If you specify the
12 -a/--all option, then all detections are included.
13
14 @todo: Due to the way indices are defined this may be faster if we were able
15 to make the primary keys the real primary keys, so instead of just
16 querying MFD on multiframeID, do it on the seemingly unnecessary
17 multiframeID and extNum. However, not sure how to do within a NOT IN
18 (SELECT attr FROM...) context.
19 @todo: Optimise date range updates.
20
21 @author: R.S. Collins
22 @org: WFAU, IfA, University of Edinburgh
23 """
24
25 from __future__ import division, print_function
26 from future_builtins import map, zip
27 import os
28
29 from wsatools.CLI import CLI
30 import wsatools.CSV as csv
31 import wsatools.DataFactory as df
32 from wsatools.DbConnect.CuSession import CuSession
33 import wsatools.DbConnect.DbConstants as dbc
34 from wsatools.DbConnect.DbSession import Database, DbSession, odbc, \
35 SelectSQL, bulkCopy, Ingester, \
36 Outgester
37 import wsatools.DbConnect.Schema as schema
38 from wsatools.Logger import Logger
39 from wsatools.SystemConstants import SystemConstants
40
41
43 """
44 Updates a database with the latest schema and data from another database up
45 to the given CU level (0 = curation-data; 1 = transfer-data; 3 = metadata;
46 4 = detections; 7 = sources; 16 = neighbours).
47
48 """
49
50 cuNum = 22
51 _autoCommit = True
52 _isPersistent = True
53
54
55
56
57
58
59 copyAllData = False
60
61 cuLevel = 3
62
63 dateRange = CuSession.sysc.obsCal.dateRange()
64
65 excludeProgs = tuple()
66
67 forceSQL = False
68
69 fromDbPathName = None
70
71 includeMergeLog = False
72
73 isMetadataMirror = False
74
75 overwriteDb = False
76
77 skipProvenance = False
78
79
80
81 _fromDb = None
82
83
84
86 """ Synchronises the database.
87 """
88 self.programmeID = set([self.programmeID] if self.programmeID else
89 self.programme.getProgIDList(onlyNonSurvey=self.onlyNonSurveys))
90
91 if self.excludeProgs:
92 self.programmeID -= \
93 set(self.programme.setProgID(prog) for prog in self.excludeProgs)
94
95 Logger.addMessage("Parsing schemas...")
96
97 idxInfo = schema.parseIndices(self.sysc.indexScript)
98 objectSchema = (schema.parseFuncProcs(self.sysc.procScript)
99 + schema.parseFuncProcs(self.sysc.surveyProcScript)
100 + schema.parseViews(self.sysc.surveyViewScript))
101
102 if self.overwriteDb:
103 tableSchema = schema.parseTables(self.sysc.curationSchema(),
104 ["FlatFileLookUp"])
105 else:
106 tableSchema = schema.parseTables(self.sysc.curationSchema())
107 tableSchema.remove("ArchiveCurationHistory")
108 tableSchema.remove("ProgrammeCurationHistory")
109
110 if self.cuLevel is 0:
111 tableSchema.remove("FlatFileLookUp")
112
113 if self.cuLevel >= 3:
114 metadataTables = schema.parseTables(self.sysc.metadataSchema())
115 if self.skipProvenance:
116 metadataTables.remove("Provenance")
117
118 mfEsoTable = "MultiframeEsoKeys"
119 if not self.copyAllData and mfEsoTable in metadataTables:
120 metadataTables.remove(mfEsoTable)
121
122 mfdEsoTable = "MultiframeDetectorEsoKeys"
123 if (not self.copyAllData or self.isMetadataMirror) \
124 and mfdEsoTable in metadataTables:
125
126 metadataTables.remove(mfdEsoTable)
127
128 tableSchema += metadataTables
129
130 if self.cuLevel >= 4 or self.includeMergeLog:
131
132 progScripts = set()
133 for progID in self.programmeID:
134 self.programme.setCurRow(programmeID=progID)
135 progScripts.add(self.programme.getSchemaScript())
136 if self.cuLevel == 16:
137 progScripts.add(self.programme.getAttr("neighboursSchema"))
138
139 if self.sysc.isVSA() \
140 and self.sysc.scienceProgs.get("VVV") in self.programmeID:
141 monthlySchemas = [os.path.join(self.sysc.sqlMonthlyDetPath(), f)
142 for f in os.listdir(self.sysc.sqlMonthlyDetPath())
143 if "vvvSchema20" in f]
144 for monSch in monthlySchemas:
145 month = monSch[monSch.find("vvvSchema") + 9:monSch.rfind('.')]
146 detTables = ["vvvDetectionRaw" + month,
147 "vvvDetectionAstrometry" + month,
148 "vvvDetectionPhotometry" + month]
149 tableSchema += schema.parseTables(monSch, detTables)
150
151 if self.includeMergeLog:
152 mergeLogSchema = []
153 for script in progScripts:
154 if script.upper() != dbc.charDefault():
155 mergeLogSchema += schema.parseTables(script)
156
157 for table in list(map(str, mergeLogSchema)):
158 if 'MergeLog' not in table:
159 mergeLogSchema.remove(table)
160
161 tableSchema += mergeLogSchema
162
163 elif self.cuLevel >= 4:
164
165 for script in progScripts:
166 if script.upper() != dbc.charDefault():
167 tableSchema += schema.parseTables(script)
168
169 if self.cuLevel < 7:
170 phrases = ['Source', 'MergeLog', 'Synoptic', 'Var',
171 'BestMatch', 'BestAper']
172
173
174 for table in list(map(str, tableSchema)):
175 if any(phrase in table for phrase in phrases):
176 tableSchema.remove(table)
177
178
179 if self.sysc.isVSA() \
180 and self.sysc.scienceProgs.get("VVV") in self.programmeID:
181
182
183
184 dbs = ['.'.join(item)
185 for item in self.sysc.ingestDbForServer.items()]
186
187 vvvDatabase = [db for db in dbs if "VVV" in db].pop()
188 self._fromDbVVV = DbSession(vvvDatabase,
189 autoCommit=self._autoCommit,
190 isTrialRun=self.archive.isTrialRun,
191 userName=dbc.loadServerRwUsername())
192
193 self._fromDbVVV.enableDirtyRead()
194
195 self._fromDb = DbSession(self.fromDbPathName,
196 autoCommit=self._autoCommit, isTrialRun=self.archive.isTrialRun,
197 userName=dbc.loadServerRwUsername())
198
199 self._fromDb.enableDirtyRead()
200
201 Logger.addMessage("Synchronising tables...")
202
203 for table in tableSchema:
204 try:
205 self.archive.checkSchema([table])
206 except schema.MismatchError as error:
207 if not self.overwriteDb:
208 Logger.addMessage("Schema mis-match: %s. "
209 "Dropping and recreating table %s in database %s" %
210 (error.errorMsg, table, self.archive.database))
211 else:
212 Logger.addMessage("Creating table %s..." % table)
213 try:
214 if self.overwriteDb:
215 self.archive.createTable(table)
216 else:
217 self.archive.createTable(table, tableSchema,
218 overWrite=True)
219 except Exception:
220 Logger.addMessage("<ERROR> Cannot update %s" % table)
221 raise
222
223 if not ("vvvDetection" in table.name and "20" in table.name):
224 self._updateTable(table)
225
226
227 if self.isMetadataMirror and idxInfo[table.name]:
228 Logger.addMessage("Applying indices...")
229 self.archive.createObjects(idxInfo[table.name])
230
231 Logger.addMessage("Creating any new views and stored db functions...")
232 self.archive.createObjects(objectSchema, ignoreNS=True)
233
234
235
236 if self.isMetadataMirror:
237 self.archive.updateStatistics()
238
239 Logger.addMessage("done!")
240
241
242
244 """
245 Update contents of table.
246
247 @param table: Table details.
248 @type table: Schema.Table
249
250 """
251 Logger.addMessage("Updating table %s..." % table)
252
253 fromDb = (self._fromDb if not table.name.lower().startswith("vvv") else
254 self._fromDbVVV)
255
256 fromDb.enableDirtyRead()
257
258 sourceTable = table.name
259 mfTable = sourceTable.replace(table.name, "Multiframe")
260 alias = ''
261 columns = "*"
262 key = ('multiframeID' if 'multiframeID' in table.columns else
263 table.primaryKey().split(',')[0])
264
265 where = ''
266 if not self.overwriteDb:
267 currentData = SelectSQL(key, self.archive.tablePath(table.name))
268 where = "%s NOT IN (%s)" % (key, currentData)
269
270 if self.programmeID != set(self.programme.getProgIDList()):
271 progSelection = ("programmeID IN (%s, %s)" %
272 (dbc.intDefault(), ', '.join(map(str, self.programmeID))))
273 progMfs = SelectSQL("multiframeID",
274 table=self._fromDb.tablePath("ProgrammeFrame"),
275 where=progSelection)
276 else:
277 progSelection = ""
278
279 needMultiframeAttrs = \
280 not (self.copyAllData and self.dateRange == SyncDb.dateRange)
281
282 if table.name == "Provenance":
283
284
285
286 if progSelection:
287 if where and not where.endswith(" AND "):
288 where += " AND "
289 where += "multiframeID IN (%s)" % progMfs
290
291 if self.dateRange != SyncDb.dateRange:
292 if where and not where.endswith(" AND "):
293 where += " AND "
294
295 dStr = "utdate BETWEEN '%s' AND '%s'" % self.dateRange
296 where += "(utdate='%s' OR %s)"\
297 % (self.sysc.obsCal.lateDefaultDateTime, dStr)
298
299 if not self.copyAllData:
300 if where and not where.endswith(" AND "):
301 where += " AND "
302 where += "deprecated = 0"
303
304 currentMfs = \
305 SelectSQL("multiframeID", fromDb.tablePath("Multiframe"), where)
306
307 where = ("combiframeID IN (%s) AND " % currentMfs +
308 "multiframeID IN (%s)" % currentMfs)
309
310 if not self.overwriteDb:
311 where += " AND NOT (combiframeID IN (%s)"\
312 " AND multiframeID IN (%s))" % (
313 SelectSQL("combiframeID", self.archive.tablePath("Provenance")),
314 SelectSQL("multiframeID", self.archive.tablePath("Provenance")))
315
316 elif table.name.endswith("TilePawPrints"):
317
318 currentTiles = SelectSQL("tileSetID",
319 self.archive.tablePath(table.name.replace("PawPrints", "Set")))
320
321 where = "tileSetID IN (%s)" % currentTiles
322
323 elif progSelection and 'programmeID' in table.columns and \
324 not needMultiframeAttrs:
325 where += " AND " if where else ""
326 where += progSelection
327
328 elif ('multiframeID' in table.columns or table.name.endswith("TileSet")
329 and (needMultiframeAttrs or 'programmeID' not in table.columns)):
330
331 alias = "This"
332 columns = "%s.%s" % (alias, columns)
333 if where:
334 where = "%s.%s" % (alias, where)
335
336 if table.name != "Multiframe":
337 mfAlias = 'M'
338 mfIdName = ("multiframeID"
339 if not table.name.endswith("TileSet") else "tlmfID")
340 sourceTable = \
341 "%s AS %s, %s AS %s" % (sourceTable, alias, mfTable, mfAlias)
342 if where and not where.endswith(" AND "):
343 where += " AND "
344 where += "%s.%s = %s.multiframeID" % (alias, mfIdName, mfAlias)
345 if progSelection:
346 where += " AND "
347 where += (progSelection if 'programmeID' in table.columns
348 else "%s.multiframeID IN (%s)" % (mfAlias, progMfs))
349 else:
350 mfAlias = alias
351 sourceTable = "%s AS %s" % (sourceTable, mfAlias)
352 if progSelection:
353 if where and not where.endswith(" AND "):
354 where += " AND "
355 where += "%s.multiframeID IN (%s)" % (mfAlias, progMfs)
356
357 if self.dateRange != SyncDb.dateRange:
358 if where and not where.endswith(" AND "):
359 where += " AND "
360
361 dStr = "utdate BETWEEN '%s' AND '%s'" % self.dateRange
362 where += "(%s.utdate='%s' OR %s.%s)"\
363 % (mfAlias, self.sysc.obsCal.lateDefaultDateTime, mfAlias,
364 dStr)
365
366 if not self.copyAllData:
367 if where and not where.endswith(" AND "):
368 where += " AND "
369 where += "%s.deprecated = 0" % mfAlias
370 if table.name.startswith(("dxsDetection", "udsDetection",
371 "videoDetection")):
372 where += \
373 " AND (%s.frameType LIKE '%%deep%%' OR %s.frameType=%r)"\
374 % (mfAlias, mfAlias, dbc.charDefault())
375
376 try:
377
378 if not self.forceSQL and (table.name == "FlatFileLookUp"
379 or table.schemaFileName != self.sysc.curationSchema()):
380
381 orderBy = table.primaryKey(alias) + " ASC"
382 sourceTable = ', '.join(fromDb.tablePath(tableName)
383 for tableName in csv.values(sourceTable))
384
385 Logger.addMessage("Outgesting %s..." % table)
386 bulkCopy(query=SelectSQL(columns, sourceTable, where, orderBy),
387 tableSchema=table,
388 fromDb=fromDb,
389 toDb=self.archive,
390 fileTag=self.shareFileID)
391 else:
392 numRows = fromDb.copyIntoTable(
393 destinationTable=self.archive.tablePath(table.name),
394 sourceTable=sourceTable,
395 columns=columns,
396 where=where)
397 if numRows:
398 Logger.addMessage("%s copied" % numRows)
399
400 except (odbc.DatabaseError, Ingester.IngestError) as error:
401 ignoreCases = ["Column name or number", "Invalid object name"]
402
403 if (isinstance(error, Ingester.IngestError)
404 or any(case in str(error) for case in ignoreCases)) \
405 and Outgester.tempViewName not in str(error):
406
407 Logger.addMessage(
408 "<WARNING> Cannot update table %s because %s schema differs "
409 "from that of the destination database." %
410 (self.archive.tablePath(table.name), fromDb.database))
411
412 if self.isMetadataMirror:
413
414
415
416 raise
417 else:
418 raise
419
420
421
423 """ Don't update programme curation history on exception - this is just
424 to keep the exception message clear in the displayed log. Following
425 exception the script will probably be re-run anyway.
426 """
427 super(SyncDb, self)._onException()
428 self.programmeID = set()
429
430
431
433 """
434 Creates a new database, overwriting the old one, with all necessary tables
435 and initial curation data.
436
437 @param database: Database object to create.
438 @type database: Database
439 @param server: Name of server to create database on.
440 @type server: str
441 @param cli: Command-line options.
442 @type cli: CLI
443
444 """
445
446
447 fromDb = DbSession(cli.getArg("from_database"), autoCommit=True,
448 isTrialRun=cli.getOpt('test'), userName=dbc.loadServerRwUsername())
449
450 fromDb.enableDirtyRead()
451
452
453 Logger.addMessage("Parsing schemas...")
454 curationTables = schema.parseTables(SyncDb.sysc.curationSchema())
455 multiframeTables = schema.parseTables(SyncDb.sysc.metadataSchema())
456 mirrorTables = curationTables + multiframeTables
457 if cli.getOpt("metadata_mirror"):
458
459
460 Logger.addMessage("Checking for database schema mismatches...")
461 try:
462 fromDb.checkSchema(mirrorTables)
463 except schema.MismatchError as error:
464
465 raise SystemExit("<ERROR> %s" % error)
466
467 adminDb = DbSession(server + '.' + SyncDb.sysc.adminDatabase, autoCommit=True,
468 isTrialRun=cli.getOpt('test'), userName=dbc.loadServerRwUsername())
469
470 Logger.addMessage("Recreating database %s..." % database)
471 database.create(adminDb, overWrite=True)
472
473 db = DbSession(cli.getArg("to_database"), autoCommit=True,
474 isTrialRun=cli.getOpt('test'), userName=dbc.loadServerRwUsername())
475
476 permittedUsers = ["wsaro"]
477 if not cli.getOpt("metadata_mirror"):
478 permittedUsers.append("ldservro")
479
480 for user in permittedUsers:
481 db.grantAccess(user)
482
483
484 Logger.addMessage("Creating tables...")
485 for table in mirrorTables:
486 if table.releasable and not table.name.endswith("DetectorEsoKeys") \
487 or not cli.getOpt("metadata_mirror"):
488 db.createTable(table)
489
490 loadCurationTables(curationTables, fromDb, db, cli)
491
492 if int(cli.getArg("cuID")) in (0, 1):
493
494 for table in multiframeTables:
495 if table.name in ("Multiframe", "MultiframeDetector",
496 "CurrentAstrometry", "ProgrammeFrame"):
497 Logger.addMessage("%s..." % table)
498 defs = [column.getDefaultValue() for column in table.columns]
499 Logger.addMessage("%s rows copied." %
500 db.insertData(table.name, defs))
501
502
503
504 if not cli.getOpt("metadata_mirror"):
505 Logger.addMessage("Creating survey tables...")
506 createSurveyTables(db, int(cli.getArg("cuID")))
507
508 return server, database
509
510
511
555
556
557
592
593
594
596 """
597 Loads the curation tables with the initial curation data.
598
599 @param curationTables: List of curation table schemas.
600 @type curationTables: list(schema.Table)
601 @param fromDb: Database connection from which data should be copied.
602 @type fromDb: DbSession
603 @param toDb: Database connection to which data should be copied.
604 @type toDb: DbSession
605 @param cli: Command-line options.
606 @type cli: CLI
607
608 """
609
610 for table in curationTables:
611 if (not cli.getOpt("metadata_mirror") or table.releasable) and \
612 not table.name.endswith("CurationHistory") \
613 and table.name != "FlatFileLookUp":
614 Logger.addMessage("%s..." % table)
615 try:
616 Logger.addMessage("%s rows copied." %
617 fromDb.copyIntoTable(toDb.tablePath(table), table))
618 except odbc.ProgrammingError as error:
619 ignoreCases = ["Column name or number",
620 "Invalid object name"]
621 if any(case in str(error) for case in ignoreCases):
622 msg = ("Cannot update table %s because %s schema differs "
623 "from that of the destination database."
624 % (toDb.tablePath(table), fromDb))
625 if not CLI.isConfirmed(msg):
626 raise SystemExit("<ERROR> " + msg)
627 else:
628 Logger.addMessage("<WARNING> " + msg)
629 elif "sys.servers" in str(error):
630 Logger.addMessage("Outgesting %s..." % table)
631 bulkCopy(query=SelectSQL('*', fromDb.tablePath(table)),
632 tableSchema=table, fromDb=fromDb, toDb=toDb,
633 fileTag="%s_SyncDb_%s" % (toDb.database,
634 toDb.sysc.tempWorkPath().split('/')[-1]))
635 else:
636 raise
637
638
639
671
672
673
674
675
676 if __name__ == "__main__":
677
678 CLI.progArgs.remove("comment")
679 CLI.progArgs.remove("database")
680 CLI.progArgs += [
681 CLI.Argument("from_database", "WSA",
682 isValOK=lambda x: x.count('.') <= 1),
683 CLI.Argument("to_database", "TestWSA",
684 isValOK=lambda x: x.count('.') <= 1
685 and not any(x.split('.', 1)[-1].upper().startswith(loadDb)
686 for loadDb in SystemConstants.getAll("loadDatabase"))),
687 CLI.Argument("cuID", '3', isOptional=True,
688 isValOK=lambda x: x.isdigit())]
689
690 CLI.progOpts.remove("curator")
691 CLI.progOpts.remove("user")
692 CLI.progOpts += [
693 CLI.Option('a', "all",
694 "include deprecated data"),
695 CLI.Option('b', "begin",
696 "observation date of first multiframe to copy e.g. 2004-04-01"
697 " or 20040401, or first semester to release e.g. 05A_SV",
698 "DATE", isValOK=CLI.isDateOK),
699 CLI.Option('e', "end",
700 "observation date of last multiframe to copy e.g. 2006-07-31"
701 " or 20060731, or final semester to copy e.g. 07A",
702 "DATE", isValOK=CLI.isDateOK),
703 CLI.Option('M', "mergelog",
704 "always copy MergeLog table data"),
705 CLI.Option('m', "metadata_mirror",
706 "only metadata and curation tables are mirrored for release database"),
707 CLI.Option('n', "noprov",
708 "don't synchronise Provenance table"),
709 CLI.Option('o', "overwrite",
710 "start anew, overwriting old database"),
711 CLI.Option('p', "progID",
712 "copy data for this programme ID only",
713 "ID"),
714 CLI.Option('s', "sql_copy",
715 "just copy through SQL interface, don't do bulk copies"),
716 CLI.Option('x', "exclude",
717 "excludes non-survey programmes from the world release",
718 "LIST")]
719
720 cli = CLI(SyncDb, "$Revision: 10194 $")
721 Logger.isVerbose = False
722 Logger.addMessage(cli.getProgDetails())
723
724 server, database = defineDatabase(cli)
725 SyncDb.sysc = SystemConstants(database.name)
726
727
728 if cli.getOpt("overwrite"):
729 Logger.archive = True
730 createDatabase(database, server, cli)
731
732 SyncDb.isDayStampedLog = (cli.getOpt("metadata_mirror")
733 and "PROPRIETY" in cli.getArg("to_database").upper())
734
735
736 cu = SyncDb(
737 programmeID=cli.getOpt("progID") if cli.getOpt("progID") else '',
738 database=cli.getArg("to_database"),
739 comment="Updating database with latest values",
740 isTrialRun=cli.getOpt("test"),
741 userName=dbc.loadServerRwUsername())
742
743 cu.copyAllData = cli.getOpt("all")
744 cu.cuLevel = int(cli.getArg("cuID"))
745 try:
746 cu.dateRange = \
747 cu.sysc.obsCal.dateRange(cli.getOpt("begin"), cli.getOpt("end"))
748
749 except Exception as error:
750 eType = "Invalid Option"
751 Logger.addExceptionMessage(error, eType)
752 raise SystemExit(eType + ": see log " + cu._log.pathName)
753
754 if cli.getOpt("exclude"):
755 cu.excludeProgs = \
756 tuple(prog.lower() for prog in csv.values(cli.getOpt("exclude")))
757
758 cu.forceSQL = cli.getOpt("sql_copy")
759 cu.fromDbPathName = cli.getArg("from_database")
760 cu.includeMergeLog = cli.getOpt("mergelog")
761 cu.isMetadataMirror = cli.getOpt("metadata_mirror")
762 cu.overwriteDb = cli.getOpt("overwrite")
763 cu.skipProvenance = cli.getOpt("noprov")
764
765
766 cu.run()
767
768 if cli.getOpt("metadata_mirror") and not cli.getOpt("test"):
769
770 cu.archive.goOffline()
771 releaseDatabase(database, server)
772
773
774
775
776
777