1
2
3 """
4 Curation session. Provides a class design framework that performs all the
5 actions necessary of any data curation activity for the archive where a
6 log is required to be recorded. Also provides a framework for updating the
7 database correctly. All curation tasks should inherit from this class.
8
9 Usage
10 =====
11
12 Inheriting from CuSession
13 -------------------------
14
15 Simplest, basic design (see L{CuSession} class documentation for explanation
16 of individual aspects)::
17
18 from wsatools.CLI import CLI
19 from wsatools.DbConnect.CuSession import CuSession
20 from wsatools.Logger import Logger
21
22 class MyCu33(CuSession):
23 ''' The first sentence of this description should describe the
24 script's purpose for the command-line interface help screen, and
25 thus be written with the mandatory arguments in context.
26 '''
27 # Override these parameters if defaults not satisfactory
28 cuNum = 33
29 _autoCommit = True
30 _useWorkDir = True
31 _cleanUpDir = True
32
33 def _onRun(self):
34 ''' Tasks this CU performs.
35 '''
36 # perform tasks
37
38 # Executing script
39 cli = CLI(MyCu33.__name__, "$Revision: 9646 $", MyCu33.__doc__)
40 Logger.addMessage(cli.getProgDetails())
41 cu = MyCu33(cli=cli)
42 cu.run()
43
44 Individual curation tasks that inherit this class should override the
45 _onRun() method to define their behaviour, and then invoke the
46 CuSession.run() method to perform the curation task, which performs the
47 exception handling for _onRun(). Override _onException() to perform tidy-up
48 tasks following an exception. For example, to prevent files on the catalogue
49 load server share directory from being deleted on curation failure then
50 override the _onException() set the shareFileID member variable to None::
51
52 def _onException(self):
53 CuSession._onException(self)
54 self.shareFileID = None
55
56 If the task has a mandatory arguments then these should be defined by
57 overriding the constructor. Always call the L{CuSession.__init__()} method
58 first, and expand upon its interface.
59
60 Programme curation considerations
61 ---------------------------------
62
63 If curating a single programme then initialise the curation session with
64 that programme's ID. For curating multiple programmes add programmeIDs to
65 the programmeID member variable at the appropriate time somewhere within
66 the _onRun() method, e.g.::
67
68 def _onRun(self):
69 self.programmeID.add(101)
70
71 If you don't wish for the ProgrammeCurationHistory to be ever updated on
72 curation failure, even with success=no flags, then override the
73 _onException() method to reset the programmeID member variable::
74
75 def _onException(self):
76 CuSession._onException(self)
77 self.programmeID = set()
78
79 @author: R.S. Collins
80 @org: WFAU, IfA, University of Edinburgh
81
82 @warning: If a CuError is thrown the ACH is not updated, but the log-file is
83 still written. On the next run this log-file will be overwritten,
84 unless the log-file mode is append (like for parallel CUs) - which
85 could be confusing, might be a good idea to change this
86 behaviour...
87
88 @todo: Should the success flag be in ACH, and the PCH not updated at all if
89 rolledback = True, in all cases?
90 """
91
92 from __future__ import division, print_function
93
94 import os
95 import shutil
96 import time
97
98 from wsatools.CLI import CLI
99 import wsatools.DataFactory as df
100 import wsatools.DbConnect.DbConstants as dbc
101 from wsatools.DbConnect.DbSession import DbSession, odbc
102 from wsatools.ExternalProcess import Error as ExtError
103 from wsatools.Logger import Logger
104 from wsatools.SystemConstants import SystemConstants
105 import wsatools.Utilities as utils
109 """
110 A curation session. Any data curation activity for the archive where a
111 log is required to be recorded, should either be designed as a class that
112 inherit from this class or else an instance of this class should be invoked
113 by that curation script. This class design ensures a log is correctly
114 written, errors are correctly handled, temporary files are correctly
115 managed and that the database ArchiveCurationHistory and
116 ProgrammeCurationHistory tables are updated if required. Also a database
117 connection, temporary working directory, and programme table objects are
118 provided if required.
119
120 @warning: The destructor won't be called if the instance of a CuSession
121 object is copied to a member variable that stores it (if it used
122 only temporarily then that's OK - either the member variable uses
123 the reference to the CuSession temporarily or the reference to
124 the CuSession is passed to a temporary variable). This is because
125 a circular reference occurs on destruction.
126
127 @note: In the destructor you cannot order the destruction of the archive
128 and log objects by just calling del. Therefore, I've left their
129 order of destruction undefined - the order is not important.
130
131 @group Nested Errors and Exceptions: CuError
132
133 """
135 """ A curation error instead of a programming error. If this is thrown
136 no exception trace is displayed and the ArchiveCurationHistory is
137 not updated. Just designed for an error that prevents a curation
138 task from even beginning.
139 """
140 pass
141
142
143
144
145 _numSessions = 0
146
147
148
149
150 eTypes = set()
151
152 sysc = SystemConstants()
153
154
155
156
157
158 cuNum = dbc.smallIntDefault()
159
160 isDayStampedLog = False
161
162 _autoCommit = DbSession._autoCommit
163
164 _cleanUpDirs = True
165
166 _isPersistent = DbSession._isPersistent
167
168 _reqDbLock = True
169
170 _reqCuEventID = False
171
172 _useWorkDir = False
173
174
175
176
177
178
179
180 comment = 'A descriptive comment'
181
182 curator = os.getenv('USER')
183
184
185
186
187 archive = None
188
189 cuEventID = None
190
191 onlyNonSurveys = False
192
193 onlySurveys = False
194
195 programme = None
196
197 programmeID = None
198
199 resultsFilePathName = dbc.charDefault()
200
201 shareFileID = ''
202
203
204
205
206
207 _cuTable = None
208
209 _lockPathName = None
210
211 _log = None
212
213 _rolledBack = None
214
215 _success = None
216
217
218
219
220 CLI.progArgs.append(CLI.Argument('comment', comment, isOptional=True))
221 CLI.progOpts.append(
222 CLI.Option('c', 'curator', 'username of curator', 'NAME', curator))
223
224
225
229 """
230 Writes a database lock and opens a connection. Obtains a unique
231 curation event ID and prepares a Logger object to hold the verbose
232 record of this curation event, whilst reserving a placeholder in the
233 database ArchiveCurationHistory to log the record of this curation
234 event. A temporary working directory is created, if required, and all
235 member variables are initialised.
236
237 @param programmeID: Optionally supply the unique identifier of a
238 programme or survey name initials to curate
239 selected programme. Set to 'NONSURVEYS' if just
240 curating non-surveys or 'SURVEYS' for surveys only.
241 @type programmeID: str
242 @param curator: Name of curator.
243 @type curator: str
244 @param comment: Descriptive comment as to why curation task is
245 being performed.
246 @type comment: str
247 @param database: Name of the database to connect to.
248 @type database: str
249 @param isTrialRun: If True, do not perform database modifications,
250 just print the SQL statement to the terminal.
251 @type isTrialRun: bool
252 @param userName: Optionally override default database username.
253 @type userName: str
254 @param cli: Optionally initialise using the settings supplied
255 by the command-line options.
256 @type cli: L{CLI.CLI}
257
258 """
259 if CuSession._numSessions > 0:
260 Logger.addMessage(
261 "<Info> Waiting for old CuSessions to be deleted...")
262 time.sleep(30)
263 if CuSession._numSessions > 0:
264
265
266
267 Logger.addMessage(
268 "<Warning> Cannot have more than one CuSession object "
269 "running concurrently. Delete the old object before "
270 "creating the new object.")
271 else:
272 CuSession._numSessions += 1
273
274 if cli:
275 curator = cli.getOpt("curator")
276 comment = cli.getArg("comment")
277 database = cli.getArg("database")
278 isTrialRun = cli.getOpt("test")
279 userName = cli.getOpt("user")
280
281
282
283
284 serverDb = (database if '.' in database else
285 SystemConstants(database).loadServer + '.' + database)
286
287
288 programmeID = programmeID.replace('/', '')
289 self._lockPathName = os.path.join(self.sysc.sysPath,
290 "dblock-%sCU%sp%s" % (serverDb, self.cuNum, programmeID or 'ALL'))
291
292 try:
293 if self._reqDbLock and not isTrialRun:
294 self._prepareDbLock()
295
296 self.archive = DbSession(database, self._autoCommit, isTrialRun,
297 userName, self._isPersistent)
298 except Exception as error:
299
300
301
302 raise SystemExit(error)
303
304 CuSession.sysc = self.archive.sysc
305
306
307 self.curator = curator
308 if not isTrialRun:
309 self.cuEventID = self._getNextCuEventID()
310 Logger.addMessage("Starting cuEventID: %s" % self.cuEventID)
311
312
313 dayStamp = time.ctime().split()[0] if self.isDayStampedLog else ''
314 self._log = Logger(fileName='%s_cu%sid%s%s.log' %
315 (self.archive.database, self.cuNum, self.cuEventID, dayStamp))
316
317 elif self._reqCuEventID:
318 self.cuEventID = 123456789
319 Logger.addMessage("Starting TRIAL cuEventID: %s" % self.cuEventID)
320
321 self.shareFileID = '%sCuID%06d' % (self.archive.database,
322 self.cuEventID if self.cuEventID else 0)
323
324
325 if self.cuNum != dbc.smallIntDefault():
326 try:
327 self._cuTable = df.CurationTaskTable(self.archive, self.cuNum)
328 Logger.addMessage("Running Curation Use case:")
329 Logger.addMessage(self._cuTable.getDescription())
330 except ValueError as error:
331 Logger.addMessage("<Warning> %s" % error)
332 self._cuTable = None
333
334
335 self.onlyNonSurveys = (programmeID.upper() == 'NONSURVEYS')
336 self.onlySurveys = (programmeID.upper() == 'SURVEYS')
337 self.programme = df.ProgrammeTable(self.archive)
338 if not programmeID or self.onlyNonSurveys or self.onlySurveys:
339 self.programmeID = set()
340 progAcronym = (programmeID or "ALL").upper()
341 if not programmeID:
342 Logger.addMessage("for all programmes")
343 elif self.onlyNonSurveys:
344 Logger.addMessage("for all non-survey programmes")
345 else:
346 Logger.addMessage("for all survey programmes")
347 else:
348 self.programmeID = self.programme.setProgID(programmeID)
349 self.onlyNonSurveys = self.programme.isNonSurvey()
350 self.onlySurveys = not self.programme.isNonSurvey()
351 progAcronym = self.programme.getAcronym(self.programmeID).upper()
352 Logger.addMessage("for programme %s: " % progAcronym
353 + self.programme.getName())
354
355 Logger.addMessage("on curation server: " + os.getenv('HOST'))
356
357 self.comment = "[%s] " % progAcronym + (("Running CU%s" % self.cuNum)
358 if comment is CuSession.comment else comment)
359
360
361 if self._useWorkDir:
362 workSpace = self.sysc.tempWorkPath()
363 if os.path.exists(workSpace):
364 os.rename(workSpace, workSpace + '.old')
365 Logger.addMessage('<Warning> curationClientWorkPath %s exists,'
366 ' moved to %s.old' % (workSpace, workSpace))
367 os.mkdir(workSpace)
368
369
370
464
465
466
468 """ Executes the _onRun() method of the curation session within a
469 try block, so that all the standard exceptions may be dealt with
470 correctly.
471
472 @todo: Replace init and del with a context manager for database
473 connections and locks, implemented as a with-block in this
474 function.
475 """
476 try:
477 self._onRun()
478 self.archive.commitTransaction()
479 self._rolledBack = False
480 self._success = True
481 except BaseException as error:
482
483 with utils.noInterrupt():
484 stdErrMsg = CuSession.logException(error)
485 if error.args and self._log:
486 stdErrMsg += ": see log " + self._log.pathName
487
488 self._onException()
489 if isinstance(error, CuSession.CuError) and \
490 not any(self.archive._isLogReq.values()):
491 self._success = None
492
493
494 raise SystemExit(stdErrMsg)
495
496
497
498 @staticmethod
500 """
501 Logs the given trapped exception.
502
503 @param error: Exception caught.
504 @type error: Exception
505
506 @return: Derived error type (for full list see L{CuSession.eTypes}).
507 @rtype: str
508
509 """
510 wantTrace = False
511 if isinstance(error, SystemExit):
512 eType = SystemExit.__name__
513 elif isinstance(error, odbc.Error):
514 eType = "SQL or Database Error"
515 elif isinstance(error, ExtError):
516 eType = "External-process Error"
517 elif isinstance(error, CuSession.CuError):
518 eType = "Curation Failure"
519 elif isinstance(error, KeyboardInterrupt):
520 eType = KeyboardInterrupt.__name__
521
522 wantTrace = (os.getenv("USER") == "scos")
523 if not wantTrace:
524 prompt = "Would you like to see trace (yes/no)? >"
525 answer = None
526 while answer is None:
527 try:
528 answer = raw_input(prompt)
529 except KeyboardInterrupt:
530 print('')
531 except EOFError:
532
533 answer = "no"
534 break
535 wantTrace = not answer or 'y' in answer.lower()
536 else:
537 sysErrors = (OSError, IOError, DbSession.CatalogueServerError)
538 eType = ("System Failure" if isinstance(error, sysErrors) else
539 "Programming Error")
540 wantTrace = True
541
542 if not str(error).startswith(tuple(CuSession.eTypes)):
543
544
545 Logger.addExceptionMessage(error, eType, wantTrace)
546
547
548 CuSession.eTypes.add(eType)
549
550 return eType
551
552
553
578
579
580
582 """ Curation sessions that inherit from this class should override this
583 method, executing within all of the tasks that required of this
584 curation session.
585 """
586 pass
587
588
589
591 """ Attempts to rollback transaction and sets the database state flags,
592 so that the destructor realises the curation session failed.
593 Can override in subclasses to tidy up run tasks specific to that CU
594 following an exception, but always call this method.
595 """
596 self._success = False
597 self._rolledBack = not any(self.archive._isLogReq.values())
598
599 self.archive.rollbackTransaction()
600
601
602
604 """
605 Obtain the next available cuEventID. Whilst putting a placeholder row
606 into the ArchiveCurationHistory table to reserve this cuEventID,
607 preventing it from being hijacked by a parallel process.
608
609 """
610 cuEventID = self._getNextID(dbc.curationEventUIDAttributeName(),
611 dbc.archiveCurationHistoryTableName())
612
613 if self.sysc.isVSA():
614 dbID = 2 if self.archive.database == "VSAVVV" else 1
615 cuEventID = 10 * (cuEventID // 10 + 1) + dbID
616
617 isLogReq = self.archive._isLogReq[self.archive.database]
618
619
620 try:
621 self.archive.insertData(dbc.archiveCurationHistoryTableName(),
622 rowData=[cuEventID, self.cuNum, dbc.charDefault(),
623 dbc.charDefault(), utils.makeMssqlTimeStamp(),
624 self.curator, "Placeholder for CU currently "
625 "in progress", dbc.yes()],
626 enforcePKC=True)
627
628 self.archive.commitTransaction()
629
630 except odbc.InternalError as error:
631 if "database is read-only" not in str(error):
632 raise
633
634 raise SystemExit("<ERROR> Cannot curate databases that are mounted"
635 " read-only. Even if the CU only performs outgests"
636 " it still needs write permissions to construct"
637 " temporary views to enable BCP to work.")
638
639 except odbc.ProgrammingError as error:
640 if "permission was denied" not in str(error):
641 raise
642
643 raise SystemExit("<ERROR> You do not have write permission "
644 "on database " + self.archive.database)
645 finally:
646 self.archive._isLogReq[self.archive.database] = isLogReq
647
648 return cuEventID
649
650
651
652 - def _getNextID(self, attrName, tableName, where=''):
653 """
654 Obtain next available ID number in the db for a given ID attribute.
655
656 @param attrName: Name of ID attribute.
657 @type attrName: str
658 @param tableName: Name of the table where attribute resides.
659 @type tableName: str
660 @param where: Optional SQL WHERE clause.
661 @type where: str
662
663 @return: Next available ID number.
664 @rtype: int
665
666 """
667 maxID = self.archive.queryAttrMax(attrName, tableName, where)
668
669 if maxID:
670 return maxID + 1
671 else:
672 return 1
673
674
675
684
685
686
688 """
689 Adds an entry for this curation session to the ArchiveCurationHistory
690 database table.
691
692 """
693 row = self.cuEventID, self.cuNum, self._log.pathName, \
694 self.resultsFilePathName, utils.makeMssqlTimeStamp(), \
695 self.curator, self.comment, (dbc.yes() if self._rolledBack
696 else dbc.no())
697
698 Logger.addMessage("Updating archive curation history with this row:")
699 Logger.addMessage(repr(row))
700
701 self._scrubACHPlaceHolder()
702 self.archive.insertData(dbc.archiveCurationHistoryTableName(), row)
703
704 if self._cuTable:
705 Logger.addMessage("completed " + self._cuTable.getDescription())
706
707
708
710 """
711 Adds an entry for this curation session to the ProgrammeCurationHistory
712 database table.
713
714 """
715 if not self.programmeID:
716 return
717 elif type(self.programmeID) is not set:
718 self.programmeID = [self.programmeID]
719
720 if len(self.programmeID) > 1:
721 Logger.addMessage(
722 "for programmes: " + utils.numberRange(self.programmeID))
723 else:
724 for progID in self.programmeID:
725 Logger.addMessage("for programme %s: %s" %
726 (progID, self.programme.getName(programmeID=progID)))
727
728 tableName = dbc.programmeCurationHistoryTableName()
729 row = self.cuEventID, utils.makeMssqlTimeStamp(), \
730 (dbc.yes() if self._success else dbc.no())
731
732 Logger.addMessage("Updating programme curation history with this row:")
733 Logger.addMessage(repr(row))
734
735 for progID in self.programmeID:
736 self.archive.insertData(tableName, (progID,) + row)
737
738
739
741 """
742 Writes database lock. If lock already present then pause and wait for
743 database to unlock.
744
745 """
746 numTries = 0
747 maxTries = 100
748 delay = 2
749 while os.path.exists(self._lockPathName):
750 time.sleep(delay)
751 numTries += 1
752 if numTries == 1:
753 print("A lock file exists: " + self._lockPathName)
754 print("End the task that owns the lock and/or manually "
755 "delete the lock file and this curation session will "
756 "automatically start.")
757 elif numTries > maxTries:
758 raise Exception(
759 "Timed out in waiting for database lock to be freed")
760
761 file(self._lockPathName, 'w').write("DB locked by CU%s. PID = %s.%s" %
762 (self.cuNum, os.getenv('HOST'), os.getpid()))
763
764
765
766
767
768
769