Package helpers :: Module UpdateWSATimestamp
[hide private]

Source Code for Module helpers.UpdateWSATimestamp

  1  #! /usr/bin/env python 
  2  #------------------------------------------------------------------------------ 
  3  #$Id: UpdateWSATimestamp.py 7245 2010-07-20 12:52:03Z RossCollins $ 
  4  """ 
  5     Update/synchronise the filetimestamps (WSATimestamp) with CASU's FITS files 
  6     (via monitoring/lsCasuDirs.py). 
  7   
  8     @author: E.T.W. Sutorius 
  9     @org:    WFAU, IfA, University of Edinburgh 
 10  """ 
 11  #------------------------------------------------------------------------------ 
 12  import dircache 
 13  import mx.DateTime as mxTime 
 14  import os 
 15  import time 
 16   
 17  from   wsatools.CLI                 import CLI 
 18  import wsatools.DbConnect.DbConstants   as dbc 
 19  from   wsatools.DbConnect.DbSession import DbSession 
 20  from   wsatools.File                import File, PickleFile 
 21  import wsatools.FitsUtils               as fits 
 22  from   wsatools.Logger              import Logger 
 23  import wsatools.DbConnect.CommonQueries as queries 
 24  import wsatools.SystemConstants         as sysc 
 25  #------------------------------------------------------------------------------ 
 26  # global vars 
 27  availPubDBs = {} 
 28  dataBasesDef = {"dbload":["WSA"], "dbpub":["all"]} 
 29  dataBases = {} 
 30  testRun = False 
 31  fitsOnly = False 
 32  dbsOnly = False 
 33  localOnly = False 
 34   
 35  #------------------------------------------------------------------------------ 
 36   
 37  CLI.progArgs.remove("database") 
 38  CLI.progOpts.remove("user") 
 39  CLI.progOpts += [CLI.Option('b', "begin", 
 40                              "first date to process, eg. 20050101", 
 41                              "DATE", '20050101'), 
 42                   CLI.Option('e', "end", 
 43                              "last date to process, eg. 20050131", 
 44                              "DATE", '20050131'), 
 45                   CLI.Option('v', "version", 
 46                              "version number of the data", 
 47                              "STR", '1'), 
 48                   CLI.Option('d', "db", 
 49                              "specifies the databases that needs updating", 
 50                              "Server::Database", 'dbload::WSA,dbpub::all'), 
 51                   CLI.Option('f', "fitsonly", 
 52                              "update FITS files only"), 
 53                   CLI.Option('s', "dbsonly", 
 54                              "update DB entries only"), 
 55                   CLI.Option('l', "localonly", 
 56                              "update FITS files from FITS key WSA_TIME only, " 
 57                              "'timestamplog' should contain FITS directory," 
 58                              "eg. /disk01/wsa/ingest/fits/20050101_v1")] 
 59  CLI.progArgs += [CLI.Argument('timestamplog','casuDirsStats.log')] 
 60   
 61  #------------------------------------------------------------------------------ 
 62   
