Package wsatools :: Package DbConnect :: Module IngIngester
[hide private]

Source Code for Module wsatools.DbConnect.IngIngester

   1  #! /usr/bin/env python 
   2  #------------------------------------------------------------------------------ 
   3  #$Id: IngIngester.py 10204 2014-01-27 11:55:39Z EckhardSutorius $ 
   4  """ 
   5     Ingester for CU 1 to 4. Contains classes needed to ingest data and curation 
   6     history metadata into the database. 
   7   
   8     @author: Eckhard Sutorius 
   9     @org:    WFAU, IfA, University of Edinburgh 
  10   
  11     @newfield contributors: Contributors, Contributors (Alphabetical Order) 
  12     @contributors: R.S. Collins 
  13  """ 
  14  #------------------------------------------------------------------------------ 
  15  from   collections import defaultdict 
  16  import dircache 
  17  import inspect 
  18  import os 
  19   
  20  from   wsatools.CLI                    import CLI 
  21  import wsatools.DbConnect.DbConstants      as dbc 
  22  from   wsatools.DbConnect.DbSession    import DbSession, Ingester, Join, \ 
  23                                                odbc, SelectSQL 
  24  from   wsatools.DbConnect.IngCuSession import IngCuSession 
  25  import wsatools.DbConnect.Schema           as schema 
  26  from   wsatools.File                   import File, PickleFile 
  27  import wsatools.FitsUtils                  as fits 
  28  from   wsatools.Logger                 import Logger 
  29  from   wsatools.SystemConstants        import DepCodes 
  30  import wsatools.Utilities                  as utils 
  31   
  32  from invocations.monitoring.CreateMfIdJpegList import CreateMfIdJpegList 
  33  from helpers.UpdateJpegs import UpdateJpegs 
