1
2
3
4 """
5 Finalise the catalogue data ingests by creating the constraints for the
6 monthly VVV tables and deprecating old detections.
7
8 @author: Eckhard Sutorius
9 @org: WFAU, IfA, University of Edinburgh
10 """
11
12 from collections import defaultdict
13 import os
14 import string
15
16 from wsatools.CLI import CLI
17 import wsatools.DbConnect.DbConstants as dbc
18 from wsatools.DbConnect.DbSession import DbSession, odbc, SelectSQL
19 import wsatools.DbConnect.CommonQueries as queries
20 import wsatools.DbConnect.Schema as schema
21 import wsatools.DataFactory as df
22 from wsatools.File import File, PickleFile
23 from wsatools.Logger import Logger
24 from wsatools.SystemConstants import SystemConstants
25 import wsatools.Utilities as utils
26
27 pickledDataFile = "DetData_%s.pcl"
28 sqlOutDir = "/mnt/ramses12/"
29
30 constraintName = {"mf": "multiframeID%s%s",
31 "obj": "objID%s"}
32 constraintStr = "ALTER TABLE %s%s WITH CHECK "\
33 "ADD CONSTRAINT %s CHECK "
34 constraintRange = {"mf": "(multiframeID >= %s AND multiframeID <= %s)",
35 "obj": "(objID >= %d AND objID <= %d)"}
36 constraintAdd = {"mf": "(multiframeID = %s)",
37 "obj": "(objID = %d)"}
38
39
41 """
42 Create script to attach constraints to monthly VVV detection tables.
43
44 @param archive: The DB connection.
45 @type archive: DBSession
46 @param readFromFile: If True read from existing data file else query DB.
47 @type readFromFile: bool
48
49 """
50 detSuffix = month or "20"
51
52
53 identity = string.maketrans('', '')
54 deleteTrans = '. []()'
55 dbConstraints = defaultdict()
56 for name, text in archive.query(
57 "name, text", "sysobjects o, syscomments c", " AND ".join([
58 "o.id=c.id", "xtype = 'C'",
59 "(name like 'multiframeID%20%' or name like 'objID%20%')"])):
60 dbConstraints[name] = text.translate(identity, deleteTrans)
61
62
63 timestamp = utils.makeMssqlTimeStamp()
64 constraintFile = File(os.path.join(
65 sqlOutDir, "MakeConstraints_%s.sql" % timestamp[:timestamp.find(':')]))
66 constraintFile.wopen()
67 constraintFile.writetheline("use VSAVVV")
68 for splitTable in ["Raw", "Astrometry", "Photometry"]:
69 if splitTable in ["Astrometry", "Photometry"]:
70 readFromFile = True
71
72 detTableName = "%sDetection%s" % ("vvv", splitTable)
73 constraintFile.writetheline('--- %s mfID constraints ---' % splitTable)
74
75
76 Logger.addMessage("Getting monthly %sDetection tables..." % splitTable)
77 detTabs = archive.query(
78 "name", "sysobjects", "name like '%s%s%%'" % (detTableName,
79 detSuffix))
80 mfidList = []
81 constraintDict = defaultdict(list)
82 detCounts = defaultdict()
83
84
85 Logger.addMessage("Reading multiframeIDs")
86 fileName = "Mfid%s" % (pickledDataFile % archive.database)
87
88 if readFromFile:
89 Logger.addMessage(" from %s..." % fileName)
90 dataFile = PickleFile(fileName)
91 mfidList, detCounts = list(dataFile.pickleRead())
92 else:
93 for table in sorted(detTabs):
94 Logger.addMessage(" from %s..." % table)
95 monthStr = table.replace(detTableName, '')
96 if monthStr:
97 results = archive.query("multiframeID, count(*)", table,
98 "objID>=0", groupBy="MultiframeID",
99 orderBy="MultiframeID")
100 for mfid, counts in results:
101 mfidList.append((mfid, monthStr))
102 detCounts[mfid] = counts
103
104
105 dataFile = PickleFile(fileName)
106 dataFile.pickleWrite(mfidList, detCounts)
107
108 Logger.addMessage("%d mfIDs found" % len(mfidList))
109
110
111 monthOld = "000000"
112 theMonth = monthOld
113 constraintOn = -1
114 constraintOff = -1
115 Logger.addMessage("Getting the multiframeID constraint ranges...")
116 for mfid, monthStr in sorted(mfidList):
117 if monthStr != monthOld:
118 monthOld = monthStr
119 if constraintOn < 0:
120 constraintOn = mfid
121 theMonth = monthStr
122 else:
123 if constraintOn == constraintOff:
124 constraintDict[theMonth].append(
125 constraintAdd["mf"] % constraintOn)
126 else:
127 constraintDict[theMonth].append(
128 constraintRange["mf"] % (constraintOn,
129 constraintOff))
130 constraintOn = mfid
131 constraintOff = mfid
132 theMonth = monthOld
133 else:
134 constraintOff = mfid
135
136 constraintDict[theMonth].append(
137 constraintRange["mf"] % (constraintOn, mfid))
138 del mfidList
139
140
141 for month in sorted(constraintDict):
142 theConstraintName = constraintName["mf"] % (splitTable, month)
143 theConstraintText = constraintStr % (detTableName, month,
144 theConstraintName)
145 theConstraintText += '(' + " OR ".join(constraintDict[month]) + ')'
146 theConsCheck = theConstraintText.rpartition("CHECK")[2].translate(
147 identity, deleteTrans)
148 if writeAll or theConstraintName not in dbConstraints \
149 or theConsCheck != dbConstraints[theConstraintName]:
150 constraintFile.writetheline(theConstraintText)
151
152 objidDict = defaultdict(list)
153 constraintDict = defaultdict(list)
154
155 if splitTable == "Raw":
156 constraintFile.writetheline('--- objID constraints ---')
157 fileName = "Objid%s" % (pickledDataFile % archive.database)
158 Logger.addMessage("Reading objIDs")
159 if readFromFile:
160 Logger.addMessage(" from %s..." % fileName)
161 dataFile = PickleFile(fileName)
162 objidDict = list(dataFile.pickleRead())[0]
163 else:
164 for table in sorted([x for x in detTabs if "Raw" in x]):
165 Logger.addMessage(" from %s..." % table)
166 monthStr = table.replace(detTableName, '')
167 if monthStr:
168 results = archive.query(
169 "multiframeID, MIN(objID), MAX(objID)", table,
170 "objID>=0", groupBy="multiframeID",
171 orderBy="MIN(objID)")
172 for mfid, minobj, maxobj in results:
173 objidDict[monthStr].append((mfid, minobj, maxobj))
174
175
176 dataFile = PickleFile(fileName)
177 dataFile.pickleWrite(objidDict)
178
179 Logger.addMessage("%d objIDs found" % sum(
180 len(val) for val in objidDict.itervalues()))
181
182 for monthStr in objidDict:
183 monthRange = []
184 queryMfids = []
185 if detTableName + monthStr in detTabs:
186 for mfid, minobj, maxobj in objidDict[monthStr]:
187 if maxobj - minobj + 1 == detCounts[mfid]:
188 monthRange.append((minobj, maxobj))
189 else:
190 queryMfids.append(mfid)
191 if queryMfids:
192 for mfID in queryMfids:
193 table = detTableName + monthStr
194 ranges = getObjIDRanges(archive, mfID, table)
195 for begObjid, endObjid in ranges:
196 monthRange.append((begObjid, endObjid))
197
198 newMonthRange = []
199 minObjid, maxObjid = sorted(monthRange)[0]
200 for objMin, objMax in sorted(monthRange)[1:]:
201 if objMin == maxObjid + 1:
202 maxObjid = objMax
203 else:
204 newMonthRange.append((minObjid, maxObjid))
205 minObjid = objMin
206 maxObjid = objMax
207 newMonthRange.append((minObjid, maxObjid))
208
209 for objidMin, objidMax in newMonthRange:
210 if objidMin != objidMax:
211 constraintDict[monthStr].append(constraintRange[
212 "obj"] % (objidMin, objidMax))
213 else:
214 constraintDict[monthStr].append(
215 constraintAdd["obj"] % objidMin)
216
217
218 for month in sorted(constraintDict):
219 appendConstrain("obj", constraintDict, month,
220 dbc.intDefault(), dbc.intDefault())
221
222 theConstraintName = constraintName["obj"] % month
223 theConstraintText = constraintStr % (detTableName, month,
224 theConstraintName)
225 theConstraintText += '(' + " OR ".join(sorted(
226 constraintDict[month])) + ')'
227 theConsCheck = theConstraintText.rpartition(
228 "CHECK")[2].translate(identity, deleteTrans)
229 if writeAll or theConstraintName not in dbConstraints \
230 or theConsCheck != dbConstraints[theConstraintName]:
231 constraintFile.writetheline(theConstraintText)
232 constraintFile.writetheline("GO")
233
234 constraintFile.close()
235 Logger.addMessage("Updates written to %s" % constraintFile.name)
236
237
238
240 """ Create the views for the monthly split tables.
241 """
242 detTableList = ["%sDetection%s" % (surveyName.lower(), splitTable)
243 for splitTable in ['Raw', 'Astrometry', 'Photometry']]
244
245 try:
246 for detTableName in detTableList:
247 Logger.addMessage("Creating VIEW %s" % detTableName)
248 viewID = archive.query(
249 "id", "sysobjects",
250 "xtype = 'v' and name like '%s%%'" % detTableName,
251 firstOnly=True, default=None)
252 viewText = (archive.query(
253 "text", "syscomments", "id=%d" % viewID, firstOnly=True)
254 if viewID else '')
255 dettabs = archive.query(
256 "name", "sysobjects", "name like '%s%%'" % detTableName)
257 detMonths = []
258 viewExists = True
259 for table in dettabs:
260 if table.replace(detTableName, ''):
261 detMonths.append(table.replace(detTableName, ''))
262 if table not in viewText:
263 viewExists = False
264 if not viewExists:
265 view = schema.View()
266 view.name = detTableName
267 view.definition = " UNION ALL ".join(
268 ([str(SelectSQL('*', detTableName + detMon))
269 for detMon in sorted(detMonths) if detMon]))
270 archive.createObjects([view], overWrite=True)
271 archive.commitTransaction()
272 else:
273 Logger.addMessage("View already exists.")
274
275 except odbc.ProgrammingError as error:
276 if "permission was denied" in str(error):
277 raise SystemExit("<ERROR> You do not have write permission on "
278 "database %s" % archive)
279
280
281
283 """
284 Get all objIDs for the given month.
285
286 @param archive: The DB connection.
287 @type archive: DBSession object
288 @param mfidList: List of MultiframeIDs.
289 @type mfidList: list
290 @param tableName: Name of the processed table.
291 @type tableName: str
292
293 """
294 results = archive._executeScript(
295 "WITH objidCTE AS ("
296 "SELECT objID, DENSE_RANK() OVER (ORDER BY objID) - objID AS grp "
297 "FROM %s WHERE multiframeID=%d and objid>0) "
298 "SELECT MIN(objID), MAX(objID) FROM objidCTE "
299 "GROUP BY grp order by MIN(objID)" % (tableName, mfID),
300 wantResult=True)
301 return results
302
303
304
306 """
307 Append entry to constraint dictionary.
308
309 @param attr: Attribute short for which the constraint is build.
310 @type attr: str
311 @param conDict: The constraint dictionary.
312 @type conDict: dict
313 @param month: The processed month.
314 @type month: str
315 @param conOn: Start value of the constraint.
316 @type conOn: int
317 @param conOff: End value of the constraint.
318 @type conOff: int
319
320 """
321 if conOn == conOff:
322 conDict[month].append(
323 constraintAdd[attr] % conOn)
324 else:
325 conDict[month].append(
326 constraintRange[attr] % (conOn, conOff))
327
328
329
358
359
360
362 """
363 Since the objectIDs couldn't be calculated beforehand they have to be
364 updated separately based on their value and the maximal value found in
365 the database.
366
367 @param archive: The DB connection.
368 @type archive: DBSession object
369
370 """
371 Logger.addMessage("Updating objIDs...")
372 maxObjId = 0
373 detTabs = defaultdict()
374 for detTable in sorted(archive.query("name", "sys.tables",
375 "name like '%%%s%%'" % "Raw20")):
376 minObj, maxObj = archive.query("MIN(objID), MAX(objID)", detTable,
377 "seqNum>-99999990",
378 firstOnly=True, default=(0, 0))
379 detTabs[detTable] = minObj
380 maxObjId = max(maxObjId, maxObj)
381
382 for detTable in sorted(detTabs):
383 if detTabs[detTable] < 0:
384 archive._executeScript(
385 "DECLARE @myvar bigint; set @myvar = %d; "
386 "WITH updCTE as (SELECT TOP 9223372036854775807 "
387 "multiframeID, extNum, seqNum, objID "
388 "FROM %s WHERE objID<0 AND seqNum>-99999990 "
389 "ORDER BY multiframeID, extNum, seqNum)"
390 "UPDATE updCTE SET @myvar = objID = @myvar + 1" % (
391 maxObjId, detTable), wantRowCount=True)
392 archive.commitTransaction()
393 Logger.addMessage("...%s updated." % detTable)
394 maxObjId = max(maxObjId, archive.queryAttrMax("objID", detTable))
395
396
397
398
399
400 if __name__ == "__main__":
401 CLI.progOpts += [
402 CLI.Option('C', "constraint",
403 "create constraint script"),
404 CLI.Option('D', "deprecate",
405 "deprecate old detections"),
406 CLI.Option('F', "force",
407 "write constraints even when they are already in the DB"),
408 CLI.Option('O', "objid",
409 "update objIDs for VVV"),
410 CLI.Option('V', "doviews",
411 "create views for VVV"),
412 CLI.Option('m', "month",
413 "create objID constraints only for this month "
414 "(format: YYYYMM)", "NUMBER",
415 isValOK=lambda x: x.isdigit() and len(x) == 6),
416 CLI.Option('r', "readfile",
417 "read data from file instead DB"),
418 CLI.Option('v', "version", "version number of the data", "STR")]
419
420 cli = CLI("FinaliseCatalogueIngest", "$Revision: 10069 $", __doc__)
421 database = cli.getArg("database").split('.')[-1]
422 Logger.isVerbose = False
423
424
425
426 Logger.addMessage(cli.getProgDetails())
427 sysc = SystemConstants(database)
428 obsCal = sysc.obsCal
429
430 if not cli.getOpt("semester"):
431 semesterList = obsCal.getSemList()[-3:]
432 elif cli.getOpt("semester").lower() == "all":
433 semesterList = obsCal.getSemList()
434 else:
435 semesterList = cli.getOpt("semester").split(',')
436
437 version = (cli.getOpt("version") or
438 str(obsCal.maxVersOfDate(obsCal.getDates(semesterList[0],
439 "%Y%m%d")[0])))
440
441 doAll = not any([cli.getOpt("deprecate"), cli.getOpt("constraint"),
442 cli.getOpt("objid"), cli.getOpt("doviews")])
443
444 if doAll or cli.getOpt("deprecate"):
445 for server in sorted(sysc.ingestDbForServer):
446 loadDB = '.'.join([server, sysc.ingestDbForServer[server]])
447 try:
448
449 archive = DbSession(loadDB, False, cli.getOpt("test"),
450 cli.getOpt("user"))
451 deprecateOldDetections(archive)
452 del archive
453 except Exception, details:
454 Logger.addExceptionMessage(details)
455 raise
456
457 if sysc.isVSA() and (doAll or cli.getOpt("doviews")):
458 try:
459
460 if "test" in database.lower():
461 loadDb = cli.getArg("database")
462 else:
463 loadDb = "ramses12.VSAVVV"
464 archive = DbSession(loadDb, False, cli.getOpt("test"),
465 cli.getOpt("user"))
466 createViews(archive, "VVV")
467 del archive
468 except Exception, details:
469 Logger.addExceptionMessage(details)
470 raise
471
472
473
474 if sysc.isVSA() and (doAll or cli.getOpt("constraint")):
475 try:
476
477 if "test" in database.lower():
478 loadDb = cli.getArg("database")
479 else:
480 loadDb = "ramses12.VSAVVV"
481 archive = DbSession(loadDb, False, cli.getOpt("test"),
482 cli.getOpt("user"))
483 createConstraints(archive, readFromFile=cli.getOpt("readfile"),
484 month=cli.getOpt("month"),
485 writeAll=cli.getOpt("force"))
486 del archive
487 except Exception, details:
488 Logger.addExceptionMessage(details)
489 raise
490
491 if sysc.isVSA() and (doAll or cli.getOpt("objid")):
492 try:
493
494 if "test" in database.lower():
495 loadDb = cli.getArg("database")
496 else:
497 loadDb = "ramses12.VSAVVV"
498 archive = DbSession(loadDb, False, cli.getOpt("test"),
499 cli.getOpt("user"))
500 updateVVVObjIDs(archive)
501 del archive
502 except Exception, details:
503 Logger.addExceptionMessage(details)
504 raise
505