Package wsatools :: Package DbConnect :: Module DbSession
[hide private]

Source Code for Module wsatools.DbConnect.DbSession

   1  #------------------------------------------------------------------------------ 
   2  #$Id: DbSession.py 10300 2014-05-01 13:47:13Z RossCollins $ 
   3  """ 
   4     Database interface. The L{DbSession} class provides a complete database API 
   5     as well as connection management features. A database connection is 
   6     instantiated upon construction of a L{DbSession} object and then 
   7     automatically closed when the object falls out of scope. Existing 
   8     identical connections are reused, invisibly to the user, to reduce the 
   9     number of simultaneous open connections. All access to the database should 
  10     then be via the methods supplied by the L{DbSession} class. 
  11   
  12     Usage 
  13     ===== 
  14   
  15     Database API 
  16     ------------ 
  17   
  18     Import as:: 
  19         from wsatools.DbConnect.DbSession import DbSession 
  20   
  21     For a simple default database connection to the WSA:: 
  22   
  23         db = DbSession("WSA") 
  24         db.update("Multiframe", "deprecated=0", where="multiframeID") 
  25         print(db.queryNumRows("Multiframe")) 
  26         print(db.queryAttrMax("multiframeID", "Multiframe")) 
  27   
  28     For joined queries, e.g. a list of all LAS stacks multiframeIDs, use the 
  29     L{Join} class:: 
  30   
  31         from wsatools.DbConnect.DbSession import Join 
  32   
  33         mfIDs = db.query("Multiframe.multiframeID", 
  34                  table=Join(["Multiframe", "ProgrammeFrame"], ["multiframeID"]), 
  35                  where="frameType LIKE '%stack' AND programmeID=101") 
  36   
  37     For queries where a select statement is used within a where clause, use the 
  38     L{SelectSQL} class:: 
  39   
  40         from wsatools.DbConnect.DbSession import SelectSQL 
  41   
  42         progMfs = SelectSQL("multiframeID", table="ProgrammeFrame", 
  43                             where="programmeID=%s" % self.programmeID) 
  44         fileNames = db.query("fileName", table="Multiframe", 
  45                              where="multiframeID IN %s" % progMfs) 
  46   
  47     To perform a safe temporary disconnect from the database invoke the 
  48     L{DbSession.goOffline()} method. When any L{DbSession} object that uses this 
  49     connection next attempts to access the database, e.g. a query, the 
  50     connection will be reactivated, and if the database is down, then it will 
  51     persistently attempt to reconnect. Hence, you can also use this method at 
  52     sensitive stages of a curation script to ensure the database connection 
  53     remains alive. 
  54   
  55     Please refer to the L{DbSession} class documentation for the full range of 
  56     API methods and options. 
  57   
  58     Ingester 
  59     -------- 
  60   
  61     Import as:: 
  62         from wsatools.DbConnect.DbSession import Ingester 
  63   
  64     Before ingesting it is vital to ensure the schema of the ingest file matches 
  65     the table into which it will be ingesting. This occurs when an L{Ingester} 
  66     object is created, so this should be done as early as possible in your code, 
  67     and not within a For-Loop unnecesarily. To instantiate an L{Ingester} object 
  68     you must pass it a L{DbSession} database connection to the database that 
  69     hosts the table into which the ingest will occur, and a schema for the 
  70     table(s) that will be ingested into during the lifetime of this particular 
  71     L{Ingester} object:: 
  72   
  73         db = DbSession() 
  74         ingester = Ingester(db, tableSchema) 
  75   
  76     The tableSchema is just a list of L{Schema.Table} objects for every table 
  77     that will be ingested this session. Use the L{Schema.parseTables()} method 
  78     to create this schema, and then reduce or expand the list until just the 
  79     exact set of tables to be ingest are listed. To ingest a native binary file 
  80     located in the catalogue server's share path into the table:: 
  81   
  82         numRows = ingester.ingestTable("lasSource", "lasSource.nat", idxInfo) 
  83   
  84     Afterward the ingest file is deleted. The idxInfo is created by 
  85     L{Schema.parseIndices()} and is required if you wish to drop indices for 
  86     ingest, to automatically recreate them afterward. If the database table 
  87     doesn't already exist the ingester will create the table. 
  88   
  89     Please refer to the L{Ingester} class documentation for further options. 
  90   
  91     Outgester 
  92     --------- 
  93   
  94     Import as:: 
  95         from wsatools.DbConnect.DbSession import Outgester, SelectSQL 
  96   
  97     To initialise an L{Outgester} object you must supply it with a connection to 
  98     the database where the outgest will occur:: 
  99   
 100         db = DbSession() 
 101         sourceOutgester = Outgester(db) 
 102   
 103     Then to outgest the results of a particular table query to a binary file:: 
 104   
 105         filePathName = sourceOutgester.outgestQuery(SelectSQL( 
 106           select="*", table="WSA.dbo.Multiframe", where="deprecated = 0")) 
 107   
 108     By default the file is outgested to the load server share path, and there is 
 109     a delay following outgest to allow for the NFS to see the shared file. If 
 110     an alternative path is given outside of the share path then there is no 
 111     delay to wait for the NFS to see the file. 
 112   
 113     Please refer to the L{Outgester} class documentation for further options. 
 114   
 115     Creating and modifying databases 
 116     -------------------------------- 
 117   
 118     The L{Database} class provides methods to create and modify databases. 
 119   
 120     Import as:: 
 121         from   wsatools.DbConnect.DbSession import Database, DbSession 
 122         from   wsatools.SystemConstants     import WsaConstants 
 123   
 124     To recreate the WSA with standard options on server "pharaoh":: 
 125         WSA = Database(WsaConstants.loadDatabase, 
 126                        filegroups=[("Detection", "4 GB"), 
 127                                    ("Source", "4 GB"), 
 128                                    ("Curation", "4 GB"), 
 129                                    ("Indices", "4 GB")]) 
 130         WSA.create(DbSession("pharaoh."+WsaConstants.adminDatabase)) 
 131   
 132     Please refer to the L{Database} class documentation for further options. 
 133   
 134     @author: R.S. Collins 
 135     @org:    WFAU, IfA, University of Edinburgh 
 136   
 137     @todo: Make method keyword arguments consistent. The query() interface 
 138            should be attrs, table, where; and this naming convention should be 
 139            stuck to as much as possible in other methods. Should update() use 
 140            attrs or entry instead of entryList? 
 141     @todo: Cut down code duplication between updateEntries and deleteRows 
 142            + possibly between updateEntries + update, ditto for deleteRows 
 143     @todo: Retrospectively upgrade code using queryNumRows() to make use of new 
 144            features if necessary. 
 145     @todo: Upgrade code to make use of join feature, and upgrade documentation 
 146            of query*() methods to reflect this. 
 147     @todo: Upgrade code to make use of query() returning namedtuples. 
 148     @todo: addColumn / dropColumn should be merged with add/drop constraint, and 
 149            so be designed like createObject/dropObject. 
 150     @todo: Password option? 
 151  """ 
 152  #------------------------------------------------------------------------------ 
 153  from __future__      import division, print_function 
 154  from future_builtins import map, zip 
 155   
 156  from   collections  import defaultdict, namedtuple 
 157  from   itertools    import takewhile 
 158  from   keyword      import iskeyword 
 159  import mx.ODBC.unixODBC as odbc 
 160  from   operator     import itemgetter 
 161  import os 
 162  import shutil 
 163  import sys 
 164  import time 
 165   
 166  from   wsatools.CLI               import CLI 
 167  import wsatools.CSV                   as csv 
 168  import wsatools.DbConnect.DbConstants as dbc 
 169  import wsatools.DbConnect.Schema      as schema 
 170  from   wsatools.Logger            import Logger 
 171  # @@TODO: WsaConstants imported for backwards compatibility only... 
 172  from   wsatools.SystemConstants   import SystemConstants, WsaConstants 
 173  import wsatools.Utilities             as utils 