63 -def getMfIDs(archive, dateVersStr):
64 """ 65 Read fileName, multiframeIDs from the given database. 66 67 @param archive: Connection to database to query. 68 @type archive: DbSession 69 @param dateVersStr: The datedir string, YYYYMMDD_vV. 70 @type dateVersStr: str 71 72 @return: Dict of files (dateVersStr/fileName) and their multiframeID, 73 fileTimeStamp, and catName. 74 @rtype: dict(str:list(str)) 75 76 """ 77 mfidlist = archive.query("fileName, multiframeID, fileTimeStamp, catName", 78 fromStr="Multiframe", whereStr="fileName LIKE '%"+dateVersStr+"%'") 79 80 mfIDFileDict = {} 81 for entry in mfidlist: 82 fileObj = File(entry[0]) 83 mfIDFileDict[os.path.join(fileObj.subdir, fileObj.base)] = entry 84 85 return mfIDFileDict
86 87 #------------------------------------------------------------------------------ 88
89 -def readCasuStamps(inFileName, dateVersList):
90 """ 91 Read the file information of all CASU files from a log file. 92 @param inFileName: Log containing CASU file data. 93 @type inFileName: str 94 @param dateVersList: List containing dateVersStr. 95 @type dateVersList: list(str) 96 @return: Dict of files (dateVersStr/fileName) and their 97 timestamp (date, time) 98 @rtype: dict(str:list(str)) 99 """ 100 Logger.addMessage(' '.join(["reading", inFileName, "..."])) 101 casuDirsLogFile = PickleFile(inFileName) 102 casuDirs = list(casuDirsLogFile.pickleRead())[0] 103 casuFileDict = {} 104 notAvailDateVersList = [] 105 for dateVersStr in dateVersList: 106 notAvailCount = 0 107 for disk in casuDirs: 108 format = casuDirs[disk]["FORMAT"] 109 if dateVersStr in casuDirs[disk]: 110 for fileName in casuDirs[disk][dateVersStr]: 111 dateStr = casuDirs[disk][dateVersStr][fileName][6] 112 dateStr = dateStr.replace('-', '')[2:] 113 timeStr = casuDirs[disk][dateVersStr][fileName][7] 114 timeStr = timeStr.replace(':', '').partition('.')[0] 115 casuFileDict[os.path.join(dateVersStr, fileName)] = \ 116 (dateStr, timeStr) 117 else: 118 notAvailCount += 1 119 if notAvailCount == len(casuDirs.keys()): 120 Logger.addMessage(' '.join([ 121 dateVersStr, "not available at CASU"])) 122 notAvailDateVersList.append(dateVersStr) 123 return casuFileDict, notAvailDateVersList
124 125 #------------------------------------------------------------------------------ 126
127 -def updateDBdict(dbdict, dblist):
128 """ 129 Update the dict with which dbs are found on every host. 130 @param dbdict: Dict of servers and their databases. 131 @type dbdict: dict 132 @param dblist: List of databases. 133 @type dblist: list 134 @return: Dict of servers and their databases. 135 @rtype: dict(str:list(str)) 136 """ 137 for host in sysc.publicServerHosts(): 138 dbdict[host] = [] 139 for database in dblist: 140 if database in availPubDBs[host]: 141 dbdict[host].append(database) 142 elif database == "all": 143 dbdict[host] = availPubDBs[host][:] 144 return dbdict
145 146 #------------------------------------------------------------------------------ 147
148 -def updateFitsStamp(fileName, timeStamp):
149 """ 150 Update the FITS file timestamp accordingly. 151 @param fileName: FITS file to be updated. 152 @type fileName: str 153 @param timeStamp: WSA timestamp (datedir+filetimestamp). 154 @type timeStamp: long 155 """ 156 # if file was modified, set the modification date back 157 # to the original one 158 timetpl = time.strptime(timeStamp[6:], '%y%m%d%H%M%S') 159 modtime = time.mktime(timetpl) 160 os.utime(fileName, (time.time(), modtime))
161 162 #------------------------------------------------------------------------------ 163 # Entry point for script. 164 if __name__ == '__main__': 165 cli = CLI("UpdateWSATimestamp", "$Revision: 7245 $", __doc__) 166 Logger.setEchoOn() 167 Logger.addMessage(cli.getProgDetails()) 168 169 beginDate = int(cli.getOpt("begin")) 170 endDate = int(cli.getOpt("end")) 171 startMJD = int(mxTime.strptime(str(beginDate), "%Y%m%d").mjd) 172 endMJD = int(mxTime.strptime(str(endDate), "%Y%m%d").mjd) 173 versionNum = cli.getOpt("version") 174 testRun = cli.getOpt("test") 175 fitsOnly = cli.getOpt("fitsonly") 176 dbsOnly = cli.getOpt("dbsonly") 177 localOnly = cli.getOpt("localonly") 178 dateVersStrList = [] 179 for mjd in xrange(startMJD, endMJD+1): 180 date = mxTime.DateTimeFromMJD(mjd) 181 dateStr = ''.join(["%04d" % (date.year), 182 "%02d" % (date.month), 183 "%02d" % (date.day)]) 184 dateVersStr = dateStr + "_v%s" % (versionNum) 185 dateVersStrList.append(dateVersStr) 186 187 if "::" not in cli.getOpt("db"): 188 print "server ('dbload' or 'dbpub') or database not specified." 189 raise SystemExit 190 else: 191 servDbs = cli.getOpt("db").split(',') 192 dataBasesDef = {} 193 for servDb in servDbs: 194 server, databases = servDb.split("::") 195 dblist = databases.split(',') 196 dataBasesDef[server] = dblist 197 198 # get the CASU timestamps from the file 199 if localOnly: 200 timestampLog = File(cli.getArg('timestamplog')) 201 timestampLog.ropen() 202 dirList = timestampLog.readlines() 203 timestampLog.close() 204 fileDict = {} 205 for directory in dirList: 206 Logger.addMessage("Updating %s..." % directory) 207 for fileName in dircache.listdir(directory): 208 if fileName.endswith(".fit"): 209 filePath = os.path.join(directory, fileName) 210 newTimeStamp = fits.open(filePath)[0].header["WSA_TIME"] 211 if testRun: 212 print "Updating:", filePath 213 print "with", newTimeStamp 214 else: 215 updateFitsStamp(filePath, newTimeStamp) 216 Logger.addMessage("Finished FITS file timestamp update.") 217 else: 218 Logger.addMessage("Reading in the CASU timestamps") 219 casuStampsDict, notAvailDateVersList= readCasuStamps( 220 cli.getArg('timestamplog'), dateVersStrList) 221 availDateVersStrList = \ 222 sorted(set(dateVersStrList).difference(notAvailDateVersList)) 223 if not availDateVersStrList: 224 Logger.addMessage(' '.join([ 225 "No data available from", str(beginDate), "to", 226 str(endDate), "version", versionNum])) 227 raise SystemExit 228 229 # get the DBs from all public servers 230 for server in dataBasesDef: 231 if server not in ["dbload"]: 232 for hostname in sysc.publicServerHosts(): 233 availPubDBs[hostname] = queries.getAllDBs(hostname) 234 235 if server == "dbload": 236 dblist = dataBasesDef[server] 237 dataBases[sysc.loadServerHost()] = dblist 238 elif server == "dbpub": 239 dblist = dataBasesDef[server] 240 dataBases = updateDBdict(dataBases, dblist) 241 else: 242 print server," not known." 243 raise SystemExit 244 245 # Get MultiframeIDs and filenames 246 Logger.addMessage( 247 "Getting multiframeIDs from %s to %s version %s from WSA" % 248 (beginDate, endDate, versionNum)) 249 250 archive = DbSession() 251 mfIDDict = dict((dateVersStr, getMfIDs(archive, dateVersStr)) 252 for dateVersStr in availDateVersStrList) 253 del archive 254 255 # Update database fileTimeStamp entries 256 if not fitsOnly and not localOnly: 257 for hostname in dataBases: 258 for db in dataBases[hostname]: 259 archive = DbSession(hostname+'.'+db, 260 userName=dbc.loadServerRwUsername(), isTrialRun=testRun) 261 262 for dateVersStr in availDateVersStrList: 263 Logger.addMessage( 264 "Updating fileTimeStamp for " + dateVersStr) 265 266 for fileName in mfIDDict[dateVersStr]: 267 if fileName in casuStampsDict: 268 newTimeStamp = (os.path.dirname(fileName)[2:8] + 269 casuStampsDict[fileName][0] + 270 casuStampsDict[fileName][1]) 271 mfID = mfIDDict[dateVersStr][fileName][1] 272 archive.update("Multiframe", 273 "fileTimeStamp=%s" % newTimeStamp, 274 where="multiframeID=%s" % mfID) 275 276 del archive 277 278 Logger.addMessage("Finished database fileTimeStamp update.") 279 280 # Update FITS file entries and timestamps 281 if not dbsOnly and not localOnly: 282 for dateVersStr in availDateVersStrList: 283 Logger.addMessage(' '.join([ 284 "Updating FITS file timestamps for", dateVersStr])) 285 for fileName in mfIDDict[dateVersStr]: 286 if fileName in casuStampsDict: 287 if not "PixelFileNoLongerAvailable:" \ 288 in mfIDDict[dateVersStr][fileName][0]: 289 fullFilePath = mfIDDict[dateVersStr][ 290 fileName][0].rpartition(':')[2] 291 newTimeStamp = ''.join([ 292 os.path.dirname(fileName)[2:8], 293 casuStampsDict[fileName][0], 294 casuStampsDict[fileName][1]]) 295 fullCatPath = mfIDDict[dateVersStr][ 296 fileName][3].rpartition(':')[2] 297 catName = os.path.join( 298 os.path.basename(os.path.dirname(fullCatPath)), 299 os.path.basename(fullCatPath)) 300 if "FitsIO/empty_catalogue.fits" not in fullCatPath: 301 newCatTimeStamp = ''.join([ 302 os.path.dirname(catName)[2:8], 303 casuStampsDict[catName][0], 304 casuStampsDict[catName][1]]) 305 else: 306 newCatTimeStamp = '' 307 if testRun: 308 print "Updating:", fullFilePath, 309 print "with", newTimeStamp 310 print fileName,">",casuStampsDict[fileName] 311 if "FitsIO/empty_catalogue.fits" \ 312 not in fullCatPath: 313 print "Updating:", fullCatPath, "with", 314 print newCatTimeStamp 315 else: 316 # open file in update mode and write wsaTimestamp 317 fits.writeToFitsHdu( 318 fits.open(fullFilePath, "update"), (0,), 319 "WSA_TIME", newTimeStamp, "WSA time stamp", 320 redoing=True) 321 322 updateFitsStamp(fullFilePath, newTimeStamp) 323 if "FitsIO/empty_catalogue.fits" \ 324 not in fullCatPath: 325 updateFitsStamp(fullCatPath, newCatTimeStamp) 326 else: 327 Logger.addMessage(' '.join( 328 [fileName, "not available at WFAU"])) 329 else: 330 Logger.addMessage(' '.join( 331 [fileName, "not available at CASU"])) 332 Logger.addMessage("Finished FITS file update.") 333 334 #------------------------------------------------------------------------------ 335 # Change log: 336 # 337 # 17-Jan-2008, ETWS: First version. 338 # 23-Jan-2008, ETWS: Fixed for files that are not available anymore. 339 # 24-Jan-2008, ETWS: Included possibility to only update the DBs. 340 # 8-Jan-2008, ETWS: Included possibility to only update FITS files from 341 # its WSA_TIME keyword. 342