34 #------------------------------------------------------------------------------ 35 36 -class IngestLogger(Logger):
37 """ 38 A logger class based on Logger, but appends to the log on __del__ instead 39 of writing. 40 41 @note: Do not set the isVerbose option of this class to False, otherwise 42 old logs will be overwritten and the last log doubled up. This will 43 change once there is a writeMode argument. 44 45 @todo: This class will become obsolete, probably by giving Logger a 46 writeMode argument to the constructor, with a default of 'w'. Need 47 to be sure that the log reset is OK too. If class stays it will be 48 renamed and moved into Logger.py. 49 """
50 - def __del__(self):
51 """ Appends all messages that have been currently logged. 52 """ 53 try: 54 Logger.dump(file(Logger.pathName, 'a')) 55 except IOError as error: 56 # Write to home-dir if log directory is unavailable 57 self._revertToHomePath(error) 58 Logger.dump(file(Logger.pathName, 'a'))
59 60 @staticmethod
61 - def append(fileName, reset=True):
62 Logger.dump(file(fileName, 'a')) 63 if reset: 64 Logger.reset()
65 66 @staticmethod
67 - def addText(message, alwaysLog=True):
68 """ 69 Log a message without timestamp. This is displayed to stdout if 70 in "echoOn" mode, and also written to the log file if not in 71 "isVerbose" mode. 72 73 @param message: The message to log. 74 @type message: str 75 @param alwaysLog: If False, then this message will only be logged when 76 isVerbose is True. 77 @type alwaysLog: bool 78 79 """ 80 if Logger.isVerbose or alwaysLog: 81 utils.WordWrapper.indent = 2 82 Logger._logText.append("%s\n" % message) 83 if Logger.echoOn: 84 print("%s" % utils.WordWrapper.wrap(message)) 85 # Update log file if defined and the log isn't verbose 86 if not Logger.isVerbose and Logger.pathName: 87 try: 88 file(Logger.pathName, 'a').writelines(Logger._logText[-1]) 89 except IOError: 90 # If there's an error do nothing now, the log is still 91 # written upon destruction 92 pass
93
94 #------------------------------------------------------------------------------ 95 96 -class IngestLogFile(PickleFile):
97 """ 98 Logfile holding the information to ingest csv/binary data and to 99 update the curation histories accordingly. 100 101 """
102 - def __init__(self, fileName):
103 """Initialise. 104 @param fileName: Name of the ingest log file. 105 @type fileName: str 106 107 """ 108 super(IngestLogFile, self).__init__(fileName) 109 110 self.prefix = os.path.splitext(os.path.basename(fileName))[0] 111 self.readFilePrefix() 112 self.commonPrefix = "cu%02did%d_%s" % (self.cuNum, self.cuEventID, 113 self.hostName)
114 115 #-------------------------------------------------------------------------- 116
117 - def readFilePrefix(self):
118 """Get CU number and eventID from the file prefix. 119 """ 120 cuEvent, hostName, self.database = self.prefix.split('_', 2) 121 self.hostName, self.runNo = hostName.partition('-')[::2] 122 if '-' in self.runNo: 123 self.runNo, self.ingMonth = self.runNo.partition('-')[::2] 124 else: 125 self.ingMonth = None 126 self.cuNum, self.cuEventID = map(int, cuEvent[2:].partition("id")[::2])
127
128 #------------------------------------------------------------------------------ 129 130 -class CuIngest(IngCuSession):
131 """ Ingests products of parallel CU runs by DataBuilder. 132 """
133 - def __init__(self, 134 curator=CLI.getOptDef("curator"), 135 database=DbSession.database, 136 isTrialRun=DbSession.isTrialRun, 137 logFileName=CLI.getOptDef("readlog"), 138 ingestOneLog=CLI.getOptDef("ingestlog"), 139 cuNums=CLI.getOptDef("cunums"), 140 forceIngest=CLI.getOptDef("forceingest"), 141 autoCommit=CLI.getOptDef("autocommit"), 142 excludedTables=CLI.getOptDef("excluded"), 143 omitObjIDUpdate=CLI.getOptDef("omitobjid"), 144 isParallelRun=CLI.getOptDef("isparallel"), 145 comment=CLI.getArgDef("comment")):
146 """ 147 @param curator: Name of curator. 148 @type curator: str 149 @param comment: Descriptive comment as to why curation task is 150 being performed. 151 @type comment: str 152 @param database: Name of the database to connect to. 153 @type database: str 154 @param forceIngest: If True and ingestOneLog is True, ingest log file. 155 @type forceIngest: bool 156 @param logFileName: Full path name of a log file to be read. Excludes 157 ingest. 158 @type logFileName: str 159 @param ingestOneLog: If True, ingest log file given by logFileName. 160 @type ingestOneLog: bool 161 @param cuNums: Curation task numbers. 162 @type cuNums: list[int] 163 @param autoCommit: If True, do not auto-commit transactions. This 164 speeds up transactions through minimal logging. 165 @type autoCommit: bool 166 @param excludedTables: Exclude these tables from ingest. 167 @type excludedTables: list(str) 168 @param omitObjIDUpdate: If True, don't update objIDs. 169 @type omitObjIDUpdate: bool 170 @param isTrialRun: If True, do not perform database modifications. 171 @type isTrialRun: bool 172 173 """ 174 typeTranslation = {"curator":str, 175 "database":str, 176 "cuNums":list, 177 "ingestOneLog":bool, 178 "forceIngest":bool, 179 "autoCommit":bool, 180 "excludedTables":list, 181 "omitObjIDUpdate":bool, 182 "isParallelRun":bool, 183 "isTrialRun":bool, 184 "comment":str} 185 186 self.logFile = (IngestLogFile(logFileName) if logFileName else None) 187 188 super(CuIngest, self).attributesFromArguments( 189 inspect.getargspec(CuIngest.__init__)[0], locals(), 190 types=typeTranslation) 191 192 lockPathName = os.path.join(os.getenv('HOME'), 193 "dblock-" + self.database) 194 if self.isParallelRun and os.path.exists(lockPathName): 195 os.remove(lockPathName) 196 197 # Initialise parent class 198 super(CuIngest, self).__init__(cuNum=0, 199 curator=self.curator, 200 comment=self.comment, 201 reqWorkDir=False, 202 keepWorkDir=False, 203 database=self.database, 204 autoCommit=self.autoCommit, 205 isTrialRun=self.isTrialRun) 206 207 self.excludedTables = [name.lower() for name in self.excludedTables] 208 209 self._server = self.database.rpartition('.')[0] or self.sysc.loadServer 210 self._database = self.database.rpartition('.')[2] 211 212 self.cuNums = self._parseCuNums(self.cuNums) 213 214 self._ingestedLogsPath = self.sysc.ingestedLogsPath(self.dbMntPath) 215 utils.ensureDirExist(self._ingestedLogsPath) 216 217 for server in self.sysc.curationServers: 218 _ingestedFilesPath = self.sysc.ingestedFilesPath( 219 self.sysc.dbSharePath(serverName=server)) 220 if os.getenv("HOST") in _ingestedFilesPath: 221 utils.ensureDirExist(_ingestedFilesPath) 222 else: 223 cmd = self.sysc.privNetCmd(server, 224 "ls -d %s" % _ingestedFilesPath) 225 messages = self.runSysCmd(cmd)[1] 226 if messages and "No such file or directory" in messages[0]: 227 cmd = self.sysc.privNetCmd(server, 228 "mkdir %s" % _ingestedFilesPath) 229 try: 230 self.runSysCmd(cmd) 231 except Exception as error: 232 print(error)
233 234 #-------------------------------------------------------------------------- 235
236 - def __del__(self):
237 try: 238 IngestLogger.dump(file(self._loggerFile.pathName, 'a')) 239 except: 240 pass
241 242 #-------------------------------------------------------------------------- 243
244 - def _onRun(self):
245 """ Ingest data. 246 """ 247 lockFile = self._createLock() 248 try: 249 ingestLogList = [] 250 if self.logFile: 251 ingestLogList.append(self.logFile) 252 elif self.cuNums: 253 ingestLogs = self.inspectInDir() 254 if ingestLogs: 255 for cuNum in self.cuNums: 256 ingestLogList.extend(logFile for logFile in ingestLogs 257 if all(x in logFile.name for x in cuNum)) 258 else: 259 IngestLogger.addMessage("No files to ingest.") 260 261 if ingestLogList: 262 for logFile in ingestLogList: 263 try: 264 if self.ingestOneLog: 265 self.parseIngest(logFile) 266 else: 267 IngestLogger.addMessage( 268 os.path.basename(logFile.name) + ":") 269 self.readIngLog(logFile) 270 except IOError: 271 IngestLogger.addMessage("No such file " + logFile.name) 272 273 elif self.cuNums: 274 IngestLogger.addMessage( 275 "No files to ingest for CUs %r" % self.cuNums) 276 277 else: # Ingest everything for this database 278 self.parseIngest() 279 280 finally: 281 lockFile.remove()
282 283 #-------------------------------------------------------------------------- 284
285 - def _createLock(self):
286 """ 287 Create a lock, writing info to sysLog. If lock exists, abort. 288 289 @return: Lock file object. 290 @rtype: File.File 291 292 """ 293 utils.ensureDirExist(self.sysc.sysPath) 294 if self.logFile and self.logFile.ingMonth: 295 fileName = 'ingest_%s.%s_%s_lock' % (self._server, self._database, 296 self.logFile.ingMonth) 297 else: 298 fileName = 'ingest_%s.%s_lock' % (self._server, self._database) 299 lockFile = File(os.path.join(self.sysc.sysPath, fileName)) 300 lockTime = utils.makeTimeStamp() 301 if lockFile.exists(): 302 lockFile.ropen() 303 message = lockFile.readline() 304 IngestLogger.addMessage("<ERROR> A lock file exists in %s at %s" % 305 (lockFile.path, message)) 306 raise SystemExit 307 else: 308 lockFile.wopen() 309 lockFile.writeline('%s %s [%d]\n' % 310 (lockTime, self._hostName, os.getpid())) 311 lockFile.close() 312 return lockFile
313 314 #-------------------------------------------------------------------------- 315
316 - def ingestCU2(self, ingLogFile, histInfo, _ingestOrder, _fileInfo):
317 """ Ingest CU2 data. 318 """ 319 beginDate, endDate, versStr, subDir = histInfo["DCH"][:4] 320 jpgPath = (histInfo["DCH"][4] if len(histInfo["DCH"]) > 4 else None) 321 322 histInfo["DCH"] = self.selectFiles(subDir, beginDate, endDate) 323 324 # create list of mfids and jpgs 325 CreateMfIdJpegList(database=self.database, 326 outPath=self.sysc.monitorPath(), 327 subDir=subDir, jpgPath=jpgPath, 328 beginDate=str(beginDate), endDate=str(endDate), 329 versionStr=versStr).run() 330 331 # the MfIdJpegList file name 332 jpgidFileName = os.path.join(self.sysc.monitorPath(), 333 "jpgs_%s_%s_%s_%s.log" 334 % (ingLogFile.database, beginDate, endDate, versStr)) 335 336 # Update JPEGs in database 337 self._noData = not os.path.exists(jpgidFileName) 338 if self._noData: 339 print("No data available for %s to %s" % (beginDate, endDate)) 340 return 341 342 self._noDBData = os.path.getsize(jpgidFileName) == 0 343 if self._noDBData: 344 print("No data found in DB for %s to %s" % (beginDate, endDate)) 345 return 346 347 servers = ("dbload" if int(self.sysc.loadServer[-1]) in [1, 2] else 348 "dbpub") 349 350 UpdateJpegs(archive=self._database, 351 database='%s.%s' % (self._server, self._database), 352 multiDBs="::".join([servers, self._database]), 353 fromJpegList=True, updateFileName=jpgidFileName).run()
354 355 #-------------------------------------------------------------------------- 356
357 - def ingestCU3(self, ingLogFile, histInfo, ingestOrder, fileInfo):
358 """ Ingest CU3 data. 359 """ 360 ingestDate = os.path.basename(histInfo["DCH"][0]) 361 # ingest the csv file 362 self.ingest(ingestOrder, ingLogFile.hostName, fileInfo, ingestDate, 363 isCu3=True) 364 365 # deprecate old frames 366 self.deprecateOldFrames() 367 368 # re-allocate DXS SV frames to DXS 369 if any(entry[0] == 14 for entry in histInfo["PCH"]): 370 self.reAllocateDXSSVFrames() 371 372 self.removeProductProcessingLines() 373 # set the newly ingested flag 374 self.setNewlyIngested(ingLogFile.cuEventID)
375 376 #-------------------------------------------------------------------------- 377
378 - def ingestCU4(self, ingLogFile, histInfo, ingestOrder, fileInfo):
379 """ Ingest CU4 data. 380 """ 381 ingestDate = os.path.basename(histInfo["DCH"][0]) 382 # ingest the binary file and update the objectIDs 383 self.ingest(ingestOrder, ingLogFile.hostName, fileInfo, ingestDate, 384 isCu3=False, cuEventID=ingLogFile.cuEventID)
385 386 #-------------------------------------------------------------------------- 387
388 - def checkCUEDFile(self, ingLogFile, histInfo):
389 """ Check flag file to say that the data are already ingested. 390 """ 391 for dateDirPath in histInfo["DCH"]: 392 cuedFileName = "CU%02dED_%s" % ( 393 ingLogFile.cuNum, ingLogFile.database) 394 cuedFilePath = os.path.join(dateDirPath, cuedFileName) 395 if os.path.exists(cuedFilePath): 396 if self.ingestOneLog and \ 397 (self.forceIngest or ingLogFile.runNo > '1'): 398 os.remove(cuedFilePath) 399 else: 400 raise CuIngest.IngCuError("Data already ingested! " 401 "%r exist in %r" % (cuedFileName, dateDirPath))
402 403 #-------------------------------------------------------------------------- 404
405 - def tryIngest(self, ingestCU, ingLogFile, histInfo, ingestOrder=None, 406 fileInfo=None, enableRollback=True):
407 """ General ingest function for any CU. 408 """ 409 isNotCu2 = enableRollback 410 status = dbc.no() 411 rolledBack = dbc.yes() 412 try: 413 if isNotCu2: 414 self.checkCUEDFile(ingLogFile, histInfo) 415 416 ingestCU(ingLogFile, histInfo, ingestOrder, fileInfo) 417 418 if enableRollback: 419 self.archive.commitTransaction() 420 421 except Exception as error: 422 IngestLogger.addExceptionDetails(error) 423 if enableRollback: 424 self.archive.rollbackTransaction() 425 426 status = dbc.no() 427 rolledBack = dbc.yes() 428 429 else: 430 status = dbc.yes() 431 rolledBack = dbc.no() 432 433 self.updateHistory(histInfo, status, rolledBack) 434 435 return rolledBack
436 437 #-------------------------------------------------------------------------- 438
439 - def updateCUEDFile(self, ingLogFile, histInfo):
440 """ Update CUed file. 441 """ 442 cmd = self.makeSysCmd( 443 ingLogFile.hostName, "ls %s" % self.sysc.dbSharePath( 444 '', ingLogFile.hostName)) 445 fileList = self.runSysCmd(cmd)[0] 446 if not self._noData and os.getenv('USER') == 'scos' \ 447 and not any(ingLogFile.commonPrefix in logName 448 for logName in fileList): 449 for dateDirPath in histInfo["DCH"]: 450 cuedFile = os.path.join( 451 dateDirPath, 452 "CU%02dED_%s" % (ingLogFile.cuNum, ingLogFile.database)) 453 454 file(cuedFile, 'w').write(utils.makeTimeStamp() + '\n')
455 456 #-------------------------------------------------------------------------- 457
458 - def ingestData(self, ingLogFile, ingestOrder, fileInfo, histInfo):
459 """ Ingest all files described in given log file. 460 """ 461 self._ingestedFiles = [] 462 self._noData = False 463 self._noDBData = False 464 rolledBack = dbc.yes() 465 self._connectToDb() 466 try: 467 if ingLogFile.cuNum == 1: # update the curation histories 468 status = histInfo["PCH"][0][0] 469 histInfo["PCH"] = [] 470 rolledBack = dbc.no() 471 self.updateHistory(histInfo, status, rolledBack) 472 473 elif ingLogFile.cuNum == 2: 474 rolledBack = self.tryIngest(self.ingestCU2, ingLogFile, 475 histInfo, enableRollback=False) 476 477 elif ingLogFile.cuNum == 3: 478 rolledBack = self.tryIngest(self.ingestCU3, ingLogFile, 479 histInfo, ingestOrder, fileInfo) 480 481 elif ingLogFile.cuNum == 4: 482 rolledBack = self.tryIngest(self.ingestCU4, ingLogFile, 483 histInfo, ingestOrder, fileInfo) 484 485 finally: 486 # move the ingest log out of the way, so it wouldn't be picked up 487 # the next time 488 if not rolledBack: 489 if self._noDBData and ingLogFile.cuNum == 2: 490 os.rename(ingLogFile.name, ingLogFile.name + ".pending") 491 492 else: # mv into ingestedLogs dir 493 os.rename(ingLogFile.name, 494 os.path.join(self._ingestedLogsPath, 495 ingLogFile.base)) 496 497 self._ingestedFilesPath = self.sysc.ingestedFilesPath( 498 self.sysc.dbSharePath(serverName=ingLogFile.hostName)) 499 500 for fileName in self._ingestedFiles: 501 newPath = os.path.join(self._ingestedFilesPath, 502 os.path.basename(fileName)) 503 cmd = self.makeSysCmd( 504 ingLogFile.hostName, "mv %s %s" % ( 505 self.sysc.dbSharePath(fileName), newPath)) 506 self.runSysCmd(cmd)[0] 507 self.updateCUEDFile(ingLogFile, histInfo) 508 509 else: 510 os.rename(ingLogFile.name, ingLogFile.name + ".failed") 511 for fileName in self._ingestedFiles: 512 filePath = self.sysc.dbSharePath(fileName) 513 if self.spaceOnFileShare(ingLogFile.hostName): 514 cmd = self.makeSysCmd( 515 ingLogFile.hostName, "mv %s %s" % ( 516 filePath, filePath.replace(".rename", ''))) 517 self.runSysCmd(cmd)[0] 518 else: 519 IngestLogger.addMessage("<WARNING> No space left on " 520 "%s; data will be deleted." % self.sysc.dbSharePath()) 521 522 cmd = self.makeSysCmd( 523 ingLogFile.hostName, "rm -f %s" % (filePath)) 524 self.runSysCmd(cmd)[0] 525 os.remove(filePath) 526 527 print("disconnecting") 528 self._disconnectFromDb()
529 530 #-------------------------------------------------------------------------- 531
532 - def parseIngest(self, logFile=None):
533 """ Do the appropriate ingest for the different CUs. 534 """ 535 # is data available? 536 ingestLogs = self.inspectInDir() 537 if (not ingestLogs and not logFile) \ 538 or (self.ingestOneLog and logFile and \ 539 not any(logFile.name == inFile.name for inFile in ingestLogs)): 540 IngestLogger.addMessage("No files available for ingest into %s." 541 % self.database) 542 return 543 544 if self.ingestOneLog: 545 ingestLogs = [logFile] 546 547 IngestLogger.addMessage("The following logs are to be ingested: %s" % 548 ','.join([ingLog.name for ingLog in ingestLogs])) 549 550 # create separate logs for each partial ingest 551 ingCuEventIDs = [] 552 for ingLogFile in ingestLogs: 553 ingCuEventIDs.append(ingLogFile.cuEventID) 554 ingestOrder, fileInfo, histInfo = list(ingLogFile.pickleRead()) 555 cuLogFileName = histInfo["ACH"][0][2] 556 IngestLogger.dump(file(cuLogFileName, 'a')) 557 558 # create an overall cu0 logfile 559 self._loggerFile = IngestLogger(self.createLogFileName(ingCuEventIDs)) 560 IngestLogger.dump(file(self._loggerFile.pathName, 'a')) 561 562 for ingLogFile in ingestLogs: 563 IngestLogger.reset() 564 ingestOrder, fileInfo, histInfo = list(ingLogFile.pickleRead()) 565 IngestLogger.addMessage("Ingesting for CU%s, CuEventID %s" 566 % (ingLogFile.cuNum, ingLogFile.cuEventID)) 567 IngestLogger.addMessage("from " + ingLogFile.name) 568 569 if self.excludedTables: 570 ingestOrder = [table for table in ingestOrder 571 if table.lower() not in self.excludedTables] 572 573 # check if files are available 574 if ingLogFile.cuNum == 3: 575 cmd = self.makeSysCmd( 576 ingLogFile.hostName, "ls %s*" % self.sysc.dbSharePath( 577 ingLogFile.commonPrefix, ingLogFile.hostName)) 578 fileList = self.runSysCmd(cmd)[0] 579 if not all(any(x in y for y in fileList) for x in ingestOrder): 580 IngestLogger.addMessage( 581 "Not all files available for ingest!") 582 os.rename(ingLogFile.name, ingLogFile.name + ".failed") 583 break 584 585 self.ingestData(ingLogFile, ingestOrder, fileInfo, histInfo) 586 587 # Append to the cu's log file 588 cuLogFileName = histInfo["ACH"][0][2] 589 IngestLogger.dump(file(cuLogFileName, 'a')) 590 IngestLogger.dump(file(self._loggerFile.pathName, 'a')) 591 IngestLogger.reset()
592 593 #-------------------------------------------------------------------------- 594
595 - def deprecateOldFrames(self):
596 """ 597 Deprecate old versions of frames that are reingested in Multiframe. 598 599 @todo: This function will eventually contain the contents of the SQL 600 fragment, as a DB API update method. 601 """ 602 # @TODO: get rid of this sqlFragment... 603 IngestLogger.addMessage("Deprecating reprocessed multiframes...") 604 code = str(DepCodes.reprocCASU) 605 fragmentPath = \ 606 os.path.join( 607 self.sysc.sqlFragmentPath(), '%s_DeprecateOldFrames.sql' % 608 self.sysc.loadDatabase) 609 sql = file(fragmentPath).read().replace('@mask', code) 610 611 IngestLogger.addMessage("No. of multiframes deprecated: %s" % 612 self.archive._executeScript(sql, wantRowCount=True)) 613 614 IngestLogger.addMessage( 615 "Propagating deprecations to MultiframeDetector...") 616 617 num = self.archive.update("MultiframeDetector", 618 "MultiframeDetector.deprecated=MultiframeDetector.deprecated+" + code, 619 where="MultiframeDetector.multiframeID=M.multiframeID AND " 620 "MultiframeDetector.deprecated<%s AND M.deprecated>=%s" % 621 (code, code), 622 fromTables="MultiframeDetector, Multiframe AS M") 623 624 IngestLogger.addMessage("No. of frames deprecated: %s" % num) 625 626 if not self.sysc.isOSA(): 627 IngestLogger.addMessage("Setting unfilteredID entries...") 628 numMFs, numMFDs = self.setUnfilteredIDs(code) 629 IngestLogger.addMessage("No. of multiframes/detectors deprecated: " 630 "%s/%s" % (numMFs, numMFDs))
631 632 #-------------------------------------------------------------------------- 633
634 - def setUnfilteredIDs(self, deprCode):
635 """ 636 Set unfilteredID entries. 637 638 @param deprCode: Deprecation code. 639 @type deprCode: str 640 @return: Number of updated entries in Multiframe and MultiframeDetector. 641 @rtype: int, int 642 643 """ 644 filteredPairs = SelectSQL( 645 select="F.multiframeID AS filteredID, " 646 "U.multiframeID AS unfilteredID", 647 table="Multiframe AS F, Multiframe AS U", 648 where="U.frameType NOT LIKE 'filt%' AND (U.frameType LIKE " 649 "'%stackconf' AND F.frameType LIKE 'filt%stackconf' OR " 650 "U.frameType LIKE '%stack' AND F.frameType LIKE 'filt%stack')" 651 " AND U.obsNum=F.obsNum AND U.utDate=F.utDate AND " + 652 "U.deprecated<%s" % deprCode) 653 654 num = self.archive.update("Multiframe", "unfilteredID=T.unfilteredID", 655 where="Multiframe.unfilteredID=%s AND multiframeID=T.filteredID" 656 % dbc.intDefault(), 657 fromTables="(%s) AS T" % filteredPairs) 658 659 IngestLogger.addMessage("No. of new filtered multiframes: %s" % num) 660 661 IngestLogger.addMessage("Deprecating unfiltered multiframes...") 662 663 unfilteredIDs = SelectSQL("unfilteredID", "Multiframe", 664 where="unfilteredID!=%s" % dbc.intDefault()) 665 666 numMFs = self.archive.update("Multiframe", 667 entryList="deprecated=%s" % DepCodes.reprocAltn, 668 where="deprecated=%s AND multiframeID IN (%s)" 669 % (DepCodes.nonDep, unfilteredIDs)) 670 671 numMFDs = self.archive.update("MultiframeDetector", 672 entryList="deprecated=%s" % DepCodes.reprocAltn, 673 where="deprecated=%s AND multiframeID IN (%s)" 674 % (DepCodes.nonDep, unfilteredIDs)) 675 676 return numMFs, numMFDs
677 678 #-------------------------------------------------------------------------- 679
680 - def ingest(self, ingestOrder, ingestHost, fileInfo, ingestDate, 681 isCu3=True, cuEventID=0):
682 """ 683 Ingest the data file. 684 685 @param ingestOrder: List of table names, giving order in which ingest 686 occurs. 687 @type ingestOrder: list 688 @param fileInfo: Dictionary containing schema and ingest file names. 689 @type fileInfo: dict(str: dict(str: str)) 690 @param ingestDate: Date of ingested data. 691 @type ingestDate: str 692 @param isCu3: If True, expects a CU3 ingest of a CSV file, else 693 expects a CU4 binary file ingest, where indices 694 must remain in place for objID updates. 695 @type isCu3: bool 696 @param cuEventID: The cuEventID of the ingested data. 697 @type cuEventID: int 698 699 """ 700 IngestLogger.addMessage("Parsing schemas") 701 tableSchema = defaultdict(list) 702 for tableName in set(ingestOrder): 703 tableSchema[fileInfo[tableName]["schema"]].append(tableName) 704 705 ingestSchema = [] 706 for scriptName in tableSchema: 707 if scriptName.startswith("%sNS" % self.sysc.loadDatabase): 708 script = os.path.join("NonSurvey", scriptName) 709 elif "MonthlySQL" in scriptName: 710 script = os.path.join(self.dbMntPath, scriptName) 711 else: 712 script = scriptName 713 ingestSchema += schema.parseTables(script, tableSchema[scriptName]) 714 715 try: 716 ingester = Ingester(self.archive, ingestSchema) 717 except schema.MismatchError as error: 718 raise IngCuSession.IngCuError(error) 719 720 # Update indices for all tables prior to the ingest of each table to 721 # ensure rollback of ingest can occur. 722 IngestLogger.addMessage(("Dropping" if isCu3 else "Creating") + 723 " ingest table indices") 724 725 idxInfo = schema.parseIndices(self.sysc.indexScript) 726 idxInfo.update(schema.parseIndices(self.sysc.nsIndexScript)) 727 if any("vvv" in tableName for tableName in ingestOrder): 728 self.checkVVVIndices() 729 idxInfo.update(schema.parseIndices(self.sysc.sqlMonthlyDetPath( 730 "VSAVVV_Indices.sql"))) 731 732 for tableName in ingestOrder: 733 if isCu3: # Drop any indices lying around for efficient ingest 734 self.archive.dropObjects(idxInfo[tableName]) 735 else: 736 # CU4 needs to keep indices in place for efficient objID update 737 self.archive.createObjects(idxInfo[tableName]) 738 # drop constraints on VVV tables 739 if "vvv" in tableName: 740 self.dropVVVConstraints(tableName) 741 742 self.archive.commitTransaction() # Otherwise we'll be deadlocked. 743 744 # Ingest! 745 IngestLogger.addMessage("Ingesting %s:" % ingestDate) 746 for tableName in ingestOrder: 747 if self.isTrialRun: 748 print("ingesting %s here" % tableName) 749 else: 750 ingFileName = self.sysc.dbSharePath( 751 fileInfo[tableName]["ingfile"], ingestHost) 752 try: 753 ingester.ingestTable( 754 tableName, ingFileName, isCsv=isCu3, 755 deleteFile=False, isOrdered=not isCu3) 756 757 except OSError: 758 IngestLogger.addMessage(ingFileName + " not found.") 759 raise 760 761 self._ingestedFiles.append(ingester.fileOnShare) 762 763 if cuEventID and "DetectionRaw" in tableName \ 764 and not (self.omitObjIDUpdate and "vvv" in tableName): 765 self.updateObjIDs(tableName, cuEventID)
766 767 #-------------------------------------------------------------------------- 768
769 - def dropVVVConstraints(self, tableName):
770 """ Drop the constraints on monthly VVV tables. 771 """ 772 for constraint in self.archive.query( 773 "name", "sysobjects", " AND ".join([ 774 "xtype = 'C'", 775 "(name like 'multiframeID%20%' or name like 'objID%20%')"])): 776 if tableName.partition("Detection")[2] in constraint or \ 777 constraint == "objID%s" % tableName.partition("DetectionRaw")[2]: 778 IngestLogger.addMessage("Dropping constraint %s on %s." % 779 (constraint, tableName)) 780 try: 781 self.archive._executeScript( 782 "ALTER TABLE %s DROP CONSTRAINT %s" % 783 (tableName, constraint)) 784 except odbc.ProgrammingError as error: 785 if "does not exist" not in str(error) and \ 786 "Could not drop constraint" not in str(error): 787 raise
788 789 #-------------------------------------------------------------------------- 790
791 - def checkVVVIndices(self):
792 """ Checks if the SQL for all objID indices exist for monthly tables. 793 """ 794 idxText = ''.join([ 795 "if not exists (select * from sysindexes where name = 'idx_vvvDetectionRaw_objID')\n", 796 "begin\n", 797 "print 'idx_vvvDetectionRaw_objID ...'\n", 798 "select getdate()\n", 799 "create NONCLUSTERED INDEX idx_vvvDetectionRaw_objID on vvvDetectionRaw (objID) on vvvIndices_FG\n", 800 "print '... finished.'\n", 801 "select getdate()\n", "end\n", "go\n"]) 802 803 monthlyTables = self.archive.query( 804 "name", "sysobjects", " AND ".join([ 805 "xtype = 'U'", "name like '%vvvDetectionRaw%'"]), 806 orderBy="name") 807 808 indicesSql = File(self.sysc.sqlMonthlyDetPath("VSAVVV_Indices.sql")) 809 indicesSql.ropen() 810 availIndices = indicesSql.readlines(findValues="NONCLUSTERED INDEX") 811 indicesSql.close() 812 indicesSql.aopen() 813 for tableName in monthlyTables: 814 if not any(tableName in line for line in availIndices): 815 indicesSql.writetheline(idxText.replace("vvvDetectionRaw", 816 tableName)) 817 indicesSql.close()
818 819 #-------------------------------------------------------------------------- 820
821 - def inspectInDir(self):
822 """ @return: Ingest logs in dbShare that need ingesting. 823 @rtype: list(str) 824 """ 825 try: 826 listdir = dircache.listdir(self.dbMntPath) 827 logfiles = [os.path.join(self.dbMntPath, f) for f in listdir 828 if f.startswith("cu") and os.path.splitext(f)[1] == ".log"] 829 except Exception as error: 830 IngestLogger.addExceptionDetails(error) 831 832 ratedLogs = utils.Ratings() 833 CURatings = {1:0, 2:3, 3:1, 4:2} 834 for logFile in logfiles: 835 tmpLog = IngestLogFile(logFile) 836 if tmpLog.cuNum in CURatings.keys() \ 837 and tmpLog.database == self._database \ 838 and tmpLog.cuEventID > 0: 839 ratedLogs[tmpLog] = CURatings[tmpLog.cuNum] 840 del tmpLog 841 842 return ratedLogs.keys()
843 844 #-------------------------------------------------------------------------- 845
846 - def readIngLog(self, logFile):
847 """ 848 Read the given ingest log file and print its content 849 to the screen. No ingest takes place. 850 851 @param logFile: IngestLogFile object 852 @type logFile: File 853 854 """ 855 fileContent = list(logFile.pickleRead()) 856 for index, entry in enumerate(fileContent): 857 print("%s: %s" % (index, repr(entry)))
858 859 #-------------------------------------------------------------------------- 860
861 - def reAllocateDXSSVFrames(self):
862 """ Re-allocate DXS_SV frames to the main programme. 863 """ 864 IngestLogger.addMessage("Re-allocating DXS_SV frames...") 865 866 num = self.archive.copyIntoTable("ProgrameFrame", 867 Join([("Multiframe", 'M'), ("ProgrammeFrame", 'P')], ["multiframeID"]), 868 columns="104, P.multiframeID, -9999, P.ppErrBitsStatus", 869 where="programmeID=14 AND frameType LIKE 'deepleavstack%' AND " 870 "M.multiframeID NOT IN (%s)" % 871 SelectSQL("multiframeID", "ProgrammeFrame", "programmeID=104")) 872 873 IngestLogger.addMessage("No. of DXS_SV frames re-allocated: %s" % num)
874 875 #-------------------------------------------------------------------------- 876
878 """ 879 Use newlyIngested flag to select newly ingested filenames 880 Delete entry in ProductProcessing and GroutingLinks 881 """ 882 # @TODO: This would be grossly more efficient as a single SQL statement 883 # Can do immediately for VSA. 884 newFileNames = self.archive.query("fileName", "Multiframe", 885 "newlyIngested=%s" % dbc.yes()) 886 for fileName in newFileNames: 887 self.archive.delete("ProductProcessing", 888 whereStr="fileName='%s'" % fits.stripServer(fileName)) 889 890 if self.sysc.isVSA(): 891 self.archive.delete("GroutingLinks", 892 whereStr="tileName='%s'" % fits.stripServer(fileName))
893 894 #-------------------------------------------------------------------------- 895
896 - def selectFiles(self, subDir, beginDate, endDate):
897 """ 898 Selects all FITS files in given sub-directory of all disks where the 899 date is within the given range. 900 901 @param subDir: Common RAID system sub-directory name to search in 902 for each disk. 903 @type subDir: str 904 @param beginDate: First night to include. 905 @type beginDate: int 906 @param endDate: Last night to include. 907 @type endDate: int 908 909 @return: Set of FITS file paths. 910 @rtype: set(str) 911 912 @todo: Useful for FitsUtils? 913 914 """ 915 fitsFiles = set() 916 for disk in self.sysc.availableRaidFileSystem(): 917 try: 918 fitsFiles.update(os.path.join(disk, subDir, x) 919 for x in dircache.listdir(os.path.join(disk, subDir)) 920 if x[:8].isdigit() and '_v' in x and '.' not in x 921 and beginDate <= int(x[:8]) <= endDate) 922 except OSError: 923 pass 924 925 return fitsFiles
926 927 #-------------------------------------------------------------------------- 928
929 - def setNewlyIngested(self, cuEventID):
930 """ Reset all the 'newlyIngested' flags to zero. 931 """ 932 IngestLogger.addMessage("Resetting the new 'newlyIngested' flags... ") 933 try: 934 IngestLogger.addMessage("...%s reset." % 935 self.archive.update("Multiframe", "newlyIngested=%s" % dbc.no(), 936 where="cuEventID=%s" % cuEventID)) 937 except odbc.DatabaseError as error: 938 IngestLogger.addExceptionDetails(error)
939 940 #-------------------------------------------------------------------------- 941
942 - def spaceOnFileShare(self, server):
943 """ 944 """ 945 cmd = self.makeSysCmd( 946 server, "stat -f -c %%f %s" % self.sysc.dbSharePath('', server)) 947 availSpace = self.runSysCmd(cmd)[0] 948 cmd = self.makeSysCmd( 949 server, "stat -f -c %%S %s" % self.sysc.dbSharePath('', server)) 950 availSpace *= self.runSysCmd(cmd)[0] 951 952 #availSpace = os.statvfs(self.sysc.dbSharePath()).f_bfree 953 #availSpace *= os.statvfs(self.sysc.dbSharePath()).f_frsize 954 955 return availSpace / self.sysc.one_gigabyte >= 50
956 957 #-------------------------------------------------------------------------- 958
959 - def updateHistory(self, historyUpdate, status, rolledBack, comment=""):
960 """ 961 Update Archive and Programme Curation History with data 962 from the logfile. 963 964 @param historyUpdate: Dictionary containing the updates for the 965 Archive and the Programme CurationHistory. 966 @type historyUpdate: dict 967 @param status: Ingest status for the ProgrammeCurationHistory. 968 @type status: int 969 @param rolledBack: Flag to indicate if the transaction has been 970 rolled back. 971 @type rolledBack: int 972 973 """ 974 try: 975 for curUpdate in historyUpdate["ACH"]: 976 row = curUpdate[:6] 977 preComment = '' 978 if curUpdate[1] == 1: 979 preComment = {-1: "no data: ", 0: "failed: ", 980 1: "transferred: "}[status] 981 elif curUpdate[1] == 2 and self._noData: 982 preComment = "no data: " 983 elif curUpdate[1] == 2 and self._noDBData: 984 preComment = "pending: " 985 else: 986 preComment = self._historyCommentPrefix(status, rolledBack) 987 if curUpdate[1] == 2 and comment: 988 curUpdate[6] += comment 989 row.extend([preComment + curUpdate[6], rolledBack]) 990 self._updateArchiveCurationHistory(row) 991 self.archive.commitTransaction() 992 except KeyError: 993 IngestLogger.addMessage("<ERROR>: " 994 "Can't update ArchiveCurationHistory, no data found!") 995 except IndexError: 996 IngestLogger.addMessage("<ERROR>: " 997 "Can't update ArchiveCurationHistory, no data available!") 998 999 try: 1000 for progUpdate in historyUpdate["PCH"]: 1001 progUpdate[-1] = status 1002 self._updateProgrammeCurationHistory(progUpdate) 1003 self.archive.commitTransaction() 1004 except KeyError: 1005 IngestLogger.addMessage("<ERROR>: " 1006 "Can't update ProgrammeCurationHistory, no data found!") 1007 except IndexError: 1008 IngestLogger.addMessage("<Warning>: " 1009 "Can't update ProgrammeCurationHistory, no data available!")
1010 1011 #-------------------------------------------------------------------------- 1012
1013 - def updateObjIDs(self, tableName, cuEventID):
1014 """ 1015 Since the objectIDs couldn't be calculated beforehand they have to be 1016 updated separately based on their value and the maximal value found in 1017 the database. 1018 1019 @param tableName: Name of the table to update. 1020 @type tableName: str 1021 @param cuEventID: The cuEventID of the ingested data. 1022 @type cuEventID: int 1023 1024 """ 1025 IngestLogger.addMessage("Updating objIDs...") 1026 if "Raw20" not in tableName: 1027 maxObjId = self.archive.queryAttrMax("objID", tableName) 1028 num = self.archive.update(tableName, "objID=%s-objID" % maxObjId, 1029 where="cuEventID=%s AND objID<0 AND seqNum>-99999990" % cuEventID) 1030 1031 self.archive.commitTransaction() 1032 IngestLogger.addMessage("... %s rows updated." % num) 1033 1034 return 1035 1036 maxObjId = 0 1037 detTabs = defaultdict() 1038 for detTable \ 1039 in self.archive.query("name", "sys.tables", "name LIKE '%Raw20%'"): 1040 1041 minObj, maxObj = self.archive.query("MIN(objID), MAX(objID)", 1042 detTable, "seqNum>-99999990", firstOnly=True, default=(0, 0)) 1043 1044 detTabs[detTable] = minObj 1045 maxObjId = max(maxObjId, maxObj) 1046 1047 for detTable in sorted(detTabs): 1048 if detTabs[detTable] < 0: 1049 sql = "DECLARE @myvar bigint; set @myvar = %d; "\ 1050 "UPDATE %s SET @myvar = objID = @myvar + 1 "\ 1051 "WHERE objID<0 AND seqNum>-99999990" % (maxObjId, detTable) 1052 1053 self.archive._executeScript(sql, wantRowCount=True) 1054 self.archive.commitTransaction() 1055 Logger.addMessage("...%s updated." % detTable)
1056 1057 #------------------------------------------------------------------------------ 1058 # Change log: 1059 # 1060 # 30-Nov-2006, ETWS: Original version. 1061 # 9-Jan-2007, ETWS: Added database name to file prefix. 1062 # 30-Jan-2007, ETWS: Bug fixes. 1063 # 7-Feb-2007, ETWS: Included deprecation of old deep stacks and re-allocation 1064 # of DXS SV frames. 1065 # 15-Mar-2007, ETWS: Fixed log writing and bug in deprecateOldDeepStacks. 1066 # 3-May-2007, ETWS: Included CU1 and 2, updated general procedure. 1067 # 9-May-2007, ETWS: Fixed CU2 ingest in case of missing CU3 data. 1068 # 31-May-2007, RSC: Replaced obsolete DataFactory objects with new versions. 1069 # 11-Jun-2007, ETWS: Fixed bugs; replaced isMinLog with autoCommit. 1070 # 12-Jun-2007, ETWS: Replaced left over minlog with autocommit. 1071 # 27-Jul-2007, ETWS: Ordered ingest: first ingest CU1, then CU3, CU4, CU2. 1072 # 3-Aug-2007, ETWS: Excluded file/directory names with different format 1073 # than YYYYMMDD_v# from createFitsDict. 1074 # 27-Aug-2007, ETWS: Updated comments for non-existing data. 1075 # 26-Oct-2007, ETWS: Shortened ingest log file name. 1076 # 2-Nov-2007, ETWS: Moved objID update for CU4 into ingest function. 1077 # 8-Nov-2007, ETWS,RSC: Swapped deprecation of deep stacks with those of old 1078 # frames to progress deprecation into MultiframeDetector. 1079 # 16-Nov-2007, ETWS: Added more logging. 1080 # 28-Jan-2008, ETWS: Adjusted setNewlyIngested to only update for the actual 1081 # cuEventID. 1082 # 14-Feb-2008, RSC: Updated for new DbSession interface, and corrected log 1083 # file name setting. Allowed non-scos users to ingest into 1084 # test databases by removed directory check updates in this 1085 # instance. Revised ingest() schema parsing and ensured 1086 # indices will be dropped for every table. 1087 # 7-Apr-2008, ETWS: Upgraded to use new detection table layout. 1088 # 29-Apr-2008, ETWS: Fixed bug in updateObjIDs. 1089 # 30-May-2008, ETWS: Fixed comment for new CU4 layout. 1090 # 31-Jul-2008, ETWS: Included forced ingest for CUs 3 and 4. 1091