174 #------------------------------------------------------------------------------ 175 176 -class DbSession(object):
177 """ 178 An interface to our database. Manages the mxODBC connection, and provides a 179 set of wrappers to common SQL statements. 180 181 @note: Test database connections get read-write user access by default. 182 183 @group Errors & Exceptions: CatalogueServerError, DisconnectError 184 185 """
186 - class CatalogueServerError(Exception):
187 """ An error generated by a command run directly on the catalogue 188 server. 189 """ 190 pass
191
192 - class DisconnectError(Exception):
193 """ Exception thrown if a programmer tries to access a database 194 connection that is closed, to either attempt close it or 195 commit/rollback transactions. This is a programming error rather 196 than an exception caused by an unexpected loss of database 197 connection. 198 """
199 - def __init__(self, db, msg=''):
200 """ 201 @param db: Name of database. 202 @type db: str 203 @param msg: Optional specific message for this exception. 204 @type msg: str 205 206 """ 207 msg = msg.strip().rstrip('.') 208 super(DbSession.DisconnectError, self).__init__( 209 "%sAlready disconnected from database %s" % 210 (msg + '. ' if msg else msg, db))
211 212 #-------------------------------------------------------------------------- 213 # Class-scope member data - all instances have the same value 214 215 _dbConnections = {} #: Session manager. Stores one session per database. 216 _isLogReq = {} 217 """ Dictionary of flags to indicate a database has been modified, 218 referenced by database name. 219 """ 220 _numSessions = {} #: Counter to track number of sessions per database. 221 sysc = SystemConstants() #: System constants for this DbSession. 222 223 #-------------------------------------------------------------------------- 224 # Define public member variable default values (access as obj.varName) 225 226 #: Name of the database to connect to this session. 227 database = WsaConstants.loadDatabase # @@TODO: Remove default value. 228 #: Is this a database connection to the load database? 229 isLoadDb = True 230 #: Is this a database connection to the load database by the operator? 231 isRealRun = True 232 #: For statements that modify the database print SQL instead of executing. 233 isTrialRun = False 234 #: Server that hosts the database to connect to. 235 server = WsaConstants.loadServer # @@TODO: Remove default value. 236 237 #: Username with which to use for this session's database connection. 238 userName = (dbc.loadServerRoUsername() if os.getenv('USER') != 'scos' else 239 dbc.loadServerRwUsername()) 240 241 #-------------------------------------------------------------------------- 242 # Define private member variable default values (access prohibited) 243 244 _autoCommit = False 245 """ Automatically commit transactions, minimally logged so faster. 246 """ 247 _dbSessionKey = None 248 """ A string that defines this connection as a unique type. This is used to 249 determine if an identical connection already exists for reuse. 250 """ 251 _fixedCopyBug = False #: Has workaround for multiple copies been applied? 252 _isDeadlocked = False #: Database is currently deadlocked? 253 _isDirty = False #: Is this connection performing dirty reads? 254 _isPersistent = True #: Persistently reconnect if broken? 255 256 #-------------------------------------------------------------------------- 257 # Define generic command-line interface settings for a DbSession 258 259 CLI.progArgs.append(CLI.Argument("database", server + '.' + database, 260 isValOK=lambda x: x.count('.') <= 1)) 261 CLI.progOpts += [ 262 CLI.Option('t', "test", 263 "test run, don't alter database, just print statements to screen"), 264 CLI.Option('u', "user", 265 "database username", "NAME", userName)] 266 267 #-------------------------------------------------------------------------- 268
269 - def __init__(self, database=server + '.' + database, autoCommit=_autoCommit, 270 isTrialRun=isTrialRun, userName=userName, 271 isPersistent=_isPersistent, cli=None):
272 """ 273 Makes connection to the requested database. 274 275 @param database: Name of the database to connect to, prepend with 276 server name if not load server. For example, 277 "amenhotep.TWOMASS". 278 @type database: str 279 @param autoCommit: If True, every transaction is instantly committed. 280 This speeds up transactions through minimal logging. 281 @type autoCommit: bool 282 @param isTrialRun: If True, do not perform database modifications, just 283 print the SQL statement to the terminal. 284 @type isTrialRun: bool 285 @param userName: Optionally override default database username. 286 @type userName: str 287 @param isPersistent: If True then persistent reconnects are made to the 288 database if an already open connection is dropped. 289 Only applied in autoCommit=True mode. 290 @type isPersistent: bool 291 @param cli: Optionally initialise using the settings supplied by the 292 command-line options. 293 @type cli: L{CLI.CLI} 294 295 """ 296 if cli: 297 database = cli.getArg("database") 298 isTrialRun = cli.getOpt("test") 299 userName = cli.getOpt("user") 300 301 if '.' in database: 302 self.server, self.database = database.split('.', 1) 303 else: 304 self.server = SystemConstants(database).loadServer 305 self.database = database 306 307 self.sysc = SystemConstants(self.database) 308 309 # True if subclass has changed default behaviour to autoCommit = True, 310 # OR if turned on a constructor argument, otherwise False. NB: This 311 # assumes default behaviour is False and that other CUs turn it to True 312 self._autoCommit = self._autoCommit or autoCommit 313 self._isPersistent = self._isPersistent or isPersistent 314 self.isLoadDb = self.database.startswith(self.sysc.loadDatabase) 315 self.isRealRun = os.getenv('USER') == 'scos' and self.isLoadDb 316 self.isTrialRun = isTrialRun 317 318 # Override default database username for test databases 319 self.userName = (dbc.loadServerRwUsername() 320 if (userName is dbc.loadServerRoUsername() 321 and "test" in self.database.lower()) else 322 userName) 323 324 #: Specifies a unique database connection 325 self._dbSessionKey = (self.server + self.database + self.userName + 326 str(self._autoCommit)) 327 328 # If an identical session already exists - no need for a new connection 329 if self._dbSessionKey in DbSession._dbConnections: 330 DbSession._numSessions[self._dbSessionKey] += 1 331 else: 332 self._openConnection() 333 DbSession._numSessions[self._dbSessionKey] = 1 334 if self.database not in self._isLogReq: 335 self._isLogReq[self.database] = False
336 337 #-------------------------------------------------------------------------- 338
339 - def __del__(self):
340 """ Closes database connection, committing uncommited transactions. """ 341 # If a connection was successfully made 342 if self._dbSessionKey in DbSession._dbConnections: 343 DbSession._numSessions[self._dbSessionKey] -= 1 344 if DbSession._numSessions[self._dbSessionKey] == 0: 345 try: 346 self._closeConnection() 347 except DbSession.DisconnectError: 348 pass 349 # Clear closed connections from the list 350 del DbSession._dbConnections[self._dbSessionKey]
351 352 #-------------------------------------------------------------------------- 353
354 - def __str__(self):
355 """ @return: Database path. 356 @rtype: str 357 """ 358 return self.server + '.' + self.database
359 360 #-------------------------------------------------------------------------- 361
362 - def addColumn(self, tableName, colName, dataType, defaultStr=None, 363 notNull=True, constraint=None):
364 """ 365 Adds a column to a table in the database. 366 367 @param tableName: Name of the table. 368 @type tableName: str 369 @param colName: Name of the new column. 370 @type colName: str 371 @param dataType: SQL data type of the new attribute. 372 @type dataType: str 373 @param defaultStr: Optional default value, in the form of an SQL string 374 e.g. "'none'" or "0". 375 @type defaultStr: str 376 @param notNull: If True, column disallows null values. 377 @type notNull: bool 378 @param constraint: Optional constraint name. Defaults to 379 tableName + colName + '_cons'. 380 @type constraint: str 381 382 """ 383 sql = "ALTER TABLE %s ADD %s %s" % (tableName, colName, dataType) 384 if notNull: 385 sql += " not null" 386 if defaultStr: 387 constraint = constraint or tableName + colName + '_cons' 388 sql += " CONSTRAINT %s DEFAULT %s" % (constraint, defaultStr) 389 390 try: 391 self._executeScript(sql) 392 except odbc.ProgrammingError as error: 393 if notNull: 394 raise Exception("Cannot add a column with 'not null' specified" 395 " without specifying default values if data already exists" 396 " in the table. mxODBC says: (%s)" % error)
397 398 #-------------------------------------------------------------------------- 399
400 - def addIndex(self, indexSchema, ignoreNS=False, releasedOnly=False, 401 usingDefaultFG=False):
402 """ 403 Adds an index to a specified table. 404 405 @param indexSchema: Object describing the schema of the index to add. 406 @type indexSchema: L{Schema.Index} 407 @param ignoreNS: If True, upon a "no such" error message reporting 408 non-existent tables, attributes etc. log the 409 message and continue, with False return state (no 410 object created). 411 @type ignoreNS: bool 412 @param releasedOnly: If True, only create the object if it is to be 413 released (i.e. upper case CREATE statements). 414 @type releasedOnly: bool 415 @param usingDefaultFG: If False, use the file group "Indices_FG". 416 @type usingDefaultFG: bool 417 418 @return: True, if index has been created. 419 @rtype: bool 420 421 """ 422 if not releasedOnly or indexSchema.releasable: 423 Logger.addMessage("Creating index %s on table %s..." % 424 (indexSchema.name, indexSchema.tableName)) 425 if usingDefaultFG: 426 indexSchema.fileGroup = None 427 try: 428 return self.createObjects([indexSchema], releasedOnly=releasedOnly) 429 except odbc.ProgrammingError as error: 430 ignoreCases = [" does not exist in the target table.", 431 " table does not exist in database "] 432 if ignoreNS and any(case in str(error) for case in ignoreCases): 433 Logger.addExceptionWarning(error, "Database message") 434 return False 435 else: 436 raise
437 438 #-------------------------------------------------------------------------- 439
440 - def alterColumn(self, tableName, colName, dataType, defaultStr=None, 441 notNull=True, constraint=None):
442 """ 443 Alter a column of a table in the database. 444 445 @param tableName: Name of the table. 446 @type tableName: str 447 @param colName: Name of the existing column. 448 @type colName: str 449 @param dataType: Desired SQL data type for the column. 450 @type dataType: str 451 @param defaultStr: Optional default value, in the form of an SQL string 452 e.g. "'none'" or "0". 453 @type defaultStr: str 454 @param notNull: If True, column disallows null values. 455 @type notNull: bool 456 @param constraint: Optional constraint name. Defaults to 457 tableName + colName + '_cons'. 458 @type constraint: str 459 460 @todo: Should become more schema driven together with L{addColumn()}. 461 462 """ 463 sql = "ALTER TABLE %s ALTER COLUMN %s %s%s" \ 464 % (tableName, colName, dataType, " not null" if notNull else '') 465 if defaultStr: 466 constraint = constraint or tableName + colName + '_cons' 467 sql += " CONSTRAINT %s DEFAULT %s" % (constraint, defaultStr) 468 469 self._executeScript(sql)
470 471 #-------------------------------------------------------------------------- 472
473 - def checkConstraints(self, tableName):
474 """ 475 Checks the database constraints on specified table. Logs any constraint 476 violations and throws an exception if any are detected. 477 478 @param tableName: Name of the table. 479 @type tableName: str 480 481 """ 482 # Use a lock file to avoid deadlocks 483 lockPathName = os.path.join(self.sysc.sysPath, "conlock-" + self.server) 484 if os.path.exists(lockPathName): 485 Logger.addMessage("<Info> Another CU is checking constraints on %s" 486 ". Waiting for %s to disappear..." % (self.server, lockPathName)) 487 while os.path.exists(lockPathName): 488 time.sleep(10) 489 490 Logger.addMessage( 491 "Checking constraints on table %s (may hang if locked out by " 492 "other CUs also accessing %s.tempdb)" % (tableName, self.server)) 493 494 # Just log warning if log file can't be written? 495 file(lockPathName, 'w').write("Locking PID = %s.%s" % 496 (os.getenv('HOST'), os.getpid())) 497 498 try: 499 violations = self._executeScript( 500 "DBCC CHECKCONSTRAINTS (%r)" % tableName, wantResult=True) 501 finally: 502 os.remove(lockPathName) 503 504 if violations: 505 # Log the offending violations and then raise an exception: 506 Logger.addMessage("<Warning> Constraint check failed.") 507 msg = "Constraint violations:" 508 violationData = defaultdict(list) 509 for table, constraint, case in violations: 510 key = (table.strip().strip('[').strip(']'), 511 constraint.strip().strip('[').strip(']')) 512 violationData[key].append(case) 513 for violation in violationData: 514 msg += "\nIn table %r, constraint %r fails in these cases:\n" \ 515 % violation + '\n'.join(violationData[violation]) 516 517 raise odbc.IntegrityError(msg)
518 519 #-------------------------------------------------------------------------- 520
521 - def checkSchema(self, tableSchema, releasedOnly=False):
522 """ 523 Compares the given table schema as described in the .sql files to the 524 actual database schema to spot inconsistencies. If there any problems 525 a L{Schema.MismatchError} exception is thrown. 526 527 @param tableSchema: Schema of tables to check as provided by 528 L{Schema.parseTables()}. 529 @type tableSchema: list(Schema.Table) 530 @param releasedOnly: If True, only check released database tables. 531 @type releasedOnly: bool 532 533 """ 534 # Read in schema attributes from the schema file 535 errMsg = '' 536 errSchema = [] 537 for table in tableSchema: 538 if not releasedOnly or table.releasable: 539 # Obtain list of attributes in present db table 540 dbAttrs = self.queryColumnNames(table.name) 541 # Dictionary of data types for db attributes 542 dbTypes = self.queryDataTypes(table.name) 543 # Dictionary of data types for schema attributes 544 scTypes = dict((attr.name, attr.dataType) 545 for attr in table.columns) 546 547 # Find any extra schema attributes that are not in the database 548 exScAttrs = [attr.name for attr in table.columns 549 if attr.name not in dbAttrs] 550 if exScAttrs: 551 errMsg += "\n%s - " % table 552 if len(exScAttrs) == len(table.columns): 553 errMsg += "table does not exist in the database" 554 else: 555 errMsg += ( 556 "the following attributes have no matching columns " 557 "in the database: " + ', '.join(exScAttrs)) 558 errSchema.append(table) 559 560 # Find any extra database attributes that are not in the schema 561 exDbAttrs = [attrName for attrName in dbAttrs 562 if attrName not in table.columns] 563 if exDbAttrs: 564 errMsg += ( 565 "\n%s - the following database columns no longer exist " 566 "in the current schema: " % table + ', '.join(exDbAttrs)) 567 errSchema.append(table) 568 569 # Before the next task remove additional attributes already 570 # discovered from the lists 571 reScAttrs = [attr.name for attr in table.columns 572 if attr not in exScAttrs] 573 reDbAttrs = [attrName for attrName in dbAttrs 574 if attrName not in exDbAttrs] 575 576 # Check that the column order is correct 577 colSwapList = [(scAttr, dbAttr) for scAttr, dbAttr 578 in zip(reScAttrs, reDbAttrs) 579 if scAttr != dbAttr] 580 if colSwapList: 581 errMsg += ( 582 "\n%s - the order of these database columns and schema " 583 "attributes are swapped: " % table + 584 utils.joinNested(colSwapList, subJoinStr="<->")) 585 errSchema.append(table) 586 587 # Finally check that the data-types are the same 588 typeDifList = \ 589 [(attrName, scTypes.get(attrName), dbTypes.get(attrName)) 590 for attrName in reDbAttrs 591 if scTypes.get(attrName) != dbTypes.get(attrName)] 592 593 if typeDifList: 594 errMsg += ( 595 "\n%s - these attributes have a different data type in " 596 "the database than described by the current schema: " % 597 table + utils.joinNested(typeDifList, subJoinStr=":")) 598 errSchema.append(table) 599 600 if errMsg: 601 raise schema.MismatchError(errMsg, errSchema)
602 603 #-------------------------------------------------------------------------- 604
605 - def commitTransaction(self):
606 """ Manually commit a transaction if auto-commit is switched off. """ 607 if not self._autoCommit and not self.isTrialRun: 608 connection, _cursor = DbSession._dbConnections[self._dbSessionKey] 609 if not connection: 610 raise DbSession.DisconnectError(self.database, 611 "Cannot commit transactions.") 612 connection.commit() 613 self._isLogReq[self.database] = True
614 615 #-------------------------------------------------------------------------- 616
617 - def copyIntoTable(self, destinationTable, sourceTable, columns='*', 618 where=''):
619 """ 620 Copy a selection of rows from one table into another. 621 622 @param destinationTable: Table to copy rows into. 623 @type destinationTable: str 624 @param sourceTable: Table to copy from, or a L{Join} object of 625 several tables. 626 @type sourceTable: str or L{Join} 627 @param columns: Optional select a subset of columns, as a 628 comma-separated list. 629 @type columns: str 630 @param where: Optional SQL where clause defining the 631 selection of rows to copy. 632 @type where: str 633 634 @return: Number of rows copied. 635 @rtype: int 636 637 @note: This command is heavily transaction logged. 638 639 """ 640 sql = "INSERT INTO %s %s" % \ 641 (destinationTable, SelectSQL(columns, sourceTable, where)) 642 643 # NB: Can't do a SETUSER to pubdbrw here if copying to a remote 644 # server - SQL Server doesn't allow users set by SETUSER to copy over 645 # to remote servers, so instead, must be logged in as pubdbrw already. 646 return self._executeScript(sql, wantRowCount=True)
647 648 #-------------------------------------------------------------------------- 649
650 - def copyTable(self, sourceTable, tableSchema, columns='*', where='', 651 fileGroup='', attachForeignKeys=True):
652 """ 653 Create a new table from the selected contents of another table. 654 655 @param sourceTable: Table to copy from, or a L{Join} object of 656 several tables. 657 @type sourceTable: str or L{Join} 658 @param tableSchema: Schema of table to copy rows into that will be 659 created in this database. 660 @type tableSchema: L{Schema.Table} 661 @param columns: Optional select a subset of columns, as a 662 comma-separated list. 663 @type columns: str 664 @param where: Optional SQL where clause defining the 665 selection of rows to copy. 666 @type where: str 667 @param fileGroup: Optionally supply a different file group to 668 create the new table on other than the 669 default. 670 @type fileGroup: str 671 @param attachForeignKeys: If True, foreign key constraints are attached 672 to the new table. 673 @type attachForeignKeys: bool 674 675 @return: Number of rows copied. 676 @rtype: int 677 678 @note: This command should be minimally logged. 679 @note: This method relies on Microsoft T-SQL, rather than standard SQL. 680 @note: Changing file-group is an experimental untested feature so far. 681 682 """ 683 if not self._fixedCopyBug: 684 # Desperate attempt to get around the MS SQL Server bug on multiple 685 # re-execution of SELECT INTOs - hint OPTION (KEEP PLAN) doesn't 686 # yield consistent results: 687 self.freeProcCache() 688 self._executeScript("DBCC DROPCLEANBUFFERS") 689 self._fixedCopyBug = True 690 691 if fileGroup: 692 adminDb = DbSession(self.server + '.' + self.sysc.adminDatabase, 693 autoCommit=True, isTrialRun=self.isTrialRun, 694 userName=self.userName) 695 try: 696 adminDb._executeScript("ALTER DATABASE %s MODIFY FILEGROUP %s " 697 "DEFAULT" % (self.database, fileGroup)) 698 except odbc.ProgrammingError as error: 699 if "filegroup already has the 'DEFAULT' property set" \ 700 not in str(error): 701 raise 702 703 Logger.addMessage("%s already has the 'DEFAULT' property " 704 "set in %s" % (fileGroup, self.database)) 705 706 try: 707 rowsCopied = self._executeScript( 708 str(SelectSQL(columns, sourceTable, where) 709 ).replace(" FROM ", " INTO %s FROM " % tableSchema.name, 1), 710 wantRowCount=True) 711 if fileGroup: 712 self.commitTransaction() 713 finally: 714 if fileGroup: 715 adminDb._executeScript("ALTER DATABASE %s MODIFY FILEGROUP" 716 " [PRIMARY] DEFAULT" % self.database) 717 718 # Attach primary keys and other constraints 719 self.createObjects(tableSchema.constraints, 720 releasedOnly=not attachForeignKeys) 721 722 return rowsCopied
723 724 #-------------------------------------------------------------------------- 725
726 - def createObjects(self, objSchema, overWrite=False, ignoreNS=False, 727 releasedOnly=False, haltOnError=True):
728 """ 729 Create new database objects using the ordered list of object schema 730 definitions provided from L{Schema} methods. Tables are created with 731 primary key constraints but no foreign key constraints, use 732 L{createTable()} instead if they are required. 733 734 @param objSchema: An ordered list of objects derived from 735 L{Schema._Schema}. 736 @type objSchema: list(Schema._Schema) 737 @param overWrite: If True, allow pre-existing database objects with 738 the same name to be overwritten. 739 @type overWrite: bool 740 @param ignoreNS: If True, do not throw an exception if an object 741 could not be created because some other object in 742 the database does not exist. The function will 743 return False - object could not be created, but 744 no warning message will be displayed. 745 @type ignoreNS: bool 746 @param releasedOnly: If True, only create the object if it is to be 747 released (i.e. upper case CREATE statements). 748 @type releasedOnly: bool 749 @param haltOnError: If True, raise an exception as soon as it occurs 750 otherwise continue with the next object and raise 751 an error at the end. 752 @type haltOnError: bool 753 754 @return: True, if all objects created successfully. 755 @rtype: bool 756 757 """ 758 allCreated = True 759 lastError = None 760 for dbObj in objSchema: 761 if not releasedOnly or dbObj.releasable: 762 if overWrite: 763 self.dropObjects([dbObj]) 764 try: 765 self._executeScript(dbObj.createSQL()) 766 767 # Need to set permissions for release DBs too 768 if releasedOnly and isinstance(dbObj, schema.Function): 769 self._executeScript("GRANT EXEC ON %s.%s TO public" 770 % (dbc.loadServerDbsOwner(), dbObj)) 771 772 except (odbc.ProgrammingError, odbc.InterfaceError) as error: 773 if ignoreNS: 774 Logger.addMessage("<WARNING> %s" % error, 775 alwaysLog=False) 776 777 allCreated = False 778 ignoreCases = ["Invalid object name", "does not exist"] 779 if (not ignoreNS or 780 not any(case in str(error) for case in ignoreCases)) \ 781 and (overWrite or 782 "There is already an" not in str(error) 783 and "already exists" not in str(error)): 784 785 if haltOnError: 786 raise 787 else: 788 Logger.addExceptionWarning(error) 789 Logger.addMessage("continuing...") 790 lastError = error 791 792 except odbc.IntegrityError as error: 793 # If FK constraint already exists then don't raise an error 794 fkMsgParts = ["ALTER TABLE statement conflicted", 795 "FOREIGN KEY constraint", dbObj.name] 796 797 if not all(part in str(error) for part in fkMsgParts): 798 if haltOnError: 799 raise 800 else: 801 Logger.addExceptionWarning(error) 802 Logger.addMessage("continuing...") 803 lastError = error 804 805 if lastError: 806 raise lastError 807 808 return allCreated
809 810 #-------------------------------------------------------------------------- 811
812 - def createStatistics(self, tables):
813 """ 814 Creates database column statistics for given tables. 815 816 @param tables: List of table schema. 817 @type tables: list(Schema.Table) 818 819 """ 820 sql = "CREATE STATISTICS stats_%s_%s ON %s (%s)" 821 for table in tables: 822 Logger.addMessage("Creating statistics on %s..." % table) 823 for column in table.columns: 824 self._executeScript(sql % (table, column, table, column))
825 826 #-------------------------------------------------------------------------- 827
828 - def createTable(self, tableSchema, dbSchema=[], overWrite=False):
829 """ 830 Create a new database table using the schema provided from 831 L{Schema.parseTables()}, all constraints that reference this table 832 found in the given schema list will be applied. 833 834 @param tableSchema: Parsed schema describing the table that is to be 835 created. 836 @type tableSchema: Schema.Table 837 @param dbSchema: Schemas for every table in the database that may 838 have a foreign key constraint that references 839 the table to be created. 840 @type dbSchema: list(Schema.Table) 841 @param overWrite: If True, allow current db table to be overwritten. 842 @type overWrite: bool 843 844 @return: True, if table really was created. 845 @rtype: bool 846 847 """ 848 if overWrite: 849 self.dropTable(tableSchema, dbSchema) 850 851 # Create the table 852 if self.createObjects([tableSchema]): 853 # Apply this table's foreign key constraints 854 self.createObjects([con for con in tableSchema.constraints 855 if type(con) is schema.ForeignKeyConstraint]) 856 857 # Apply foreign key constraints that reference this table 858 foreignKeys = [] 859 for table in dbSchema: 860 foreignKeys += [con for con in table.constraints 861 if type(con) is schema.ForeignKeyConstraint 862 and con.referenceTable == tableSchema.name] 863 864 self.createObjects(foreignKeys, ignoreNS=True) 865 return True 866 867 return False
868 869 #-------------------------------------------------------------------------- 870
871 - def createUser(self, userName, password):
872 """ 873 Creates a new user account on the current server and grants it 874 read-only access to the current database. 875 876 @param userName: User name for new account. 877 @type userName: str 878 @param password: Password for user name. 879 @type password: str 880 881 """ 882 loginTable = '.'.join( 883 [self.sysc.adminDatabase, dbc.loadServerDbsOwner(), "syslogins"]) 884 # @GOTCHA: Can't use queryEntriesExist() here because the entry that is 885 # returned causes mxODBC to raise an exception! 886 if not self.queryNumRows(loginTable, "loginname='%s'" % userName): 887 self._executeScript("EXEC sp_addlogin %s, %s, %s, us_english" % 888 (userName, password, self.database)) 889 self._isLogReq[self.database] = True 890 891 self.grantAccess(userName) 892 self._executeScript( 893 "EXEC sp_addrolemember db_denydatawriter, " + userName)
894 895 #-------------------------------------------------------------------------- 896
897 - def delete(self, tableName, whereStr=''):
898 """ 899 Deletes rows from a database table (to delete an entire table 900 use truncate() instead) based on a flexible SQL WHERE clause. 901 902 @param tableName: Name of the database table. 903 @type tableName: str 904 @param whereStr: SQL WHERE clause (without the "WHERE"). 905 @type whereStr: str 906 907 @return: Number of rows deleted. 908 @rtype: int 909 910 """ 911 if not whereStr: 912 return self.truncate(tableName) 913 else: 914 return self._executeScript("DELETE FROM %s WHERE %s" % (tableName, 915 whereStr), wantRowCount=True)
916 917 #-------------------------------------------------------------------------- 918
919 - def deleteRows(self, tableName, rowIndexList=[]):
920 """ 921 Deletes selected rows from a database table (to delete an entire table 922 use truncate() instead). 923 924 @param tableName: Name of the database table. 925 @type tableName: str 926 @param rowIndexList: List of specific rows to delete, in the form 927 (attrName, PyValue), e.g. [("multiframeId", 101)], 928 all rows are deleted otherwise. If PyValue is a 929 list then executemany is invoked. 930 @type rowIndexList: list(tuple(str, object)) 931 932 @return: Number of rows deleted. 933 @rtype: int 934 935 """ 936 if not rowIndexList: 937 return self.truncate(tableName) 938 939 sql = "DELETE FROM " + tableName 940 manyList = [] 941 if rowIndexList: 942 sql += self._createWhereStr(rowIndexList) 943 # If any row indices are a list then format ready for executemany 944 for _key, index in rowIndexList: 945 if type(index) is list: 946 manyList += [tuple([value]) for value in index] 947 948 return self._executeScript(sql, manyList, wantRowCount=True)
949 950 #-------------------------------------------------------------------------- 951
952 - def dropColumn(self, tableName, colName):
953 """ 954 Removes a column from a table in the database. 955 956 @param tableName: Name of the table. 957 @type tableName: str 958 @param colName: Name of the new column. 959 @type colName: str 960 961 """ 962 constraints = dict(self.query( 963 selectStr="C.name, O.name", 964 fromStr="syscolumns AS C, sysobjects AS O " 965 "LEFT JOIN sysobjects AS T ON O.parent_obj = T.id", 966 whereStr="O.id=C.cdefault AND O.name NOT LIKE '%%dtproper%%' AND " 967 "O.name NOT LIKE 'dt[_]%%' AND T.name = %r " % tableName + 968 "AND (O.name LIKE 'DF__%' OR O.name LIKE '%_cons')")) 969 970 if colName in constraints: 971 self._executeScript("ALTER TABLE %s DROP CONSTRAINT %s" % 972 (tableName, constraints[colName])) 973 974 self._executeScript("ALTER TABLE %s DROP COLUMN %s" % 975 (tableName, colName))
976 977 #-------------------------------------------------------------------------- 978
979 - def dropObjects(self, objSchema):
980 """ 981 Drop database objects defined in a list of database L{Schema} objects. 982 Tables cannot be dropped if referenced by foreign key constraints, in 983 this case use L{dropTable()}. 984 985 @param objSchema: An ordered list of objects derived from 986 L{Schema._Schema}. 987 @type objSchema: list(Schema._Schema) 988 989 """ 990 for dbObj in objSchema: 991 try: 992 self._executeScript(dbObj.dropSQL()) 993 except odbc.ProgrammingError as error: 994 # If object already doesn't exist then don't worry about it. 995 if "does not exist" not in str(error) and \ 996 "Could not drop constraint" not in str(error): 997 raise
998 999 #-------------------------------------------------------------------------- 1000
1001 - def dropTable(self, tableSchema, dbSchema):
1002 """ 1003 Drops a table from the database, first removing any foreign key 1004 constraints that reference the table. 1005 1006 @param tableSchema: Parsed schema describing the table that is to be 1007 created. 1008 @type tableSchema: Schema.Table 1009 @param dbSchema: Schemas for every table in the database that may 1010 have a foreign key constraint that references 1011 the table to be created. 1012 @type dbSchema: list(Schema.Table) 1013 1014 """ 1015 # Drop foreign key constraints that reference this table 1016 foreignKeys = [] 1017 for table in dbSchema: 1018 foreignKeys += [con for con in table.constraints 1019 if type(con) is schema.ForeignKeyConstraint 1020 and con.referenceTable == tableSchema.name] 1021 self.dropObjects(foreignKeys) 1022 # Drop the table 1023 self.dropObjects([tableSchema])
1024 1025 #-------------------------------------------------------------------------- 1026
1027 - def enableDirtyRead(self):
1028 """ 1029 Allows you to read from tables that are being written to and hence are 1030 presently locked. Warning, use at your own risk! 1031 1032 """ 1033 self._executeScript("SET TRANSACTION ISOLATION LEVEL READ UNCOMMITTED") 1034 self._isDirty = True
1035 1036 #-------------------------------------------------------------------------- 1037
1038 - def existsTable(self, tableName):
1039 """ @return: True, if table with name given in tableName parameter 1040 exists in current database. 1041 @rtype: bool 1042 """ 1043 return self.queryColumnNames(tableName) != []
1044 1045 #-------------------------------------------------------------------------- 1046
1047 - def freeProcCache(self):
1048 """ Calls DBCC FREEPROCCACHE. 1049 """ 1050 self._executeScript("DBCC FREEPROCCACHE")
1051 1052 #-------------------------------------------------------------------------- 1053
1054 - def getBestVolume(self):
1055 """ @return: The volume on the catalogue server with the most free 1056 space, e.g. G:\. 1057 @rtype: str 1058 """ 1059 mostSpace = 0 1060 bestVolume = '' 1061 for volume in self.sysc.catServerVolumes(self.server): 1062 line = self.runOnServer("dir " + volume, isTrialRunSafe=True)[-1] 1063 try: 1064 freeSpace = long(line.split()[-3].replace(',', '')) 1065 except ValueError: 1066 raise DbSession.CatalogueServerError("Can't read volume %s. " 1067 "Check SystemConstants.catServerVolumes is correct for %s." 1068 % (volume, self.server)) 1069 1070 freeSpace /= self.sysc.one_gigabyte 1071 if freeSpace > mostSpace: 1072 mostSpace = freeSpace 1073 bestVolume = volume 1074 1075 return bestVolume
1076 1077 #-------------------------------------------------------------------------- 1078
1079 - def goOffline(self):
1080 """ 1081 Temporarily drop the database connection, whilst keeping the DbSession 1082 object alive. When an attempt to access the database is next made, the 1083 DbSession will automatically try reconnect to the database. 1084 1085 """ 1086 self._closeConnection() 1087 DbSession._dbConnections[self._dbSessionKey] = (None, None)
1088 1089 #-------------------------------------------------------------------------- 1090
1091 - def grantAccess(self, userName):
1092 """ 1093 Grants access to a given user to access the database. 1094 1095 @param userName: User name that will have permissions set. 1096 @type userName: str 1097 1098 """ 1099 try: 1100 self._executeScript( 1101 "EXEC sp_grantdbaccess %s, %s" % tuple(2 * [userName])) 1102 self._isLogReq[self.database] = True 1103 except odbc.ProgrammingError as error: 1104 if "already exists" not in str(error): 1105 raise 1106 1107 for role in ["db_datareader"]: 1108 self._executeScript( 1109 "EXEC sp_addrolemember %s, %s" % (role, userName)) 1110 self._isLogReq[self.database] = True 1111 1112 self._executeScript("GRANT SHOWPLAN TO " + userName)
1113 1114 #-------------------------------------------------------------------------- 1115
1116 - def insertData(self, tableName, rowData, enforcePKC=False):
1117 """ 1118 Insert some data into the database. 1119 1120 @param tableName: Name of the database table. 1121 @type tableName: str 1122 @param rowData: Sequence of unformatted data values for each column, 1123 e.g. ("astring", 0, 5.4). 1124 @type rowData: sequence 1125 @param enforcePKC: If True, throw an exception if row already exists, 1126 otherwise just log a warning. 1127 @type enforcePKC: bool 1128 1129 @return: Number of rows inserted (either 1 or 0). 1130 @rtype: int 1131 1132 """ 1133 if rowData: 1134 row = "(%s)" % ", ".join(map(self._sqlString, rowData)) 1135 1136 if not rowData or row == "()": 1137 raise Exception( 1138 "No data has been supplied to insert into table %s" % tableName) 1139 1140 try: 1141 return self._executeScript( 1142 "INSERT INTO %s VALUES %s" % (tableName, row), wantRowCount=True) 1143 except odbc.IntegrityError as error: 1144 if not enforcePKC and "Violation of PRIMARY KEY" in str(error): 1145 Logger.addMessage( 1146 "<Warning> data row %s already exists in table %s" % 1147 (row, tableName)) 1148 return 0 1149 else: 1150 raise
1151 1152 #-------------------------------------------------------------------------- 1153
1154 - def query(self, selectStr, fromStr, whereStr='', groupBy='', orderBy='', 1155 firstOnly=False, default=None, ResultsTuple=None, 1156 forceOneProc=False, forcePrOrder=False):
1157 """ 1158 Wrapper function to perform a general select-from-where query. 1159 1160 @param selectStr: SQL SELECT string; comma-separated column names. 1161 @type selectStr: str 1162 @param fromStr: SQL FROM string; single table name or L{Join} object. 1163 @type fromStr: str or L{Join} 1164 @param whereStr: Optional SQL WHERE clause. 1165 @type whereStr: str 1166 @param groupBy: Optional SQL GROUP BY string. 1167 @type groupBy: str 1168 @param orderBy: Optional SQL ORDER BY string. 1169 @type orderBy: str 1170 @param firstOnly: If True, just return the first entry in result set. 1171 @type firstOnly: bool 1172 @param default: Value to return if querying first entry and no 1173 entries found. 1174 @type default: object 1175 @param ResultsTuple: Convert results to this specific namedtuple type. 1176 @type ResultsTuple: type 1177 @param forceOneProc: Force query to execute only on a single-processor 1178 (disable parallelism). 1179 @type forceOneProc: bool 1180 @param forcePrOrder: Force where clause predicates to be evaluated in 1181 the stated order. 1182 @type forcePrOrder: bool 1183 1184 @return: List of tuples (or scalars if only one attribute selected) 1185 containing values for every attribute of every entry. The 1186 tuples will be namedtuples of the database column names, if 1187 possible - i.e. no duplicate column names or functions without 1188 aliases. If firstOnly then a scalar or a tuple (or namedtuple) 1189 is returned. 1190 @rtype: list 1191 1192 @todo: Either replace arguments with one SelectSQL argument or: 1193 replicate SelectSQL.__init__() interface: 1194 selectStr becomes select, fromStr becomes table, whereStr 1195 becomes where - then remove code duplication by creating an 1196 SelectSQL object here. 1197 """ 1198 #@@GOTCHA: The firstOnly option is there in case I ever manage to find 1199 # a reliable way to select just one result via the mxODBC interface, 1200 # which would be faster than using T-SQL's TOP 1, due to some strange 1201 # SQL server bug, related to the bookmark look-up bug. However, mxODBC 1202 # implementation of fetchone() is also buggy. So this just does a TOP 1 1203 # for now, when fetchone is implemented these next two lines will go. 1204 if firstOnly: 1205 selectStr = "TOP 1 %s" % selectStr 1206 1207 sql = "SELECT %s FROM %s" % (selectStr, fromStr) 1208 1209 if isinstance(fromStr, Join): 1210 sql += " WHERE " 1211 sql += fromStr.whereStr 1212 if fromStr.whereStr and whereStr \ 1213 and not whereStr.strip().startswith("AND"): 1214 sql += " AND " 1215 1216 elif whereStr.startswith(("GROUP BY", "ORDER BY")): 1217 # @@TODO: Deprecate this clause 1218 sql += " " 1219 1220 elif whereStr: 1221 sql += " WHERE " 1222 1223 sql += whereStr 1224 if groupBy: 1225 sql += " GROUP BY " + groupBy 1226 1227 if orderBy: 1228 sql += " ORDER BY " + orderBy 1229 1230 options = [] 1231 if forceOneProc: 1232 options.append("MAXDOP 1") 1233 1234 if forcePrOrder: 1235 options.append("FORCE ORDER") 1236 1237 if options: 1238 sql += " OPTION (%s)" % ', '.join(options) 1239 1240 result = \ 1241 self._executeScript(sql, wantResult=True, wantAll=not firstOnly) 1242 1243 if firstOnly and (result is None or result[0] is None): 1244 # NB: if firstOnly then result is always a tuple or None 1245 if default is None and ',' in selectStr: 1246 result = tuple([None] * len(selectStr.split(','))) 1247 else: 1248 return default 1249 1250 if selectStr.endswith('*') or ',' in selectStr: 1251 # Case of a list of tuples - make a namedtuple if possible 1252 if not ResultsTuple: 1253 metadata = self._getCursor().description or [] 1254 columns = [entry[0] for entry in metadata] 1255 1256 # i.e. there are no unnamed columns, no duplicates and no 1257 # keywords - this last check can go when we get Python 2.7 1258 if columns and not any(not column or iskeyword(column) 1259 for column in columns) \ 1260 and len(columns) == len(set(columns)): 1261 1262 ResultsTuple = namedtuple("Query", ' '.join(columns)) 1263 1264 if ResultsTuple: 1265 if firstOnly: 1266 return ResultsTuple(*result) 1267 else: 1268 return [ResultsTuple(*row) for row in result] 1269 1270 return result 1271 1272 elif firstOnly: 1273 # Strip single tuple values down to scalars 1274 return result[0] 1275 1276 else: 1277 # Multiple rows but single column 1278 return [eachTuple[0] for eachTuple in result]
1279 1280 #-------------------------------------------------------------------------- 1281
1282 - def queryAllowsNulls(self, queryTable, columns):
1283 """ 1284 Determine whether the table contains nullable columns. 1285 1286 @param queryTable: Name of table to query. 1287 @type queryTable: str 1288 @param columns: Sequence of columns to check. 1289 @type columns: sequence(str) 1290 1291 @return: True if any of the columns are nullable. 1292 @rtype: bool 1293 1294 """ 1295 cursor = self._getCursor() 1296 cursor.columns(table=queryTable) 1297 return any(col[10] is not odbc.SQL.NO_NULLS 1298 for col in cursor.fetchall() if col[3] in columns)
1299 1300 #-------------------------------------------------------------------------- 1301
1302 - def queryAttrMax(self, attrs, table, where=''):
1303 """ 1304 Wrapper to just query the maximum value of the given attributes. 1305 1306 @param attrs: Name of the attributes in the database (comma-separated). 1307 @type attrs: str 1308 @param table: Table with the attributes. 1309 @type table: str or L{Join} 1310 @param where: Optional SQL WHERE clause. 1311 @type where: str 1312 1313 @return: Maximum value of that attribute. Defaults to None. 1314 @rtype: scalar 1315 1316 """ 1317 # @@NOTE: Not using firstOnly=True, for now, as TOP 1 is silly here. 1318 # This also makes supplying defaults somewhat inefficient, so 1319 # leave it up to the user to do, e.g. x = queryAttrMax() or 0. 1320 maxAttrs = ", ".join("MAX(%s)" % attr for attr in csv.values(attrs)) 1321 return self.query(maxAttrs, table, where)[0]
1322 1323 #-------------------------------------------------------------------------- 1324
1325 - def queryColumnNames(self, tableName):
1326 """ 1327 Queries database to produce a list of column names in the table. 1328 1329 @param tableName: Name of the database table. 1330 @type tableName: str 1331 1332 @return: A list of the column names, in order, from the given table. 1333 @rtype: list(str) 1334 1335 """ 1336 return self.query( 1337 selectStr="COLUMN_NAME", 1338 fromStr="INFORMATION_SCHEMA.COLUMNS", 1339 whereStr="TABLE_NAME=%r ORDER BY ORDINAL_POSITION" % tableName)
1340 1341 #-------------------------------------------------------------------------- 1342
1343 - def queryDataTypes(self, table):
1344 """ 1345 Queries database to produce a dictionary of data-types for each column. 1346 1347 @param table: Name of the database table, with optional full-path. 1348 @type table: str 1349 1350 @return: A dictionary where the keywords are the column names and the 1351 values are the data-types of those columns. 1352 @rtype: dict(str:str) 1353 1354 """ 1355 server, database, tableName = self._splitTablePath(table) 1356 1357 if server == self.server: 1358 attrs = self.query( 1359 selectStr="COLUMN_NAME, DATA_TYPE, CHARACTER_MAXIMUM_LENGTH", 1360 fromStr=database + ".INFORMATION_SCHEMA.COLUMNS", 1361 whereStr="TABLE_NAME=" + self._sqlString(tableName)) 1362 1363 return dict((name, dataType + 1364 ('(%s)' % dataSize if dataSize else '')) 1365 for name, dataType, dataSize in attrs) 1366 else: 1367 db = '.'.join([server, database, dbc.loadServerDbsOwner()]) 1368 attrs = self.query( 1369 selectStr="sc.name, st.name, sc.length", 1370 fromStr="%s.sysobjects AS so, %s.syscolumns AS sc, " 1371 "%s.systypes AS st" % tuple([db] * 3), 1372 whereStr="so.id = sc.id AND st.xtype = sc.xtype AND so.name=" + 1373 self._sqlString(tableName)) 1374 1375 return dict((name, dataType + 1376 (('(%s)' % dataSize) if dataType == 'varchar' else '')) 1377 for name, dataType, dataSize in attrs)
1378 1379 #-------------------------------------------------------------------------- 1380
1381 - def queryEntriesExist(self, table, where='', pk=''):
1382 """ 1383 Queries database to determine if any rows exist in the given table 1384 that satisfy the WHERE clause. 1385 1386 @param table: Table to query. A single table name or a L{Join} object. 1387 @type table: str or L{Join} 1388 @param where: Optional WHERE clause. 1389 @type where: str 1390 @param pk: Optionally supply primary key to speed up query. Not 1391 required if table is a Join. 1392 @type pk: str 1393 1394 @return: True, if there are any rows in the table that satisfy the 1395 WHERE clause. 1396 @rtype: bool 1397 1398 """ 1399 columns = pk or '*' 1400 if isinstance(table, Join): 1401 columns = table.joinCols 1402 1403 # This is faster than count(*) and avoids index bugs. 1404 result = self.query(columns, table, where, firstOnly=True) 1405 if not isinstance(result, tuple): 1406 return result is not None 1407 else: 1408 return any(entry is not None for entry in result)
1409 1410 #-------------------------------------------------------------------------- 1411
1412 - def queryNumRows(self, tableName, whereStr='', groupBy='', 1413 distinctAttr=''):
1414 """ 1415 Wrapper to just query the number of rows in a given table. Will also 1416 return the total count for a "SELECT count(*) ... GROUP BY"-type query, 1417 if the "GROUP BY" is specified in the whereStr keyword, rather than the 1418 groupBy keyword. 1419 1420 @param tableName: Table to query. 1421 @type tableName: str 1422 @param whereStr: WHERE string, optional where clause. 1423 @type whereStr: str 1424 @param groupBy: Specify a comma-separated list of columns in which 1425 to group counts by. 1426 @type groupBy: str 1427 @param distinctAttr: With distinct values of this attribute. 1428 @type distinctAttr: str 1429 1430 @return: Number of rows, if no groupBy keyword, otherwise a list of 1431 the groupBy attribute values in ascending order with counts. 1432 @rtype: int or list(tuple) 1433 1434 """ 1435 attr = ('DISTINCT ' + distinctAttr) if distinctAttr else '*' 1436 cmd = "COUNT(%s) AS count" % attr 1437 for _attempt in range(2): 1438 try: 1439 if groupBy: 1440 return self.query(groupBy + ", " + cmd, tableName, whereStr, 1441 groupBy=groupBy, orderBy=groupBy) 1442 else: 1443 results = self.query(cmd, tableName, whereStr) 1444 if len(results) == 1: 1445 return results[0] 1446 else: # "Group by"-type query: length is the desired value 1447 return len(results) 1448 1449 except odbc.ProgrammingError as error: 1450 if "Arithmetic overflow" in str(error): 1451 cmd = "COUNT_BIG(%s) AS count" % attr 1452 else: 1453 raise
1454 1455 #-------------------------------------------------------------------------- 1456
1457 - def queryRowSize(self, queryTable):
1458 """ 1459 Determine the row size of the given table or the select query result. 1460 1461 @param queryTable: Either just a single table name or a select query. 1462 @type queryTable: str or SelectSQL 1463 1464 @return: Size in bytes of a row. 1465 @rtype: int 1466 1467 """ 1468 cursor = self._getCursor() 1469 if isinstance(queryTable, SelectSQL): 1470 self.query(queryTable.selectStr, queryTable.fromStr, 1471 firstOnly=True) 1472 print(cursor.description) 1473 raise Exception("This feature is deprecated due to ODBC " 1474 "inconsistencies. Check that the values printed " 1475 "above are consistent and correct between servers") 1476 # @@TODO: Replace with some kind of parsing of the SQL types 1477 # returned by cursor.description. 1478 return sum(col[4] for col in cursor.description) 1479 else: 1480 cursor.columns(table=queryTable) 1481 return sum(col[7] for col in cursor.fetchall())
1482 1483 #-------------------------------------------------------------------------- 1484
1485 - def renameTable(self, oldName, newName):
1486 """ 1487 Rename the given database table. 1488 1489 @param oldName: Existing database table name. 1490 @type oldName: str 1491 @param newName: Name to give to the database table. 1492 @type newName: str 1493 1494 """ 1495 self._executeScript("sp_RENAME %s, %s" % (oldName, newName))
1496 1497 #-------------------------------------------------------------------------- 1498
1499 - def rollbackTransaction(self):
1500 """ Rollback a transaction. 1501 1502 @note: Only applies if auto-commit is switched off. 1503 """ 1504 if not self._autoCommit: 1505 connection, _cursor = DbSession._dbConnections[self._dbSessionKey] 1506 if not connection: 1507 raise DbSession.DisconnectError(self.database, 1508 "Cannot rollback transactions") 1509 Logger.addMessage( 1510 "<Info> Rolling back uncommited database transactions...") 1511 connection.rollback()
1512 1513 #-------------------------------------------------------------------------- 1514
1515 - def runOnServer(self, cmd, isTrialRunSafe=False):
1516 """ 1517 Runs a command directly on the database server. 1518 1519 @param cmd: Command-line command to run on the database server. 1520 @type cmd: str 1521 @param isTrialRunSafe: Can this command be run in trial-run mode? 1522 @type isTrialRunSafe: bool 1523 1524 @return: A result set. 1525 @rtype: list(str) 1526 1527 """ 1528 #@@GOTCHA: Need to use '%s' instead of %r because of the '\' character 1529 script = "EXEC %s..xp_cmdshell '%s'" % (self.sysc.adminDatabase, cmd) 1530 isTrialRun = self.isTrialRun 1531 try: 1532 if isTrialRunSafe: 1533 self.isTrialRun = False 1534 1535 results = self._executeScript(script, wantResult=True) 1536 finally: 1537 self.isTrialRun = isTrialRun 1538 1539 # Strip mxODBC tuples and convert 'None's to spaces 1540 results = [(result if result else '') 1541 for result in map(itemgetter(0), results)] 1542 1543 # Strip any final blank line from result 1544 if results and not results[-1]: 1545 results = results[:-1] 1546 1547 # Raise exceptions for cases that are clearly always errors 1548 filesysCmds = \ 1549 ("dir", "move", "copy", "mkdir", "rmdir", "del", "rename") 1550 1551 if results and cmd.lower().strip().startswith(filesysCmds) \ 1552 and any("Access is denied" in result for result in results): 1553 1554 raise DbSession.CatalogueServerError("%s: %s\n%s" 1555 % (self.server, cmd, '\n'.join(results))) 1556 1557 return results
1558 1559 #-------------------------------------------------------------------------- 1560
1561 - def sharePath(self, fileName=''):
1562 """ 1563 Returns the path to the given file in the catalogue server's share 1564 directory for outgests and ingests. 1565 1566 @param fileName: Name of the file on the share. 1567 @type fileName: str 1568 1569 @todo: Deprecate! 1570 1571 """ 1572 return self.sysc.dbSharePath(fileName) + (os.sep if not fileName else '')
1573 1574 #-------------------------------------------------------------------------- 1575
1576 - def shrinkTempDb(self):
1577 """ Shrinks tempdb and its log files. 1578 """ 1579 self._executeScript("DBCC SHRINKDATABASE (tempdb)")
1580 1581 #-------------------------------------------------------------------------- 1582
1583 - def tablePath(self, tableName, ownerName=dbc.loadServerDbsOwner()):
1584 """ 1585 Full database path to the given table. 1586 1587 @param tableName: Name of table. 1588 @type tableName: str 1589 @param ownerName: Optionally specify a different table owner name than 1590 the database owner. 1591 @type ownerName: str 1592 1593 @return: Full path to database table, e.g. ahmose.WSA.dbo.tableName. 1594 @rtype: str 1595 1596 """ 1597 return '.'.join([str(self), ownerName, str(tableName)])
1598 1599 #-------------------------------------------------------------------------- 1600
1601 - def testSharePath(self):
1602 """ 1603 Checks to see if the share path for this database is active. Raises a 1604 SystemExit if there are problems. 1605 1606 """ 1607 testDir = "outgest_test_%s_%s" % (os.getenv("HOST"), os.getpid()) 1608 try: 1609 os.mkdir(self.sharePath(testDir)) 1610 os.rmdir(self.sharePath(testDir)) 1611 except OSError as error: 1612 raise SystemExit("<ERROR> " + str(error))
1613 1614 #-------------------------------------------------------------------------- 1615
1616 - def truncate(self, tableName):
1617 """ 1618 Wipes a database table - use with care! 1619 1620 @param tableName: Name of database table to wipe. 1621 @type tableName: str 1622 1623 @return: Number of rows deleted from table. 1624 @rtype: int 1625 1626 @todo: Deprecate in favour of delete() without where clause? 1627 @todo: Always call truncate over delete from if not where clause, with 1628 the addition of a drop and add constraints? 1629 """ 1630 try: 1631 # Truncate is faster... 1632 return self._executeScript("TRUNCATE TABLE %s" % tableName, 1633 wantRowCount=True) 1634 except odbc.IntegrityError: 1635 # ...but doesn't work if there are foreign key constraints 1636 return self._executeScript("DELETE %s" % tableName, 1637 wantRowCount=True)
1638 1639 #-------------------------------------------------------------------------- 1640
1641 - def uncPath(self, filePathName):
1642 """ 1643 Converts the given catalogue server local path into a UNC path. 1644 1645 @param filePathName: Catalogue server local path. 1646 @type filePathName: str 1647 1648 @return: Catalogue server UNC path. 1649 @rtype: str 1650 1651 """ 1652 return (2 * self.sysc.catServerFileSep + self.server.upper() 1653 + self.sysc.catServerFileSep + filePathName.replace(':', ''))
1654 1655 #-------------------------------------------------------------------------- 1656
1657 - def update(self, tableName, entryList, where='', fromTables=''):
1658 """ 1659 Update specific entries in database table rows specified with a 1660 flexible SQL WHERE clause. 1661 1662 @param tableName: Name of the database table. 1663 @type tableName: str 1664 @param entryList: List entries to update, in the form (attrName, 1665 SQLvalue) with value as an SQL formatted string, 1666 as defined by L{DbSession._sqlString()}, e.g. 1667 [("skyID", -9999), ("raMoon", "1.01"), 1668 ("trackSys", "'J2000'")]. (NB: this may also be 1669 an explicit string for the SQL SET statement e.g. 1670 "trackSys='J2000'") 1671 @type entryList: list(tuple(str, str)) or str 1672 @param where: SQL WHERE clause (without the "WHERE"). 1673 @type where: str 1674 @param fromTables: Optional source to update from (SQL FROM syntax), 1675 single table name or L{Join} object. 1676 @type fromTables: str or L{Join} 1677 1678 @return: Number of rows updated. 1679 @rtype: int 1680 1681 """ 1682 # Sanity checks 1683 if not entryList: 1684 raise Exception("No entries supplied to update table " + tableName) 1685 1686 sql = "UPDATE " + tableName 1687 sql += " SET " + utils.joinNested(entryList, subJoinStr="=") 1688 if fromTables: 1689 sql += " FROM %s" % fromTables 1690 1691 if where or isinstance(fromTables, Join): 1692 sql += " WHERE " 1693 if isinstance(fromTables, Join): 1694 sql += fromTables.whereStr 1695 if fromTables.whereStr and where: 1696 sql += " AND " 1697 sql += where 1698 1699 numRows = self._executeScript(sql, wantRowCount=True) 1700 if self.isTrialRun: # Return expected number of updates 1701 if isinstance(fromTables, Join): 1702 fromStr = fromTables.fromStr 1703 if fromTables.whereStr and where: 1704 where += " AND " + fromTables.whereStr 1705 else: 1706 where = where or fromTables.whereStr 1707 elif fromTables: 1708 fromStr = fromTables 1709 else: 1710 fromStr = tableName 1711 1712 if tableName not in fromStr: 1713 fromStr += ', ' + tableName 1714 1715 numRows = self.queryNumRows(fromStr, where) 1716 1717 return numRows
1718 1719 #-------------------------------------------------------------------------- 1720
1721 - def updateEntries(self, tableName, entryList, rowIndexList=[]):
1722 """ 1723 Update specific entries in a database table row. 1724 1725 @param tableName: Name of the database table. 1726 @type tableName: str 1727 @param entryList: List entries to update, in the form (attrName, 1728 SQLvalue) with value as an SQL formatted string, 1729 as defined by L{DbSession._sqlString()}, e.g. 1730 [("skyID", -9999), ("raMoon", "1.01"), 1731 ("trackSys", "'J2000'")]. (NB: this may also be 1732 an explicit string for the SQL SET statement e.g. 1733 "trackSys='J2000'") 1734 @type entryList: list(tuple(str, str)) or str 1735 @param rowIndexList: List of specific rows to update, in the form 1736 (attrName, PyValue), e.g. [("multiframeID", 101)], 1737 all rows are updated otherwise. If PyValue is a 1738 list then executemany is invoked. 1739 @type rowIndexList: list(tuple(str, object)) 1740 1741 @return: Number of rows updated. 1742 @rtype: int 1743 1744 """ 1745 # Sanity checks 1746 if not tableName: 1747 raise Exception("No table name has been supplied") 1748 if not entryList: 1749 raise Exception("No entries supplied to update table " + tableName) 1750 1751 sql = "UPDATE " + tableName 1752 sql += " SET " + utils.joinNested(entryList, subJoinStr="=") 1753 manyList = [] 1754 if rowIndexList: 1755 sql += self._createWhereStr(rowIndexList) 1756 # If any row indices are a list then format ready for executemany 1757 for _key, index in rowIndexList: 1758 if type(index) is list: 1759 manyList += [tuple([value]) for value in index] 1760 1761 return self._executeScript(sql, manyList, wantRowCount=True)
1762 1763 #-------------------------------------------------------------------------- 1764
1765 - def updateStatistics(self):
1766 """ Updates database index statistics. 1767 """ 1768 Logger.addMessage("Updating database index statistics...") 1769 self._executeScript("EXEC sp_updatestats")
1770 1771 #-------------------------------------------------------------------------- 1772
1773 - def _closeConnection(self):
1774 """ Closes a database cursor and connection. """ 1775 try: 1776 self.commitTransaction() 1777 except odbc.OperationalError: 1778 Logger.addMessage( 1779 "<Warning> Could not commit transactions to " + self.database) 1780 except DbSession.DisconnectError: # Remove the commit failure message 1781 raise DbSession.DisconnectError(self.database) 1782 1783 connection, cursor = DbSession._dbConnections[self._dbSessionKey] 1784 if connection: 1785 try: 1786 cursor.close() 1787 connection.close() 1788 except odbc.OperationalError: 1789 Logger.addMessage( 1790 "<Warning> Cannot disconnect from database: " + self.database) 1791 else: 1792 Logger.addMessage( 1793 "Disconnected from database: " + self.database)
1794 1795 #-------------------------------------------------------------------------- 1796
1797 - def _createWhereStr(self, rowIndexList):
1798 """ 1799 Converts a list of tuple pairs of the form (attribute name, value) into 1800 a correctly formatted SQL WHERE clause. If the row index list contains 1801 a list of indicies for a given attribute, then executemany markers are 1802 used. 1803 1804 @param rowIndexList: List of specific rows to update, in the form 1805 (attrName, PyValue), e.g. [("multiframeId", 101)]. 1806 @type rowIndexList: list(tuple(str, object)) 1807 1808 @return: A SQL WHERE clause. 1809 @rtype: str 1810 1811 """ 1812 whereList = [(key, self._sqlString(index)) 1813 for key, index in rowIndexList 1814 if type(index) is not list] 1815 1816 # Add executemany markers if there are lists of indicies present. 1817 # Presently, only one substituted quantity per SQL 1818 # statement is allowed; in dictionary style this is called "x". 1819 paramStyle = {'qmark': '?', 'pyformat': '%(x)s'}[odbc.paramstyle] 1820 whereList += [(key, paramStyle) for key, index in rowIndexList 1821 if type(index) is list] 1822 1823 return " WHERE " + utils.joinNested(whereList, subJoinStr="=", 1824 joinStr=" AND ")
1825 1826 #-------------------------------------------------------------------------- 1827
1828 - def _executeScript(self, script, parameters=None, wantResult=False, 1829 wantAll=True, wantRowCount=False):
1830 """ 1831 Send any SQL script to the server to execute. Specific DBMS errors that 1832 are just warnings are trapped to prevent processing from halting. 1833 1834 @param script: The script which is to be processed. 1835 @type script: str 1836 @param parameters: Optionally supply parameter set if script contains 1837 executemany wildcards. 1838 @type parameters: list(tuple) 1839 @param wantResult: If True, return results. 1840 @type wantResult: bool 1841 @param wantAll: If wantResult is True and wantAll is True then 1842 return all results otherwise just first result. 1843 @type wantAll: bool 1844 @param wantRowCount: If True, and results aren't returned then a count 1845 of affected rows are returned instead. 1846 @param wantRowCount: bool 1847 1848 @return: The result of the executed script (if wantResult), or number 1849 of rows affected (if wantRowCount). 1850 @rtype: list(tuple) or int 1851 1852 """ 1853 if self.isTrialRun and not script.upper().startswith("SELECT"): 1854 print(script) 1855 if wantResult: 1856 return [] if wantAll else None 1857 elif wantRowCount: 1858 return 0 1859 else: 1860 return 1861 1862 # Execute the script, ignoring any innocuous messages coming back 1863 # from the server (but echoing warning message to standard output): 1864 cursor = self._getCursor() 1865 try: 1866 if parameters: 1867 cursor.executemany(script, parameters) 1868 else: 1869 cursor.execute(script) 1870 1871 except odbc.Warning as error: 1872 if "xpsql.cpp" in str(error) and "GetProxyAccount" in str(error): 1873 Logger.addMessage("<ERROR> Database user name %s does not have" 1874 " bcp outgest permissions set on database %s.%s. Please fix " 1875 "the database permissions." % (self.userName, self.server, 1876 self.database)) 1877 1878 if "DBCC execution completed" not in str(error): 1879 Logger.addExceptionWarning(error, "Database message") 1880 if "would be truncated" in str(error): 1881 Logger.addMessage("Script executed: " + script, 1882 alwaysLog=False) 1883 1884 except (odbc.OperationalError, odbc.InternalError) as error: 1885 isDeadlock = "deadlock victim. Rerun the transaction" in str(error) 1886 brokenMsgs = ["Function sequence error", # Only InternalError case 1887 "Read from SQL server failed", 1888 "Read from the server failed", 1889 "Write to SQL Server failed", 1890 "Write to the server failed", 1891 "Communication link failure"] 1892 1893 persist = self._isPersistent and self._autoCommit 1894 if persist and (any(message in str(error) for message in brokenMsgs) 1895 or isDeadlock and not self._isDeadlocked): 1896 self._isDeadlocked = isDeadlock 1897 msg = ("Deadlock occurred in" if self._isDeadlocked else 1898 "Lost connection to") 1899 msg += " %s.%s" % (self.server, self.database) 1900 sys.stderr.write(msg + '. See log.\n') 1901 Logger.addMessage("<Warning> %s. Reconnecting and resubmitting" 1902 " the last database statement..." % msg) 1903 self.goOffline() 1904 return self._executeScript(script, parameters, wantResult, 1905 wantAll, wantRowCount) 1906 1907 if persist and self._isDeadlocked: 1908 # Deadlocks should allow quick reconnect so need additional 1909 # wee delay here, and no logging on subsequent attempts. 1910 time.sleep(1) 1911 return self._executeScript(script, parameters, wantResult, 1912 wantAll, wantRowCount) 1913 1914 if isinstance(error, odbc.OperationalError): 1915 self._raiseWithScript(error, script) 1916 else: 1917 raise 1918 1919 except (odbc.ProgrammingError, odbc.IntegrityError) as error: 1920 self._raiseWithScript(error, script) 1921 else: 1922 self._isDeadlocked = False 1923 1924 numRows = max(cursor.rowcount, 0) 1925 if wantRowCount and not numRows: 1926 # Could be an integer overflow - run a test 1927 numRows = max(self._executeScript("SELECT ROWCOUNT_BIG()", 1928 wantResult=True, wantAll=False)[0], 0) 1929 1930 self._isLogReq[self.database] = self._isLogReq[self.database] or \ 1931 self._autoCommit and (not wantRowCount or numRows) and \ 1932 not script.upper().startswith(("SELECT", "DBCC", "EXEC", "SET")) 1933 1934 if wantResult: 1935 # Extract result from the cursor object. 1936 try: 1937 if wantAll: 1938 return cursor.fetchall() 1939 else: 1940 ####### FETCH ONE TEST CODE ######### 1941 #result = cursor.fetchone() 1942 #cursor.flush() 1943 #connection, c = DbSession._dbConnections[self._dbSessionKey] 1944 #connection.commit() 1945 #return result 1946 #@@GOTCHA: We restrict number of rows in the result set 1947 # rather than use fetchone() because fetchone() doesn't 1948 # seem to be closing queries properly. It might be due to 1949 # autocommit not working properly and thus I'd need to 1950 # commit() afterward or else execute() isn't doing a 1951 # flush() automatically as it should. Setting max number of 1952 # rows to return also doesn't work! 1953 try: 1954 return cursor.fetchall()[0] 1955 except IndexError: 1956 return 1957 except odbc.ProgrammingError as error: 1958 if "missing result set" in str(error): 1959 # No results. Return empty result set. 1960 return [] if wantAll else None 1961 else: 1962 raise 1963 1964 elif wantRowCount: 1965 return numRows
1966 1967 #-------------------------------------------------------------------------- 1968
1969 - def _getCursor(self):
1970 """ 1971 Method to obtain cursor for current DbSession. If database connection 1972 is currently offline then an endless reconnect attempt is made. 1973 1974 @return: The cursor for the current DbSession. 1975 @rtype: Cursor 1976 1977 """ 1978 connection, cursor = DbSession._dbConnections[self._dbSessionKey] 1979 # Infinite-loop whilst attempting to reconnect with database to 1980 # allow us a chance to reboot a server on a long weekend+ run. 1981 while not connection: # due to earlier goOffline() call for any reason 1982 try: 1983 self._openConnection() 1984 except SystemExit: 1985 Logger.addMessage("<Warning> Cannot connect to database. " 1986 "Attempting reconnect in 10 minutes (Ctrl-C to quit)...") 1987 time.sleep(600) 1988 else: 1989 connection, cursor = \ 1990 DbSession._dbConnections[self._dbSessionKey] 1991 1992 return cursor
1993 1994 #-------------------------------------------------------------------------- 1995
1996 - def _openConnection(self):
1997 """ Opens a database connection and initiates the cursor. """ 1998 try: 1999 # Get the R/W connection password: 2000 pwFilePath = os.path.join(os.getenv("HOME"), ".load-server-access") 2001 password = file(pwFilePath).read().strip() 2002 params = (self.server, self.database, self.userName, password) 2003 connection = \ 2004 odbc.DriverConnect("DSN=%s;Database=%s;UID=%s;PWD=%s" % params) 2005 2006 Logger.addMessage("Connected to %s.%s as %s" % params[:-1]) 2007 if self._autoCommit: 2008 connection.setconnectoption(odbc.SQL.AUTOCOMMIT, 2009 odbc.SQL.AUTOCOMMIT_ON) 2010 2011 except odbc.OperationalError: 2012 raise SystemExit( 2013 "Cannot access database %s.%s. Probably connection is down." 2014 % (self.server, self.database)) 2015 except odbc.ProgrammingError as error: 2016 if "Login failed for user" in str(error): 2017 raise SystemExit( 2018 "Access denied for username %r to database %s.%s" % 2019 (self.userName, self.server, self.database)) 2020 elif "Cannot open database" in str(error): 2021 raise SystemExit( 2022 "Database %s.%s does not exist or access denied for " 2023 "user %s" % (self.server, self.database, self.userName)) 2024 else: 2025 raise SystemExit("Cannot connect: %s" % error) 2026 2027 cursor = connection.cursor() 2028 if "mx.ODBC.unixODBC" in sys.modules: 2029 # Cursor type conversion to fix a probable bug in the database 2030 # interface layers (FreeTDS?) that affects bigints on Unix clients. 2031 cursor.setconverter(lambda _position, sqlType, sqlLen: 2032 (sqlType, (sqlLen if sqlType != odbc.SQL.BIGINT else 0))) 2033 2034 # Add to connection container for access by future instances 2035 DbSession._dbConnections[self._dbSessionKey] = (connection, cursor)
2036 2037 #-------------------------------------------------------------------------- 2038
2039 - def _raiseWithScript(self, err, script):
2040 """ Appends the SQL script to an mxODBC exception and raises it. 2041 """ 2042 try: 2043 message = err.args[2].replace('[FreeTDS][SQL Server]', '') 2044 except (IndexError, AttributeError): 2045 message = str(err) 2046 2047 raise err.__class__(message + "\nScript executed: " + script)
2048 2049 #-------------------------------------------------------------------------- 2050
2051 - def _splitTablePath(self, tablePath):
2052 """ 2053 Decomposes a full table path to its individual components. 2054 2055 @param tablePath: Path to a table in the format "table" or 2056 "database.table" or "server.database..table" or 2057 "server.database.dbo.table" 2058 @type tablePath: str 2059 2060 @return: Server name, database name, table name from given path. 2061 @rtype: tuple(str, str, str) 2062 2063 """ 2064 path = tablePath.split('.') 2065 tableName = path[-1] 2066 try: 2067 server = path[-4] 2068 except IndexError: 2069 server = self.server 2070 try: 2071 database = path[-3] 2072 except IndexError: 2073 try: 2074 database = path[-2] 2075 except IndexError: 2076 database = self.database 2077 2078 return server, database, tableName
2079 2080 #-------------------------------------------------------------------------- 2081
2082 - def _sqlString(self, value):
2083 """ @return: An SQL string of the correct format for the data type. 2084 @rtype: str 2085 """ 2086 if value is None: 2087 return "" 2088 elif type(value) is str: 2089 # default python behaviour for handling apostrophes is not 2090 # suitable for SQL statements - so this is the work around 2091 if "'" in value: 2092 return repr(value).replace("\'", "''").replace('"', "'") 2093 else: 2094 return repr(value) 2095 else: 2096 # Can't use repr(int) cos SQL doesn't like L suffixes on long ints 2097 return str(value)
2098
2099 #------------------------------------------------------------------------------ 2100 2101 -class Ingester(DbSession):
2102 """ 2103 A special type of database session, where you wish to ingest data into a 2104 database table from a native binary or CSV file. Upon initialisation this 2105 class verifies that the schemas of the database tables are up-to-date. 2106 2107 @note: Prefer binary files as they ingest faster than CSV files. 2108 @note: Prefer the ingest data to be pre-sorted by primary key and supply 2109 isOrdered=True to ingestTable() speed up ingest. Otherwise, if 2110 adding more data than already exists, e.g. overwriting the existing 2111 table or creating a new one, it may be faster to load the data 2112 without any constraints and then apply the constraints to the table 2113 after loading. 2114 @todo: Always parse indices on initialisation of Ingester. Can't drop 2115 indices at this stage as it should occur too soon. Then give 2116 ingestTable() a default option of dropIndices=True. Ingester can 2117 tell from the schema whether it should parse non-survey indices or 2118 not. Ingester itself could have a dropIndices option at 2119 initialisation with default of False, for IngIngester to overrule. 2120 """
2121 - class IngestError(Exception):
2122 """ Exception thrown when the data cannot be ingested. 2123 """ 2124 pass
2125 2126 #-------------------------------------------------------------------------- 2127 2128 #: Name of file on share for IngIngester type ingests only. 2129 fileOnShare = None 2130 #: Tag to append to ingest files so they can be cleaned-up later. 2131 fileTag = 'CuID000000' 2132 2133 #-------------------------------------------------------------------------- 2134 # Define private member variable default values (access prohibited) 2135 2136 #: Dictionary of schemas for tables to be ingested into, ref by table name. 2137 _schema = None 2138 2139 #-------------------------------------------------------------------------- 2140
2141 - def __init__(self, dbCon, tableSchema, tag=fileTag, skipSchemaCheck=False, 2142 checkReleasedOnly=False):
2143 """ 2144 Makes connection to the requested database, and checks that the schemas 2145 of the tables supplied in the table list are correct. 2146 2147 @param dbCon: A database connection. 2148 @type dbCon: DbSession 2149 @param tableSchema: Schema for tables to be ingested this session, as 2150 provided by L{Schema.parseTables()}. 2151 @type tableSchema: list(Schema.Table) or dict(str; Schema.Table) 2152 @param tag: A tag string to append to the file name on the 2153 share directory, so file clean up can be rm *tag*. 2154 @type tag: str 2155 @param skipSchemaCheck: If True, don't check the schema for 2156 mismatches - use with extreme caution! 2157 @type skipSchemaCheck: bool 2158 @param checkReleasedOnly: If True, only check the schema of released 2159 tables for mismatches. 2160 @type checkReleasedOnly: bool 2161 2162 """ 2163 super(Ingester, self).__init__( 2164 database=dbCon.server + '.' + dbCon.database, 2165 autoCommit=dbCon._autoCommit, 2166 isTrialRun=dbCon.isTrialRun, 2167 userName=dbCon.userName, 2168 isPersistent=dbCon._isPersistent) 2169 2170 self.fileTag = tag 2171 self._schema = (dict((table.name, table) for table in tableSchema) 2172 if not isinstance(tableSchema, dict) else tableSchema) 2173 2174 if not skipSchemaCheck: 2175 Logger.addMessage("Checking database schema for ingest tables...") 2176 self.checkSchema(self._schema.values(), checkReleasedOnly)
2177 2178 #-------------------------------------------------------------------------- 2179
2180 - def ingestTable(self, tableName, filePathName, idxInfo=None, 2181 overWrite=False, isCsv=False, deleteFile=True, 2182 isOrdered=False, checkConstraints=True):
2183 """ 2184 Ingest a binary or CSV flat file table into the database. 2185 2186 @note: If the table doesn't already exist in the database it will be 2187 created. However, it won't have foreign key constraints (this is a 2188 feature intended for release databases made by CU19). If you need 2189 foreign key constraints it is best to call createTable() prior to 2190 ingestTable(overWrite=False). 2191 2192 @todo: If not isOrdered and overWrite=True then we don't apply primary 2193 key until after ingest. OR even better - if not isOrdered try to create 2194 table without any constraints, if succeed, i.e. there is no existing 2195 table or overWrite=True, then apply primary key after ingest other 2196 don't but do check foreign keys. May make parts of 2197 NeighbourTableIngester redundant. 2198 2199 @param tableName: Name of table to create/update in the database. 2200 @type tableName: str 2201 @param filePathName: Name of the ingest file, with optional full 2202 catalogue server path or curation server path. If 2203 full path is omitted, then file is assumed to 2204 be located in the catalogue server's share 2205 directory. Otherwise if a different curation 2206 server path is supplied the file is automatically 2207 moved to the share (assuming the fileTag has been 2208 set upon Ingester() initialisation). 2209 @type filePathName: str 2210 @param idxInfo: If you wish to drop indices prior to ingest, then 2211 supply L{Schema.parseIndices()} information. 2212 @type idxInfo: defaultdict(str: list(Schema.Index)) 2213 @param overWrite: If True, overwrite an existing table in the 2214 database. If False, update the table if it exists, 2215 otherwise create the table. NB: Whenever this 2216 method creates a table it does so without applying 2217 foreign key constraints. 2218 @type overWrite: bool 2219 @param isCsv: If True, expects to ingest a CSV ascii file, else 2220 expects native binary format. 2221 @type isCsv: bool 2222 @param deleteFile: If True, delete ingest file after ingest, 2223 regardless of outcome. 2224 @type deleteFile: bool 2225 @param isOrdered: If True, the data is ordered by primary key. 2226 @type isOrdered: bool 2227 @param checkConstraints: If True, check foreign key constraints during 2228 ingest if any exist, otherwise never check 2229 foreign key constraints. 2230 @type checkConstraints: bool 2231 2232 @return: Number of rows ingested. 2233 @rtype: int 2234 2235 """ 2236 if not filePathName: 2237 return 0 2238 2239 if any(prefix in filePathName 2240 for prefix in["cu01id", "cu03id", "cu04id"]): 2241 fileName = self.fileOnShare = filePathName[:] 2242 ingHost, sharePath = filePathName.partition("samba/")[::2] 2243 filePathName = 2 * self.sysc.catServerFileSep \ 2244 + ingHost.replace(os.sep, '').upper() \ 2245 + self.sysc.catServerFileSep \ 2246 + sharePath.replace(os.sep, self.sysc.catServerFileSep) 2247 else: 2248 fileName, filePathName = self._normalisePath(filePathName) 2249 2250 if not fileName: # File not found, nothing to ingest 2251 return 0 2252 2253 Logger.addMessage("Ingesting into %s..." % tableName) 2254 try: 2255 ingestSchema = self._schema[tableName] 2256 except KeyError: 2257 raise Exception( 2258 "<ERROR> Cannot ingest %s: Ingester was not initially supplied" 2259 " with a schema for table %s" % (fileName, tableName)) 2260 2261 isNewTable = overWrite or not self.existsTable(ingestSchema.name) 2262 if isNewTable: 2263 isNewTable = self.createObjects([ingestSchema], overWrite) 2264 2265 # Drop any indices defined on the table before bulk loading 2266 if not overWrite and idxInfo: 2267 self.dropObjects(idxInfo[tableName]) 2268 self.commitTransaction() # Otherwise deadlock occurs 2269 2270 hints = "firstrow=1" 2271 if isCsv: 2272 hints += ", datafiletype='char', fieldterminator=','"\ 2273 ", rowterminator='0x0A'" 2274 else: 2275 hints += ", datafiletype='native', tablock" 2276 2277 # Verify foreign-key constraints during ingest, unless this method 2278 # has created a new table in which case they won't have been 2279 # created anyway. 2280 checkConstraints = checkConstraints and not isNewTable \ 2281 and any(type(constraint) is schema.ForeignKeyConstraint 2282 for constraint in ingestSchema.constraints) 2283 2284 if checkConstraints: 2285 hints += ", check_constraints" 2286 2287 if isOrdered: 2288 hints += ", order(%s)" % self._schema[tableName].primaryKey() 2289 2290 sql = "BULK INSERT %s FROM '%s' WITH (%s)" \ 2291 % (tableName, filePathName, hints) 2292 2293 try: 2294 try: 2295 # @HACK: Due to recent slow down in samba shares: time out loop 2296 timeout = 0 2297 while True: 2298 try: 2299 numRowsIngested = \ 2300 self._executeScript(sql, wantRowCount=True) 2301 2302 except odbc.DatabaseError as error: 2303 if "does not exist" not in str(error) or timeout == 600: 2304 raise 2305 2306 time.sleep(1) 2307 timeout += 1 2308 else: 2309 break 2310 2311 except odbc.ProgrammingError as error: 2312 if not str(error).startswith('Cannot fetch a row from OLE DB ' 2313 'provider "BULK" for linked server "(null)".\nScript'): 2314 raise 2315 2316 msg = "Ingest file data does not match the schema of " 2317 msg += tableName 2318 2319 if not isCsv: 2320 # @TODO: Binary row-by-row ingest 2321 raise Ingester.IngestError(msg 2322 + ', ' + str(error).split("returned ")[-1] 2323 + ". Catalogue server file path: " + filePathName) 2324 else: 2325 Logger.addMessage("<WARNING> " + msg 2326 + ". Attemping ODBC insert...") 2327 2328 # Try a row-by-row ingest for debug purposes 2329 numRowsIngested = 0 2330 for row in csv.File(self.sharePath(fileName)): 2331 fRow = [column.parseValue(val) for val, column 2332 in zip(row, self._schema[tableName].columns)] 2333 try: 2334 isIngested = self.insertData(tableName, fRow) 2335 except odbc.Error as error: 2336 raise Ingester.IngestError(error) 2337 2338 if not isIngested: 2339 Logger.addMessage("Insert of this row failed: " 2340 + ', '.join(map(str, fRow))) 2341 2342 numRowsIngested += isIngested 2343 Logger.addMessage("Row number = %s" % numRowsIngested, 2344 alwaysLog=False) 2345 else: 2346 # Don't trust ODBC cursor row count as it's not rowcount_big 2347 numRowsIngested = \ 2348 max(self._executeScript("SELECT ROWCOUNT_BIG()", 2349 wantResult=True, wantAll=False)[0], 0) 2350 2351 except: 2352 if self.sysc.catServerSharePath in filePathName: 2353 Logger.addMessage("<Info> Ingest file left on file share: " 2354 + self.sharePath(fileName)) 2355 else: 2356 Logger.addMessage("<Info> Ingest file left on catalogue " 2357 "server: " + filePathName) 2358 raise 2359 2360 Logger.addMessage("done! Rows affected = %s" % numRowsIngested) 2361 2362 self._removeIngestFile(fileName, filePathName, deleteFile) 2363 2364 return numRowsIngested
2365 2366 #-------------------------------------------------------------------------- 2367
2368 - def _moveToShare(self, filePathName):
2369 """ 2370 Moves a file to the catalogue load server's share directory as mounted 2371 on the curation server. Works around the share violation problem under 2372 NFS: a file written to the curation-client / load-server NFS file share 2373 cannot be accessed immediately after being written unless it is first 2374 renamed! 2375 2376 @param filePathName: Path of file to copy over to the share directory. 2377 Must be a full path to the file visible from the 2378 curation server O/S. 2379 @type filePathName: str 2380 2381 @return: New file name in share directory, including any 2382 sub-directories, or None if original file does not exist. 2383 @rtype: str 2384 2385 """ 2386 if not os.path.exists(filePathName): 2387 return 2388 2389 elif self.fileTag is Ingester.fileTag: 2390 # In this case we are not allow to rename or move the file, so it 2391 # must already be tagged and on the share. 2392 # @@TODO: When the IngIngester code is unified with the rest of the 2393 # code to use the same tags then we can avoid this condition. Then 2394 # could have file paths publicly available as member variables. 2395 return filePathName.replace(self.sharePath(), '') 2396 2397 if not filePathName.startswith(self.sharePath()): 2398 shareDisk = self.sharePath().split(os.sep)[1] 2399 if filePathName.split(os.sep)[1] != shareDisk: 2400 Logger.addMessage( 2401 "Transferring ingest file to %s..." % shareDisk) 2402 2403 fileName = os.path.basename(filePathName) 2404 shutil.move(filePathName, self.sharePath(fileName)) 2405 filePathName = self.sharePath(fileName) 2406 2407 # Append tag as a work-around to wake up windows... hopefully. 2408 fileRoot, fileExt = os.path.splitext(filePathName) 2409 taggedPathName = fileRoot + self.fileTag + fileExt 2410 if self.fileTag not in fileRoot: 2411 os.rename(filePathName, taggedPathName) 2412 else: 2413 os.rename(filePathName, taggedPathName) 2414 os.rename(taggedPathName, filePathName) 2415 taggedPathName = filePathName 2416 2417 return taggedPathName.replace(self.sharePath(), '')
2418 2419 #-------------------------------------------------------------------------- 2420
2421 - def _normalisePath(self, filePathName):
2422 """ 2423 Move ingest file to a place visible to the catalogue server O/S and 2424 normalise different filePathName inputs into the format: 2425 2426 filePathName = Full path to file from the catalogue server O/S e.g. 2427 H:\dir\subdir\filename.ext 2428 2429 fileName = filename.ext or subdir/filename.ext if file is in a 2430 subdirectory of the catalogue server share directory. 2431 2432 @param filePathName: Any acceptable file path definition allowed by 2433 the Ingester.ingestTable() method. 2434 @type filePathName: str 2435 2436 @return: fileName, filePathName (see description above). 2437 @rtype: tuple(str, str) 2438 2439 """ 2440 if self.sharePath() in filePathName: 2441 curPathName = filePathName 2442 2443 elif self.sysc.catServerFileSep not in filePathName: 2444 # Either no file path or a curation server path 2445 curPathName = (filePathName if os.path.dirname(filePathName) else 2446 self.sharePath(filePathName)) 2447 2448 elif self.sysc.catServerSharePath in filePathName: 2449 # Full load server share path given, probably file is in a 2450 # sub-directory in this case. 2451 curPathName = self.sharePath( 2452 filePathName.replace(self.sysc.catServerSharePath, '') 2453 .replace(self.sysc.catServerFileSep, os.sep)) 2454 2455 else: 2456 # Alternate load server path to the share path given 2457 fileName = filePathName # just used for error message 2458 return fileName, filePathName 2459 2460 fileName = self._moveToShare(curPathName) 2461 if not fileName: 2462 return None, None 2463 else: 2464 filePathName = self.sysc.catServerSharePath \ 2465 + fileName.replace(os.sep, self.sysc.catServerFileSep) 2466 2467 return fileName, filePathName
2468 2469 #-------------------------------------------------------------------------- 2470
2471 - def _removeIngestFile(self, fileName, filePathName, deleteFile):
2472 """ 2473 Remove the ingest file. 2474 2475 @param fileName: Name and sub-dir of file in the curation server 2476 share path. 2477 @type fileName: str 2478 @param filePathName: Full path to file on the catalogue server. 2479 @type filePathName: str 2480 @param deleteFile: If false, then just provide a warning. 2481 @type deleteFile: bool 2482 2483 """ 2484 if deleteFile and not self.isTrialRun: 2485 try: 2486 self.runOnServer("del " + filePathName) 2487 except DbSession.CatalogueServerError: 2488 if self.sysc.catServerSharePath not in filePathName: 2489 raise 2490 2491 # Probably a permissions issue - delete from the Linux side 2492 os.remove(self.sharePath(fileName)) 2493 elif "cu03id" in filePathName or "cu04id" in filePathName: 2494 pass 2495 elif self.sysc.catServerSharePath not in filePathName: 2496 Logger.addMessage("<Info> Ingest file left on catalogue server: " 2497 + filePathName) 2498 elif deleteFile: 2499 Logger.addMessage("<Info> Ingest file left on file share: " 2500 + self.sharePath(fileName))
2501
2502 #------------------------------------------------------------------------------ 2503 2504 -class Outgester(DbSession):
2505 """ 2506 A special type of database session, where you wish to bulk outgest data 2507 from the database to a file. This class ensures the connection has the 2508 correct permissions and is designed to work around the delays in the 2509 appearance of the data on the NFS mounted load server share directory. If 2510 the schema for tables that data will be outgested from is supplied, then 2511 upon initialisation this class verifies that the schemas of the database 2512 tables are up-to-date. Otherwise, the class will query the database 2513 schema directly, which is slower. 2514 2515 """ 2516 #-------------------------------------------------------------------------- 2517 # Define class constants (access as Outgester.varName) 2518 2519 fileTag = 'CuID000000' 2520 """ Tag to append to outgest files so they can be cleaned-up later. 2521 """ 2522 sampleRate = 2 2523 """ Delay in seconds between testing file size on NFS mounted share. 2524 """ 2525 tempViewName = "OutgestTempView" + str(os.getpid()) + os.getenv("HOST") 2526 """ Name of temporary outgest view that's created if the SQL statement is 2527 too long for the BCP statement. Host PID is a safer UID than fileTag. 2528 """ 2529 timeOut = 600 2530 """ Time in seconds before assuming outgest to NFS has failed. 2531 """ 2532 #-------------------------------------------------------------------------- 2533 # Define private member variable default values (access prohibited) 2534 2535 _isBcpDeadlocked = False #: BCP outgest is currently deadlocked? 2536 2537 #-------------------------------------------------------------------------- 2538
2539 - def __init__(self, dbCon, tag=fileTag, honourTrialRun=False):
2540 """ 2541 Makes connection to the requested database. Supplied database 2542 connection must be under a username that has has bcp rights, e.g. 2543 ldservrw/pubdbrw. Can't do a SETUSER to make sure we are always 2544 ldservrw, because not all users have the permissions to perform a 2545 SETUSER - which kinda defeats the object. If user is not ldservrw or 2546 pubdbrw a new connection will be made as ldservrw (which is safe as 2547 Outgester only reads from the database anyway). In this circumstance, 2548 to prevent too many connections being made/dropped make sure to create 2549 just a single Outgester object, which is continually used throughout. 2550 2551 @param dbCon: A database connection. 2552 @type dbCon: DbSession 2553 @param tag: A tag string to append to the file name on the 2554 share directory, so file clean up can be rm *tag*. 2555 @type tag: str 2556 @param honourTrialRun: If True, don't outgest if the database 2557 connection is in trial run mode. 2558 @type honourTrialRun: bool 2559 2560 """ 2561 super(Outgester, self).__init__( 2562 database=dbCon.server + '.' + dbCon.database, 2563 autoCommit=dbCon._autoCommit, 2564 isTrialRun=dbCon.isTrialRun if honourTrialRun else False, 2565 userName=(dbc.loadServerRwUsername() 2566 if dbCon.userName != "pubdbrw" else dbCon.userName), 2567 isPersistent=dbCon._isPersistent) 2568 2569 self._isDirty = dbCon._isDirty 2570 self.fileTag = tag
2571 2572 #-------------------------------------------------------------------------- 2573
2574 - def getFilePath(self, fileID):
2575 """ Returns the location of a file that outgest with this fileID. 2576 """ 2577 return self.sharePath(fileID + self.fileTag + ".dat")
2578 2579 #-------------------------------------------------------------------------- 2580
2581 - def outgestQuery(self, query, fileID='', filePath='', createDtdFile=False, 2582 isCsv=False, redirectStdOut=False):
2583 """ 2584 Bulk outgest the results from an SQL query to a file in the catalogue 2585 server share directory. 2586 2587 @note: If data is being copied by bulk outgest/ingest it helps to order 2588 by the primary key. 2589 @note: This function may hang for up to 10 minutes after outgest if the 2590 outgest failed silently (which can happen), whilst waiting for 2591 the NFS update to time out. I've chosen not to print a message 2592 saying the outgest is complete and that the function is waiting 2593 until time-out, because multiple outgests would display too 2594 much information, and there's no point making this message to 2595 occur in debug mode only because it would be obvious. 2596 2597 @param query: SQL query to outgest. 2598 @type query: SelectSQL 2599 @param fileID: Optionally request a specific unique identifier to be 2600 used in the name for the outgest file. The file will 2601 be named "[fileID][fileTag].dat". This is useful if 2602 multiple outgests from the same table need to be 2603 available at the same time. 2604 @type fileID: str 2605 @param filePath: Optionally outgest to a path on the catalogue server 2606 other than the share path (e.g. "G:\"), or transfer 2607 outgest file to a different path on the curation 2608 server. Either give full new path name for the file or 2609 else just provide the path to the new directory, but 2610 make sure that directory exists first. 2611 @type filePath: str 2612 @param createDtdFile: If True and the outgest is to a binary file on 2613 the share path, a file containing the data type 2614 definition of the outgest file will be created 2615 with the same name as the outgest file, but with 2616 a ".dtd" extension rather than ".dat". 2617 @type createDtdFile: bool 2618 @param isCsv: If True, outputs to CSV ascii file, else defaults 2619 to native binary format. 2620 @type isCsv: bool 2621 @param redirectStdOut: If True, the results of the outgest will be 2622 written to a file on the share path rather than 2623 returned through mxODBC. This is required to 2624 work around a mysterious bug in very large table 2625 outgests only and is slightly less efficient. 2626 @type redirectStdOut: bool 2627 2628 @return: Full path to the outgest file (unless createDtdFile requested, 2629 which returns the row count outgested). The path will be a 2630 curation server path if the file is outgest to the share 2631 directory, which is the default behaviour, otherwise a 2632 catalogue server path is returned. 2633 @rtype: str (or int) 2634 2635 """ 2636 if len(query.tables) is 1: 2637 table = query.tables.values()[0] 2638 elif query.selectStr.count('.') is 1: 2639 table = query.tables[query.selectStr.split('.')[0]] 2640 else: 2641 tableList = query.tables.values() 2642 tableList.reverse() 2643 table = ''.join(tableList) 2644 2645 drive = '' 2646 if self.sysc.catServerFileSep in filePath: 2647 drive, filePath = filePath, drive 2648 2649 fileName = \ 2650 os.path.basename(self.getFilePath(fileID or table.split('.')[-1])) 2651 2652 filePathName = (drive or self.sysc.catServerSharePath) + fileName 2653 2654 queryFmt = '"%s OPTION (MAXDOP 1)"' 2655 view = schema.View() 2656 if len(str(query)) < 1000: 2657 querySQL = (queryFmt % query).replace("'", "''") 2658 else: 2659 view.name = Outgester.tempViewName 2660 # View definition cannot include "order by" or "group by" 2661 view.definition = \ 2662 str(SelectSQL(query.selectStr, query.fromStr, query.whereStr)) 2663 2664 # Order by can't contain aliases 2665 order = ', '.join(col.split('.')[-1] 2666 for col in csv.values(query.orderByStr)) 2667 2668 tablePath = (self.tablePath(view.name) 2669 if self.userName not in ["pubdbrw", "ldservro"] else 2670 self.tablePath(view.name, ownerName=self.userName)) 2671 2672 querySQL = queryFmt % SelectSQL('*', tablePath, orderBy=order) 2673 self.createObjects([view], overWrite=True) 2674 2675 if self._isDirty: 2676 querySQL = querySQL.replace(" WHERE", " WITH (NOLOCK) WHERE") 2677 2678 try: 2679 cmd = "bcp %s queryout %s %s -T" % \ 2680 (querySQL, filePathName, '-a' if isCsv else '-n') 2681 2682 while not self._outgestFile(cmd, redirectStdOut, view): 2683 # Keep trying until the file exists 2684 time.sleep(60) 2685 2686 finally: 2687 self._isBcpDeadlocked = False 2688 if view.definition: 2689 self.dropObjects([view]) 2690 2691 if drive: 2692 if ':' in filePathName: 2693 filePathName = self.uncPath(filePathName) 2694 else: 2695 filePathName = self.sharePath(fileName) 2696 2697 if not self.isTrialRun: 2698 self._waitForNFS(filePathName, query) 2699 2700 if filePath: 2701 filePathName = self._transferFile(filePathName, filePath) 2702 2703 if not isCsv and createDtdFile and not self.isTrialRun: 2704 dataTypes = self._getDataTypes(query) 2705 2706 # DTD file is read-only and possibly written by another user 2707 # so delete 2708 dtdPathName = filePathName.replace('.dat', '.dtd') 2709 if os.path.exists(dtdPathName): 2710 os.remove(dtdPathName) 2711 2712 file(dtdPathName, 'w').writelines(dataType + '\n' 2713 for dataType in dataTypes) 2714 os.chmod(dtdPathName, 0444) 2715 2716 rowSize = sum(self.sysc.sqlDataTypeSize[dataType] 2717 for dataType in dataTypes) 2718 2719 return os.path.getsize(filePathName) // rowSize 2720 2721 return filePathName
2722 2723 #-------------------------------------------------------------------------- 2724
2725 - def _getDataTypes(self, query):
2726 """ Returns ordered list of data types returned by the given query, 2727 either via examining the schema, if available, or by direct 2728 database table query. 2729 2730 @todo: Select * queries may not return correctly ordered data 2731 types. If so, and order matters, this can be acheived by 2732 another database query - which of course will slow the code 2733 down a bit. 2734 @todo: Simplify by just running a test query of one row and then 2735 inspecting the cursor like pySQL does? Similar to how 2736 DbSession.queryRowSize() works. This will render the above 2737 todo obsolete. 2738 """ 2739 columns = \ 2740 [column.split('.')[-1] for column in csv.values(query.selectStr)] 2741 2742 # Prepare a dictionary of attributes for all tables queried, assuming 2743 # duplicated attribute names have same data type. 2744 attrs = {} 2745 for table in query.tables.values(): 2746 attrs.update(self.queryDataTypes(table)) 2747 2748 if columns == ['*']: 2749 if '.' not in query.selectStr: 2750 columns = attrs.keys() 2751 else: 2752 alias = query.selectStr.split('.')[0] 2753 table = query.tables[alias].split('.')[-1] 2754 columns = self.queryColumnNames(table) 2755 2756 return [attrs[column] for column in columns]
2757 2758 #-------------------------------------------------------------------------- 2759
2760 - def _outgestFile(self, cmd, redirectStdOut, view):
2761 """ 2762 Performs outgest to file, ensuring it exists. 2763 2764 @return: True, if outgest file successfully created. 2765 @rtype: bool 2766 2767 """ 2768 if redirectStdOut: 2769 # @TODO: Test performance with and without this option 2770 # #HACK: wired to G for nytro 2771 resultFile = "G:" + self.sysc.catServerFileSep + self.database \ 2772 + self.fileTag + "_outgest.log" 2773 2774 cmd += " -o " + resultFile 2775 2776 resultSet = self.runOnServer(cmd) 2777 2778 if redirectStdOut and not self.isTrialRun: 2779 # @@GOTCHA: Have to examine file on the catalogue server side in 2780 # case of NFS file delays, otherwise it is hard to tell if file is 2781 # empty due to failure or due to a delay of unknown length. 2782 resultSet = self.runOnServer("type " + resultFile) 2783 self.runOnServer("del " + resultFile) 2784 2785 if not self.isTrialRun and (len(resultSet) < 3 2786 or "rows copied" not in resultSet[-3]): 2787 2788 isDeadlock = \ 2789 any("deadlock victim" in result for result in resultSet) 2790 2791 if not self._isBcpDeadlocked and isDeadlock: 2792 Logger.addMessage("Outgest delayed due to deadlock...") 2793 self._isBcpDeadlocked = isDeadlock 2794 2795 if not isDeadlock: 2796 msg = "BCP Error:\n" 2797 msg += ('\n'.join(resultSet) if resultSet else 2798 "Outgest failed. BCP gave no error.") 2799 2800 msg += "\nCommand executed: " + cmd 2801 msg += ('' if not view.definition else 2802 "\n%s = %s" % (view.name, view.definition)) 2803 2804 raise odbc.DatabaseError(msg) 2805 2806 return not self._isBcpDeadlocked
2807 2808 #-------------------------------------------------------------------------- 2809
2810 - def _transferFile(self, filePathName, filePath):
2811 """ Transfers given file from the share path to the given path on the 2812 curation server. 2813 """ 2814 if not os.path.isdir(filePath): 2815 newFileName = os.path.basename(filePath) 2816 newFilePathName = filePath 2817 else: 2818 newFileName = os.path.basename(filePathName) 2819 newFilePathName = os.path.join(filePath, newFileName) 2820 2821 if self.isTrialRun or newFilePathName == filePathName: 2822 return newFilePathName 2823 2824 if filePath.split(os.sep)[1] != filePathName.split(os.sep)[1]: 2825 Logger.addMessage("Transferring outgest file to %s..." 2826 % self.sysc.getServerName(filePath)) 2827 2828 if os.path.exists(newFilePathName): 2829 Logger.addMessage("<Info> File %s already exists, will be " 2830 "overwritten." % newFileName) 2831 os.remove(newFilePathName) 2832 2833 os.system('cp %s %s' % (filePathName, newFilePathName)) 2834 2835 if (os.path.getsize(filePathName) != os.path.getsize(newFilePathName)): 2836 os.remove(filePathName) 2837 raise Exception("Transfer from %s to %s failed" 2838 % (self.server, self.sysc.getServerName(filePath))) 2839 2840 os.remove(filePathName) 2841 2842 return newFilePathName
2843 2844 #-------------------------------------------------------------------------- 2845
2846 - def _waitForNFS(self, filePathName, query):
2847 """ Wait until the file on the NFS share is ready to be accessed. 2848 """ 2849 curTime = time.time() 2850 fileName = os.path.basename(filePathName) 2851 cmd = "dir " + self.sysc.catServerSharePath + fileName 2852 dirResults = [result.split()[2] for result in self.runOnServer(cmd) 2853 if result and fileName in result] 2854 try: 2855 tableFileSize = int(dirResults[0].replace(',', '')) 2856 except IndexError: 2857 raise odbc.OperationalError( 2858 "SQL outgest query to file %s has failed.\n" 2859 "Query executed: %s" % (filePathName, query)) 2860 2861 while (not os.path.exists(filePathName) or 2862 os.path.getsize(filePathName) != tableFileSize): 2863 2864 time.sleep(Outgester.sampleRate) 2865 2866 if time.time() - curTime > Outgester.timeOut: 2867 sizeStr = ("Non-existent" if not os.path.exists(filePathName) 2868 else str(os.path.getsize(filePathName))) 2869 raise odbc.OperationalError( 2870 "SQL outgest query to file %s has failed.\n" 2871 "Expected file size = %s\n" 2872 "Actual size = %s\n" 2873 "Query executed: %s" % 2874 (filePathName, tableFileSize, sizeStr, query))
2875
2876 #------------------------------------------------------------------------------ 2877 2878 -class Database(object):
2879 """ 2880 Defines a database object, and provides method to create and modify 2881 databases through administrative database connections. First, define with 2882 L{Database.__init__()}, then create the database with L{Database.create()}. 2883 2884 @group Nested Error Exceptions: ExistsError 2885 2886 """
2887 - class ExistsError(Exception):
2888 """ Exception thrown when a database being created already exists. 2889 """ 2890 msg = "Database %r already exists." 2891
2892 - def __init__(self, name):
2893 """ @param name: Name of database that is attempting to be created. 2894 @type name: str 2895 """ 2896 super(Database.ExistsError, self).__init__(self.msg % name)
2897 2898 #-------------------------------------------------------------------------- 2899 # Define public member variable default values (access as obj.varName) 2900 2901 files = None #: List of file path names for this database. 2902 name = "" #: Database name. 2903 2904 #-------------------------------------------------------------------------- 2905 # Define private member variable default values (access prohibited) 2906 2907 _folders = [] #: Set of directories this database spans. 2908 _sqlLine = "" #: SQL to create this database. 2909 2910 #-------------------------------------------------------------------------- 2911
2912 - def __init__(self, name, volumes=SystemConstants.catServerVolumes(), 2913 dbDir=None, primarySize="4 GB", logSize="1 GB", 2914 filegroups=[], filegrowth="10%"):
2915 """ 2916 Defines a database object. 2917 2918 @param name: Name of database to create. 2919 @type name: str 2920 @param volumes: List of volumes to spread database files over. 2921 @type volumes: str 2922 @param dbDir: Directory name on catalogue server to store 2923 database. This value is overriden for multi-file 2924 databases, which have the directory name as the 2925 database name, otherwise assumes non-survey folder. 2926 @type dbDir: str 2927 @param primarySize: Total initial size of files in the primary file 2928 group, as a string, e.g. "4 GB". 2929 @type primarySize: str 2930 @param logSize: Total initial size of files in the log file group, 2931 as a string, e.g. "4 GB". 2932 @type logSize: str 2933 @param filegroups: List of additional file group names and sizes, e.g. 2934 ("Detection", "4 GB"). 2935 @type filegroups: list(tuple(str, str)) 2936 @param filegrowth: File growth, e.g. "10%". 2937 @type filegrowth: str 2938 2939 """ 2940 self.name = name 2941 self._sqlLine = "CREATE DATABASE %s ON PRIMARY " % self.name 2942 dbDir = (self.name if filegroups else 2943 dbDir or SystemConstants(name).nonSurveyDir) 2944 2945 dbDir += SystemConstants.catServerFileSep 2946 self._folders = [volume + dbDir for volume in volumes] 2947 2948 fgLine = "(name=%s, filename='%s', size=%s, maxsize=UNLIMITED, " 2949 fgLine += "filegrowth=%s)," % filegrowth.replace('%', '%%') 2950 self.files = [] 2951 2952 # Primary file group 2953 for count, volume in enumerate(volumes): 2954 if len(volumes) is 1: 2955 fgName = "Primary_FG" 2956 else: 2957 fgName = "pfg_%s" % (count + 1) 2958 2959 if filegroups: 2960 filePathName = volume + dbDir + \ 2961 "PrimaryFileGroup%s.mdf" % (count + 1) 2962 else: 2963 filePathName = volume + dbDir + self.name + ".mdf" 2964 2965 self.files.append(filePathName) 2966 self._sqlLine += fgLine % (fgName, filePathName, 2967 self._splitSize(primarySize, len(volumes))) 2968 2969 # Other file groups 2970 for filegroup, size in filegroups: 2971 self._sqlLine += " filegroup %s_FG " % filegroup 2972 if len(volumes) is 1: 2973 fgName = "%s_FG" % filegroup 2974 else: 2975 fgName = ''.join(takewhile(lambda c: c.islower(), filegroup)) 2976 fgName += filegroup[len(fgName)].lower() + "fg_%s" 2977 2978 for count, volume in enumerate(volumes): 2979 #@@BUG: This is creating Detection1.ndf etc. instead of the 2980 #original DetectionFileGroup1.ndf etc. Can't change as it will 2981 #break automatic deletion of existing auto-created databases 2982 filePathName = volume + dbDir + filegroup + "%s.ndf" % (count + 1) 2983 self.files.append(filePathName) 2984 fgVolName = \ 2985 (fgName % (count + 1) if len(volumes) > 1 else fgName) 2986 2987 self._sqlLine += fgLine % (fgVolName, filePathName, 2988 self._splitSize(size, len(volumes))) 2989 2990 # Transaction log file group 2991 self._sqlLine = self._sqlLine.rstrip(',') + " LOG ON " 2992 for count, volume in enumerate(volumes): 2993 if len(volumes) is 1: 2994 fgName = "Log_FG" 2995 else: 2996 fgName = "lfg_%s" % (count + 1) 2997 2998 if filegroups: 2999 filePathName = volume + dbDir + \ 3000 "LogFileGroup_data%s.ldf" % (count + 1) 3001 else: 3002 filePathName = volume + dbDir + self.name + ".ldf" 3003 3004 self.files.append(filePathName) 3005 self._sqlLine += fgLine % (fgName, filePathName, 3006 self._splitSize(logSize, len(volumes))) 3007 3008 self._sqlLine = self._sqlLine.rstrip(',')
3009 3010 #-------------------------------------------------------------------------- 3011
3012 - def __str__(self):
3013 """ 3014 @return: Database name. 3015 @rtype: str 3016 3017 """ 3018 return self.name
3019 3020 #-------------------------------------------------------------------------- 3021
3022 - def attach(self, adminDb, asName=None):
3023 """ 3024 Re-attach the offline database that was previously detached. The 3025 current implementation is tied to MS SQL Server via T-SQL. 3026 3027 @param adminDb: An autocommiting admin (i.e. master) database 3028 connection to the server where the database will be 3029 attached. 3030 @type adminDb: DbSession 3031 @param asName: Optional alternative name to attach the database as, 3032 instead of its real name (used in the file names). 3033 @type asName: str 3034 3035 """ 3036 sql = self._sqlLine + " FOR ATTACH" 3037 name = self.name 3038 if asName: 3039 sql = sql.replace("CREATE DATABASE %s" % name, 3040 "CREATE DATABASE %s" % asName) 3041 name = asName 3042 3043 try: 3044 adminDb._executeScript(sql) 3045 except odbc.ProgrammingError as error: 3046 if Database.ExistsError.msg % name in str(error): 3047 raise Database.ExistsError(name) 3048 try: 3049 # As a fall back, use this simpler method that only cares about 3050 # the first file. Can't always use this method because it fails 3051 # with the mirrored databases. 3052 sql = "sp_attach_db '%s', '%s'" % (name, self.files[0]) 3053 adminDb._executeScript(sql) 3054 except: 3055 raise error 3056 3057 # Only change name on success in case caller catches exception. 3058 self.name = name 3059 self.setDefaultOptions(adminDb)
3060 3061 #-------------------------------------------------------------------------- 3062
3063 - def create(self, adminDb, overWrite=False):
3064 """ 3065 Creates this database on a specified server. 3066 3067 @param adminDb: An autocommiting admin (i.e. master) database 3068 connection to the server where the database will be 3069 created. 3070 @type adminDb: DbSession 3071 @param overWrite: If True, overwrite any pre-existing database of this 3072 name. 3073 @type overWrite: bool 3074 3075 """ 3076 self._makeFolders(adminDb) 3077 3078 # Create the database 3079 try: 3080 adminDb._executeScript(self._sqlLine) 3081 except odbc.ProgrammingError as error: 3082 if overWrite and ( 3083 "file names listed could not be created" in str(error) 3084 or Database.ExistsError.msg % self.name in str(error)): 3085 # Delete and try again... 3086 self.delete(adminDb) 3087 self._makeFolders(adminDb) 3088 # Create the database 3089 adminDb._executeScript(self._sqlLine) 3090 elif Database.ExistsError.msg % self.name in str(error): 3091 raise Database.ExistsError(self.name) 3092 else: 3093 raise 3094 3095 self.setDefaultOptions(adminDb)
3096 3097 #-------------------------------------------------------------------------- 3098
3099 - def delete(self, adminDb):
3100 """ 3101 Delete this database on a specified server. 3102 3103 @param adminDb: An autocommiting admin (i.e. master) database 3104 connection to the server where the database will be 3105 deleted. 3106 @type adminDb: DbSession 3107 3108 """ 3109 # Try and detach the database if it hasn't already been detached 3110 numAttempts = 20 3111 delay = 600 3112 for attempt in range(numAttempts): 3113 try: 3114 self.detach(adminDb) 3115 except odbc.ProgrammingError as error: 3116 if "does not exist" in str(error): 3117 break 3118 else: 3119 msg = "<Info> Cannot delete existing database because it "\ 3120 "is in use. Attempt %s of %s." % (attempt + 1, numAttempts) 3121 if attempt + 1 == numAttempts: 3122 msg += " Giving up..." 3123 Logger.addMessage(msg) 3124 raise 3125 else: 3126 msg += " Waiting for %d min..." % (delay / 60) 3127 Logger.addMessage(msg) 3128 time.sleep(delay) 3129 else: 3130 break 3131 3132 # Remove the database files 3133 for filePathName in self.files: 3134 adminDb.runOnServer("del " + filePathName) 3135 adminDb._isLogReq[adminDb.database] = True 3136 3137 # Remove the folders 3138 for folder in self._folders: 3139 adminDb.runOnServer("rmdir " + folder)
3140 3141 #-------------------------------------------------------------------------- 3142
3143 - def detach(self, adminDb):
3144 """ 3145 Detach (take offline) the database. The current implementation is tied 3146 to MS SQL Server via T-SQL. 3147 3148 @param adminDb: An autocommiting admin (i.e. master) database 3149 connection to the server where the database will be 3150 detached. 3151 @type adminDb: DbSession 3152 3153 """ 3154 adminDb._executeScript("EXEC sp_detach_db %r" % self.name)
3155 3156 #-------------------------------------------------------------------------- 3157
3158 - def getMirrorServer(self, server):
3159 """ 3160 Returns the corresponding mirror server for the given server. 3161 3162 @param server: Name of server to find mirror of. 3163 @type server: str 3164 3165 @return: Name of mirror server. 3166 @rtype: str 3167 3168 """ 3169 # Determine what the mirror server should be 3170 try: 3171 originalNum = int(server.replace(SystemConstants.clusterName, '')) 3172 except ValueError: 3173 raise Exception("Database.mirror() can only be performed on " 3174 "cluster servers") 3175 3176 mirrorNum = 1 + (originalNum - 1 ^ 1) 3177 return SystemConstants.clusterName + str(mirrorNum)
3178 3179 #-------------------------------------------------------------------------- 3180
3181 - def mirror(self, adminDb):
3182 """ 3183 Copy the database to the mirror server on the cluster. 3184 3185 @param adminDb: An autocommiting admin (i.e. master) database 3186 connection to the server which has the database to be 3187 copied. 3188 @type adminDb: DbSession 3189 3190 @return: Mirror server name. 3191 @rtype: str 3192 3193 """ 3194 mirrorServer = self.getMirrorServer(adminDb.server) 3195 3196 # Detach to enable mirror copy 3197 try: 3198 self.detach(adminDb) 3199 except odbc.DatabaseError as error: 3200 Logger.addExceptionWarning(error) 3201 Logger.addMessage("<WARNING> Cannot mirror database %s to %s as it" 3202 " needs to be detached from %s first." 3203 % (self.name, mirrorServer, adminDb.server)) 3204 3205 self.setReleaseOptions(adminDb) 3206 else: 3207 try: 3208 mirrorDb = DbSession( 3209 database=mirrorServer + '.' + adminDb.database, 3210 autoCommit=adminDb._autoCommit, 3211 isTrialRun=adminDb.isTrialRun, userName=adminDb.userName, 3212 isPersistent=adminDb._isPersistent) 3213 3214 # Copy original database files to the mirror and attach 3215 self.delete(mirrorDb) 3216 self._makeFolders(mirrorDb) 3217 for filePathName in self.files: 3218 mirrorFile = mirrorDb.uncPath(filePathName) 3219 adminDb.runOnServer("copy %s %s" 3220 % (filePathName, mirrorFile)) 3221 3222 self.attach(mirrorDb) 3223 self.setReleaseOptions(mirrorDb) 3224 finally: 3225 self.attach(adminDb) 3226 self.setReleaseOptions(adminDb) 3227 3228 return mirrorServer
3229 3230 #-------------------------------------------------------------------------- 3231
3232 - def release(self, adminDb, pubServer):
3233 """ 3234 Copy the database to the given public server on the cluster. 3235 3236 @param adminDb: An autocommiting admin (i.e. master) database 3237 connection to the server which has the database to be 3238 copied. 3239 @type adminDb: DbSession 3240 @param pubServer: Name of server to copy to. 3241 @type pubServer: str 3242 3243 @todo: Merge with mirror somehow. 3244 3245 """ 3246 pubVolumes = SystemConstants.catServerVolumes(pubServer) 3247 3248 # Save object's original state 3249 origVolumes = SystemConstants.catServerVolumes(adminDb.server) 3250 origFolders = self._folders[:] 3251 origFiles = self.files[:] 3252 origSqlLine = self._sqlLine 3253 3254 # Detach to enable copy to public server 3255 try: 3256 self.detach(adminDb) 3257 except odbc.DatabaseError as error: 3258 Logger.addExceptionWarning(error) 3259 Logger.addMessage("<WARNING> Cannot copy database %s to %s as it " 3260 "needs to be detached from %s first." 3261 % (self.name, pubServer, adminDb.server)) 3262 else: 3263 try: 3264 # Update this database object for public server volumes 3265 for volume, pubVolume in zip(origVolumes, pubVolumes): 3266 self._sqlLine = self._sqlLine.replace(volume, pubVolume) 3267 self._folders = [path.replace(volume, pubVolume) 3268 for path in self._folders] 3269 3270 self.files = [path.replace(volume, pubVolume) 3271 for path in self.files] 3272 3273 pubDb = DbSession( 3274 database=pubServer + '.' + adminDb.database, 3275 autoCommit=adminDb._autoCommit, 3276 isTrialRun=adminDb.isTrialRun, userName=adminDb.userName, 3277 isPersistent=adminDb._isPersistent) 3278 3279 # Copy original database files to the public server and attach 3280 self.delete(pubDb) 3281 self._makeFolders(pubDb) 3282 for origFile, pubFile in zip(origFiles, self.files): 3283 pubFile = pubDb.uncPath(pubFile) 3284 adminDb.runOnServer("copy %s %s" % (origFile, pubFile)) 3285 3286 self.attach(pubDb) 3287 self.setReleaseOptions(pubDb) 3288 finally: 3289 # Restore this object to its original state 3290 self._folders = origFolders[:] 3291 self.files = origFiles[:] 3292 self._sqlLine = origSqlLine 3293 self.attach(adminDb)
3294 3295 #-------------------------------------------------------------------------- 3296
3297 - def setBulkLoadOptions(self, adminDb):
3298 """ 3299 Set MS SQL Server database options appropriate for bulk loading, over 3300 the load database default options. 3301 3302 @param adminDb: An autocommiting admin (i.e. master) database 3303 connection to the server where the database will be 3304 altered. 3305 @type adminDb: DbSession 3306 3307 """ 3308 for option in ["AUTO_CREATE_STATISTICS OFF", "RECOVERY SIMPLE"]: 3309 adminDb._executeScript("ALTER DATABASE %s SET %s" 3310 % (self.name, option))
3311 3312 #-------------------------------------------------------------------------- 3313
3314 - def setDefaultOptions(self, adminDb):
3315 """ 3316 Set the MS SQL Server database options that are appropriate for a 3317 load database over the MS SQL server default options. 3318 3319 @param adminDb: An autocommiting admin (i.e. master) database 3320 connection to the server where the database will be 3321 altered. 3322 @type adminDb: DbSession 3323 3324 """ 3325 for option in ["AUTO_CREATE_STATISTICS ON", 3326 "RECOVERY BULK_LOGGED", 3327 "AUTO_SHRINK OFF", 3328 "TORN_PAGE_DETECTION ON", 3329 "AUTO_CLOSE OFF"]: 3330 adminDb._executeScript("ALTER DATABASE %s SET %s" 3331 % (self.name, option))
3332 3333 #-------------------------------------------------------------------------- 3334
3335 - def setReleaseOptions(self, adminDb):
3336 """ 3337 Set MS SQL Server database options appropriate for a release db over 3338 the default load database options. 3339 3340 @param adminDb: An autocommiting admin (i.e. master) database 3341 connection to the server where the database will be 3342 altered. 3343 @type adminDb: DbSession 3344 3345 """ 3346 for option in ["AUTO_CREATE_STATISTICS ON", "RECOVERY SIMPLE"]: 3347 adminDb._executeScript("ALTER DATABASE %s SET %s" 3348 % (self.name, option))
3349 3350 #-------------------------------------------------------------------------- 3351
3352 - def _makeFolders(self, adminDb):
3353 """ 3354 Creates the folders for the database files. 3355 3356 @param adminDb: An autocommiting admin (i.e. master) database 3357 connection to the server where the database will be 3358 created. 3359 @type adminDb: DbSession 3360 3361 """ 3362 # Make the folders 3363 for folder in self._folders: 3364 adminDb.runOnServer("mkdir " + folder)
3365 3366 #-------------------------------------------------------------------------- 3367
3368 - def _splitSize(self, size, divisions):
3369 """ 3370 Splits a size into a number of divisions, converting between GB and 3371 MB when necessary. 3372 3373 @param size: Initial size in GB or MB, e.g. 10 GB. 3374 @type size: str 3375 @param divisions: Number of divisions size must be divide between. 3376 @type divisions: int 3377 3378 @return: Divided integer size in either GB or MB. 3379 @rtype: str 3380 3381 """ 3382 units = size[-2:] 3383 newSize = float(size[:-2].strip()) / divisions 3384 if newSize < 1.0: 3385 if units != "MB": 3386 newSize *= 1000.0 3387 units = "MB" 3388 else: 3389 newSize = 1.0 3390 3391 return "%s%s" % (int(newSize), units)
3392
3393 #------------------------------------------------------------------------------ 3394 3395 -class Join(object):
3396 """ 3397 Defines a database inner join between two or more tables. Use as the table 3398 argument value in L{DbSession.query()} type methods or when initialising 3399 an L{SelectSQL} object. 3400 3401 """ 3402 # Define public member variable default values (access as obj.varName) 3403 aliases = None #: Dictionary of table names referenced by alias. 3404 joinCols = '' #: Common table commons to join on. 3405 fromStr = '' #: The SQL FROM clause for this join. 3406 whereStr = '' #: Join predicates to append to an SQL WHERE clause. 3407
3408 - def __init__(self, tables, attributes, subJoin=None):
3409 """ 3410 Defines an SQL inner join between given tables on given attributes. 3411 3412 @param tables: Unique set of all tables to be joined on same 3413 attribute set. Either just supply table names, or 3414 a tuple containing table name and alias, e.g. 3415 ["Multiframe", "ProgrammeFrame"] or 3416 [("Multiframe", "M1"), ("Multiframe", "M2")]. 3417 @type tables: sequence(str) or sequence(tuple) 3418 @param attributes: Unique attribute set on which to join. If a single 3419 join column is required then just a string will do. 3420 @type attributes: sequence(str) or str 3421 @param subJoin: Join additional tables on just the given attribute. 3422 @type subJoin: tuple(list(str), str) 3423 3424 """ 3425 if len(tables) < 2: 3426 raise ValueError("Join class must be supplied with at least two " 3427 "tables: tables = %s" % tables) 3428 3429 firstTable = self._parseInput(tables, attributes) 3430 3431 if subJoin: 3432 subTables, subCol = subJoin 3433 if isinstance(subTables, str): 3434 subTables = [subTables] 3435 3436 self.fromStr += ", " + ", ".join(subTables) 3437 self.whereStr += " AND " + " AND ".join("%s.%s=%s.%s" 3438 % (firstTable, subCol, table.split('.')[-1], subCol) 3439 for table in subTables)
3440 3441 #-------------------------------------------------------------------------- 3442
3443 - def __str__(self):
3444 """ 3445 @return: The SQL FROM clause for this join. 3446 @rtype: str 3447 3448 """ 3449 return self.fromStr
3450 3451 #-------------------------------------------------------------------------- 3452
3453 - def _parseInput(self, tables, attributes):
3454 """ 3455 Parses input tables and attributes to get a useful list of predicates. 3456 3457 """ 3458 attributes = set(csv.values(attributes) 3459 if isinstance(attributes, str) else attributes) 3460 try: 3461 self.aliases = dict((alias, table) for table, alias in tables) 3462 except ValueError: 3463 self.aliases = dict((table, table) for table in tables) 3464 3465 predicates = [] 3466 tableList = list(tables) 3467 firstTable = tableList[0] 3468 if type(firstTable) is tuple: 3469 firstTable = firstTable[1] 3470 3471 self.joinCols = \ 3472 ", ".join("%s.%s" % (firstTable, attr) for attr in attributes) 3473 3474 tableSet = set(tableList[1:]) 3475 for attr in attributes: 3476 for table in tableSet: 3477 if isinstance(table, tuple): 3478 table = table[1] 3479 3480 predicate = "%s.%s=%s.%s" % (firstTable, attr, table, attr) 3481 predicates.append(predicate) 3482 3483 self._setSQL(tables, predicates) 3484 3485 return firstTable
3486 3487 #-------------------------------------------------------------------------- 3488
3489 - def _setSQL(self, tables, predicates):
3490 """ 3491 Sets the FROM and WHERE SQL statement member variables for this join. 3492 3493 """ 3494 self.whereStr = " AND ".join(predicates) 3495 self.fromStr = ", ".join((table if not isinstance(table, tuple) else 3496 "%s AS %s" % table) for table in tables)
3497
3498 #------------------------------------------------------------------------------ 3499 3500 -class LeftJoin(Join):
3501 """ 3502 Defines a database left outer join between two tables. Use as the table 3503 argument value in L{DbSession.query()} type methods or when initialising 3504 an L{SelectSQL} object. 3505 3506 """
3507 - def __init__(self, tables, attributes):
3508 """ 3509 Defines an SQL left outer join between given tables on given attributes. 3510 3511 @param tables: Pair of tables to be left-joined on same attribute 3512 set. Either just supply table names, or a tuple 3513 containing table name and alias, e.g. 3514 ["Multiframe", "ProgrammeFrame"] or 3515 [("Multiframe", "M1"), ("Multiframe", "M2")]. 3516 @type tables: sequence(str) or sequence(tuple) 3517 @param attributes: Unique attribute set on which to join. If a single 3518 join column is required then just a string will do. 3519 @type attributes: sequence(str) or str 3520 3521 """ 3522 if len(tables) != 2: 3523 raise ValueError("LeftJoin class must be supplied with exactly two" 3524 " tables: tables = %s" % tables) 3525 3526 self._parseInput(tables, attributes)
3527 3528 #-------------------------------------------------------------------------- 3529
3530 - def _setSQL(self, tables, predicates):
3531 """ 3532 Sets the FROM and WHERE SQL statement member variables for this join. 3533 3534 """ 3535 self.fromStr = " LEFT OUTER JOIN ".join( 3536 (table if not isinstance(table, tuple) else "%s AS %s" % table) 3537 for table in tables) + " ON " + " AND ".join(predicates)
3538
3539 #------------------------------------------------------------------------------ 3540 3541 -class PkLeftJoin(LeftJoin):
3542 """ 3543 Defines a database left outer join between two tables on the first table's 3544 primary key. Use as the table argument value in L{DbSession.query()} type 3545 methods or when initialising an L{SelectSQL} object. 3546 3547 """
3548 - def __init__(self, leftTable, rightTable):
3549 """ 3550 Defines an SQL left outer join between given tables on the first 3551 table's primary key. 3552 3553 @param leftTable: First table in the left join. 3554 @type leftTable: Schema.Table 3555 @param rightTable: Second table in the left join. 3556 @type rightTable: Schema.Table 3557 3558 """ 3559 tables = [leftTable.name, rightTable.name] 3560 self._parseInput(tables, leftTable.primaryKey())
3561
3562 #------------------------------------------------------------------------------ 3563 3564 -class SelectSQL(object):
3565 """ 3566 Defines an SQL select statement. Initialise with components of an SQL query 3567 and then the SQL select line is returned when you convert the object to a 3568 string. Primarily used as part of the WHERE clause of another SQL 3569 statement. 3570 3571 """ 3572 # Define public member variable default values (access as obj.varName) 3573 tables = None #: Dictionary of table names referenced by alias. 3574 selectStr = '' #: SQL SELECT statement 3575 fromStr = '' #: SQL FROM statement 3576 whereStr = '' #: SQL WHERE clause statement 3577 orderByStr = '' #: SQL ORDER BY statement 3578 groupByStr = '' #: SQL GROUP BY statement 3579 3580 # Define private member variable default values (access prohibited) 3581 _sql = '' #: The full SQL select query statement line. 3582
3583 - def __init__(self, select, table, where='', orderBy='', groupBy=''):
3584 """ 3585 Defines an SQL select statement from the given components. 3586 3587 @param select: SQL SELECT string; comma-separated column names. 3588 @type select: str 3589 @param table: SQL FROM string; single table name or L{Join} object. 3590 @type table: str or L{Join} 3591 @param where: Optional SQL WHERE clause. 3592 @type where: str 3593 @param orderBy: Optional SQL ORDER BY statement. 3594 @type orderBy: str 3595 @param groupBy: Optional SQL GROUP BY statement. 3596 @type groupBy: str 3597 3598 """ 3599 self.selectStr = select 3600 self.fromStr = str(table) 3601 self.whereStr = where 3602 self.orderByStr = orderBy 3603 self.groupByStr = groupBy 3604 3605 if isinstance(table, Join): 3606 self.tables = table.aliases 3607 if table.whereStr: 3608 self.whereStr += " AND " if self.whereStr else "" 3609 self.whereStr += table.whereStr 3610 else: 3611 self.tables = {} 3612 3613 # Careful of any tables created as a sub-query 3614 derivTableDict = {} 3615 fromStr = self.fromStr 3616 if '(' in fromStr: 3617 derivTableDict, fromStr = self.replaceDerivTables(fromStr) 3618 3619 for table in list(csv.values(fromStr)): 3620 if " AS " in table.upper(): 3621 table, alias = \ 3622 table.split(" AS " if " AS " in table else " as ") 3623 if table.startswith('@') and table in derivTableDict: 3624 table = derivTableDict[table] 3625 self.tables[alias] = table 3626 else: 3627 if table.startswith('@') and table in derivTableDict: 3628 table = derivTableDict[table] 3629 self.tables[table] = table 3630 3631 self._sql = "SELECT %s FROM %s" % (self.selectStr, self.fromStr) 3632 if self.whereStr: 3633 self._sql += " WHERE " + self.whereStr 3634 if self.groupByStr: 3635 self._sql += " GROUP BY " + self.groupByStr 3636 if self.orderByStr: 3637 self._sql += " ORDER BY " + self.orderByStr
3638 3639 #-------------------------------------------------------------------------- 3640
3641 - def __str__(self):
3642 """ @return: The SQL select statement line. 3643 @rtype: str 3644 """ 3645 return self._sql
3646 3647 #-------------------------------------------------------------------------- 3648 3649 @staticmethod
3650 - def replaceDerivTables(fromStr):
3651 """ Replaces tables derived from a sub-query in the SQL script's FROM 3652 string, so that the class can parse the information correctly. 3653 """ 3654 nOpenBra = 0 3655 nCloseBra = 0 3656 isDerivedTable = False 3657 startPos = 0 3658 derivedTableDict = {} 3659 for chrNo in range(len(fromStr)): 3660 if fromStr[chrNo] == '(': 3661 nOpenBra += 1 3662 3663 if fromStr[chrNo] == ')': 3664 nCloseBra += 1 3665 3666 if nOpenBra > nCloseBra: 3667 if not isDerivedTable: 3668 startPos = chrNo 3669 isDerivedTable = True 3670 3671 if nCloseBra == nOpenBra: 3672 if isDerivedTable: 3673 derivedTableNo = len(derivedTableDict) + 1 3674 derivedTableDict['@derivedTable%s' % derivedTableNo] = \ 3675 fromStr[startPos:chrNo + 1] 3676 3677 isDerivedTable = False 3678 3679 for dTable in derivedTableDict: 3680 fromStr = fromStr.replace(derivedTableDict[dTable], dTable) 3681 3682 return derivedTableDict, fromStr
3683
3684 #------------------------------------------------------------------------------ 3685 3686 -def bulkCopy(query, tableSchema, fromDb, toDb, fileTag, drive='', 3687 isSmallTable=True):
3688 """ 3689 Copies table data via a bulk outgest and ingest to another table, which 3690 can be in a different database and on a different server. Destination table 3691 will be automatically created if it doesn't already exist. 3692 3693 @param query: Query of data selected to copy via outgest. Table names 3694 must be given with full path. Selection must be ordered 3695 by the ingest table's primary key in ascending order. 3696 @type query: SelectSQL 3697 @param tableSchema: Schema of the ingest table. 3698 @type tableSchema: schema.Table 3699 @param fromDb: Connection to the database where outgest will occur. 3700 @type fromDb: DbSession 3701 @param toDb: Connection to the database where ingest will occur. 3702 @type toDb: DbSession 3703 @param fileTag: Unique tag to attach to temporary outgest/ingest files. 3704 @type fileTag: str 3705 @param drive: Optionally outgest to a different catalogue server 3706 drive, instead of the share directory, e.g. "G:\". 3707 @type drive: str 3708 @param isSmallTable: If False, then stdout is redirected to file to work 3709 around a mystery bug in very large table outgests, 3710 which is inefficient for small tables. 3711 @type isSmallTable: bool 3712 3713 @return: Number of rows ingested. 3714 @rtype: int 3715 3716 @warning: Make sure the ingest table schema has been checked first. 3717 3718 @todo: Merge Outgester and Ingester classes into one BulkTransfer class 3719 with individual outgest, ingest and copy functions? Then we can 3720 handle the tableSchema initialisation a bit more neatly, and put 3721 fileTag into the initialiser. Then codes can choose whether or not 3722 to pre-initialise. 3723 """ 3724 outPathName = \ 3725 Outgester(fromDb, fileTag, honourTrialRun=True).outgestQuery(query, 3726 filePath=drive or toDb.sharePath(), 3727 redirectStdOut=not isSmallTable) 3728 3729 # # @TODO: Remove? 3730 # # @@HACK: This needs to be integrated into Outgester - i.e. need to outgest 3731 # # to ingest server's free-est volume. Tidy up changes to Outgester as well 3732 # # as the Ingester time-delay hack when I implement this. 3733 # if drive and fromDb.server != toDb.server: 3734 # # Strictly speaking moving files between catalogue servers within the 3735 # # cluster is uneccessary. However, other servers on the cluster have a 3736 # # 10s delay before they see a newly created remote file, so it's faster 3737 # # for small files to move them to the local server prior to ingest. 3738 # newPathName = \ 3739 # outPathName.replace(fromDb.server.upper(), toDb.server.upper())\ 3740 # .replace(drive.replace(':', ''), 3741 # toDb.getBestVolume().replace(':', '')) 3742 # 3743 # fromDb.runOnServer("move %s %s" % (outPathName, newPathName)) 3744 # outPathName = newPathName 3745 3746 ingester = Ingester(toDb, [tableSchema], tag=fileTag, skipSchemaCheck=True) 3747 return ingester.ingestTable(tableSchema.name, outPathName, isOrdered=True)
3748 3749 #------------------------------------------------------------------------------ 3750 # Change log: 3751 # 3752 # 18-Jan-2006, RSC: Original version. 3753