1
2
3
4 """
5 Ingester for CU 1 to 4. Contains classes needed to ingest data and curation
6 history metadata into the database.
7
8 @author: Eckhard Sutorius
9 @org: WFAU, IfA, University of Edinburgh
10
11 @newfield contributors: Contributors, Contributors (Alphabetical Order)
12 @contributors: R.S. Collins
13 """
14
15 from collections import defaultdict
16 import dircache
17 import inspect
18 import os
19
20 from wsatools.CLI import CLI
21 import wsatools.DbConnect.DbConstants as dbc
22 from wsatools.DbConnect.DbSession import DbSession, Ingester, Join, \
23 odbc, SelectSQL
24 from wsatools.DbConnect.IngCuSession import IngCuSession
25 import wsatools.DbConnect.Schema as schema
26 from wsatools.File import File, PickleFile
27 import wsatools.FitsUtils as fits
28 from wsatools.Logger import Logger
29 from wsatools.SystemConstants import DepCodes
30 import wsatools.Utilities as utils
31
32 from invocations.monitoring.CreateMfIdJpegList import CreateMfIdJpegList
33 from helpers.UpdateJpegs import UpdateJpegs
37 """
38 A logger class based on Logger, but appends to the log on __del__ instead
39 of writing.
40
41 @note: Do not set the isVerbose option of this class to False, otherwise
42 old logs will be overwritten and the last log doubled up. This will
43 change once there is a writeMode argument.
44
45 @todo: This class will become obsolete, probably by giving Logger a
46 writeMode argument to the constructor, with a default of 'w'. Need
47 to be sure that the log reset is OK too. If class stays it will be
48 renamed and moved into Logger.py.
49 """
59
60 @staticmethod
61 - def append(fileName, reset=True):
65
66 @staticmethod
67 - def addText(message, alwaysLog=True):
68 """
69 Log a message without timestamp. This is displayed to stdout if
70 in "echoOn" mode, and also written to the log file if not in
71 "isVerbose" mode.
72
73 @param message: The message to log.
74 @type message: str
75 @param alwaysLog: If False, then this message will only be logged when
76 isVerbose is True.
77 @type alwaysLog: bool
78
79 """
80 if Logger.isVerbose or alwaysLog:
81 utils.WordWrapper.indent = 2
82 Logger._logText.append("%s\n" % message)
83 if Logger.echoOn:
84 print("%s" % utils.WordWrapper.wrap(message))
85
86 if not Logger.isVerbose and Logger.pathName:
87 try:
88 file(Logger.pathName, 'a').writelines(Logger._logText[-1])
89 except IOError:
90
91
92 pass
93
97 """
98 Logfile holding the information to ingest csv/binary data and to
99 update the curation histories accordingly.
100
101 """
103 """Initialise.
104 @param fileName: Name of the ingest log file.
105 @type fileName: str
106
107 """
108 super(IngestLogFile, self).__init__(fileName)
109
110 self.prefix = os.path.splitext(os.path.basename(fileName))[0]
111 self.readFilePrefix()
112 self.commonPrefix = "cu%02did%d_%s" % (self.cuNum, self.cuEventID,
113 self.hostName)
114
115
116
118 """Get CU number and eventID from the file prefix.
119 """
120 cuEvent, hostName, self.database = self.prefix.split('_', 2)
121 self.hostName, self.runNo = hostName.partition('-')[::2]
122 if '-' in self.runNo:
123 self.runNo, self.ingMonth = self.runNo.partition('-')[::2]
124 else:
125 self.ingMonth = None
126 self.cuNum, self.cuEventID = map(int, cuEvent[2:].partition("id")[::2])
127
131 """ Ingests products of parallel CU runs by DataBuilder.
132 """
133 - def __init__(self,
134 curator=CLI.getOptDef("curator"),
135 database=DbSession.database,
136 isTrialRun=DbSession.isTrialRun,
137 logFileName=CLI.getOptDef("readlog"),
138 ingestOneLog=CLI.getOptDef("ingestlog"),
139 cuNums=CLI.getOptDef("cunums"),
140 forceIngest=CLI.getOptDef("forceingest"),
141 autoCommit=CLI.getOptDef("autocommit"),
142 excludedTables=CLI.getOptDef("excluded"),
143 omitObjIDUpdate=CLI.getOptDef("omitobjid"),
144 isParallelRun=CLI.getOptDef("isparallel"),
145 comment=CLI.getArgDef("comment")):
146 """
147 @param curator: Name of curator.
148 @type curator: str
149 @param comment: Descriptive comment as to why curation task is
150 being performed.
151 @type comment: str
152 @param database: Name of the database to connect to.
153 @type database: str
154 @param forceIngest: If True and ingestOneLog is True, ingest log file.
155 @type forceIngest: bool
156 @param logFileName: Full path name of a log file to be read. Excludes
157 ingest.
158 @type logFileName: str
159 @param ingestOneLog: If True, ingest log file given by logFileName.
160 @type ingestOneLog: bool
161 @param cuNums: Curation task numbers.
162 @type cuNums: list[int]
163 @param autoCommit: If True, do not auto-commit transactions. This
164 speeds up transactions through minimal logging.
165 @type autoCommit: bool
166 @param excludedTables: Exclude these tables from ingest.
167 @type excludedTables: list(str)
168 @param omitObjIDUpdate: If True, don't update objIDs.
169 @type omitObjIDUpdate: bool
170 @param isTrialRun: If True, do not perform database modifications.
171 @type isTrialRun: bool
172
173 """
174 typeTranslation = {"curator":str,
175 "database":str,
176 "cuNums":list,
177 "ingestOneLog":bool,
178 "forceIngest":bool,
179 "autoCommit":bool,
180 "excludedTables":list,
181 "omitObjIDUpdate":bool,
182 "isParallelRun":bool,
183 "isTrialRun":bool,
184 "comment":str}
185
186 self.logFile = (IngestLogFile(logFileName) if logFileName else None)
187
188 super(CuIngest, self).attributesFromArguments(
189 inspect.getargspec(CuIngest.__init__)[0], locals(),
190 types=typeTranslation)
191
192 lockPathName = os.path.join(os.getenv('HOME'),
193 "dblock-" + self.database)
194 if self.isParallelRun and os.path.exists(lockPathName):
195 os.remove(lockPathName)
196
197
198 super(CuIngest, self).__init__(cuNum=0,
199 curator=self.curator,
200 comment=self.comment,
201 reqWorkDir=False,
202 keepWorkDir=False,
203 database=self.database,
204 autoCommit=self.autoCommit,
205 isTrialRun=self.isTrialRun)
206
207 self.excludedTables = [name.lower() for name in self.excludedTables]
208
209 self._server = self.database.rpartition('.')[0] or self.sysc.loadServer
210 self._database = self.database.rpartition('.')[2]
211
212 self.cuNums = self._parseCuNums(self.cuNums)
213
214 self._ingestedLogsPath = self.sysc.ingestedLogsPath(self.dbMntPath)
215 utils.ensureDirExist(self._ingestedLogsPath)
216
217 for server in self.sysc.curationServers:
218 _ingestedFilesPath = self.sysc.ingestedFilesPath(
219 self.sysc.dbSharePath(serverName=server))
220 if os.getenv("HOST") in _ingestedFilesPath:
221 utils.ensureDirExist(_ingestedFilesPath)
222 else:
223 cmd = self.sysc.privNetCmd(server,
224 "ls -d %s" % _ingestedFilesPath)
225 messages = self.runSysCmd(cmd)[1]
226 if messages and "No such file or directory" in messages[0]:
227 cmd = self.sysc.privNetCmd(server,
228 "mkdir %s" % _ingestedFilesPath)
229 try:
230 self.runSysCmd(cmd)
231 except Exception as error:
232 print(error)
233
234
235
241
242
243
245 """ Ingest data.
246 """
247 lockFile = self._createLock()
248 try:
249 ingestLogList = []
250 if self.logFile:
251 ingestLogList.append(self.logFile)
252 elif self.cuNums:
253 ingestLogs = self.inspectInDir()
254 if ingestLogs:
255 for cuNum in self.cuNums:
256 ingestLogList.extend(logFile for logFile in ingestLogs
257 if all(x in logFile.name for x in cuNum))
258 else:
259 IngestLogger.addMessage("No files to ingest.")
260
261 if ingestLogList:
262 for logFile in ingestLogList:
263 try:
264 if self.ingestOneLog:
265 self.parseIngest(logFile)
266 else:
267 IngestLogger.addMessage(
268 os.path.basename(logFile.name) + ":")
269 self.readIngLog(logFile)
270 except IOError:
271 IngestLogger.addMessage("No such file " + logFile.name)
272
273 elif self.cuNums:
274 IngestLogger.addMessage(
275 "No files to ingest for CUs %r" % self.cuNums)
276
277 else:
278 self.parseIngest()
279
280 finally:
281 lockFile.remove()
282
283
284
286 """
287 Create a lock, writing info to sysLog. If lock exists, abort.
288
289 @return: Lock file object.
290 @rtype: File.File
291
292 """
293 utils.ensureDirExist(self.sysc.sysPath)
294 if self.logFile and self.logFile.ingMonth:
295 fileName = 'ingest_%s.%s_%s_lock' % (self._server, self._database,
296 self.logFile.ingMonth)
297 else:
298 fileName = 'ingest_%s.%s_lock' % (self._server, self._database)
299 lockFile = File(os.path.join(self.sysc.sysPath, fileName))
300 lockTime = utils.makeTimeStamp()
301 if lockFile.exists():
302 lockFile.ropen()
303 message = lockFile.readline()
304 IngestLogger.addMessage("<ERROR> A lock file exists in %s at %s" %
305 (lockFile.path, message))
306 raise SystemExit
307 else:
308 lockFile.wopen()
309 lockFile.writeline('%s %s [%d]\n' %
310 (lockTime, self._hostName, os.getpid()))
311 lockFile.close()
312 return lockFile
313
314
315
316 - def ingestCU2(self, ingLogFile, histInfo, _ingestOrder, _fileInfo):
317 """ Ingest CU2 data.
318 """
319 beginDate, endDate, versStr, subDir = histInfo["DCH"][:4]
320 jpgPath = (histInfo["DCH"][4] if len(histInfo["DCH"]) > 4 else None)
321
322 histInfo["DCH"] = self.selectFiles(subDir, beginDate, endDate)
323
324
325 CreateMfIdJpegList(database=self.database,
326 outPath=self.sysc.monitorPath(),
327 subDir=subDir, jpgPath=jpgPath,
328 beginDate=str(beginDate), endDate=str(endDate),
329 versionStr=versStr).run()
330
331
332 jpgidFileName = os.path.join(self.sysc.monitorPath(),
333 "jpgs_%s_%s_%s_%s.log"
334 % (ingLogFile.database, beginDate, endDate, versStr))
335
336
337 self._noData = not os.path.exists(jpgidFileName)
338 if self._noData:
339 print("No data available for %s to %s" % (beginDate, endDate))
340 return
341
342 self._noDBData = os.path.getsize(jpgidFileName) == 0
343 if self._noDBData:
344 print("No data found in DB for %s to %s" % (beginDate, endDate))
345 return
346
347 servers = ("dbload" if int(self.sysc.loadServer[-1]) in [1, 2] else
348 "dbpub")
349
350 UpdateJpegs(archive=self._database,
351 database='%s.%s' % (self._server, self._database),
352 multiDBs="::".join([servers, self._database]),
353 fromJpegList=True, updateFileName=jpgidFileName).run()
354
355
356
357 - def ingestCU3(self, ingLogFile, histInfo, ingestOrder, fileInfo):
375
376
377
378 - def ingestCU4(self, ingLogFile, histInfo, ingestOrder, fileInfo):
379 """ Ingest CU4 data.
380 """
381 ingestDate = os.path.basename(histInfo["DCH"][0])
382
383 self.ingest(ingestOrder, ingLogFile.hostName, fileInfo, ingestDate,
384 isCu3=False, cuEventID=ingLogFile.cuEventID)
385
386
387
389 """ Check flag file to say that the data are already ingested.
390 """
391 for dateDirPath in histInfo["DCH"]:
392 cuedFileName = "CU%02dED_%s" % (
393 ingLogFile.cuNum, ingLogFile.database)
394 cuedFilePath = os.path.join(dateDirPath, cuedFileName)
395 if os.path.exists(cuedFilePath):
396 if self.ingestOneLog and \
397 (self.forceIngest or ingLogFile.runNo > '1'):
398 os.remove(cuedFilePath)
399 else:
400 raise CuIngest.IngCuError("Data already ingested! "
401 "%r exist in %r" % (cuedFileName, dateDirPath))
402
403
404
405 - def tryIngest(self, ingestCU, ingLogFile, histInfo, ingestOrder=None,
406 fileInfo=None, enableRollback=True):
407 """ General ingest function for any CU.
408 """
409 isNotCu2 = enableRollback
410 status = dbc.no()
411 rolledBack = dbc.yes()
412 try:
413 if isNotCu2:
414 self.checkCUEDFile(ingLogFile, histInfo)
415
416 ingestCU(ingLogFile, histInfo, ingestOrder, fileInfo)
417
418 if enableRollback:
419 self.archive.commitTransaction()
420
421 except Exception as error:
422 IngestLogger.addExceptionDetails(error)
423 if enableRollback:
424 self.archive.rollbackTransaction()
425
426 status = dbc.no()
427 rolledBack = dbc.yes()
428
429 else:
430 status = dbc.yes()
431 rolledBack = dbc.no()
432
433 self.updateHistory(histInfo, status, rolledBack)
434
435 return rolledBack
436
437
438
440 """ Update CUed file.
441 """
442 cmd = self.makeSysCmd(
443 ingLogFile.hostName, "ls %s" % self.sysc.dbSharePath(
444 '', ingLogFile.hostName))
445 fileList = self.runSysCmd(cmd)[0]
446 if not self._noData and os.getenv('USER') == 'scos' \
447 and not any(ingLogFile.commonPrefix in logName
448 for logName in fileList):
449 for dateDirPath in histInfo["DCH"]:
450 cuedFile = os.path.join(
451 dateDirPath,
452 "CU%02dED_%s" % (ingLogFile.cuNum, ingLogFile.database))
453
454 file(cuedFile, 'w').write(utils.makeTimeStamp() + '\n')
455
456
457
458 - def ingestData(self, ingLogFile, ingestOrder, fileInfo, histInfo):
459 """ Ingest all files described in given log file.
460 """
461 self._ingestedFiles = []
462 self._noData = False
463 self._noDBData = False
464 rolledBack = dbc.yes()
465 self._connectToDb()
466 try:
467 if ingLogFile.cuNum == 1:
468 status = histInfo["PCH"][0][0]
469 histInfo["PCH"] = []
470 rolledBack = dbc.no()
471 self.updateHistory(histInfo, status, rolledBack)
472
473 elif ingLogFile.cuNum == 2:
474 rolledBack = self.tryIngest(self.ingestCU2, ingLogFile,
475 histInfo, enableRollback=False)
476
477 elif ingLogFile.cuNum == 3:
478 rolledBack = self.tryIngest(self.ingestCU3, ingLogFile,
479 histInfo, ingestOrder, fileInfo)
480
481 elif ingLogFile.cuNum == 4:
482 rolledBack = self.tryIngest(self.ingestCU4, ingLogFile,
483 histInfo, ingestOrder, fileInfo)
484
485 finally:
486
487
488 if not rolledBack:
489 if self._noDBData and ingLogFile.cuNum == 2:
490 os.rename(ingLogFile.name, ingLogFile.name + ".pending")
491
492 else:
493 os.rename(ingLogFile.name,
494 os.path.join(self._ingestedLogsPath,
495 ingLogFile.base))
496
497 self._ingestedFilesPath = self.sysc.ingestedFilesPath(
498 self.sysc.dbSharePath(serverName=ingLogFile.hostName))
499
500 for fileName in self._ingestedFiles:
501 newPath = os.path.join(self._ingestedFilesPath,
502 os.path.basename(fileName))
503 cmd = self.makeSysCmd(
504 ingLogFile.hostName, "mv %s %s" % (
505 self.sysc.dbSharePath(fileName), newPath))
506 self.runSysCmd(cmd)[0]
507 self.updateCUEDFile(ingLogFile, histInfo)
508
509 else:
510 os.rename(ingLogFile.name, ingLogFile.name + ".failed")
511 for fileName in self._ingestedFiles:
512 filePath = self.sysc.dbSharePath(fileName)
513 if self.spaceOnFileShare(ingLogFile.hostName):
514 cmd = self.makeSysCmd(
515 ingLogFile.hostName, "mv %s %s" % (
516 filePath, filePath.replace(".rename", '')))
517 self.runSysCmd(cmd)[0]
518 else:
519 IngestLogger.addMessage("<WARNING> No space left on "
520 "%s; data will be deleted." % self.sysc.dbSharePath())
521
522 cmd = self.makeSysCmd(
523 ingLogFile.hostName, "rm -f %s" % (filePath))
524 self.runSysCmd(cmd)[0]
525 os.remove(filePath)
526
527 print("disconnecting")
528 self._disconnectFromDb()
529
530
531
533 """ Do the appropriate ingest for the different CUs.
534 """
535
536 ingestLogs = self.inspectInDir()
537 if (not ingestLogs and not logFile) \
538 or (self.ingestOneLog and logFile and \
539 not any(logFile.name == inFile.name for inFile in ingestLogs)):
540 IngestLogger.addMessage("No files available for ingest into %s."
541 % self.database)
542 return
543
544 if self.ingestOneLog:
545 ingestLogs = [logFile]
546
547 IngestLogger.addMessage("The following logs are to be ingested: %s" %
548 ','.join([ingLog.name for ingLog in ingestLogs]))
549
550
551 ingCuEventIDs = []
552 for ingLogFile in ingestLogs:
553 ingCuEventIDs.append(ingLogFile.cuEventID)
554 ingestOrder, fileInfo, histInfo = list(ingLogFile.pickleRead())
555 cuLogFileName = histInfo["ACH"][0][2]
556 IngestLogger.dump(file(cuLogFileName, 'a'))
557
558
559 self._loggerFile = IngestLogger(self.createLogFileName(ingCuEventIDs))
560 IngestLogger.dump(file(self._loggerFile.pathName, 'a'))
561
562 for ingLogFile in ingestLogs:
563 IngestLogger.reset()
564 ingestOrder, fileInfo, histInfo = list(ingLogFile.pickleRead())
565 IngestLogger.addMessage("Ingesting for CU%s, CuEventID %s"
566 % (ingLogFile.cuNum, ingLogFile.cuEventID))
567 IngestLogger.addMessage("from " + ingLogFile.name)
568
569 if self.excludedTables:
570 ingestOrder = [table for table in ingestOrder
571 if table.lower() not in self.excludedTables]
572
573
574 if ingLogFile.cuNum == 3:
575 cmd = self.makeSysCmd(
576 ingLogFile.hostName, "ls %s*" % self.sysc.dbSharePath(
577 ingLogFile.commonPrefix, ingLogFile.hostName))
578 fileList = self.runSysCmd(cmd)[0]
579 if not all(any(x in y for y in fileList) for x in ingestOrder):
580 IngestLogger.addMessage(
581 "Not all files available for ingest!")
582 os.rename(ingLogFile.name, ingLogFile.name + ".failed")
583 break
584
585 self.ingestData(ingLogFile, ingestOrder, fileInfo, histInfo)
586
587
588 cuLogFileName = histInfo["ACH"][0][2]
589 IngestLogger.dump(file(cuLogFileName, 'a'))
590 IngestLogger.dump(file(self._loggerFile.pathName, 'a'))
591 IngestLogger.reset()
592
593
594
596 """
597 Deprecate old versions of frames that are reingested in Multiframe.
598
599 @todo: This function will eventually contain the contents of the SQL
600 fragment, as a DB API update method.
601 """
602
603 IngestLogger.addMessage("Deprecating reprocessed multiframes...")
604 code = str(DepCodes.reprocCASU)
605 fragmentPath = \
606 os.path.join(
607 self.sysc.sqlFragmentPath(), '%s_DeprecateOldFrames.sql' %
608 self.sysc.loadDatabase)
609 sql = file(fragmentPath).read().replace('@mask', code)
610
611 IngestLogger.addMessage("No. of multiframes deprecated: %s" %
612 self.archive._executeScript(sql, wantRowCount=True))
613
614 IngestLogger.addMessage(
615 "Propagating deprecations to MultiframeDetector...")
616
617 num = self.archive.update("MultiframeDetector",
618 "MultiframeDetector.deprecated=MultiframeDetector.deprecated+" + code,
619 where="MultiframeDetector.multiframeID=M.multiframeID AND "
620 "MultiframeDetector.deprecated<%s AND M.deprecated>=%s" %
621 (code, code),
622 fromTables="MultiframeDetector, Multiframe AS M")
623
624 IngestLogger.addMessage("No. of frames deprecated: %s" % num)
625
626 if not self.sysc.isOSA():
627 IngestLogger.addMessage("Setting unfilteredID entries...")
628 numMFs, numMFDs = self.setUnfilteredIDs(code)
629 IngestLogger.addMessage("No. of multiframes/detectors deprecated: "
630 "%s/%s" % (numMFs, numMFDs))
631
632
633
635 """
636 Set unfilteredID entries.
637
638 @param deprCode: Deprecation code.
639 @type deprCode: str
640 @return: Number of updated entries in Multiframe and MultiframeDetector.
641 @rtype: int, int
642
643 """
644 filteredPairs = SelectSQL(
645 select="F.multiframeID AS filteredID, "
646 "U.multiframeID AS unfilteredID",
647 table="Multiframe AS F, Multiframe AS U",
648 where="U.frameType NOT LIKE 'filt%' AND (U.frameType LIKE "
649 "'%stackconf' AND F.frameType LIKE 'filt%stackconf' OR "
650 "U.frameType LIKE '%stack' AND F.frameType LIKE 'filt%stack')"
651 " AND U.obsNum=F.obsNum AND U.utDate=F.utDate AND " +
652 "U.deprecated<%s" % deprCode)
653
654 num = self.archive.update("Multiframe", "unfilteredID=T.unfilteredID",
655 where="Multiframe.unfilteredID=%s AND multiframeID=T.filteredID"
656 % dbc.intDefault(),
657 fromTables="(%s) AS T" % filteredPairs)
658
659 IngestLogger.addMessage("No. of new filtered multiframes: %s" % num)
660
661 IngestLogger.addMessage("Deprecating unfiltered multiframes...")
662
663 unfilteredIDs = SelectSQL("unfilteredID", "Multiframe",
664 where="unfilteredID!=%s" % dbc.intDefault())
665
666 numMFs = self.archive.update("Multiframe",
667 entryList="deprecated=%s" % DepCodes.reprocAltn,
668 where="deprecated=%s AND multiframeID IN (%s)"
669 % (DepCodes.nonDep, unfilteredIDs))
670
671 numMFDs = self.archive.update("MultiframeDetector",
672 entryList="deprecated=%s" % DepCodes.reprocAltn,
673 where="deprecated=%s AND multiframeID IN (%s)"
674 % (DepCodes.nonDep, unfilteredIDs))
675
676 return numMFs, numMFDs
677
678
679
680 - def ingest(self, ingestOrder, ingestHost, fileInfo, ingestDate,
681 isCu3=True, cuEventID=0):
682 """
683 Ingest the data file.
684
685 @param ingestOrder: List of table names, giving order in which ingest
686 occurs.
687 @type ingestOrder: list
688 @param fileInfo: Dictionary containing schema and ingest file names.
689 @type fileInfo: dict(str: dict(str: str))
690 @param ingestDate: Date of ingested data.
691 @type ingestDate: str
692 @param isCu3: If True, expects a CU3 ingest of a CSV file, else
693 expects a CU4 binary file ingest, where indices
694 must remain in place for objID updates.
695 @type isCu3: bool
696 @param cuEventID: The cuEventID of the ingested data.
697 @type cuEventID: int
698
699 """
700 IngestLogger.addMessage("Parsing schemas")
701 tableSchema = defaultdict(list)
702 for tableName in set(ingestOrder):
703 tableSchema[fileInfo[tableName]["schema"]].append(tableName)
704
705 ingestSchema = []
706 for scriptName in tableSchema:
707 if scriptName.startswith("%sNS" % self.sysc.loadDatabase):
708 script = os.path.join("NonSurvey", scriptName)
709 elif "MonthlySQL" in scriptName:
710 script = os.path.join(self.dbMntPath, scriptName)
711 else:
712 script = scriptName
713 ingestSchema += schema.parseTables(script, tableSchema[scriptName])
714
715 try:
716 ingester = Ingester(self.archive, ingestSchema)
717 except schema.MismatchError as error:
718 raise IngCuSession.IngCuError(error)
719
720
721
722 IngestLogger.addMessage(("Dropping" if isCu3 else "Creating") +
723 " ingest table indices")
724
725 idxInfo = schema.parseIndices(self.sysc.indexScript)
726 idxInfo.update(schema.parseIndices(self.sysc.nsIndexScript))
727 if any("vvv" in tableName for tableName in ingestOrder):
728 self.checkVVVIndices()
729 idxInfo.update(schema.parseIndices(self.sysc.sqlMonthlyDetPath(
730 "VSAVVV_Indices.sql")))
731
732 for tableName in ingestOrder:
733 if isCu3:
734 self.archive.dropObjects(idxInfo[tableName])
735 else:
736
737 self.archive.createObjects(idxInfo[tableName])
738
739 if "vvv" in tableName:
740 self.dropVVVConstraints(tableName)
741
742 self.archive.commitTransaction()
743
744
745 IngestLogger.addMessage("Ingesting %s:" % ingestDate)
746 for tableName in ingestOrder:
747 if self.isTrialRun:
748 print("ingesting %s here" % tableName)
749 else:
750 ingFileName = self.sysc.dbSharePath(
751 fileInfo[tableName]["ingfile"], ingestHost)
752 try:
753 ingester.ingestTable(
754 tableName, ingFileName, isCsv=isCu3,
755 deleteFile=False, isOrdered=not isCu3)
756
757 except OSError:
758 IngestLogger.addMessage(ingFileName + " not found.")
759 raise
760
761 self._ingestedFiles.append(ingester.fileOnShare)
762
763 if cuEventID and "DetectionRaw" in tableName \
764 and not (self.omitObjIDUpdate and "vvv" in tableName):
765 self.updateObjIDs(tableName, cuEventID)
766
767
768
770 """ Drop the constraints on monthly VVV tables.
771 """
772 for constraint in self.archive.query(
773 "name", "sysobjects", " AND ".join([
774 "xtype = 'C'",
775 "(name like 'multiframeID%20%' or name like 'objID%20%')"])):
776 if tableName.partition("Detection")[2] in constraint or \
777 constraint == "objID%s" % tableName.partition("DetectionRaw")[2]:
778 IngestLogger.addMessage("Dropping constraint %s on %s." %
779 (constraint, tableName))
780 try:
781 self.archive._executeScript(
782 "ALTER TABLE %s DROP CONSTRAINT %s" %
783 (tableName, constraint))
784 except odbc.ProgrammingError as error:
785 if "does not exist" not in str(error) and \
786 "Could not drop constraint" not in str(error):
787 raise
788
789
790
792 """ Checks if the SQL for all objID indices exist for monthly tables.
793 """
794 idxText = ''.join([
795 "if not exists (select * from sysindexes where name = 'idx_vvvDetectionRaw_objID')\n",
796 "begin\n",
797 "print 'idx_vvvDetectionRaw_objID ...'\n",
798 "select getdate()\n",
799 "create NONCLUSTERED INDEX idx_vvvDetectionRaw_objID on vvvDetectionRaw (objID) on vvvIndices_FG\n",
800 "print '... finished.'\n",
801 "select getdate()\n", "end\n", "go\n"])
802
803 monthlyTables = self.archive.query(
804 "name", "sysobjects", " AND ".join([
805 "xtype = 'U'", "name like '%vvvDetectionRaw%'"]),
806 orderBy="name")
807
808 indicesSql = File(self.sysc.sqlMonthlyDetPath("VSAVVV_Indices.sql"))
809 indicesSql.ropen()
810 availIndices = indicesSql.readlines(findValues="NONCLUSTERED INDEX")
811 indicesSql.close()
812 indicesSql.aopen()
813 for tableName in monthlyTables:
814 if not any(tableName in line for line in availIndices):
815 indicesSql.writetheline(idxText.replace("vvvDetectionRaw",
816 tableName))
817 indicesSql.close()
818
819
820
822 """ @return: Ingest logs in dbShare that need ingesting.
823 @rtype: list(str)
824 """
825 try:
826 listdir = dircache.listdir(self.dbMntPath)
827 logfiles = [os.path.join(self.dbMntPath, f) for f in listdir
828 if f.startswith("cu") and os.path.splitext(f)[1] == ".log"]
829 except Exception as error:
830 IngestLogger.addExceptionDetails(error)
831
832 ratedLogs = utils.Ratings()
833 CURatings = {1:0, 2:3, 3:1, 4:2}
834 for logFile in logfiles:
835 tmpLog = IngestLogFile(logFile)
836 if tmpLog.cuNum in CURatings.keys() \
837 and tmpLog.database == self._database \
838 and tmpLog.cuEventID > 0:
839 ratedLogs[tmpLog] = CURatings[tmpLog.cuNum]
840 del tmpLog
841
842 return ratedLogs.keys()
843
844
845
847 """
848 Read the given ingest log file and print its content
849 to the screen. No ingest takes place.
850
851 @param logFile: IngestLogFile object
852 @type logFile: File
853
854 """
855 fileContent = list(logFile.pickleRead())
856 for index, entry in enumerate(fileContent):
857 print("%s: %s" % (index, repr(entry)))
858
859
860
862 """ Re-allocate DXS_SV frames to the main programme.
863 """
864 IngestLogger.addMessage("Re-allocating DXS_SV frames...")
865
866 num = self.archive.copyIntoTable("ProgrameFrame",
867 Join([("Multiframe", 'M'), ("ProgrammeFrame", 'P')], ["multiframeID"]),
868 columns="104, P.multiframeID, -9999, P.ppErrBitsStatus",
869 where="programmeID=14 AND frameType LIKE 'deepleavstack%' AND "
870 "M.multiframeID NOT IN (%s)" %
871 SelectSQL("multiframeID", "ProgrammeFrame", "programmeID=104"))
872
873 IngestLogger.addMessage("No. of DXS_SV frames re-allocated: %s" % num)
874
875
876
893
894
895
897 """
898 Selects all FITS files in given sub-directory of all disks where the
899 date is within the given range.
900
901 @param subDir: Common RAID system sub-directory name to search in
902 for each disk.
903 @type subDir: str
904 @param beginDate: First night to include.
905 @type beginDate: int
906 @param endDate: Last night to include.
907 @type endDate: int
908
909 @return: Set of FITS file paths.
910 @rtype: set(str)
911
912 @todo: Useful for FitsUtils?
913
914 """
915 fitsFiles = set()
916 for disk in self.sysc.availableRaidFileSystem():
917 try:
918 fitsFiles.update(os.path.join(disk, subDir, x)
919 for x in dircache.listdir(os.path.join(disk, subDir))
920 if x[:8].isdigit() and '_v' in x and '.' not in x
921 and beginDate <= int(x[:8]) <= endDate)
922 except OSError:
923 pass
924
925 return fitsFiles
926
927
928
939
940
941
956
957
958
959 - def updateHistory(self, historyUpdate, status, rolledBack, comment=""):
960 """
961 Update Archive and Programme Curation History with data
962 from the logfile.
963
964 @param historyUpdate: Dictionary containing the updates for the
965 Archive and the Programme CurationHistory.
966 @type historyUpdate: dict
967 @param status: Ingest status for the ProgrammeCurationHistory.
968 @type status: int
969 @param rolledBack: Flag to indicate if the transaction has been
970 rolled back.
971 @type rolledBack: int
972
973 """
974 try:
975 for curUpdate in historyUpdate["ACH"]:
976 row = curUpdate[:6]
977 preComment = ''
978 if curUpdate[1] == 1:
979 preComment = {-1: "no data: ", 0: "failed: ",
980 1: "transferred: "}[status]
981 elif curUpdate[1] == 2 and self._noData:
982 preComment = "no data: "
983 elif curUpdate[1] == 2 and self._noDBData:
984 preComment = "pending: "
985 else:
986 preComment = self._historyCommentPrefix(status, rolledBack)
987 if curUpdate[1] == 2 and comment:
988 curUpdate[6] += comment
989 row.extend([preComment + curUpdate[6], rolledBack])
990 self._updateArchiveCurationHistory(row)
991 self.archive.commitTransaction()
992 except KeyError:
993 IngestLogger.addMessage("<ERROR>: "
994 "Can't update ArchiveCurationHistory, no data found!")
995 except IndexError:
996 IngestLogger.addMessage("<ERROR>: "
997 "Can't update ArchiveCurationHistory, no data available!")
998
999 try:
1000 for progUpdate in historyUpdate["PCH"]:
1001 progUpdate[-1] = status
1002 self._updateProgrammeCurationHistory(progUpdate)
1003 self.archive.commitTransaction()
1004 except KeyError:
1005 IngestLogger.addMessage("<ERROR>: "
1006 "Can't update ProgrammeCurationHistory, no data found!")
1007 except IndexError:
1008 IngestLogger.addMessage("<Warning>: "
1009 "Can't update ProgrammeCurationHistory, no data available!")
1010
1011
1012
1014 """
1015 Since the objectIDs couldn't be calculated beforehand they have to be
1016 updated separately based on their value and the maximal value found in
1017 the database.
1018
1019 @param tableName: Name of the table to update.
1020 @type tableName: str
1021 @param cuEventID: The cuEventID of the ingested data.
1022 @type cuEventID: int
1023
1024 """
1025 IngestLogger.addMessage("Updating objIDs...")
1026 if "Raw20" not in tableName:
1027 maxObjId = self.archive.queryAttrMax("objID", tableName)
1028 num = self.archive.update(tableName, "objID=%s-objID" % maxObjId,
1029 where="cuEventID=%s AND objID<0 AND seqNum>-99999990" % cuEventID)
1030
1031 self.archive.commitTransaction()
1032 IngestLogger.addMessage("... %s rows updated." % num)
1033
1034 return
1035
1036 maxObjId = 0
1037 detTabs = defaultdict()
1038 for detTable \
1039 in self.archive.query("name", "sys.tables", "name LIKE '%Raw20%'"):
1040
1041 minObj, maxObj = self.archive.query("MIN(objID), MAX(objID)",
1042 detTable, "seqNum>-99999990", firstOnly=True, default=(0, 0))
1043
1044 detTabs[detTable] = minObj
1045 maxObjId = max(maxObjId, maxObj)
1046
1047 for detTable in sorted(detTabs):
1048 if detTabs[detTable] < 0:
1049 sql = "DECLARE @myvar bigint; set @myvar = %d; "\
1050 "UPDATE %s SET @myvar = objID = @myvar + 1 "\
1051 "WHERE objID<0 AND seqNum>-99999990" % (maxObjId, detTable)
1052
1053 self.archive._executeScript(sql, wantRowCount=True)
1054 self.archive.commitTransaction()
1055 Logger.addMessage("...%s updated." % detTable)
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091