1
2
3
4 """
5 Updates fileName and catName in the given databases from the DB 'disk??'
6 value to the given disk.
7
8 @author: E. Sutorius
9 @org: WFAU, IfA, University of Edinburgh
10 """
11
12 from collections import defaultdict
13 import os
14 import time
15
16 from wsatools.CLI import CLI
17 import wsatools.CSV as csv
18 from wsatools.DbConnect.DbSession import DbSession, Join
19 from wsatools.File import File
20 from wsatools.Logger import Logger, ForLoopMonitor
21 import wsatools.DbConnect.CommonQueries as queries
22 from wsatools.SystemConstants import DepCodes, SystemConstants
23
24
26 """ Updates fileName and catName in the given databases.
27 """
28
29
30
31
32 dateVersStr = '20050101'
33 deprecateFiles = False
34 isTrialRun = False
35 newDisk = ''
36 outDir = SystemConstants.fitsDir
37 subDir = SystemConstants.fitsDir
38 updateFileName = None
39 updateVersion = ''
40 userName = DbSession.userName
41
42
43
44
45 _databases = []
46 _updateFileDict = {}
47 _updateMask = {'disk': None, 'subdir': None, 'version': None, "avail": None}
48
49
50
52 """
53 Initialise class from command-line options.
54
55 @param cli: Command-line arguments.
56 @type cli: CLI
57
58 """
59
60 versNum = cli.getOpt("version")
61 if '=>' in versNum:
62 self.updateVersion = versNum.replace(' ', '').partition("=>")[::2]
63 self._updateMask['version'] = ['_v%s' % self.updateVersion[0],
64 '_v%s' % self.updateVersion[1]]
65 else:
66 self.updateVersion = (versNum, versNum)
67
68
69 self.dateReq = cli.getArg("date")
70 self.obsCal = SystemConstants(cli.getArg("archive")).obsCal
71 if self.dateReq.isdigit():
72 self.dateVersStr = (
73 "%s_v%s" % (self.dateReq, self.updateVersion[0])
74 if self.dateReq is not UpdateFits.dateVersStr
75 else None)
76 self.dateVersStrList = [self.dateVersStr]
77 else:
78 semDays = self.obsCal.getSemDayList(self.dateReq)
79 self.dateVersStrList = ["%s_v%s" % (x, self.updateVersion[0])
80 for x in semDays]
81 self.dateVersStr = self.dateVersStrList[0]
82
83
84 if cli.getOpt("newdisk") != "diskNN":
85 self.newDisk = os.sep + os.path.join(cli.getOpt("newdisk"),
86 cli.getArg("archive").lower())
87 self._updateMask['disk'] = ['', self.newDisk]
88
89
90 self.subDir = cli.getOpt("subdir")
91 self.outDir = cli.getOpt("outdir")
92 if self.subDir != self.outDir:
93 self._updateMask['subdir'] = [self.subDir, self.outDir]
94
95
96 self.deprecateFiles = cli.getOpt("deprecate")
97
98 self.checkPixAvailable = cli.getOpt("available")
99 if self.checkPixAvailable and not self.dateVersStr:
100 raise Exception("Must choose date with -A option")
101
102 self.checkDB = cli.getOpt("masterdb")
103
104 self.updateFileName = cli.getOpt("filename")
105
106 self.isTrialRun = cli.getOpt("test")
107 self.userName = cli.getOpt("user")
108
109
110 if cli.getOpt("multidbs"):
111 server, dbs = cli.getOpt("multidbs").split("::")
112 databases = set(dbs.split(',') if dbs != "all" else [])
113 elif cli.getOpt("database"):
114 server, db = cli.getOpt("database").split('.')
115 databases = set([db])
116 else:
117 raise Exception("Must choose either -M or -d options")
118
119 self._databases = getAvailDBs(cli.getArg("archive"), server, databases)
120
121
122
124 """
125 Get a list of all MultiframeIds, filenames, and catnames.
126
127 @param archive: Connection to database to query.
128 @type archive: DbSession
129
130 @return: Dictionary giving fileName, catName tuples for every MfID.
131 @rtype: dict(int:list(str))
132
133 """
134 compareList = ['/'.join(entry.rsplit('/', 2)[-2:])
135 for entry in sorted(self._updateFileDict)]
136
137 whereClause = ''
138 if self.dateVersStr:
139 whereClause = "dateVersStr in ('%s')" % "','".join(
140 self.dateVersStrList)
141
142 mffflu = archive.query(selectStr="name",
143 fromStr="sysobjects",
144 whereStr=' OR '.join([
145 "name like 'Multiframe'",
146 "name like 'FlatFileLookUp'"]))
147
148 if 'FlatFileLookUp' in mffflu:
149 results = archive.query(
150 selectStr="Multiframe.multiframeID, Multiframe.fileName, "
151 "catName, dateVersStr",
152 fromStr=Join(["Multiframe", "FlatFileLookUp"],
153 ["multiframeID"]),
154 whereStr=whereClause)
155
156 elif 'Multiframe' in mffflu:
157 results = archive.query(
158 selectStr="multiframeID, fileName, catName, '%s'" % self.dateVersStr,
159 fromStr="Multiframe",
160 whereStr=" OR ".join(["fileName like '%%%s%%'" % x
161 for x in self.dateVersStrList]))
162 else:
163 Logger.addMessage("<ERROR> %s is missing the Multiframe and"
164 " FlatFileLookUp table!" % archive.database)
165 results = []
166
167 mfd = defaultdict(list)
168 Logger.addMessage("Creating mfID dict...")
169 for mfID, fN, cN, dvStr in results:
170 dvStr = (dvStr if dvStr in fN else fN.rsplit('/', 2)[-2])
171 fileName = fN.replace(archive.sysc.pixelServerHostName, '')
172 catName = cN.replace(archive.sysc.pixelServerHostName, '')
173 if '/'.join(fileName.rsplit('/', 2)[-2:]) in compareList \
174 or not compareList:
175 mfd[mfID] += [fileName, catName, dvStr]
176 Logger.addMessage("%d files found." % len(mfd))
177 return mfd
178
179
180
203
204
205
207 """
208 Update the given file name from arguments.
209
210 @param oldFileName: Old file name.
211 @type oldFileName: str
212 @param dateVersStr: Date-version string.
213 @type dateVersStr: str
214 @param updateMask: Update mask to use.
215 @type updateMask: dict(str:list(str))
216
217 @return: New file name.
218 @rtype: str
219
220 """
221 newFileName = oldFileName
222 for flag in updateMask:
223 if updateMask[flag]:
224 if flag == "version":
225 oldEntry = dateVersStr
226 newEntry = dateVersStr.replace(
227 '_v%s' % self.updateVersion[0],
228 '_v%s' % self.updateVersion[1])
229 else:
230 oldEntry = updateMask[flag][0]
231 newEntry = updateMask[flag][1]
232 newFileName = newFileName.replace(oldEntry, newEntry)
233 return newFileName
234
235
236
238 """Create the dictionary containing old/new path relations from
239 a file.
240 """
241 if not os.path.exists(self.updateFileName):
242 raise SystemExit(
243 "<ERROR> %s does not exist!" % self.updateFileName)
244
245 Logger.addMessage("Updating from " + self.updateFileName)
246 self._updateFileDict = dict(csv.File(self.updateFileName))
247
248 datedirSet = set()
249 for entry in self._updateFileDict:
250 updFile = File(entry)
251 datedirSet.add(updFile.subdir)
252
253 if len(datedirSet) > 1:
254 raise SystemExit("<ERROR> More than one day in the given list, "
255 "please split it up!")
256 elif len(datedirSet) == 1:
257 self.dateVersStr = datedirSet.pop()
258 self.dateVersStrList = [self.dateVersStr]
259 else:
260 raise SystemExit("<ERROR> No date found in the list!")
261
262
263
265 """
266 Update the given database.
267
268 @param database: Database to update.
269 @type database: str
270
271 """
272 Logger.addMessage("Connecting to %s" % database)
273 archive = DbSession(database, autoCommit=True,
274 isTrialRun=self.isTrialRun, userName=self.userName)
275
276
277 Logger.addMessage("Getting file names...")
278 mfidDict = self.getFileNames(archive)
279
280
281 newMfidDict = defaultdict(list)
282
283 fitsDirWarning = False
284 filePathWarning = False
285 pfnlaStr = "PixelFileNoLongerAvailable:"
286 updateMask = self._updateMask.copy()
287 Logger.addMessage("Checking %s files..." % len(mfidDict))
288 progress = ForLoopMonitor(sorted(mfidDict))
289 for mfID in sorted(mfidDict):
290 oldDbFileName, oldDbCatName, dateVersStr = mfidDict[mfID]
291
292 if self.updateFileName:
293 try:
294 newFileName = self._updateFileDict[oldDbFileName]
295 if "empty_catalogue.fits" in oldDbCatName \
296 or oldDbCatName.upper() == "NONE":
297 newCatName = oldDbCatName
298 else:
299 newCatName = self._updateFileDict[oldDbCatName]
300 newMfidDict[mfID].extend([newFileName, newCatName])
301 except KeyError:
302 Logger.addMessage(
303 "%s in DB but not in provided list." % oldDbFileName)
304
305 elif self.checkDB:
306 masterDbFileName, masterDbCatName, masterDateVersStr = \
307 self.masterMfidDict[mfID]
308 if masterDateVersStr == dateVersStr:
309 newFileName = (masterDbFileName
310 if oldDbFileName != masterDbFileName
311 else None)
312 newCatName = (masterDbCatName
313 if oldDbCatName != masterDbCatName
314 and not (oldDbCatName == "NONE" \
315 and "empty" in masterDbCatName)
316 else None)
317 if newFileName or newCatName:
318 newMfidDict[mfID].extend([newFileName, newCatName])
319
320 else:
321
322 oldFile = File(oldDbFileName.replace(pfnlaStr, ''))
323 oldDisk = oldFile.diskdir
324 oldSubDir = oldFile.commondir
325 if updateMask['disk']:
326 updateMask['disk'][0] = oldDisk
327
328
329 if oldSubDir == self.subDir:
330 if mfID == sorted(mfidDict)[0]:
331 text = []
332 if updateMask['disk']:
333 text.append('=>'.join([oldDisk, self.newDisk]))
334 if updateMask['subdir']:
335 text.append('=>'.join([self.subDir, self.outDir]))
336 if updateMask['version']:
337 text.append('=>'.join([
338 'version:' + self.updateVersion[0],
339 self.updateVersion[1]]))
340 if text:
341 Logger.addMessage("Updating %s" % ', '.join(text))
342 else:
343 if not fitsDirWarning:
344 Logger.addMessage("<WARNING> DB file(s) in different "\
345 "fits directory: %s (%s)" % (
346 oldSubDir, self.subDir))
347 fitsDirWarning = True
348
349 if self.checkPixAvailable:
350 if os.path.exists(oldFile.name):
351 updateMask["avail"] = [pfnlaStr, '']
352 else:
353 if not pfnlaStr in oldDbFileName:
354 prefix = "/disk"
355 updateMask["avail"] = [
356 prefix, pfnlaStr + prefix]
357 print updateMask
358 elif pfnlaStr + pfnlaStr in oldDbFileName:
359 updateMask["avail"] = [pfnlaStr + pfnlaStr, pfnlaStr]
360 else:
361 updateMask["avail"] = ['', '']
362
363 if oldFile.subdir.rpartition('_v')[2] == self.updateVersion[0] \
364 and dateVersStr in oldFile.name \
365 and oldFile.commondir == self.subDir:
366 newDbFileName = self.updateFromArgs(
367 oldDbFileName, dateVersStr, updateMask)
368 if os.path.exists(newDbFileName.rpartition(':')[-1]):
369 newDbFileName = newDbFileName.replace(pfnlaStr, '')
370 if "empty_catalogue.fits" in oldDbCatName \
371 or oldDbCatName.upper() == "NONE":
372 newDbCatName = oldDbCatName
373 else:
374 newDbCatName = self.updateFromArgs(
375 oldDbCatName, dateVersStr, updateMask)
376 if os.path.exists(newDbCatName.rpartition(':')[-1]):
377 newDbCatName = newDbCatName.replace(pfnlaStr, '')
378 else:
379 newDbFileName = oldDbFileName
380 newDbCatName = oldDbCatName
381
382 newFilePath = newDbFileName.rpartition(':')[-1]
383 if newDbFileName != oldDbFileName:
384 if not os.path.exists(newFilePath) \
385 and pfnlaStr not in newDbFileName:
386 newDbFileName = ':'.join([pfnlaStr, newFilePath])
387 Logger.addMessage(
388 "<WARNING> Image File missing: %s" % newFilePath)
389 filePathWarning = True
390 newMfidDict[mfID].extend([newDbFileName, newDbCatName])
391
392 if not (newDbCatName.endswith("empty_catalogue.fits")
393 or newDbCatName.upper() == "NONE") \
394 and not os.path.exists(newDbCatName):
395 Logger.addMessage(
396 "<WARNING> Catalogue File missing: %s" % newDbCatName)
397 filePathWarning = True
398 progress.testForOutput()
399
400 if filePathWarning or fitsDirWarning:
401 Logger.addMessage("<WARNING> Check if version is correct or "
402 "new file path exists.")
403
404 if newMfidDict:
405 Logger.addMessage("Updating %s entries in %s..." %
406 (len(newMfidDict), database))
407
408
409 if archive.database.upper().startswith(("UKIDSS",
410 "WORLD", "TRANSIT")):
411 path = os.path.join(
412 "/mnt", archive.sysc.loadServer,
413 "SqlUpdates", "FITS_PubDB_%s.%s_%sv%s_%s.sql"
414 % (archive.server, archive.database,
415 self.dateReq, self.updateVersion[1],
416 time.strftime('%Y%m%d_%H%M')))
417
418 updateFile = File(path)
419 updateFile.aopen()
420 else:
421 updateFile = None
422
423 updateDBFileNames(archive, newMfidDict, updateFile)
424 if archive.database.upper().startswith(("UKIDSS",
425 "WORLD", "TRANSIT")):
426 updateFile.close()
427
428 if self.deprecateFiles:
429 if archive.server == archive.sysc.loadServer:
430 deprecateFileNames(archive, sorted(newMfidDict))
431 else:
432 Logger.addMessage("You're not allowed to deprecate data "
433 "in a public database.")
434 else:
435 Logger.addMessage(database + " is up-to-date.")
436
437
438
440 """
441 Deprecate the frames in the multiframeID list.
442
443 @param archive: Connection to database to update.
444 @type archive: DbSession
445 @param mfIDList: List of MultiframeIDs.
446 @type mfIDList: list
447
448 """
449 entries = "deprecated=deprecated+%s" % DepCodes.reprocCASU
450
451 rows = "deprecated<%s AND multiframeID IN (%s)" % \
452 (DepCodes.reprocCASU, ','.join(map(str, mfIDList)))
453
454 Logger.addMessage("%s deprecated in Multiframe." %
455 archive.update("Multiframe", entries, rows))
456
457 Logger.addMessage("%s deprecated in MultiframeDetector." %
458 archive.update("MultiframeDetector", entries, rows))
459
460
461
463 """
464 Get all or selected DBs from given server.
465
466 @param archive: The archive to be updated, e.g. WSA.
467 @type archive: str
468 @param mainServer: The server to query for databases, can be the name or
469 the synonym, e.g. dbload or dbpub
470 (for all public servers).
471 @type mainServer: str
472 @param databases: Optionally request specific database names else all are
473 returned.
474 @type databases: sequence(str)
475
476 @return: List of full paths to databases on selected server.
477 @rtype: list(str)
478
479 """
480 sysc = SystemConstants(archive)
481
482
483 if mainServer == "dbpub":
484 availDatabases = []
485 for server in sysc.publicServers:
486 pubDBs = set(queries.getAllDBs(server))
487 if sysc.isVSA():
488 pubDBs = set([db for db in pubDBs
489 if not db.startswith(("UKIDSS", "TRANSIT",
490 "WORLD"))])
491 else:
492 pubDBs = set([db for db in pubDBs if not (db.startswith("V")
493 or db.endswith("SIAP"))])
494 if databases:
495 pubDBs.intersection_update(databases)
496 availDatabases.extend(server + '.' + db for db in pubDBs)
497
498
499 elif mainServer == "dbload":
500 availDatabases = [sysc.loadServer + '.' + db for db in databases]
501
502
503 elif mainServer in sysc.publicServers + [sysc.loadServer]:
504 availDatabases = [mainServer + '.' + db for db in databases]
505 else:
506 raise SystemExit("<ERROR> %s not known." % mainServer)
507
508 return sorted(availDatabases)
509
510
511
513 """
514 Update the database entries for fileName and catName.
515
516 @param archive: Connection to database to update.
517 @type archive: DbSession
518 @param mfDict: Dictionary giving fileName, catName tuples for every MfID.
519 @type mfDict: dict(int:list(str))
520 @param updateFile: Write to this file for read only DBs.
521 @type updateFile: File obj
522
523 """
524 numRows = 0
525 for mfID in sorted(mfDict):
526 updSql = []
527 fileName, catName = mfDict[mfID][:2]
528 if fileName:
529 fileName = fileName.replace(
530 "/disk", archive.sysc.pixelServerHostName + "/disk")
531 updSql.append("fileName='%s'" % fileName)
532
533 if catName:
534
535
536 catName = catName.replace(
537 "/disk", archive.sysc.pixelServerHostName + "/disk")
538 updSql.append("catName='%s'" % catName)
539
540
541 if archive.database.upper().startswith(("UKIDSS", "WORLD", "TRANSIT")):
542 numRows += 1
543 updateFile.writetheline(
544 "UPDATE Multiframe SET %s WHERE multiframeID=%s" % (
545 ','.join(updSql), mfID))
546 else:
547 numRows += archive.update("Multiframe", ','.join(updSql),
548 "multiframeID=%s" % mfID)
549
550 Logger.addMessage("%s fileNames affected in Multiframe." % numRows)
551
552
553
554
555
556 if __name__ == '__main__':
557
558
559 CLI.progArgs.remove("database")
560 CLI.progArgs += [
561 CLI.Argument("archive", "WSA",
562 isValOK=lambda x: x.lower() in ['wsa', 'vsa', 'osa']),
563 CLI.Argument("date", UpdateFits.dateVersStr, isOptional=True,
564 isValOK=CLI.isDateOK)
565 ]
566 CLI.progOpts += [
567 CLI.Option('d', 'database',
568 "update individual database",
569 "NAME", isValOK=lambda x: x.count('.') <= 1),
570 CLI.Option('A', 'available',
571 "check if the pixel file is available and rename filename "
572 "accordingly in Multiframe"),
573 CLI.Option('D', 'deprecate',
574 "set the deprecated flag in Multiframe and "
575 "MutiframeDetector"),
576 CLI.Option('M', 'multidbs',
577 "'server::database' where server can be 'dbload' or 'dbpub' "
578 "and database a comma-seperated list of DBs or 'all'",
579 "LIST", isValOK=lambda x: "::" in x),
580 CLI.Option('f', 'filename', "File containing old/new file path pairs",
581 "FILE"),
582 CLI.Option('k', 'newdisk', "new disk containing the data", "DISK",
583 "diskNN", isValOK=lambda x: "disk" in x and x[4:].isdigit()),
584 CLI.Option('m', 'masterdb',
585 "check against this master DB.",
586 "DB", None),
587 CLI.Option('o', 'outdir',
588 "new destination subdirectory containing FITS date "
589 "directories", "DIR", UpdateFits.outDir),
590 CLI.Option('s', 'subdir',
591 "subdirectory containing FITS date directories",
592 "DIR", UpdateFits.subDir),
593 CLI.Option('v', 'version', "version number or update from version N "
594 "to version M ('N=>M', partitions on '=>')", "VERSION",
595 '1', isValOK=lambda x: x.replace('.', '').isdigit()
596 or "=>" in x)
597 ]
598
599 cli = CLI(UpdateFits, "$Revision: 9564 $")
600 Logger.isVerbose = False
601 Logger.addMessage(cli.getProgDetails())
602 logFileName = "updateFitsFileNames_%s.log" % (time.strftime('%Y%m%d_%H%M'))
603 log = Logger(logFileName, path=os.curdir)
604
605 UpdateFits(cli).run()
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623