1   
   2   
   3  """ 
   4     Database interface. The L{DbSession} class provides a complete database API 
   5     as well as connection management features. A database connection is 
   6     instantiated upon construction of a L{DbSession} object and then 
   7     automatically closed when the object falls out of scope. Existing 
   8     identical connections are reused, invisibly to the user, to reduce the 
   9     number of simultaneous open connections. All access to the database should 
  10     then be via the methods supplied by the L{DbSession} class. 
  11   
  12     Usage 
  13     ===== 
  14   
  15     Database API 
  16     ------------ 
  17   
  18     Import as:: 
  19         from wsatools.DbConnect.DbSession import DbSession 
  20   
  21     For a simple default database connection to the WSA:: 
  22   
  23         db = DbSession("WSA") 
  24         db.update("Multiframe", "deprecated=0", where="multiframeID") 
  25         print(db.queryNumRows("Multiframe")) 
  26         print(db.queryAttrMax("multiframeID", "Multiframe")) 
  27   
  28     For joined queries, e.g. a list of all LAS stacks multiframeIDs, use the 
  29     L{Join} class:: 
  30   
  31         from wsatools.DbConnect.DbSession import Join 
  32   
  33         mfIDs = db.query("Multiframe.multiframeID", 
  34                  table=Join(["Multiframe", "ProgrammeFrame"], ["multiframeID"]), 
  35                  where="frameType LIKE '%stack' AND programmeID=101") 
  36   
  37     For queries where a select statement is used within a where clause, use the 
  38     L{SelectSQL} class:: 
  39   
  40         from wsatools.DbConnect.DbSession import SelectSQL 
  41   
  42         progMfs = SelectSQL("multiframeID", table="ProgrammeFrame", 
  43                             where="programmeID=%s" % self.programmeID) 
  44         fileNames = db.query("fileName", table="Multiframe", 
  45                              where="multiframeID IN %s" % progMfs) 
  46   
  47     To perform a safe temporary disconnect from the database invoke the 
  48     L{DbSession.goOffline()} method. When any L{DbSession} object that uses this 
  49     connection next attempts to access the database, e.g. a query, the 
  50     connection will be reactivated, and if the database is down, then it will 
  51     persistently attempt to reconnect. Hence, you can also use this method at 
  52     sensitive stages of a curation script to ensure the database connection 
  53     remains alive. 
  54   
  55     Please refer to the L{DbSession} class documentation for the full range of 
  56     API methods and options. 
  57   
  58     Ingester 
  59     -------- 
  60   
  61     Import as:: 
  62         from wsatools.DbConnect.DbSession import Ingester 
  63   
  64     Before ingesting it is vital to ensure the schema of the ingest file matches 
  65     the table into which it will be ingesting. This occurs when an L{Ingester} 
  66     object is created, so this should be done as early as possible in your code, 
  67     and not within a For-Loop unnecesarily. To instantiate an L{Ingester} object 
  68     you must pass it a L{DbSession} database connection to the database that 
  69     hosts the table into which the ingest will occur, and a schema for the 
  70     table(s) that will be ingested into during the lifetime of this particular 
  71     L{Ingester} object:: 
  72   
  73         db = DbSession() 
  74         ingester = Ingester(db, tableSchema) 
  75   
  76     The tableSchema is just a list of L{Schema.Table} objects for every table 
  77     that will be ingested this session. Use the L{Schema.parseTables()} method 
  78     to create this schema, and then reduce or expand the list until just the 
  79     exact set of tables to be ingest are listed. To ingest a native binary file 
  80     located in the catalogue server's share path into the table:: 
  81   
  82         numRows = ingester.ingestTable("lasSource", "lasSource.nat", idxInfo) 
  83   
  84     Afterward the ingest file is deleted. The idxInfo is created by 
  85     L{Schema.parseIndices()} and is required if you wish to drop indices for 
  86     ingest, to automatically recreate them afterward. If the database table 
  87     doesn't already exist the ingester will create the table. 
  88   
  89     Please refer to the L{Ingester} class documentation for further options. 
  90   
  91     Outgester 
  92     --------- 
  93   
  94     Import as:: 
  95         from wsatools.DbConnect.DbSession import Outgester, SelectSQL 
  96   
  97     To initialise an L{Outgester} object you must supply it with a connection to 
  98     the database where the outgest will occur:: 
  99   
 100         db = DbSession() 
 101         sourceOutgester = Outgester(db) 
 102   
 103     Then to outgest the results of a particular table query to a binary file:: 
 104   
 105         filePathName = sourceOutgester.outgestQuery(SelectSQL( 
 106           select="*", table="WSA.dbo.Multiframe", where="deprecated = 0")) 
 107   
 108     By default the file is outgested to the load server share path, and there is 
 109     a delay following outgest to allow for the NFS to see the shared file. If 
 110     an alternative path is given outside of the share path then there is no 
 111     delay to wait for the NFS to see the file. 
 112   
 113     Please refer to the L{Outgester} class documentation for further options. 
 114   
 115     Creating and modifying databases 
 116     -------------------------------- 
 117   
 118     The L{Database} class provides methods to create and modify databases. 
 119   
 120     Import as:: 
 121         from   wsatools.DbConnect.DbSession import Database, DbSession 
 122         from   wsatools.SystemConstants     import WsaConstants 
 123   
 124     To recreate the WSA with standard options on server "pharaoh":: 
 125         WSA = Database(WsaConstants.loadDatabase, 
 126                        filegroups=[("Detection", "4 GB"), 
 127                                    ("Source", "4 GB"), 
 128                                    ("Curation", "4 GB"), 
 129                                    ("Indices", "4 GB")]) 
 130         WSA.create(DbSession("pharaoh."+WsaConstants.adminDatabase)) 
 131   
 132     Please refer to the L{Database} class documentation for further options. 
 133   
 134     @author: R.S. Collins 
 135     @org:    WFAU, IfA, University of Edinburgh 
 136   
 137     @todo: Make method keyword arguments consistent. The query() interface 
 138            should be attrs, table, where; and this naming convention should be 
 139            stuck to as much as possible in other methods. Should update() use 
 140            attrs or entry instead of entryList? 
 141     @todo: Cut down code duplication between updateEntries and deleteRows 
 142            + possibly between updateEntries + update, ditto for deleteRows 
 143     @todo: Retrospectively upgrade code using queryNumRows() to make use of new 
 144            features if necessary. 
 145     @todo: Upgrade code to make use of join feature, and upgrade documentation 
 146            of query*() methods to reflect this. 
 147     @todo: Upgrade code to make use of query() returning namedtuples. 
 148     @todo: addColumn / dropColumn should be merged with add/drop constraint, and 
 149            so be designed like createObject/dropObject. 
 150     @todo: Password option? 
 151  """ 
 152   
 153  from __future__      import division, print_function 
 154  from future_builtins import map, zip 
 155   
 156  from   collections  import defaultdict, namedtuple 
 157  from   itertools    import takewhile 
 158  from   keyword      import iskeyword 
 159  import mx.ODBC.unixODBC as odbc 
 160  from   operator     import itemgetter 
 161  import os 
 162  import shutil 
 163  import sys 
 164  import time 
 165   
 166  from   wsatools.CLI               import CLI 
 167  import wsatools.CSV                   as csv 
 168  import wsatools.DbConnect.DbConstants as dbc 
 169  import wsatools.DbConnect.Schema      as schema 
 170  from   wsatools.Logger            import Logger 
 171   
 172  from   wsatools.SystemConstants   import SystemConstants, WsaConstants 
 173  import wsatools.Utilities             as utils 
 177      """ 
 178      An interface to our database. Manages the mxODBC connection, and provides a 
 179      set of wrappers to common SQL statements. 
 180   
 181      @note: Test database connections get read-write user access by default. 
 182   
 183      @group Errors & Exceptions: CatalogueServerError, DisconnectError 
 184   
 185      """ 
 187          """ An error generated by a command run directly on the catalogue 
 188              server. 
 189          """ 
 190          pass 
  191   
 193          """ Exception thrown if a programmer tries to access a database 
 194              connection that is closed, to either attempt close it or 
 195              commit/rollback transactions. This is a programming error rather 
 196              than an exception caused by an unexpected loss of database 
 197              connection. 
 198          """ 
 200              """ 
 201              @param db:  Name of database. 
 202              @type  db:  str 
 203              @param msg: Optional specific message for this exception. 
 204              @type  msg: str 
 205   
 206              """ 
 207              msg = msg.strip().rstrip('.') 
 208              super(DbSession.DisconnectError, self).__init__( 
 209                "%sAlready disconnected from database %s" % 
 210                (msg + '. ' if msg else msg, db)) 
   211   
 212       
 213       
 214   
 215      _dbConnections = {}     
 216      _isLogReq = {} 
 217      """ Dictionary of flags to indicate a database has been modified, 
 218          referenced by database name. 
 219      """ 
 220      _numSessions = {}       
 221      sysc = SystemConstants()   
 222   
 223       
 224       
 225   
 226       
 227      database = WsaConstants.loadDatabase   
 228       
 229      isLoadDb = True 
 230       
 231      isRealRun = True 
 232       
 233      isTrialRun = False 
 234       
 235      server = WsaConstants.loadServer   
 236   
 237       
 238      userName = (dbc.loadServerRoUsername() if os.getenv('USER') != 'scos' else 
 239                  dbc.loadServerRwUsername()) 
 240   
 241       
 242       
 243   
 244      _autoCommit = False 
 245      """ Automatically commit transactions, minimally logged so faster. 
 246      """ 
 247      _dbSessionKey = None 
 248      """ A string that defines this connection as a unique type. This is used to 
 249          determine if an identical connection already exists for reuse. 
 250      """ 
 251      _fixedCopyBug = False   
 252      _isDeadlocked = False   
 253      _isDirty = False        
 254      _isPersistent = True    
 255   
 256       
 257       
 258   
 259      CLI.progArgs.append(CLI.Argument("database", server + '.' + database, 
 260                                       isValOK=lambda x: x.count('.') <= 1)) 
 261      CLI.progOpts += [ 
 262        CLI.Option('t', "test", 
 263          "test run, don't alter database, just print statements to screen"), 
 264        CLI.Option('u', "user", 
 265          "database username", "NAME", userName)] 
 266   
 267       
 268   
 336   
 337       
 338   
 351   
 352       
 353   
 355          """ @return: Database path. 
 356              @rtype:  str 
 357          """ 
 358          return self.server + '.' + self.database 
  359   
 360       
 361   
 362 -    def addColumn(self, tableName, colName, dataType, defaultStr=None, 
 363                          notNull=True, constraint=None): 
  364          """ 
 365          Adds a column to a table in the database. 
 366   
 367          @param tableName:  Name of the table. 
 368          @type  tableName:  str 
 369          @param colName:    Name of the new column. 
 370          @type  colName:    str 
 371          @param dataType:   SQL data type of the new attribute. 
 372          @type  dataType:   str 
 373          @param defaultStr: Optional default value, in the form of an SQL string 
 374                             e.g. "'none'" or "0". 
 375          @type  defaultStr: str 
 376          @param notNull:    If True, column disallows null values. 
 377          @type  notNull:    bool 
 378          @param constraint: Optional constraint name. Defaults to 
 379                             tableName + colName + '_cons'. 
 380          @type  constraint: str 
 381   
 382          """ 
 383          sql = "ALTER TABLE %s ADD %s %s" % (tableName, colName, dataType) 
 384          if notNull: 
 385              sql += " not null" 
 386          if defaultStr: 
 387              constraint = constraint or tableName + colName + '_cons' 
 388              sql += " CONSTRAINT %s DEFAULT %s" % (constraint, defaultStr) 
 389   
 390          try: 
 391              self._executeScript(sql) 
 392          except odbc.ProgrammingError as error: 
 393              if notNull: 
 394                  raise Exception("Cannot add a column with 'not null' specified" 
 395                      " without specifying default values if data already exists" 
 396                      " in the table. mxODBC says: (%s)" % error) 
  397   
 398       
 399   
 400 -    def addIndex(self, indexSchema, ignoreNS=False, releasedOnly=False, 
 401                         usingDefaultFG=False): 
  402          """ 
 403          Adds an index to a specified table. 
 404   
 405          @param indexSchema: Object describing the schema of the index to add. 
 406          @type  indexSchema: L{Schema.Index} 
 407          @param ignoreNS:    If True, upon a "no such" error message reporting 
 408                              non-existent tables, attributes etc. log the 
 409                              message and continue, with False return state (no 
 410                              object created). 
 411          @type  ignoreNS:    bool 
 412          @param releasedOnly:   If True, only create the object if it is to be 
 413                                 released (i.e. upper case CREATE statements). 
 414          @type  releasedOnly:   bool 
 415          @param usingDefaultFG: If False, use the file group "Indices_FG". 
 416          @type  usingDefaultFG: bool 
 417   
 418          @return: True, if index has been created. 
 419          @rtype:  bool 
 420   
 421          """ 
 422          if not releasedOnly or indexSchema.releasable: 
 423              Logger.addMessage("Creating index %s on table %s..." % 
 424                  (indexSchema.name, indexSchema.tableName)) 
 425          if usingDefaultFG: 
 426              indexSchema.fileGroup = None 
 427          try: 
 428              return self.createObjects([indexSchema], releasedOnly=releasedOnly) 
 429          except odbc.ProgrammingError as error: 
 430              ignoreCases = [" does not exist in the target table.", 
 431                             " table does not exist in database "] 
 432              if ignoreNS and any(case in str(error) for case in ignoreCases): 
 433                  Logger.addExceptionWarning(error, "Database message") 
 434                  return False 
 435              else: 
 436                  raise 
  437   
 438       
 439   
 440 -    def alterColumn(self, tableName, colName, dataType, defaultStr=None, 
 441                            notNull=True, constraint=None): 
  442          """ 
 443          Alter a column of a table in the database. 
 444   
 445          @param tableName:  Name of the table. 
 446          @type  tableName:  str 
 447          @param colName:    Name of the existing column. 
 448          @type  colName:    str 
 449          @param dataType:   Desired SQL data type for the column. 
 450          @type  dataType:   str 
 451          @param defaultStr: Optional default value, in the form of an SQL string 
 452                             e.g. "'none'" or "0". 
 453          @type  defaultStr: str 
 454          @param notNull:    If True, column disallows null values. 
 455          @type  notNull:    bool 
 456          @param constraint: Optional constraint name. Defaults to 
 457                             tableName + colName + '_cons'. 
 458          @type  constraint: str 
 459   
 460          @todo: Should become more schema driven together with L{addColumn()}. 
 461   
 462          """ 
 463          sql = "ALTER TABLE %s ALTER COLUMN %s %s%s" \ 
 464            % (tableName, colName, dataType, " not null" if notNull else '') 
 465          if defaultStr: 
 466              constraint = constraint or tableName + colName + '_cons' 
 467              sql += " CONSTRAINT %s DEFAULT %s" % (constraint, defaultStr) 
 468   
 469          self._executeScript(sql) 
  470   
 471       
 472   
 474          """ 
 475          Checks the database constraints on specified table. Logs any constraint 
 476          violations and throws an exception if any are detected. 
 477   
 478          @param tableName: Name of the table. 
 479          @type  tableName: str 
 480   
 481          """ 
 482           
 483          lockPathName = os.path.join(self.sysc.sysPath, "conlock-" + self.server) 
 484          if os.path.exists(lockPathName): 
 485              Logger.addMessage("<Info> Another CU is checking constraints on %s" 
 486                ". Waiting for %s to disappear..." % (self.server, lockPathName)) 
 487              while os.path.exists(lockPathName): 
 488                  time.sleep(10) 
 489   
 490          Logger.addMessage( 
 491            "Checking constraints on table %s (may hang if locked out by " 
 492            "other CUs also accessing %s.tempdb)" % (tableName, self.server)) 
 493   
 494           
 495          file(lockPathName, 'w').write("Locking PID = %s.%s" % 
 496            (os.getenv('HOST'), os.getpid())) 
 497   
 498          try: 
 499              violations = self._executeScript( 
 500                "DBCC CHECKCONSTRAINTS (%r)" % tableName, wantResult=True) 
 501          finally: 
 502              os.remove(lockPathName) 
 503   
 504          if violations: 
 505               
 506              Logger.addMessage("<Warning> Constraint check failed.") 
 507              msg = "Constraint violations:" 
 508              violationData = defaultdict(list) 
 509              for table, constraint, case in violations: 
 510                  key = (table.strip().strip('[').strip(']'), 
 511                         constraint.strip().strip('[').strip(']')) 
 512                  violationData[key].append(case) 
 513              for violation in violationData: 
 514                  msg += "\nIn table %r, constraint %r fails in these cases:\n" \ 
 515                    % violation + '\n'.join(violationData[violation]) 
 516   
 517              raise odbc.IntegrityError(msg) 
  518   
 519       
 520   
 521 -    def checkSchema(self, tableSchema, releasedOnly=False): 
  522          """ 
 523          Compares the given table schema as described in the .sql files to the 
 524          actual database schema to spot inconsistencies. If there any problems 
 525          a L{Schema.MismatchError} exception is thrown. 
 526   
 527          @param tableSchema:  Schema of tables to check as provided by 
 528                               L{Schema.parseTables()}. 
 529          @type  tableSchema:  list(Schema.Table) 
 530          @param releasedOnly: If True, only check released database tables. 
 531          @type  releasedOnly: bool 
 532   
 533          """ 
 534           
 535          errMsg = '' 
 536          errSchema = [] 
 537          for table in tableSchema: 
 538              if not releasedOnly or table.releasable: 
 539                   
 540                  dbAttrs = self.queryColumnNames(table.name) 
 541                   
 542                  dbTypes = self.queryDataTypes(table.name) 
 543                   
 544                  scTypes = dict((attr.name, attr.dataType) 
 545                                 for attr in table.columns) 
 546   
 547                   
 548                  exScAttrs = [attr.name for attr in table.columns 
 549                                          if attr.name not in dbAttrs] 
 550                  if exScAttrs: 
 551                      errMsg += "\n%s - " % table 
 552                      if len(exScAttrs) == len(table.columns): 
 553                          errMsg += "table does not exist in the database" 
 554                      else: 
 555                          errMsg += ( 
 556                            "the following attributes have no matching columns " 
 557                            "in the database: " + ', '.join(exScAttrs)) 
 558                      errSchema.append(table) 
 559   
 560                   
 561                  exDbAttrs = [attrName for attrName in dbAttrs 
 562                                         if attrName not in table.columns] 
 563                  if exDbAttrs: 
 564                      errMsg += ( 
 565                        "\n%s - the following database columns no longer exist " 
 566                        "in the current schema: " % table + ', '.join(exDbAttrs)) 
 567                      errSchema.append(table) 
 568   
 569                   
 570                   
 571                  reScAttrs = [attr.name for attr in table.columns 
 572                                          if attr not in exScAttrs] 
 573                  reDbAttrs = [attrName for attrName in dbAttrs 
 574                                         if attrName not in exDbAttrs] 
 575   
 576                   
 577                  colSwapList = [(scAttr, dbAttr) for scAttr, dbAttr 
 578                                                   in zip(reScAttrs, reDbAttrs) 
 579                                                   if scAttr != dbAttr] 
 580                  if colSwapList: 
 581                      errMsg += ( 
 582                        "\n%s - the order of these database columns and schema " 
 583                        "attributes are swapped: " % table + 
 584                        utils.joinNested(colSwapList, subJoinStr="<->")) 
 585                      errSchema.append(table) 
 586   
 587                   
 588                  typeDifList = \ 
 589                    [(attrName, scTypes.get(attrName), dbTypes.get(attrName)) 
 590                     for attrName in reDbAttrs 
 591                      if scTypes.get(attrName) != dbTypes.get(attrName)] 
 592   
 593                  if typeDifList: 
 594                      errMsg += ( 
 595                        "\n%s - these attributes have a different data type in " 
 596                        "the database than described by the current schema: " % 
 597                        table + utils.joinNested(typeDifList, subJoinStr=":")) 
 598                      errSchema.append(table) 
 599   
 600          if errMsg: 
 601              raise schema.MismatchError(errMsg, errSchema) 
  602   
 603       
 604   
 614   
 615       
 616   
 617 -    def copyIntoTable(self, destinationTable, sourceTable, columns='*', 
 618                              where=''): 
  619          """ 
 620          Copy a selection of rows from one table into another. 
 621   
 622          @param destinationTable: Table to copy rows into. 
 623          @type  destinationTable: str 
 624          @param sourceTable:      Table to copy from, or a L{Join} object of 
 625                                   several tables. 
 626          @type  sourceTable:      str or L{Join} 
 627          @param columns:          Optional select a subset of columns, as a 
 628                                   comma-separated list. 
 629          @type  columns:          str 
 630          @param where:            Optional SQL where clause defining the 
 631                                   selection of rows to copy. 
 632          @type  where:            str 
 633   
 634          @return: Number of rows copied. 
 635          @rtype:  int 
 636   
 637          @note: This command is heavily transaction logged. 
 638   
 639          """ 
 640          sql = "INSERT INTO %s %s" % \ 
 641            (destinationTable, SelectSQL(columns, sourceTable, where)) 
 642   
 643           
 644           
 645           
 646          return self._executeScript(sql, wantRowCount=True) 
  647   
 648       
 649   
 650 -    def copyTable(self, sourceTable, tableSchema, columns='*', where='', 
 651                          fileGroup='', attachForeignKeys=True): 
  652          """ 
 653          Create a new table from the selected contents of another table. 
 654   
 655          @param sourceTable:      Table to copy from, or a L{Join} object of 
 656                                   several tables. 
 657          @type  sourceTable:      str or L{Join} 
 658          @param tableSchema:      Schema of table to copy rows into that will be 
 659                                   created in this database. 
 660          @type  tableSchema:      L{Schema.Table} 
 661          @param columns:          Optional select a subset of columns, as a 
 662                                   comma-separated list. 
 663          @type  columns:          str 
 664          @param where:            Optional SQL where clause defining the 
 665                                   selection of rows to copy. 
 666          @type  where:            str 
 667          @param fileGroup:        Optionally supply a different file group to 
 668                                   create the new table on other than the 
 669                                   default. 
 670          @type  fileGroup:        str 
 671          @param attachForeignKeys: If True, foreign key constraints are attached 
 672                                    to the new table. 
 673          @type  attachForeignKeys: bool 
 674   
 675          @return: Number of rows copied. 
 676          @rtype:  int 
 677   
 678          @note: This command should be minimally logged. 
 679          @note: This method relies on Microsoft T-SQL, rather than standard SQL. 
 680          @note: Changing file-group is an experimental untested feature so far. 
 681   
 682          """ 
 683          if not self._fixedCopyBug: 
 684               
 685               
 686               
 687              self.freeProcCache() 
 688              self._executeScript("DBCC DROPCLEANBUFFERS") 
 689              self._fixedCopyBug = True 
 690   
 691          if fileGroup: 
 692              adminDb = DbSession(self.server + '.' + self.sysc.adminDatabase, 
 693                                  autoCommit=True, isTrialRun=self.isTrialRun, 
 694                                  userName=self.userName) 
 695              try: 
 696                  adminDb._executeScript("ALTER DATABASE %s MODIFY FILEGROUP %s " 
 697                                         "DEFAULT" % (self.database, fileGroup)) 
 698              except odbc.ProgrammingError as error: 
 699                  if "filegroup already has the 'DEFAULT' property set" \ 
 700                    not in str(error): 
 701                      raise 
 702   
 703                  Logger.addMessage("%s already has the 'DEFAULT' property " 
 704                                    "set in %s" % (fileGroup, self.database)) 
 705   
 706          try: 
 707              rowsCopied = self._executeScript( 
 708                  str(SelectSQL(columns, sourceTable, where) 
 709                     ).replace(" FROM ", " INTO %s FROM " % tableSchema.name, 1), 
 710                  wantRowCount=True) 
 711              if fileGroup: 
 712                  self.commitTransaction() 
 713          finally: 
 714              if fileGroup: 
 715                  adminDb._executeScript("ALTER DATABASE %s MODIFY FILEGROUP" 
 716                                         " [PRIMARY] DEFAULT" % self.database) 
 717   
 718           
 719          self.createObjects(tableSchema.constraints, 
 720                             releasedOnly=not attachForeignKeys) 
 721   
 722          return rowsCopied 
  723   
 724       
 725   
 726 -    def createObjects(self, objSchema, overWrite=False, ignoreNS=False, 
 727                              releasedOnly=False, haltOnError=True): 
  728          """ 
 729          Create new database objects using the ordered list of object schema 
 730          definitions provided from L{Schema} methods. Tables are created with 
 731          primary key constraints but no foreign key constraints, use 
 732          L{createTable()} instead if they are required. 
 733   
 734          @param objSchema:    An ordered list of objects derived from 
 735                               L{Schema._Schema}. 
 736          @type  objSchema:    list(Schema._Schema) 
 737          @param overWrite:    If True, allow pre-existing database objects with 
 738                               the same name to be overwritten. 
 739          @type  overWrite:    bool 
 740          @param ignoreNS:     If True, do not throw an exception if an object 
 741                               could not be created because some other object in 
 742                               the database does not exist. The function will 
 743                               return False - object could not be created, but 
 744                               no warning message will be displayed. 
 745          @type  ignoreNS:     bool 
 746          @param releasedOnly: If True, only create the object if it is to be 
 747                               released (i.e. upper case CREATE statements). 
 748          @type  releasedOnly: bool 
 749          @param haltOnError:  If True, raise an exception as soon as it occurs 
 750                               otherwise continue with the next object and raise 
 751                               an error at the end. 
 752          @type  haltOnError:  bool 
 753   
 754          @return: True, if all objects created successfully. 
 755          @rtype:  bool 
 756   
 757          """ 
 758          allCreated = True 
 759          lastError = None 
 760          for dbObj in objSchema: 
 761              if not releasedOnly or dbObj.releasable: 
 762                  if overWrite: 
 763                      self.dropObjects([dbObj]) 
 764                  try: 
 765                      self._executeScript(dbObj.createSQL()) 
 766   
 767                       
 768                      if releasedOnly and isinstance(dbObj, schema.Function): 
 769                          self._executeScript("GRANT EXEC ON %s.%s TO public" 
 770                                              % (dbc.loadServerDbsOwner(), dbObj)) 
 771   
 772                  except (odbc.ProgrammingError, odbc.InterfaceError) as error: 
 773                      if ignoreNS: 
 774                          Logger.addMessage("<WARNING> %s" % error, 
 775                                            alwaysLog=False) 
 776   
 777                      allCreated = False 
 778                      ignoreCases = ["Invalid object name", "does not exist"] 
 779                      if (not ignoreNS or 
 780                        not any(case in str(error) for case in ignoreCases)) \ 
 781                        and (overWrite or 
 782                        "There is already an" not in str(error) 
 783                        and "already exists" not in str(error)): 
 784   
 785                          if haltOnError: 
 786                              raise 
 787                          else: 
 788                              Logger.addExceptionWarning(error) 
 789                              Logger.addMessage("continuing...") 
 790                              lastError = error 
 791   
 792                  except odbc.IntegrityError as error: 
 793                       
 794                      fkMsgParts = ["ALTER TABLE statement conflicted", 
 795                                    "FOREIGN KEY constraint", dbObj.name] 
 796   
 797                      if not all(part in str(error) for part in fkMsgParts): 
 798                          if haltOnError: 
 799                              raise 
 800                          else: 
 801                              Logger.addExceptionWarning(error) 
 802                              Logger.addMessage("continuing...") 
 803                              lastError = error 
 804   
 805          if lastError: 
 806              raise lastError 
 807   
 808          return allCreated 
  809   
 810       
 811   
 813          """ 
 814          Creates database column statistics for given tables. 
 815   
 816          @param tables: List of table schema. 
 817          @type  tables: list(Schema.Table) 
 818   
 819          """ 
 820          sql = "CREATE STATISTICS stats_%s_%s ON %s (%s)" 
 821          for table in tables: 
 822              Logger.addMessage("Creating statistics on %s..." % table) 
 823              for column in table.columns: 
 824                  self._executeScript(sql % (table, column, table, column)) 
  825   
 826       
 827   
 828 -    def createTable(self, tableSchema, dbSchema=[], overWrite=False): 
  829          """ 
 830          Create a new database table using the schema provided from 
 831          L{Schema.parseTables()}, all constraints that reference this table 
 832          found in the given schema list will be applied. 
 833   
 834          @param tableSchema:  Parsed schema describing the table that is to be 
 835                               created. 
 836          @type  tableSchema:  Schema.Table 
 837          @param dbSchema:     Schemas for every table in the database that may 
 838                               have a foreign key constraint that references 
 839                               the table to be created. 
 840          @type  dbSchema:     list(Schema.Table) 
 841          @param overWrite:    If True, allow current db table to be overwritten. 
 842          @type  overWrite:    bool 
 843   
 844          @return: True, if table really was created. 
 845          @rtype:  bool 
 846   
 847          """ 
 848          if overWrite: 
 849              self.dropTable(tableSchema, dbSchema) 
 850   
 851           
 852          if self.createObjects([tableSchema]): 
 853               
 854              self.createObjects([con for con in tableSchema.constraints 
 855                                  if type(con) is schema.ForeignKeyConstraint]) 
 856   
 857               
 858              foreignKeys = [] 
 859              for table in dbSchema: 
 860                  foreignKeys += [con for con in table.constraints 
 861                                      if type(con) is schema.ForeignKeyConstraint 
 862                                      and con.referenceTable == tableSchema.name] 
 863   
 864              self.createObjects(foreignKeys, ignoreNS=True) 
 865              return True 
 866   
 867          return False 
  868   
 869       
 870   
 894   
 895       
 896   
 897 -    def delete(self, tableName, whereStr=''): 
  898          """ 
 899          Deletes rows from a database table (to delete an entire table 
 900          use truncate() instead) based on a flexible SQL WHERE clause. 
 901   
 902          @param tableName: Name of the database table. 
 903          @type  tableName: str 
 904          @param whereStr:  SQL WHERE clause (without the "WHERE"). 
 905          @type  whereStr:  str 
 906   
 907          @return: Number of rows deleted. 
 908          @rtype:  int 
 909   
 910          """ 
 911          if not whereStr: 
 912              return self.truncate(tableName) 
 913          else: 
 914              return self._executeScript("DELETE FROM %s WHERE %s" % (tableName, 
 915                                         whereStr), wantRowCount=True) 
  916   
 917       
 918   
 919 -    def deleteRows(self, tableName, rowIndexList=[]): 
  920          """ 
 921          Deletes selected rows from a database table (to delete an entire table 
 922          use truncate() instead). 
 923   
 924          @param tableName:    Name of the database table. 
 925          @type  tableName:    str 
 926          @param rowIndexList: List of specific rows to delete, in the form 
 927                               (attrName, PyValue), e.g. [("multiframeId", 101)], 
 928                               all rows are deleted otherwise. If PyValue is a 
 929                               list then executemany is invoked. 
 930          @type  rowIndexList: list(tuple(str, object)) 
 931   
 932          @return: Number of rows deleted. 
 933          @rtype:  int 
 934   
 935          """ 
 936          if not rowIndexList: 
 937              return self.truncate(tableName) 
 938   
 939          sql = "DELETE FROM " + tableName 
 940          manyList = [] 
 941          if rowIndexList: 
 942              sql += self._createWhereStr(rowIndexList) 
 943               
 944              for _key, index in rowIndexList: 
 945                  if type(index) is list: 
 946                      manyList += [tuple([value]) for value in index] 
 947   
 948          return self._executeScript(sql, manyList, wantRowCount=True) 
  949   
 950       
 951   
 953          """ 
 954          Removes a column from a table in the database. 
 955   
 956          @param tableName:  Name of the table. 
 957          @type  tableName:  str 
 958          @param colName:    Name of the new column. 
 959          @type  colName:    str 
 960   
 961          """ 
 962          constraints = dict(self.query( 
 963            selectStr="C.name, O.name", 
 964              fromStr="syscolumns AS C, sysobjects AS O " 
 965                      "LEFT JOIN sysobjects AS T ON O.parent_obj = T.id", 
 966             whereStr="O.id=C.cdefault AND O.name NOT LIKE '%%dtproper%%' AND " 
 967                      "O.name NOT LIKE 'dt[_]%%' AND T.name = %r " % tableName + 
 968                      "AND (O.name LIKE 'DF__%' OR O.name LIKE '%_cons')")) 
 969   
 970          if colName in constraints: 
 971              self._executeScript("ALTER TABLE %s DROP CONSTRAINT %s" % 
 972                                  (tableName, constraints[colName])) 
 973   
 974          self._executeScript("ALTER TABLE %s DROP COLUMN %s" % 
 975                              (tableName, colName)) 
  976   
 977       
 978   
 980          """ 
 981          Drop database objects defined in a list of database L{Schema} objects. 
 982          Tables cannot be dropped if referenced by foreign key constraints, in 
 983          this case use L{dropTable()}. 
 984   
 985          @param objSchema: An ordered list of objects derived from 
 986                            L{Schema._Schema}. 
 987          @type  objSchema: list(Schema._Schema) 
 988   
 989          """ 
 990          for dbObj in objSchema: 
 991              try: 
 992                  self._executeScript(dbObj.dropSQL()) 
 993              except odbc.ProgrammingError as error: 
 994                   
 995                  if "does not exist" not in str(error) and \ 
 996                     "Could not drop constraint" not in str(error): 
 997                      raise 
  998   
 999       
1000   
1001 -    def dropTable(self, tableSchema, dbSchema): 
 1002          """ 
1003          Drops a table from the database, first removing any foreign key 
1004          constraints that reference the table. 
1005   
1006          @param tableSchema:  Parsed schema describing the table that is to be 
1007                               created. 
1008          @type  tableSchema:  Schema.Table 
1009          @param dbSchema:     Schemas for every table in the database that may 
1010                               have a foreign key constraint that references 
1011                               the table to be created. 
1012          @type  dbSchema:     list(Schema.Table) 
1013   
1014          """ 
1015           
1016          foreignKeys = [] 
1017          for table in dbSchema: 
1018              foreignKeys += [con for con in table.constraints 
1019                                  if type(con) is schema.ForeignKeyConstraint 
1020                                  and con.referenceTable == tableSchema.name] 
1021          self.dropObjects(foreignKeys) 
1022           
1023          self.dropObjects([tableSchema]) 
 1024   
1025       
1026   
1028          """ 
1029          Allows you to read from tables that are being written to and hence are 
1030          presently locked. Warning, use at your own risk! 
1031   
1032          """ 
1033          self._executeScript("SET TRANSACTION ISOLATION LEVEL READ UNCOMMITTED") 
1034          self._isDirty = True 
 1035   
1036       
1037   
1039          """ @return: True, if table with name given in tableName parameter 
1040                       exists in current database. 
1041              @rtype:  bool 
1042          """ 
1043          return self.queryColumnNames(tableName) != [] 
 1044   
1045       
1046   
1048          """ Calls DBCC FREEPROCCACHE. 
1049          """ 
1050          self._executeScript("DBCC FREEPROCCACHE") 
 1051   
1052       
1053   
1055          """ @return: The volume on the catalogue server with the most free 
1056                       space, e.g. G:\. 
1057              @rtype:  str 
1058          """ 
1059          mostSpace = 0 
1060          bestVolume = '' 
1061          for volume in self.sysc.catServerVolumes(self.server): 
1062              line = self.runOnServer("dir " + volume, isTrialRunSafe=True)[-1] 
1063              try: 
1064                  freeSpace = long(line.split()[-3].replace(',', '')) 
1065              except ValueError: 
1066                  raise DbSession.CatalogueServerError("Can't read volume %s. " 
1067                    "Check SystemConstants.catServerVolumes is correct for %s." 
1068                    % (volume, self.server)) 
1069   
1070              freeSpace /= self.sysc.one_gigabyte 
1071              if freeSpace > mostSpace: 
1072                  mostSpace = freeSpace 
1073                  bestVolume = volume 
1074   
1075          return bestVolume 
 1076   
1077       
1078   
1080          """ 
1081          Temporarily drop the database connection, whilst keeping the DbSession 
1082          object alive. When an attempt to access the database is next made, the 
1083          DbSession will automatically try reconnect to the database. 
1084   
1085          """ 
1086          self._closeConnection() 
1087          DbSession._dbConnections[self._dbSessionKey] = (None, None) 
 1088   
1089       
1090   
1092          """ 
1093          Grants access to a given user to access the database. 
1094   
1095          @param userName: User name that will have permissions set. 
1096          @type  userName: str 
1097   
1098          """ 
1099          try: 
1100              self._executeScript( 
1101                "EXEC sp_grantdbaccess %s, %s" % tuple(2 * [userName])) 
1102              self._isLogReq[self.database] = True 
1103          except odbc.ProgrammingError as error: 
1104              if "already exists" not in str(error): 
1105                  raise 
1106   
1107          for role in ["db_datareader"]: 
1108              self._executeScript( 
1109                "EXEC sp_addrolemember %s, %s" % (role, userName)) 
1110              self._isLogReq[self.database] = True 
1111   
1112          self._executeScript("GRANT SHOWPLAN TO " + userName) 
 1113   
1114       
1115   
1116 -    def insertData(self, tableName, rowData, enforcePKC=False): 
 1117          """ 
1118          Insert some data into the database. 
1119   
1120          @param tableName:  Name of the database table. 
1121          @type  tableName:  str 
1122          @param rowData:    Sequence of unformatted data values for each column, 
1123                             e.g. ("astring", 0, 5.4). 
1124          @type  rowData:    sequence 
1125          @param enforcePKC: If True, throw an exception if row already exists, 
1126                             otherwise just log a warning. 
1127          @type  enforcePKC: bool 
1128   
1129          @return: Number of rows inserted (either 1 or 0). 
1130          @rtype:  int 
1131   
1132          """ 
1133          if rowData: 
1134              row = "(%s)" % ", ".join(map(self._sqlString, rowData)) 
1135   
1136          if not rowData or row == "()": 
1137              raise Exception( 
1138                "No data has been supplied to insert into table %s" % tableName) 
1139   
1140          try: 
1141              return self._executeScript( 
1142                "INSERT INTO %s VALUES %s" % (tableName, row), wantRowCount=True) 
1143          except odbc.IntegrityError as error: 
1144              if not enforcePKC and "Violation of PRIMARY KEY" in str(error): 
1145                  Logger.addMessage( 
1146                    "<Warning> data row %s already exists in table %s" % 
1147                    (row, tableName)) 
1148                  return 0 
1149              else: 
1150                  raise 
 1151   
1152       
1153   
1154 -    def query(self, selectStr, fromStr, whereStr='', groupBy='', orderBy='', 
1155                      firstOnly=False, default=None, ResultsTuple=None, 
1156                      forceOneProc=False, forcePrOrder=False): 
 1157          """ 
1158          Wrapper function to perform a general select-from-where query. 
1159   
1160          @param selectStr: SQL SELECT string; comma-separated column names. 
1161          @type  selectStr: str 
1162          @param fromStr:   SQL FROM string; single table name or L{Join} object. 
1163          @type  fromStr:   str or L{Join} 
1164          @param whereStr:  Optional SQL WHERE clause. 
1165          @type  whereStr:  str 
1166          @param groupBy:   Optional SQL GROUP BY string. 
1167          @type  groupBy:   str 
1168          @param orderBy:   Optional SQL ORDER BY string. 
1169          @type  orderBy:   str 
1170          @param firstOnly: If True, just return the first entry in result set. 
1171          @type  firstOnly: bool 
1172          @param default:   Value to return if querying first entry and no 
1173                            entries found. 
1174          @type  default:   object 
1175          @param ResultsTuple: Convert results to this specific namedtuple type. 
1176          @type  ResultsTuple: type 
1177          @param forceOneProc: Force query to execute only on a single-processor 
1178                               (disable parallelism). 
1179          @type  forceOneProc: bool 
1180          @param forcePrOrder: Force where clause predicates to be evaluated in 
1181                               the stated order. 
1182          @type  forcePrOrder: bool  
1183   
1184          @return: List of tuples (or scalars if only one attribute selected) 
1185                   containing values for every attribute of every entry. The 
1186                   tuples will be namedtuples of the database column names, if 
1187                   possible - i.e. no duplicate column names or functions without 
1188                   aliases. If firstOnly then a scalar or a tuple (or namedtuple) 
1189                   is returned. 
1190          @rtype:  list 
1191   
1192          @todo: Either replace arguments with one SelectSQL argument or: 
1193                 replicate SelectSQL.__init__() interface: 
1194                 selectStr becomes select, fromStr becomes table, whereStr 
1195                 becomes where - then remove code duplication by creating an 
1196                 SelectSQL object here. 
1197          """ 
1198           
1199           
1200           
1201           
1202           
1203           
1204          if firstOnly: 
1205              selectStr = "TOP 1 %s" % selectStr 
1206   
1207          sql = "SELECT %s FROM %s" % (selectStr, fromStr) 
1208   
1209          if isinstance(fromStr, Join): 
1210              sql += " WHERE " 
1211              sql += fromStr.whereStr 
1212              if fromStr.whereStr and whereStr \ 
1213                and not whereStr.strip().startswith("AND"): 
1214                  sql += " AND " 
1215   
1216          elif whereStr.startswith(("GROUP BY", "ORDER BY")): 
1217               
1218              sql += " " 
1219   
1220          elif whereStr: 
1221              sql += " WHERE " 
1222   
1223          sql += whereStr 
1224          if groupBy: 
1225              sql += " GROUP BY " + groupBy 
1226   
1227          if orderBy: 
1228              sql += " ORDER BY " + orderBy 
1229   
1230          options = [] 
1231          if forceOneProc: 
1232              options.append("MAXDOP 1") 
1233   
1234          if forcePrOrder: 
1235              options.append("FORCE ORDER") 
1236   
1237          if options: 
1238              sql += " OPTION (%s)" % ', '.join(options) 
1239   
1240          result = \ 
1241            self._executeScript(sql, wantResult=True, wantAll=not firstOnly) 
1242   
1243          if firstOnly and (result is None or result[0] is None): 
1244               
1245              if default is None and ',' in selectStr: 
1246                  result = tuple([None] * len(selectStr.split(','))) 
1247              else: 
1248                  return default 
1249   
1250          if selectStr.endswith('*') or ',' in selectStr: 
1251               
1252              if not ResultsTuple: 
1253                  metadata = self._getCursor().description or [] 
1254                  columns = [entry[0] for entry in metadata] 
1255   
1256                   
1257                   
1258                  if columns and not any(not column or iskeyword(column) 
1259                                         for column in columns) \ 
1260                    and len(columns) == len(set(columns)): 
1261   
1262                      ResultsTuple = namedtuple("Query", ' '.join(columns)) 
1263   
1264              if ResultsTuple: 
1265                  if firstOnly: 
1266                      return ResultsTuple(*result) 
1267                  else: 
1268                      return [ResultsTuple(*row) for row in result] 
1269   
1270              return result 
1271   
1272          elif firstOnly: 
1273               
1274              return result[0] 
1275   
1276          else: 
1277               
1278              return [eachTuple[0] for eachTuple in result] 
 1279   
1280       
1281   
1283          """ 
1284          Determine whether the table contains nullable columns. 
1285   
1286          @param queryTable: Name of table to query. 
1287          @type  queryTable: str 
1288          @param columns:    Sequence of columns to check. 
1289          @type  columns:    sequence(str) 
1290   
1291          @return: True if any of the columns are nullable. 
1292          @rtype:  bool 
1293   
1294          """ 
1295          cursor = self._getCursor() 
1296          cursor.columns(table=queryTable) 
1297          return any(col[10] is not odbc.SQL.NO_NULLS 
1298                     for col in cursor.fetchall() if col[3] in columns) 
 1299   
1300       
1301   
1303          """ 
1304          Wrapper to just query the maximum value of the given attributes. 
1305   
1306          @param attrs: Name of the attributes in the database (comma-separated). 
1307          @type  attrs: str 
1308          @param table: Table with the attributes. 
1309          @type  table: str or L{Join} 
1310          @param where: Optional SQL WHERE clause. 
1311          @type  where: str 
1312   
1313          @return: Maximum value of that attribute. Defaults to None. 
1314          @rtype:  scalar 
1315   
1316          """ 
1317           
1318           
1319           
1320          maxAttrs = ", ".join("MAX(%s)" % attr for attr in csv.values(attrs)) 
1321          return self.query(maxAttrs, table, where)[0] 
 1322   
1323       
1324   
1326          """ 
1327          Queries database to produce a list of column names in the table. 
1328   
1329          @param tableName: Name of the database table. 
1330          @type  tableName: str 
1331   
1332          @return: A list of the column names, in order, from the given table. 
1333          @rtype:  list(str) 
1334   
1335          """ 
1336          return self.query( 
1337            selectStr="COLUMN_NAME", 
1338              fromStr="INFORMATION_SCHEMA.COLUMNS", 
1339             whereStr="TABLE_NAME=%r ORDER BY ORDINAL_POSITION" % tableName) 
 1340   
1341       
1342   
1344          """ 
1345          Queries database to produce a dictionary of data-types for each column. 
1346   
1347          @param table: Name of the database table, with optional full-path. 
1348          @type  table: str 
1349   
1350          @return: A dictionary where the keywords are the column names and the 
1351                   values are the data-types of those columns. 
1352          @rtype:  dict(str:str) 
1353   
1354          """ 
1355          server, database, tableName = self._splitTablePath(table) 
1356   
1357          if server == self.server: 
1358              attrs = self.query( 
1359                selectStr="COLUMN_NAME, DATA_TYPE, CHARACTER_MAXIMUM_LENGTH", 
1360                  fromStr=database + ".INFORMATION_SCHEMA.COLUMNS", 
1361                 whereStr="TABLE_NAME=" + self._sqlString(tableName)) 
1362   
1363              return dict((name, dataType + 
1364                ('(%s)' % dataSize if dataSize else '')) 
1365                for name, dataType, dataSize in attrs) 
1366          else: 
1367              db = '.'.join([server, database, dbc.loadServerDbsOwner()]) 
1368              attrs = self.query( 
1369                selectStr="sc.name, st.name, sc.length", 
1370                  fromStr="%s.sysobjects AS so, %s.syscolumns AS sc, " 
1371                          "%s.systypes AS st" % tuple([db] * 3), 
1372                 whereStr="so.id = sc.id AND st.xtype = sc.xtype AND so.name=" + 
1373                          self._sqlString(tableName)) 
1374   
1375              return dict((name, dataType + 
1376                (('(%s)' % dataSize) if dataType == 'varchar' else '')) 
1377                for name, dataType, dataSize in attrs) 
 1378   
1379       
1380   
1382          """ 
1383          Queries database to determine if any rows exist in the given table 
1384          that satisfy the WHERE clause. 
1385   
1386          @param table: Table to query. A single table name or a L{Join} object. 
1387          @type  table: str or L{Join} 
1388          @param where: Optional WHERE clause. 
1389          @type  where: str 
1390          @param pk:    Optionally supply primary key to speed up query. Not 
1391                        required if table is a Join. 
1392          @type  pk:    str 
1393   
1394          @return: True, if there are any rows in the table that satisfy the 
1395                   WHERE clause. 
1396          @rtype:  bool 
1397   
1398          """ 
1399          columns = pk or '*' 
1400          if isinstance(table, Join): 
1401              columns = table.joinCols 
1402   
1403           
1404          result = self.query(columns, table, where, firstOnly=True) 
1405          if not isinstance(result, tuple): 
1406              return result is not None 
1407          else: 
1408              return any(entry is not None for entry in result) 
 1409   
1410       
1411   
1412 -    def queryNumRows(self, tableName, whereStr='', groupBy='', 
1413                             distinctAttr=''): 
 1414          """ 
1415          Wrapper to just query the number of rows in a given table. Will also 
1416          return the total count for a "SELECT count(*) ... GROUP BY"-type query, 
1417          if the "GROUP BY" is specified in the whereStr keyword, rather than the 
1418          groupBy keyword. 
1419   
1420          @param tableName:    Table to query. 
1421          @type  tableName:    str 
1422          @param whereStr:     WHERE string, optional where clause. 
1423          @type  whereStr:     str 
1424          @param groupBy:      Specify a comma-separated list of columns in which 
1425                               to group counts by. 
1426          @type  groupBy:      str 
1427          @param distinctAttr: With distinct values of this attribute. 
1428          @type  distinctAttr: str 
1429   
1430          @return: Number of rows, if no groupBy keyword, otherwise a list of 
1431                   the groupBy attribute values in ascending order with counts. 
1432          @rtype:  int or list(tuple) 
1433   
1434          """ 
1435          attr = ('DISTINCT ' + distinctAttr) if distinctAttr else '*' 
1436          cmd = "COUNT(%s) AS count" % attr 
1437          for _attempt in range(2): 
1438              try: 
1439                  if groupBy: 
1440                      return self.query(groupBy + ", " + cmd, tableName, whereStr, 
1441                                        groupBy=groupBy, orderBy=groupBy) 
1442                  else: 
1443                      results = self.query(cmd, tableName, whereStr) 
1444                      if len(results) == 1: 
1445                          return results[0] 
1446                      else:  
1447                          return len(results) 
1448   
1449              except odbc.ProgrammingError as error: 
1450                  if "Arithmetic overflow" in str(error): 
1451                      cmd = "COUNT_BIG(%s) AS count" % attr 
1452                  else: 
1453                      raise 
 1454   
1455       
1456   
1458          """ 
1459          Determine the row size of the given table or the select query result. 
1460   
1461          @param queryTable: Either just a single table name or a select query. 
1462          @type  queryTable: str or SelectSQL 
1463   
1464          @return: Size in bytes of a row. 
1465          @rtype:  int 
1466   
1467          """ 
1468          cursor = self._getCursor() 
1469          if isinstance(queryTable, SelectSQL): 
1470              self.query(queryTable.selectStr, queryTable.fromStr, 
1471                         firstOnly=True) 
1472              print(cursor.description) 
1473              raise Exception("This feature is deprecated due to ODBC " 
1474                              "inconsistencies. Check that the values printed " 
1475                              "above are consistent and correct between servers") 
1476               
1477               
1478              return sum(col[4] for col in cursor.description) 
1479          else: 
1480              cursor.columns(table=queryTable) 
1481              return sum(col[7] for col in cursor.fetchall()) 
 1482   
1483       
1484   
1496   
1497       
1498   
1512   
1513       
1514   
1516          """ 
1517          Runs a command directly on the database server. 
1518   
1519          @param cmd: Command-line command to run on the database server. 
1520          @type  cmd: str 
1521          @param isTrialRunSafe: Can this command be run in trial-run mode? 
1522          @type  isTrialRunSafe: bool 
1523   
1524          @return: A result set. 
1525          @rtype:  list(str) 
1526   
1527          """ 
1528           
1529          script = "EXEC %s..xp_cmdshell '%s'" % (self.sysc.adminDatabase, cmd) 
1530          isTrialRun = self.isTrialRun 
1531          try: 
1532              if isTrialRunSafe: 
1533                  self.isTrialRun = False 
1534   
1535              results = self._executeScript(script, wantResult=True) 
1536          finally: 
1537              self.isTrialRun = isTrialRun 
1538   
1539           
1540          results = [(result if result else '') 
1541                     for result in map(itemgetter(0), results)] 
1542   
1543           
1544          if results and not results[-1]: 
1545              results = results[:-1] 
1546   
1547           
1548          filesysCmds = \ 
1549            ("dir", "move", "copy", "mkdir", "rmdir", "del", "rename") 
1550   
1551          if results and cmd.lower().strip().startswith(filesysCmds) \ 
1552            and any("Access is denied" in result for result in results): 
1553   
1554              raise DbSession.CatalogueServerError("%s: %s\n%s" 
1555                % (self.server, cmd, '\n'.join(results))) 
1556   
1557          return results 
 1558   
1559       
1560   
1562          """ 
1563          Returns the path to the given file in the catalogue server's share 
1564          directory for outgests and ingests. 
1565   
1566          @param fileName: Name of the file on the share. 
1567          @type  fileName: str 
1568           
1569          @todo: Deprecate! 
1570   
1571          """ 
1572          return self.sysc.dbSharePath(fileName) + (os.sep if not fileName else '') 
 1573   
1574       
1575   
1577          """ Shrinks tempdb and its log files. 
1578          """ 
1579          self._executeScript("DBCC SHRINKDATABASE (tempdb)") 
 1580   
1581       
1582   
1584          """ 
1585          Full database path to the given table. 
1586   
1587          @param tableName: Name of table. 
1588          @type  tableName: str 
1589          @param ownerName: Optionally specify a different table owner name than 
1590                            the database owner. 
1591          @type  ownerName: str 
1592   
1593          @return: Full path to database table, e.g. ahmose.WSA.dbo.tableName. 
1594          @rtype:  str 
1595   
1596          """ 
1597          return '.'.join([str(self), ownerName, str(tableName)]) 
 1598   
1599       
1600   
1602          """ 
1603          Checks to see if the share path for this database is active. Raises a 
1604          SystemExit if there are problems. 
1605   
1606          """ 
1607          testDir = "outgest_test_%s_%s" % (os.getenv("HOST"), os.getpid()) 
1608          try: 
1609              os.mkdir(self.sharePath(testDir)) 
1610              os.rmdir(self.sharePath(testDir)) 
1611          except OSError as error: 
1612              raise SystemExit("<ERROR> " + str(error)) 
 1613   
1614       
1615   
1617          """ 
1618          Wipes a database table - use with care! 
1619   
1620          @param tableName: Name of database table to wipe. 
1621          @type  tableName: str 
1622   
1623          @return: Number of rows deleted from table. 
1624          @rtype:  int 
1625   
1626          @todo: Deprecate in favour of delete() without where clause? 
1627          @todo: Always call truncate over delete from if not where clause, with 
1628                 the addition of a drop and add constraints? 
1629          """ 
1630          try: 
1631               
1632              return self._executeScript("TRUNCATE TABLE %s" % tableName, 
1633                                         wantRowCount=True) 
1634          except odbc.IntegrityError: 
1635               
1636              return self._executeScript("DELETE %s" % tableName, 
1637                                         wantRowCount=True) 
 1638   
1639       
1640   
1642          """ 
1643          Converts the given catalogue server local path into a UNC path. 
1644   
1645          @param filePathName: Catalogue server local path. 
1646          @type  filePathName: str 
1647   
1648          @return: Catalogue server UNC path. 
1649          @rtype:  str 
1650   
1651          """ 
1652          return (2 * self.sysc.catServerFileSep + self.server.upper() 
1653                  + self.sysc.catServerFileSep + filePathName.replace(':', '')) 
 1654   
1655       
1656   
1657 -    def update(self, tableName, entryList, where='', fromTables=''): 
 1658          """ 
1659          Update specific entries in database table rows specified with a 
1660          flexible SQL WHERE clause. 
1661   
1662          @param tableName:    Name of the database table. 
1663          @type  tableName:    str 
1664          @param entryList:    List entries to update, in the form (attrName, 
1665                               SQLvalue) with value as an SQL formatted string, 
1666                               as defined by L{DbSession._sqlString()}, e.g. 
1667                               [("skyID", -9999), ("raMoon", "1.01"), 
1668                               ("trackSys", "'J2000'")]. (NB: this may also be 
1669                               an explicit string for the SQL SET statement e.g. 
1670                               "trackSys='J2000'") 
1671          @type  entryList:    list(tuple(str, str)) or str 
1672          @param where:        SQL WHERE clause (without the "WHERE"). 
1673          @type  where:        str 
1674          @param fromTables:   Optional source to update from (SQL FROM syntax), 
1675                               single table name or L{Join} object. 
1676          @type  fromTables:   str or L{Join} 
1677   
1678          @return: Number of rows updated. 
1679          @rtype:  int 
1680   
1681          """ 
1682           
1683          if not entryList: 
1684              raise Exception("No entries supplied to update table " + tableName) 
1685   
1686          sql = "UPDATE " + tableName 
1687          sql += " SET " + utils.joinNested(entryList, subJoinStr="=") 
1688          if fromTables: 
1689              sql += " FROM %s" % fromTables 
1690   
1691          if where or isinstance(fromTables, Join): 
1692              sql += " WHERE " 
1693              if isinstance(fromTables, Join): 
1694                  sql += fromTables.whereStr 
1695                  if fromTables.whereStr and where: 
1696                      sql += " AND " 
1697              sql += where 
1698   
1699          numRows = self._executeScript(sql, wantRowCount=True) 
1700          if self.isTrialRun:   
1701              if isinstance(fromTables, Join): 
1702                  fromStr = fromTables.fromStr 
1703                  if fromTables.whereStr and where: 
1704                      where += " AND " + fromTables.whereStr 
1705                  else: 
1706                      where = where or fromTables.whereStr 
1707              elif fromTables: 
1708                  fromStr = fromTables 
1709              else: 
1710                  fromStr = tableName 
1711   
1712              if tableName not in fromStr: 
1713                  fromStr += ', ' + tableName 
1714   
1715              numRows = self.queryNumRows(fromStr, where) 
1716   
1717          return numRows 
 1718   
1719       
1720   
1721 -    def updateEntries(self, tableName, entryList, rowIndexList=[]): 
 1722          """ 
1723          Update specific entries in a database table row. 
1724   
1725          @param tableName:    Name of the database table. 
1726          @type  tableName:    str 
1727          @param entryList:    List entries to update, in the form (attrName, 
1728                               SQLvalue) with value as an SQL formatted string, 
1729                               as defined by L{DbSession._sqlString()}, e.g. 
1730                               [("skyID", -9999), ("raMoon", "1.01"), 
1731                               ("trackSys", "'J2000'")]. (NB: this may also be 
1732                               an explicit string for the SQL SET statement e.g. 
1733                               "trackSys='J2000'") 
1734          @type  entryList:    list(tuple(str, str)) or str 
1735          @param rowIndexList: List of specific rows to update, in the form 
1736                               (attrName, PyValue), e.g. [("multiframeID", 101)], 
1737                               all rows are updated otherwise. If PyValue is a 
1738                               list then executemany is invoked. 
1739          @type  rowIndexList: list(tuple(str, object)) 
1740   
1741          @return: Number of rows updated. 
1742          @rtype:  int 
1743   
1744          """ 
1745           
1746          if not tableName: 
1747              raise Exception("No table name has been supplied") 
1748          if not entryList: 
1749              raise Exception("No entries supplied to update table " + tableName) 
1750   
1751          sql = "UPDATE " + tableName 
1752          sql += " SET " + utils.joinNested(entryList, subJoinStr="=") 
1753          manyList = [] 
1754          if rowIndexList: 
1755              sql += self._createWhereStr(rowIndexList) 
1756               
1757              for _key, index in rowIndexList: 
1758                  if type(index) is list: 
1759                      manyList += [tuple([value]) for value in index] 
1760   
1761          return self._executeScript(sql, manyList, wantRowCount=True) 
 1762   
1763       
1764   
1766          """ Updates database index statistics. 
1767          """ 
1768          Logger.addMessage("Updating database index statistics...") 
1769          self._executeScript("EXEC sp_updatestats") 
 1770   
1771       
1772   
1794   
1795       
1796   
1798          """ 
1799          Converts a list of tuple pairs of the form (attribute name, value) into 
1800          a correctly formatted SQL WHERE clause. If the row index list contains 
1801          a list of indicies for a given attribute, then executemany markers are 
1802          used. 
1803   
1804          @param rowIndexList: List of specific rows to update, in the form 
1805                               (attrName, PyValue), e.g. [("multiframeId", 101)]. 
1806          @type  rowIndexList: list(tuple(str, object)) 
1807   
1808          @return: A SQL WHERE clause. 
1809          @rtype:  str 
1810   
1811          """ 
1812          whereList = [(key, self._sqlString(index)) 
1813                       for key, index in rowIndexList 
1814                       if type(index) is not list] 
1815   
1816           
1817           
1818           
1819          paramStyle = {'qmark': '?', 'pyformat': '%(x)s'}[odbc.paramstyle] 
1820          whereList += [(key, paramStyle) for key, index in rowIndexList 
1821                        if type(index) is list] 
1822   
1823          return " WHERE " + utils.joinNested(whereList, subJoinStr="=", 
1824                                              joinStr=" AND ") 
 1825   
1826       
1827   
1828 -    def _executeScript(self, script, parameters=None, wantResult=False, 
1829                               wantAll=True, wantRowCount=False): 
 1830          """ 
1831          Send any SQL script to the server to execute. Specific DBMS errors that 
1832          are just warnings are trapped to prevent processing from halting. 
1833   
1834          @param script:       The script which is to be processed. 
1835          @type  script:       str 
1836          @param parameters:   Optionally supply parameter set if script contains 
1837                               executemany wildcards. 
1838          @type  parameters:   list(tuple) 
1839          @param wantResult:   If True, return results. 
1840          @type  wantResult:   bool 
1841          @param wantAll:      If wantResult is True and wantAll is True then 
1842                               return all results otherwise just first result. 
1843          @type  wantAll:      bool 
1844          @param wantRowCount: If True, and results aren't returned then a count 
1845                               of affected rows are returned instead. 
1846          @param wantRowCount: bool 
1847   
1848          @return: The result of the executed script (if wantResult), or number 
1849                   of rows affected (if wantRowCount). 
1850          @rtype:  list(tuple) or int 
1851   
1852          """ 
1853          if self.isTrialRun and not script.upper().startswith("SELECT"): 
1854              print(script) 
1855              if wantResult: 
1856                  return [] if wantAll else None 
1857              elif wantRowCount: 
1858                  return 0 
1859              else: 
1860                  return 
1861   
1862           
1863           
1864          cursor = self._getCursor() 
1865          try: 
1866              if parameters: 
1867                  cursor.executemany(script, parameters) 
1868              else: 
1869                  cursor.execute(script) 
1870   
1871          except odbc.Warning as error: 
1872              if "xpsql.cpp" in str(error) and "GetProxyAccount" in str(error): 
1873                  Logger.addMessage("<ERROR> Database user name %s does not have" 
1874                    " bcp outgest permissions set on database %s.%s. Please fix " 
1875                    "the database permissions." % (self.userName, self.server, 
1876                    self.database)) 
1877   
1878              if "DBCC execution completed" not in str(error): 
1879                  Logger.addExceptionWarning(error, "Database message") 
1880                  if "would be truncated" in str(error): 
1881                      Logger.addMessage("Script executed: " + script, 
1882                                        alwaysLog=False) 
1883   
1884          except (odbc.OperationalError, odbc.InternalError) as error: 
1885              isDeadlock = "deadlock victim. Rerun the transaction" in str(error) 
1886              brokenMsgs = ["Function sequence error",  
1887                            "Read from SQL server failed", 
1888                            "Read from the server failed", 
1889                            "Write to SQL Server failed", 
1890                            "Write to the server failed", 
1891                            "Communication link failure"] 
1892   
1893              persist = self._isPersistent and self._autoCommit 
1894              if persist and (any(message in str(error) for message in brokenMsgs) 
1895                or isDeadlock and not self._isDeadlocked): 
1896                  self._isDeadlocked = isDeadlock 
1897                  msg = ("Deadlock occurred in" if self._isDeadlocked else 
1898                         "Lost connection to") 
1899                  msg += " %s.%s" % (self.server, self.database) 
1900                  sys.stderr.write(msg + '. See log.\n') 
1901                  Logger.addMessage("<Warning> %s. Reconnecting and resubmitting" 
1902                                    " the last database statement..." % msg) 
1903                  self.goOffline() 
1904                  return self._executeScript(script, parameters, wantResult, 
1905                                             wantAll, wantRowCount) 
1906   
1907              if persist and self._isDeadlocked: 
1908                   
1909                   
1910                  time.sleep(1) 
1911                  return self._executeScript(script, parameters, wantResult, 
1912                                             wantAll, wantRowCount) 
1913   
1914              if isinstance(error, odbc.OperationalError): 
1915                  self._raiseWithScript(error, script) 
1916              else: 
1917                  raise 
1918   
1919          except (odbc.ProgrammingError, odbc.IntegrityError) as error: 
1920              self._raiseWithScript(error, script) 
1921          else: 
1922              self._isDeadlocked = False 
1923   
1924          numRows = max(cursor.rowcount, 0) 
1925          if wantRowCount and not numRows: 
1926               
1927              numRows = max(self._executeScript("SELECT ROWCOUNT_BIG()", 
1928                            wantResult=True, wantAll=False)[0], 0) 
1929   
1930          self._isLogReq[self.database] = self._isLogReq[self.database] or \ 
1931              self._autoCommit and (not wantRowCount or numRows) and \ 
1932              not script.upper().startswith(("SELECT", "DBCC", "EXEC", "SET")) 
1933   
1934          if wantResult: 
1935               
1936              try: 
1937                  if wantAll: 
1938                      return cursor.fetchall() 
1939                  else: 
1940                       
1941                       
1942                       
1943                       
1944                       
1945                       
1946                       
1947                       
1948                       
1949                       
1950                       
1951                       
1952                       
1953                      try: 
1954                          return cursor.fetchall()[0] 
1955                      except IndexError: 
1956                          return 
1957              except odbc.ProgrammingError as error: 
1958                  if "missing result set" in str(error): 
1959                       
1960                      return [] if wantAll else None 
1961                  else: 
1962                      raise 
1963   
1964          elif wantRowCount: 
1965              return numRows 
 1966   
1967       
1968   
1970          """ 
1971          Method to obtain cursor for current DbSession. If database connection 
1972          is currently offline then an endless reconnect attempt is made. 
1973   
1974          @return: The cursor for the current DbSession. 
1975          @rtype:  Cursor 
1976   
1977          """ 
1978          connection, cursor = DbSession._dbConnections[self._dbSessionKey] 
1979           
1980           
1981          while not connection:   
1982              try: 
1983                  self._openConnection() 
1984              except SystemExit: 
1985                  Logger.addMessage("<Warning> Cannot connect to database. " 
1986                      "Attempting reconnect in 10 minutes (Ctrl-C to quit)...") 
1987                  time.sleep(600) 
1988              else: 
1989                  connection, cursor = \ 
1990                      DbSession._dbConnections[self._dbSessionKey] 
1991   
1992          return cursor 
 1993   
1994       
1995   
1997          """ Opens a database connection and initiates the cursor. """ 
1998          try: 
1999               
2000              pwFilePath = os.path.join(os.getenv("HOME"), ".load-server-access") 
2001              password = file(pwFilePath).read().strip() 
2002              params = (self.server, self.database, self.userName, password) 
2003              connection = \ 
2004                odbc.DriverConnect("DSN=%s;Database=%s;UID=%s;PWD=%s" % params) 
2005   
2006              Logger.addMessage("Connected to %s.%s as %s" % params[:-1]) 
2007              if self._autoCommit: 
2008                  connection.setconnectoption(odbc.SQL.AUTOCOMMIT, 
2009                                              odbc.SQL.AUTOCOMMIT_ON) 
2010   
2011          except odbc.OperationalError: 
2012              raise SystemExit( 
2013                "Cannot access database %s.%s. Probably connection is down." 
2014                % (self.server, self.database)) 
2015          except odbc.ProgrammingError as error: 
2016              if "Login failed for user" in str(error): 
2017                  raise SystemExit( 
2018                    "Access denied for username %r to database %s.%s" % 
2019                    (self.userName, self.server, self.database)) 
2020              elif "Cannot open database" in str(error): 
2021                  raise SystemExit( 
2022                    "Database %s.%s does not exist or access denied for " 
2023                    "user %s" % (self.server, self.database, self.userName)) 
2024              else: 
2025                  raise SystemExit("Cannot connect: %s" % error) 
2026   
2027          cursor = connection.cursor() 
2028          if "mx.ODBC.unixODBC" in sys.modules: 
2029               
2030               
2031              cursor.setconverter(lambda _position, sqlType, sqlLen: 
2032                (sqlType, (sqlLen if sqlType != odbc.SQL.BIGINT else 0))) 
2033   
2034           
2035          DbSession._dbConnections[self._dbSessionKey] = (connection, cursor) 
 2036   
2037       
2038   
2040          """ Appends the SQL script to an mxODBC exception and raises it. 
2041          """ 
2042          try: 
2043              message = err.args[2].replace('[FreeTDS][SQL Server]', '') 
2044          except (IndexError, AttributeError): 
2045              message = str(err) 
2046   
2047          raise err.__class__(message + "\nScript executed: " + script) 
 2048   
2049       
2050   
2052          """ 
2053          Decomposes a full table path to its individual components. 
2054   
2055          @param tablePath: Path to a table in the format "table" or 
2056                            "database.table" or "server.database..table" or 
2057                            "server.database.dbo.table" 
2058          @type  tablePath: str 
2059   
2060          @return: Server name, database name, table name from given path. 
2061          @rtype:  tuple(str, str, str) 
2062   
2063          """ 
2064          path = tablePath.split('.') 
2065          tableName = path[-1] 
2066          try: 
2067              server = path[-4] 
2068          except IndexError: 
2069              server = self.server 
2070          try: 
2071              database = path[-3] 
2072          except IndexError: 
2073              try: 
2074                  database = path[-2] 
2075              except IndexError: 
2076                  database = self.database 
2077   
2078          return server, database, tableName 
 2079   
2080       
2081   
2083          """ @return: An SQL string of the correct format for the data type. 
2084              @rtype:  str 
2085          """ 
2086          if value is None: 
2087              return "" 
2088          elif type(value) is str: 
2089               
2090               
2091              if "'" in value: 
2092                  return repr(value).replace("\'", "''").replace('"', "'") 
2093              else: 
2094                  return repr(value) 
2095          else: 
2096               
2097              return str(value) 
  2098   
2102      """ 
2103      A special type of database session, where you wish to ingest data into a 
2104      database table from a native binary or CSV file. Upon initialisation this 
2105      class verifies that the schemas of the database tables are up-to-date. 
2106   
2107      @note: Prefer binary files as they ingest faster than CSV files. 
2108      @note: Prefer the ingest data to be pre-sorted by primary key and supply 
2109             isOrdered=True to ingestTable() speed up ingest. Otherwise, if 
2110             adding more data than already exists, e.g. overwriting the existing 
2111             table or creating a new one, it may be faster to load the data 
2112             without any constraints and then apply the constraints to the table 
2113             after loading. 
2114      @todo: Always parse indices on initialisation of Ingester. Can't drop 
2115             indices at this stage as it should occur too soon. Then give 
2116             ingestTable() a default option of dropIndices=True. Ingester can 
2117             tell from the schema whether it should parse non-survey indices or 
2118             not. Ingester itself could have a dropIndices option at 
2119             initialisation with default of False, for IngIngester to overrule. 
2120      """ 
2122          """ Exception thrown when the data cannot be ingested. 
2123          """ 
2124          pass 
 2125   
2126       
2127   
2128       
2129      fileOnShare = None 
2130       
2131      fileTag = 'CuID000000' 
2132   
2133       
2134       
2135   
2136       
2137      _schema = None 
2138   
2139       
2140   
2141 -    def __init__(self, dbCon, tableSchema, tag=fileTag, skipSchemaCheck=False, 
2142                         checkReleasedOnly=False): 
 2143          """ 
2144          Makes connection to the requested database, and checks that the schemas 
2145          of the tables supplied in the table list are correct. 
2146   
2147          @param dbCon:       A database connection. 
2148          @type  dbCon:       DbSession 
2149          @param tableSchema: Schema for tables to be ingested this session, as 
2150                              provided by L{Schema.parseTables()}. 
2151          @type  tableSchema: list(Schema.Table) or dict(str; Schema.Table) 
2152          @param tag:         A tag string to append to the file name on the 
2153                              share directory, so file clean up can be rm *tag*. 
2154          @type  tag:         str 
2155          @param skipSchemaCheck:   If True, don't check the schema for 
2156                                    mismatches - use with extreme caution! 
2157          @type  skipSchemaCheck:   bool 
2158          @param checkReleasedOnly: If True, only check the schema of released 
2159                                    tables for mismatches. 
2160          @type  checkReleasedOnly: bool 
2161   
2162          """ 
2163          super(Ingester, self).__init__( 
2164                database=dbCon.server + '.' + dbCon.database, 
2165              autoCommit=dbCon._autoCommit, 
2166              isTrialRun=dbCon.isTrialRun, 
2167                userName=dbCon.userName, 
2168            isPersistent=dbCon._isPersistent) 
2169   
2170          self.fileTag = tag 
2171          self._schema = (dict((table.name, table) for table in tableSchema) 
2172                          if not isinstance(tableSchema, dict) else tableSchema) 
2173   
2174          if not skipSchemaCheck: 
2175              Logger.addMessage("Checking database schema for ingest tables...") 
2176              self.checkSchema(self._schema.values(), checkReleasedOnly) 
 2177   
2178       
2179   
2180 -    def ingestTable(self, tableName, filePathName, idxInfo=None, 
2181                            overWrite=False, isCsv=False, deleteFile=True, 
2182                            isOrdered=False, checkConstraints=True): 
 2183          """ 
2184          Ingest a binary or CSV flat file table into the database. 
2185   
2186          @note: If the table doesn't already exist in the database it will be 
2187          created. However, it won't have foreign key constraints (this is a 
2188          feature intended for release databases made by CU19). If you need 
2189          foreign key constraints it is best to call createTable() prior to 
2190          ingestTable(overWrite=False). 
2191   
2192          @todo: If not isOrdered and overWrite=True then we don't apply primary 
2193          key until after ingest. OR even better - if not isOrdered try to create 
2194          table without any constraints, if succeed, i.e. there is no existing 
2195          table or overWrite=True, then apply primary key after ingest other 
2196          don't but do check foreign keys. May make parts of 
2197          NeighbourTableIngester redundant. 
2198   
2199          @param tableName:    Name of table to create/update in the database. 
2200          @type  tableName:    str 
2201          @param filePathName: Name of the ingest file, with optional full 
2202                               catalogue server path or curation server path. If 
2203                               full path is omitted, then file is assumed to 
2204                               be located in the catalogue server's share 
2205                               directory. Otherwise if a different curation 
2206                               server path is supplied the file is automatically 
2207                               moved to the share (assuming the fileTag has been 
2208                               set upon Ingester() initialisation). 
2209          @type  filePathName: str 
2210          @param idxInfo:      If you wish to drop indices prior to ingest, then 
2211                               supply L{Schema.parseIndices()} information. 
2212          @type  idxInfo:      defaultdict(str: list(Schema.Index)) 
2213          @param overWrite:    If True, overwrite an existing table in the 
2214                               database. If False, update the table if it exists, 
2215                               otherwise create the table. NB: Whenever this 
2216                               method creates a table it does so without applying 
2217                               foreign key constraints. 
2218          @type  overWrite:    bool 
2219          @param isCsv:        If True, expects to ingest a CSV ascii file, else 
2220                               expects native binary format. 
2221          @type  isCsv:        bool 
2222          @param deleteFile:   If True, delete ingest file after ingest, 
2223                               regardless of outcome. 
2224          @type  deleteFile:   bool 
2225          @param isOrdered:    If True, the data is ordered by primary key. 
2226          @type  isOrdered:    bool 
2227          @param checkConstraints: If True, check foreign key constraints during 
2228                                   ingest if any exist, otherwise never check 
2229                                   foreign key constraints. 
2230          @type  checkConstraints: bool 
2231   
2232          @return: Number of rows ingested. 
2233          @rtype:  int 
2234   
2235          """ 
2236          if not filePathName: 
2237              return 0 
2238   
2239          if any(prefix in filePathName 
2240                 for prefix in["cu01id", "cu03id", "cu04id"]): 
2241              fileName = self.fileOnShare = filePathName[:] 
2242              ingHost, sharePath = filePathName.partition("samba/")[::2] 
2243              filePathName = 2 * self.sysc.catServerFileSep \ 
2244                      + ingHost.replace(os.sep, '').upper() \ 
2245                      + self.sysc.catServerFileSep \ 
2246                      + sharePath.replace(os.sep, self.sysc.catServerFileSep) 
2247          else: 
2248              fileName, filePathName = self._normalisePath(filePathName) 
2249   
2250          if not fileName:   
2251              return 0 
2252   
2253          Logger.addMessage("Ingesting into %s..." % tableName) 
2254          try: 
2255              ingestSchema = self._schema[tableName] 
2256          except KeyError: 
2257              raise Exception( 
2258                "<ERROR> Cannot ingest %s: Ingester was not initially supplied" 
2259                " with a schema for table %s" % (fileName, tableName)) 
2260   
2261          isNewTable = overWrite or not self.existsTable(ingestSchema.name) 
2262          if isNewTable: 
2263              isNewTable = self.createObjects([ingestSchema], overWrite) 
2264   
2265           
2266          if not overWrite and idxInfo: 
2267              self.dropObjects(idxInfo[tableName]) 
2268              self.commitTransaction()   
2269   
2270          hints = "firstrow=1" 
2271          if isCsv: 
2272              hints += ", datafiletype='char', fieldterminator=','"\ 
2273                       ", rowterminator='0x0A'" 
2274          else: 
2275              hints += ", datafiletype='native', tablock" 
2276   
2277           
2278           
2279           
2280          checkConstraints = checkConstraints and not isNewTable \ 
2281             and any(type(constraint) is schema.ForeignKeyConstraint 
2282                     for constraint in ingestSchema.constraints) 
2283   
2284          if checkConstraints: 
2285              hints += ", check_constraints" 
2286   
2287          if isOrdered: 
2288              hints += ", order(%s)" % self._schema[tableName].primaryKey() 
2289   
2290          sql = "BULK INSERT %s FROM '%s' WITH (%s)" \ 
2291            % (tableName, filePathName, hints) 
2292   
2293          try: 
2294              try: 
2295                   
2296                  timeout = 0 
2297                  while True: 
2298                      try: 
2299                          numRowsIngested = \ 
2300                            self._executeScript(sql, wantRowCount=True) 
2301   
2302                      except odbc.DatabaseError as error: 
2303                          if "does not exist" not in str(error) or timeout == 600: 
2304                              raise 
2305   
2306                          time.sleep(1) 
2307                          timeout += 1 
2308                      else: 
2309                          break 
2310   
2311              except odbc.ProgrammingError as error: 
2312                  if not str(error).startswith('Cannot fetch a row from OLE DB ' 
2313                    'provider "BULK" for linked server "(null)".\nScript'): 
2314                      raise 
2315   
2316                  msg = "Ingest file data does not match the schema of " 
2317                  msg += tableName 
2318   
2319                  if not isCsv: 
2320                       
2321                      raise Ingester.IngestError(msg 
2322                         + ', ' + str(error).split("returned ")[-1] 
2323                         + ". Catalogue server file path: " + filePathName) 
2324                  else: 
2325                      Logger.addMessage("<WARNING> " + msg 
2326                                        + ". Attemping ODBC insert...") 
2327   
2328                       
2329                      numRowsIngested = 0 
2330                      for row in csv.File(self.sharePath(fileName)): 
2331                          fRow = [column.parseValue(val) for val, column 
2332                                  in zip(row, self._schema[tableName].columns)] 
2333                          try: 
2334                              isIngested = self.insertData(tableName, fRow) 
2335                          except odbc.Error as error: 
2336                              raise Ingester.IngestError(error) 
2337   
2338                          if not isIngested: 
2339                              Logger.addMessage("Insert of this row failed: " 
2340                                                + ', '.join(map(str, fRow))) 
2341   
2342                          numRowsIngested += isIngested 
2343                          Logger.addMessage("Row number = %s" % numRowsIngested, 
2344                                            alwaysLog=False) 
2345              else: 
2346                   
2347                  numRowsIngested = \ 
2348                    max(self._executeScript("SELECT ROWCOUNT_BIG()", 
2349                        wantResult=True, wantAll=False)[0], 0) 
2350   
2351          except: 
2352              if self.sysc.catServerSharePath in filePathName: 
2353                  Logger.addMessage("<Info> Ingest file left on file share: " 
2354                                   + self.sharePath(fileName)) 
2355              else: 
2356                  Logger.addMessage("<Info> Ingest file left on catalogue " 
2357                      "server: " + filePathName) 
2358              raise 
2359   
2360          Logger.addMessage("done! Rows affected = %s" % numRowsIngested) 
2361   
2362          self._removeIngestFile(fileName, filePathName, deleteFile) 
2363   
2364          return numRowsIngested 
 2365   
2366       
2367   
2369          """ 
2370          Moves a file to the catalogue load server's share directory as mounted 
2371          on the curation server. Works around the share violation problem under 
2372          NFS: a file written to the curation-client / load-server NFS file share 
2373          cannot be accessed immediately after being written unless it is first 
2374          renamed! 
2375   
2376          @param filePathName: Path of file to copy over to the share directory. 
2377                               Must be a full path to the file visible from the 
2378                               curation server O/S. 
2379          @type  filePathName: str 
2380   
2381          @return: New file name in share directory, including any 
2382                   sub-directories, or None if original file does not exist. 
2383          @rtype:  str 
2384   
2385          """ 
2386          if not os.path.exists(filePathName): 
2387              return 
2388   
2389          elif self.fileTag is Ingester.fileTag: 
2390               
2391               
2392               
2393               
2394               
2395              return filePathName.replace(self.sharePath(), '') 
2396   
2397          if not filePathName.startswith(self.sharePath()): 
2398              shareDisk = self.sharePath().split(os.sep)[1] 
2399              if filePathName.split(os.sep)[1] != shareDisk: 
2400                  Logger.addMessage( 
2401                    "Transferring ingest file to %s..." % shareDisk) 
2402   
2403              fileName = os.path.basename(filePathName) 
2404              shutil.move(filePathName, self.sharePath(fileName)) 
2405              filePathName = self.sharePath(fileName) 
2406   
2407           
2408          fileRoot, fileExt = os.path.splitext(filePathName) 
2409          taggedPathName = fileRoot + self.fileTag + fileExt 
2410          if self.fileTag not in fileRoot: 
2411              os.rename(filePathName, taggedPathName) 
2412          else: 
2413              os.rename(filePathName, taggedPathName) 
2414              os.rename(taggedPathName, filePathName) 
2415              taggedPathName = filePathName 
2416   
2417          return taggedPathName.replace(self.sharePath(), '') 
 2418   
2419       
2420   
2422          """ 
2423          Move ingest file to a place visible to the catalogue server O/S and 
2424          normalise different filePathName inputs into the format: 
2425   
2426          filePathName = Full path to file from the catalogue server O/S e.g. 
2427          H:\dir\subdir\filename.ext 
2428   
2429          fileName = filename.ext or subdir/filename.ext if file is in a 
2430          subdirectory of the catalogue server share directory. 
2431   
2432          @param filePathName: Any acceptable file path definition allowed by 
2433                               the Ingester.ingestTable() method. 
2434          @type  filePathName: str 
2435   
2436          @return: fileName, filePathName (see description above). 
2437          @rtype:  tuple(str, str) 
2438   
2439          """ 
2440          if self.sharePath() in filePathName: 
2441              curPathName = filePathName 
2442   
2443          elif self.sysc.catServerFileSep not in filePathName: 
2444               
2445              curPathName = (filePathName if os.path.dirname(filePathName) else 
2446                             self.sharePath(filePathName)) 
2447   
2448          elif self.sysc.catServerSharePath in filePathName: 
2449               
2450               
2451              curPathName = self.sharePath( 
2452                filePathName.replace(self.sysc.catServerSharePath, '') 
2453                            .replace(self.sysc.catServerFileSep, os.sep)) 
2454   
2455          else: 
2456               
2457              fileName = filePathName   
2458              return fileName, filePathName 
2459   
2460          fileName = self._moveToShare(curPathName) 
2461          if not fileName: 
2462              return None, None 
2463          else: 
2464              filePathName = self.sysc.catServerSharePath \ 
2465                + fileName.replace(os.sep, self.sysc.catServerFileSep) 
2466   
2467              return fileName, filePathName 
 2468   
2469       
2470   
2472          """ 
2473          Remove the ingest file. 
2474   
2475          @param fileName:     Name and sub-dir of file in the curation server 
2476                               share path. 
2477          @type  fileName:     str 
2478          @param filePathName: Full path to file on the catalogue server. 
2479          @type  filePathName: str 
2480          @param deleteFile:   If false, then just provide a warning. 
2481          @type  deleteFile:   bool 
2482   
2483          """ 
2484          if deleteFile and not self.isTrialRun: 
2485              try: 
2486                  self.runOnServer("del " + filePathName) 
2487              except DbSession.CatalogueServerError: 
2488                  if self.sysc.catServerSharePath not in filePathName: 
2489                      raise 
2490   
2491                   
2492                  os.remove(self.sharePath(fileName)) 
2493          elif "cu03id" in filePathName or "cu04id" in filePathName: 
2494              pass 
2495          elif self.sysc.catServerSharePath not in filePathName: 
2496              Logger.addMessage("<Info> Ingest file left on catalogue server: " 
2497                                + filePathName) 
2498          elif deleteFile: 
2499              Logger.addMessage("<Info> Ingest file left on file share: " 
2500                                + self.sharePath(fileName)) 
  2501   
2505      """ 
2506      A special type of database session, where you wish to bulk outgest data 
2507      from the database to a file. This class ensures the connection has the 
2508      correct permissions and is designed to work around the delays in the 
2509      appearance of the data on the NFS mounted load server share directory. If 
2510      the schema for tables that data will be outgested from is supplied, then 
2511      upon initialisation this class verifies that the schemas of the database 
2512      tables are up-to-date. Otherwise, the class will query the database 
2513      schema directly, which is slower. 
2514   
2515      """ 
2516       
2517       
2518   
2519      fileTag = 'CuID000000' 
2520      """ Tag to append to outgest files so they can be cleaned-up later. 
2521      """ 
2522      sampleRate = 2 
2523      """ Delay in seconds between testing file size on NFS mounted share. 
2524      """ 
2525      tempViewName = "OutgestTempView" + str(os.getpid()) + os.getenv("HOST") 
2526      """ Name of temporary outgest view that's created if the SQL statement is 
2527          too long for the BCP statement. Host PID is a safer UID than fileTag. 
2528      """ 
2529      timeOut = 600 
2530      """ Time in seconds before assuming outgest to NFS has failed. 
2531      """ 
2532       
2533       
2534   
2535      _isBcpDeadlocked = False   
2536   
2537       
2538   
2540          """ 
2541          Makes connection to the requested database. Supplied database 
2542          connection must be under a username that has has bcp rights, e.g. 
2543          ldservrw/pubdbrw. Can't do a SETUSER to make sure we are always 
2544          ldservrw, because not all users have the permissions to perform a 
2545          SETUSER - which kinda defeats the object. If user is not ldservrw or 
2546          pubdbrw a new connection will be made as ldservrw (which is safe as 
2547          Outgester only reads from the database anyway). In this circumstance, 
2548          to prevent too many connections being made/dropped make sure to create 
2549          just a single Outgester object, which is continually used throughout. 
2550   
2551          @param dbCon: A database connection. 
2552          @type  dbCon: DbSession 
2553          @param tag:   A tag string to append to the file name on the 
2554                        share directory, so file clean up can be rm *tag*. 
2555          @type  tag:   str 
2556          @param honourTrialRun: If True, don't outgest if the database 
2557                                 connection is in trial run mode. 
2558          @type  honourTrialRun: bool 
2559   
2560          """ 
2561          super(Outgester, self).__init__( 
2562                database=dbCon.server + '.' + dbCon.database, 
2563              autoCommit=dbCon._autoCommit, 
2564              isTrialRun=dbCon.isTrialRun if honourTrialRun else False, 
2565                userName=(dbc.loadServerRwUsername() 
2566                          if dbCon.userName != "pubdbrw" else dbCon.userName), 
2567            isPersistent=dbCon._isPersistent) 
2568   
2569          self._isDirty = dbCon._isDirty 
2570          self.fileTag = tag 
 2571   
2572       
2573   
2575          """ Returns the location of a file that outgest with this fileID. 
2576          """ 
2577          return self.sharePath(fileID + self.fileTag + ".dat") 
 2578   
2579       
2580   
2581 -    def outgestQuery(self, query, fileID='', filePath='', createDtdFile=False, 
2582                             isCsv=False, redirectStdOut=False): 
 2583          """ 
2584          Bulk outgest the results from an SQL query to a file in the catalogue 
2585          server share directory. 
2586   
2587          @note: If data is being copied by bulk outgest/ingest it helps to order 
2588                 by the primary key. 
2589          @note: This function may hang for up to 10 minutes after outgest if the 
2590                 outgest failed silently (which can happen), whilst waiting for 
2591                 the NFS update to time out. I've chosen not to print a message 
2592                 saying the outgest is complete and that the function is waiting 
2593                 until time-out, because multiple outgests would display too 
2594                 much information, and there's no point making this message to 
2595                 occur in debug mode only because it would be obvious. 
2596   
2597          @param query:    SQL query to outgest. 
2598          @type  query:    SelectSQL 
2599          @param fileID:   Optionally request a specific unique identifier to be 
2600                           used in the name for the outgest file. The file will 
2601                           be named "[fileID][fileTag].dat". This is useful if 
2602                           multiple outgests from the same table need to be 
2603                           available at the same time. 
2604          @type  fileID:   str 
2605          @param filePath: Optionally outgest to a path on the catalogue server 
2606                           other than the share path (e.g. "G:\"), or transfer 
2607                           outgest file to a different path on the curation 
2608                           server. Either give full new path name for the file or 
2609                           else just provide the path to the new directory, but 
2610                           make sure that directory exists first. 
2611          @type  filePath: str 
2612          @param createDtdFile: If True and the outgest is to a binary file on 
2613                                the share path, a file containing the data type 
2614                                definition of the outgest file will be created 
2615                                with the same name as the outgest file, but with 
2616                                a ".dtd" extension rather than ".dat". 
2617          @type createDtdFile:  bool 
2618          @param isCsv:         If True, outputs to CSV ascii file, else defaults 
2619                                to native binary format. 
2620          @type  isCsv:         bool 
2621          @param redirectStdOut: If True, the results of the outgest will be 
2622                                 written to a file on the share path rather than 
2623                                 returned through mxODBC. This is required to 
2624                                 work around a mysterious bug in very large table 
2625                                 outgests only and is slightly less efficient. 
2626          @type  redirectStdOut: bool 
2627   
2628          @return: Full path to the outgest file (unless createDtdFile requested, 
2629                   which returns the row count outgested). The path will be a 
2630                   curation server path if the file is outgest to the share 
2631                   directory, which is the default behaviour, otherwise a  
2632                   catalogue server path is returned. 
2633          @rtype:  str (or int) 
2634   
2635          """ 
2636          if len(query.tables) is 1: 
2637              table = query.tables.values()[0] 
2638          elif query.selectStr.count('.') is 1: 
2639              table = query.tables[query.selectStr.split('.')[0]] 
2640          else: 
2641              tableList = query.tables.values() 
2642              tableList.reverse() 
2643              table = ''.join(tableList) 
2644   
2645          drive = '' 
2646          if self.sysc.catServerFileSep in filePath: 
2647              drive, filePath = filePath, drive 
2648   
2649          fileName = \ 
2650            os.path.basename(self.getFilePath(fileID or table.split('.')[-1])) 
2651   
2652          filePathName = (drive or self.sysc.catServerSharePath) + fileName 
2653   
2654          queryFmt = '"%s OPTION (MAXDOP 1)"' 
2655          view = schema.View() 
2656          if len(str(query)) < 1000: 
2657              querySQL = (queryFmt % query).replace("'", "''") 
2658          else: 
2659              view.name = Outgester.tempViewName 
2660               
2661              view.definition = \ 
2662                str(SelectSQL(query.selectStr, query.fromStr, query.whereStr)) 
2663   
2664               
2665              order = ', '.join(col.split('.')[-1] 
2666                                for col in csv.values(query.orderByStr)) 
2667   
2668              tablePath = (self.tablePath(view.name) 
2669                           if self.userName not in ["pubdbrw", "ldservro"] else 
2670                           self.tablePath(view.name, ownerName=self.userName)) 
2671   
2672              querySQL = queryFmt % SelectSQL('*', tablePath, orderBy=order) 
2673              self.createObjects([view], overWrite=True) 
2674   
2675          if self._isDirty: 
2676              querySQL = querySQL.replace(" WHERE", " WITH (NOLOCK) WHERE") 
2677   
2678          try: 
2679              cmd = "bcp %s queryout %s %s -T" % \ 
2680                (querySQL, filePathName, '-a' if isCsv else '-n') 
2681   
2682              while not self._outgestFile(cmd, redirectStdOut, view): 
2683                   
2684                  time.sleep(60) 
2685   
2686          finally: 
2687              self._isBcpDeadlocked = False 
2688              if view.definition: 
2689                  self.dropObjects([view]) 
2690   
2691          if drive: 
2692              if ':' in filePathName: 
2693                  filePathName = self.uncPath(filePathName) 
2694          else: 
2695              filePathName = self.sharePath(fileName) 
2696   
2697              if not self.isTrialRun: 
2698                  self._waitForNFS(filePathName, query) 
2699   
2700              if filePath: 
2701                  filePathName = self._transferFile(filePathName, filePath) 
2702   
2703              if not isCsv and createDtdFile and not self.isTrialRun: 
2704                  dataTypes = self._getDataTypes(query) 
2705   
2706                   
2707                   
2708                  dtdPathName = filePathName.replace('.dat', '.dtd') 
2709                  if os.path.exists(dtdPathName): 
2710                      os.remove(dtdPathName) 
2711   
2712                  file(dtdPathName, 'w').writelines(dataType + '\n' 
2713                                                    for dataType in dataTypes) 
2714                  os.chmod(dtdPathName, 0444) 
2715   
2716                  rowSize = sum(self.sysc.sqlDataTypeSize[dataType] 
2717                                for dataType in dataTypes) 
2718   
2719                  return os.path.getsize(filePathName) // rowSize 
2720   
2721          return filePathName 
 2722   
2723       
2724   
2726          """ Returns ordered list of data types returned by the given query, 
2727              either via examining the schema, if available, or by direct 
2728              database table query. 
2729   
2730              @todo: Select * queries may not return correctly ordered data 
2731                     types. If so, and order matters, this can be acheived by 
2732                     another database query - which of course will slow the code 
2733                     down a bit. 
2734              @todo: Simplify by just running a test query of one row and then 
2735                     inspecting the cursor like pySQL does? Similar to how 
2736                     DbSession.queryRowSize() works. This will render the above 
2737                     todo obsolete. 
2738          """ 
2739          columns = \ 
2740            [column.split('.')[-1] for column in csv.values(query.selectStr)] 
2741   
2742           
2743           
2744          attrs = {} 
2745          for table in query.tables.values(): 
2746              attrs.update(self.queryDataTypes(table)) 
2747   
2748          if columns == ['*']: 
2749              if '.' not in query.selectStr: 
2750                  columns = attrs.keys() 
2751              else: 
2752                  alias = query.selectStr.split('.')[0] 
2753                  table = query.tables[alias].split('.')[-1] 
2754                  columns = self.queryColumnNames(table) 
2755   
2756          return [attrs[column] for column in columns] 
 2757   
2758       
2759   
2761          """ 
2762          Performs outgest to file, ensuring it exists. 
2763   
2764          @return: True, if outgest file successfully created. 
2765          @rtype:  bool 
2766   
2767          """ 
2768          if redirectStdOut: 
2769               
2770               
2771              resultFile = "G:" + self.sysc.catServerFileSep + self.database \ 
2772                + self.fileTag + "_outgest.log" 
2773   
2774              cmd += " -o " + resultFile 
2775   
2776          resultSet = self.runOnServer(cmd) 
2777   
2778          if redirectStdOut and not self.isTrialRun: 
2779               
2780               
2781               
2782              resultSet = self.runOnServer("type " + resultFile) 
2783              self.runOnServer("del " + resultFile) 
2784   
2785          if not self.isTrialRun and (len(resultSet) < 3 
2786            or "rows copied" not in resultSet[-3]): 
2787   
2788              isDeadlock = \ 
2789                any("deadlock victim" in result for result in resultSet) 
2790   
2791              if not self._isBcpDeadlocked and isDeadlock: 
2792                  Logger.addMessage("Outgest delayed due to deadlock...") 
2793                  self._isBcpDeadlocked = isDeadlock 
2794   
2795              if not isDeadlock: 
2796                  msg = "BCP Error:\n" 
2797                  msg += ('\n'.join(resultSet) if resultSet else 
2798                          "Outgest failed. BCP gave no error.") 
2799   
2800                  msg += "\nCommand executed: " + cmd 
2801                  msg += ('' if not view.definition else 
2802                          "\n%s = %s" % (view.name, view.definition)) 
2803   
2804                  raise odbc.DatabaseError(msg) 
2805   
2806          return not self._isBcpDeadlocked 
 2807   
2808       
2809   
2811          """ Transfers given file from the share path to the given path on the 
2812              curation server. 
2813          """ 
2814          if not os.path.isdir(filePath): 
2815              newFileName = os.path.basename(filePath) 
2816              newFilePathName = filePath 
2817          else: 
2818              newFileName = os.path.basename(filePathName) 
2819              newFilePathName = os.path.join(filePath, newFileName) 
2820   
2821          if self.isTrialRun or newFilePathName == filePathName: 
2822              return newFilePathName 
2823   
2824          if filePath.split(os.sep)[1] != filePathName.split(os.sep)[1]: 
2825              Logger.addMessage("Transferring outgest file to %s..." 
2826                                % self.sysc.getServerName(filePath)) 
2827   
2828          if os.path.exists(newFilePathName): 
2829              Logger.addMessage("<Info> File %s already exists, will be " 
2830                                "overwritten." % newFileName) 
2831              os.remove(newFilePathName) 
2832   
2833          os.system('cp %s %s' % (filePathName, newFilePathName)) 
2834   
2835          if (os.path.getsize(filePathName) != os.path.getsize(newFilePathName)): 
2836              os.remove(filePathName) 
2837              raise Exception("Transfer from %s to %s failed" 
2838                              % (self.server, self.sysc.getServerName(filePath))) 
2839   
2840          os.remove(filePathName) 
2841   
2842          return newFilePathName 
 2843   
2844       
2845   
2847          """ Wait until the file on the NFS share is ready to be accessed. 
2848          """ 
2849          curTime = time.time() 
2850          fileName = os.path.basename(filePathName) 
2851          cmd = "dir " + self.sysc.catServerSharePath + fileName 
2852          dirResults = [result.split()[2] for result in self.runOnServer(cmd) 
2853                        if result and fileName in result] 
2854          try: 
2855              tableFileSize = int(dirResults[0].replace(',', '')) 
2856          except IndexError: 
2857              raise odbc.OperationalError( 
2858                "SQL outgest query to file %s has failed.\n" 
2859                "Query executed: %s" % (filePathName, query)) 
2860   
2861          while (not os.path.exists(filePathName) or 
2862                 os.path.getsize(filePathName) != tableFileSize): 
2863   
2864              time.sleep(Outgester.sampleRate) 
2865   
2866              if time.time() - curTime > Outgester.timeOut: 
2867                  sizeStr = ("Non-existent" if not os.path.exists(filePathName) 
2868                    else str(os.path.getsize(filePathName))) 
2869                  raise odbc.OperationalError( 
2870                    "SQL outgest query to file %s has failed.\n" 
2871                    "Expected file size = %s\n" 
2872                    "Actual size        = %s\n" 
2873                    "Query executed: %s" % 
2874                    (filePathName, tableFileSize, sizeStr, query)) 
  2875   
2879      """ 
2880      Defines a database object, and provides method to create and modify 
2881      databases through administrative database connections. First, define with 
2882      L{Database.__init__()}, then create the database with L{Database.create()}. 
2883   
2884      @group Nested Error Exceptions: ExistsError 
2885   
2886      """ 
2888          """ Exception thrown when a database being created already exists. 
2889          """ 
2890          msg = "Database %r already exists." 
2891   
 2897   
2898       
2899       
2900   
2901      files = None   
2902      name = ""      
2903   
2904       
2905       
2906   
2907      _folders = []   
2908      _sqlLine = ""   
2909   
2910       
2911   
2915          """ 
2916          Defines a database object. 
2917   
2918          @param name:        Name of database to create. 
2919          @type  name:        str 
2920          @param volumes:     List of volumes to spread database files over. 
2921          @type  volumes:     str 
2922          @param dbDir:       Directory name on catalogue server to store 
2923                              database. This value is overriden for multi-file 
2924                              databases, which have the directory name as the 
2925                              database name, otherwise assumes non-survey folder. 
2926          @type  dbDir:       str 
2927          @param primarySize: Total initial size of files in the primary file 
2928                              group, as a string, e.g. "4 GB". 
2929          @type  primarySize: str 
2930          @param logSize:     Total initial size of files in the log file group, 
2931                              as a string, e.g. "4 GB". 
2932          @type  logSize:     str 
2933          @param filegroups:  List of additional file group names and sizes, e.g. 
2934                              ("Detection", "4 GB"). 
2935          @type  filegroups:  list(tuple(str, str)) 
2936          @param filegrowth:  File growth, e.g. "10%". 
2937          @type  filegrowth:  str 
2938   
2939          """ 
2940          self.name = name 
2941          self._sqlLine = "CREATE DATABASE %s ON PRIMARY " % self.name 
2942          dbDir = (self.name if filegroups else 
2943                   dbDir or SystemConstants(name).nonSurveyDir) 
2944   
2945          dbDir += SystemConstants.catServerFileSep 
2946          self._folders = [volume + dbDir for volume in volumes] 
2947   
2948          fgLine = "(name=%s, filename='%s', size=%s, maxsize=UNLIMITED, " 
2949          fgLine += "filegrowth=%s)," % filegrowth.replace('%', '%%') 
2950          self.files = [] 
2951   
2952           
2953          for count, volume in enumerate(volumes): 
2954              if len(volumes) is 1: 
2955                  fgName = "Primary_FG" 
2956              else: 
2957                  fgName = "pfg_%s" % (count + 1) 
2958   
2959              if filegroups: 
2960                  filePathName = volume + dbDir + \ 
2961                                 "PrimaryFileGroup%s.mdf" % (count + 1) 
2962              else: 
2963                  filePathName = volume + dbDir + self.name + ".mdf" 
2964   
2965              self.files.append(filePathName) 
2966              self._sqlLine += fgLine % (fgName, filePathName, 
2967                               self._splitSize(primarySize, len(volumes))) 
2968   
2969           
2970          for filegroup, size in filegroups: 
2971              self._sqlLine += " filegroup %s_FG " % filegroup 
2972              if len(volumes) is 1: 
2973                  fgName = "%s_FG" % filegroup 
2974              else: 
2975                  fgName = ''.join(takewhile(lambda c: c.islower(), filegroup)) 
2976                  fgName += filegroup[len(fgName)].lower() + "fg_%s" 
2977   
2978              for count, volume in enumerate(volumes): 
2979                   
2980                   
2981                   
2982                  filePathName = volume + dbDir + filegroup + "%s.ndf" % (count + 1) 
2983                  self.files.append(filePathName) 
2984                  fgVolName = \ 
2985                    (fgName % (count + 1) if len(volumes) > 1 else fgName) 
2986   
2987                  self._sqlLine += fgLine % (fgVolName, filePathName, 
2988                                             self._splitSize(size, len(volumes))) 
2989   
2990           
2991          self._sqlLine = self._sqlLine.rstrip(',') + " LOG ON " 
2992          for count, volume in enumerate(volumes): 
2993              if len(volumes) is 1: 
2994                  fgName = "Log_FG" 
2995              else: 
2996                  fgName = "lfg_%s" % (count + 1) 
2997   
2998              if filegroups: 
2999                  filePathName = volume + dbDir + \ 
3000                                 "LogFileGroup_data%s.ldf" % (count + 1) 
3001              else: 
3002                  filePathName = volume + dbDir + self.name + ".ldf" 
3003   
3004              self.files.append(filePathName) 
3005              self._sqlLine += fgLine % (fgName, filePathName, 
3006                                         self._splitSize(logSize, len(volumes))) 
3007   
3008          self._sqlLine = self._sqlLine.rstrip(',') 
 3009   
3010       
3011   
3013          """ 
3014          @return: Database name. 
3015          @rtype:  str 
3016   
3017          """ 
3018          return self.name 
 3019   
3020       
3021   
3022 -    def attach(self, adminDb, asName=None): 
 3023          """ 
3024          Re-attach the offline database that was previously detached. The 
3025          current implementation is tied to MS SQL Server via T-SQL. 
3026   
3027          @param adminDb: An autocommiting admin (i.e. master) database 
3028                          connection to the server where the database will be 
3029                          attached. 
3030          @type  adminDb: DbSession 
3031          @param asName:  Optional alternative name to attach the database as, 
3032                          instead of its real name (used in the file names). 
3033          @type  asName:  str 
3034   
3035          """ 
3036          sql = self._sqlLine + " FOR ATTACH" 
3037          name = self.name 
3038          if asName: 
3039              sql = sql.replace("CREATE DATABASE %s" % name, 
3040                                "CREATE DATABASE %s" % asName) 
3041              name = asName 
3042   
3043          try: 
3044              adminDb._executeScript(sql) 
3045          except odbc.ProgrammingError as error: 
3046              if Database.ExistsError.msg % name in str(error): 
3047                  raise Database.ExistsError(name) 
3048              try: 
3049                   
3050                   
3051                   
3052                  sql = "sp_attach_db '%s', '%s'" % (name, self.files[0]) 
3053                  adminDb._executeScript(sql) 
3054              except: 
3055                  raise error 
3056   
3057           
3058          self.name = name 
3059          self.setDefaultOptions(adminDb) 
 3060   
3061       
3062   
3063 -    def create(self, adminDb, overWrite=False): 
 3064          """ 
3065          Creates this database on a specified server. 
3066   
3067          @param adminDb:   An autocommiting admin (i.e. master) database 
3068                            connection to the server where the database will be 
3069                            created. 
3070          @type  adminDb:   DbSession 
3071          @param overWrite: If True, overwrite any pre-existing database of this 
3072                            name. 
3073          @type  overWrite: bool 
3074   
3075          """ 
3076          self._makeFolders(adminDb) 
3077   
3078           
3079          try: 
3080              adminDb._executeScript(self._sqlLine) 
3081          except odbc.ProgrammingError as error: 
3082              if overWrite and ( 
3083                "file names listed could not be created" in str(error) 
3084                or Database.ExistsError.msg % self.name in str(error)): 
3085                   
3086                  self.delete(adminDb) 
3087                  self._makeFolders(adminDb) 
3088                   
3089                  adminDb._executeScript(self._sqlLine) 
3090              elif Database.ExistsError.msg % self.name in str(error): 
3091                  raise Database.ExistsError(self.name) 
3092              else: 
3093                  raise 
3094   
3095          self.setDefaultOptions(adminDb) 
 3096   
3097       
3098   
3100          """ 
3101          Delete this database on a specified server. 
3102   
3103          @param adminDb: An autocommiting admin (i.e. master) database 
3104                          connection to the server where the database will be 
3105                          deleted. 
3106          @type  adminDb: DbSession 
3107   
3108          """ 
3109           
3110          numAttempts = 20 
3111          delay = 600 
3112          for attempt in range(numAttempts): 
3113              try: 
3114                  self.detach(adminDb) 
3115              except odbc.ProgrammingError as error: 
3116                  if "does not exist" in str(error): 
3117                      break 
3118                  else: 
3119                      msg = "<Info> Cannot delete existing database because it "\ 
3120                        "is in use. Attempt %s of %s." % (attempt + 1, numAttempts) 
3121                      if attempt + 1 == numAttempts: 
3122                          msg += " Giving up..." 
3123                          Logger.addMessage(msg) 
3124                          raise 
3125                      else: 
3126                          msg += " Waiting for %d min..." % (delay / 60) 
3127                          Logger.addMessage(msg) 
3128                          time.sleep(delay) 
3129              else: 
3130                  break 
3131   
3132           
3133          for filePathName in self.files: 
3134              adminDb.runOnServer("del " + filePathName) 
3135              adminDb._isLogReq[adminDb.database] = True 
3136   
3137           
3138          for folder in self._folders: 
3139              adminDb.runOnServer("rmdir " + folder) 
 3140   
3141       
3142   
3144          """ 
3145          Detach (take offline) the database. The current implementation is tied 
3146          to MS SQL Server via T-SQL. 
3147   
3148          @param adminDb: An autocommiting admin (i.e. master) database 
3149                          connection to the server where the database will be 
3150                          detached. 
3151          @type  adminDb: DbSession 
3152   
3153          """ 
3154          adminDb._executeScript("EXEC sp_detach_db %r" % self.name) 
 3155   
3156       
3157   
3159          """ 
3160          Returns the corresponding mirror server for the given server. 
3161   
3162          @param server: Name of server to find mirror of. 
3163          @type  server: str 
3164   
3165          @return: Name of mirror server. 
3166          @rtype:  str 
3167   
3168          """ 
3169           
3170          try: 
3171              originalNum = int(server.replace(SystemConstants.clusterName, '')) 
3172          except ValueError: 
3173              raise Exception("Database.mirror() can only be performed on " 
3174                              "cluster servers") 
3175   
3176          mirrorNum = 1 + (originalNum - 1 ^ 1) 
3177          return SystemConstants.clusterName + str(mirrorNum) 
 3178   
3179       
3180   
3182          """ 
3183          Copy the database to the mirror server on the cluster. 
3184   
3185          @param adminDb: An autocommiting admin (i.e. master) database 
3186                          connection to the server which has the database to be 
3187                          copied. 
3188          @type  adminDb: DbSession 
3189   
3190          @return: Mirror server name. 
3191          @rtype:  str 
3192   
3193          """ 
3194          mirrorServer = self.getMirrorServer(adminDb.server) 
3195   
3196           
3197          try: 
3198              self.detach(adminDb) 
3199          except odbc.DatabaseError as error: 
3200              Logger.addExceptionWarning(error) 
3201              Logger.addMessage("<WARNING> Cannot mirror database %s to %s as it" 
3202                " needs to be detached from %s first." 
3203                % (self.name, mirrorServer, adminDb.server)) 
3204   
3205              self.setReleaseOptions(adminDb) 
3206          else: 
3207              try: 
3208                  mirrorDb = DbSession( 
3209                    database=mirrorServer + '.' + adminDb.database, 
3210                    autoCommit=adminDb._autoCommit, 
3211                    isTrialRun=adminDb.isTrialRun, userName=adminDb.userName, 
3212                    isPersistent=adminDb._isPersistent) 
3213   
3214                   
3215                  self.delete(mirrorDb) 
3216                  self._makeFolders(mirrorDb) 
3217                  for filePathName in self.files: 
3218                      mirrorFile = mirrorDb.uncPath(filePathName) 
3219                      adminDb.runOnServer("copy %s %s" 
3220                                          % (filePathName, mirrorFile)) 
3221   
3222                  self.attach(mirrorDb) 
3223                  self.setReleaseOptions(mirrorDb) 
3224              finally: 
3225                  self.attach(adminDb) 
3226                  self.setReleaseOptions(adminDb) 
3227   
3228          return mirrorServer 
 3229   
3230       
3231   
3232 -    def release(self, adminDb, pubServer): 
 3233          """ 
3234          Copy the database to the given public server on the cluster. 
3235   
3236          @param adminDb:   An autocommiting admin (i.e. master) database 
3237                            connection to the server which has the database to be 
3238                            copied. 
3239          @type  adminDb:   DbSession 
3240          @param pubServer: Name of server to copy to. 
3241          @type  pubServer: str 
3242   
3243          @todo: Merge with mirror somehow. 
3244   
3245          """ 
3246          pubVolumes = SystemConstants.catServerVolumes(pubServer) 
3247   
3248           
3249          origVolumes = SystemConstants.catServerVolumes(adminDb.server) 
3250          origFolders = self._folders[:] 
3251          origFiles = self.files[:] 
3252          origSqlLine = self._sqlLine 
3253   
3254           
3255          try: 
3256              self.detach(adminDb) 
3257          except odbc.DatabaseError as error: 
3258              Logger.addExceptionWarning(error) 
3259              Logger.addMessage("<WARNING> Cannot copy database %s to %s as it " 
3260                "needs to be detached from %s first." 
3261                % (self.name, pubServer, adminDb.server)) 
3262          else: 
3263              try: 
3264                   
3265                  for volume, pubVolume in zip(origVolumes, pubVolumes): 
3266                      self._sqlLine = self._sqlLine.replace(volume, pubVolume) 
3267                      self._folders = [path.replace(volume, pubVolume) 
3268                                   for path in self._folders] 
3269   
3270                      self.files = [path.replace(volume, pubVolume) 
3271                                   for path in self.files] 
3272   
3273                  pubDb = DbSession( 
3274                    database=pubServer + '.' + adminDb.database, 
3275                    autoCommit=adminDb._autoCommit, 
3276                    isTrialRun=adminDb.isTrialRun, userName=adminDb.userName, 
3277                    isPersistent=adminDb._isPersistent) 
3278   
3279                   
3280                  self.delete(pubDb) 
3281                  self._makeFolders(pubDb) 
3282                  for origFile, pubFile in zip(origFiles, self.files): 
3283                      pubFile = pubDb.uncPath(pubFile) 
3284                      adminDb.runOnServer("copy %s %s" % (origFile, pubFile)) 
3285   
3286                  self.attach(pubDb) 
3287                  self.setReleaseOptions(pubDb) 
3288              finally: 
3289                   
3290                  self._folders = origFolders[:] 
3291                  self.files = origFiles[:] 
3292                  self._sqlLine = origSqlLine 
3293                  self.attach(adminDb) 
 3294   
3295       
3296   
3298          """ 
3299          Set MS SQL Server database options appropriate for bulk loading, over 
3300          the load database default options. 
3301   
3302          @param adminDb: An autocommiting admin (i.e. master) database 
3303                          connection to the server where the database will be 
3304                          altered. 
3305          @type  adminDb: DbSession 
3306   
3307          """ 
3308          for option in ["AUTO_CREATE_STATISTICS OFF", "RECOVERY SIMPLE"]: 
3309              adminDb._executeScript("ALTER DATABASE %s SET %s" 
3310                                     % (self.name, option)) 
 3311   
3312       
3313   
3315          """ 
3316          Set the MS SQL Server database options that are appropriate for a 
3317          load database over the MS SQL server default options. 
3318   
3319          @param adminDb: An autocommiting admin (i.e. master) database 
3320                          connection to the server where the database will be 
3321                          altered. 
3322          @type  adminDb: DbSession 
3323   
3324          """ 
3325          for option in ["AUTO_CREATE_STATISTICS ON", 
3326                         "RECOVERY BULK_LOGGED", 
3327                         "AUTO_SHRINK OFF", 
3328                         "TORN_PAGE_DETECTION ON", 
3329                         "AUTO_CLOSE OFF"]: 
3330              adminDb._executeScript("ALTER DATABASE %s SET %s" 
3331                                     % (self.name, option)) 
 3332   
3333       
3334   
3336          """ 
3337          Set MS SQL Server database options appropriate for a release db over 
3338          the default load database options. 
3339   
3340          @param adminDb: An autocommiting admin (i.e. master) database 
3341                          connection to the server where the database will be 
3342                          altered. 
3343          @type  adminDb: DbSession 
3344   
3345          """ 
3346          for option in ["AUTO_CREATE_STATISTICS ON", "RECOVERY SIMPLE"]: 
3347              adminDb._executeScript("ALTER DATABASE %s SET %s" 
3348                                     % (self.name, option)) 
 3349   
3350       
3351   
3353          """ 
3354          Creates the folders for the database files. 
3355   
3356          @param adminDb: An autocommiting admin (i.e. master) database 
3357                          connection to the server where the database will be 
3358                          created. 
3359          @type  adminDb: DbSession 
3360   
3361          """ 
3362           
3363          for folder in self._folders: 
3364              adminDb.runOnServer("mkdir " + folder) 
 3365   
3366       
3367   
3369          """ 
3370          Splits a size into a number of divisions, converting between GB and 
3371          MB when necessary. 
3372   
3373          @param size:      Initial size in GB or MB, e.g. 10 GB. 
3374          @type  size:      str 
3375          @param divisions: Number of divisions size must be divide between. 
3376          @type  divisions: int 
3377   
3378          @return: Divided integer size in either GB or MB. 
3379          @rtype:  str 
3380   
3381          """ 
3382          units = size[-2:] 
3383          newSize = float(size[:-2].strip()) / divisions 
3384          if newSize < 1.0: 
3385              if units != "MB": 
3386                  newSize *= 1000.0 
3387                  units = "MB" 
3388              else: 
3389                  newSize = 1.0 
3390   
3391          return "%s%s" % (int(newSize), units) 
  3392   
3393   
3394   
3395 -class Join(object): 
 3396      """ 
3397      Defines a database inner join between two or more tables. Use as the table 
3398      argument value in L{DbSession.query()} type methods or when initialising 
3399      an L{SelectSQL} object. 
3400   
3401      """ 
3402       
3403      aliases = None   
3404      joinCols = ''    
3405      fromStr = ''     
3406      whereStr = ''    
3407   
3408 -    def __init__(self, tables, attributes, subJoin=None): 
 3409          """ 
3410          Defines an SQL inner join between given tables on given attributes. 
3411   
3412          @param tables:     Unique set of all tables to be joined on same 
3413                             attribute set. Either just supply table names, or 
3414                             a tuple containing table name and alias, e.g. 
3415                             ["Multiframe", "ProgrammeFrame"] or 
3416                             [("Multiframe", "M1"), ("Multiframe", "M2")]. 
3417          @type  tables:     sequence(str) or sequence(tuple) 
3418          @param attributes: Unique attribute set on which to join. If a single 
3419                             join column is required then just a string will do. 
3420          @type  attributes: sequence(str) or str 
3421          @param subJoin:    Join additional tables on just the given attribute. 
3422          @type  subJoin:    tuple(list(str), str) 
3423   
3424          """ 
3425          if len(tables) < 2: 
3426              raise ValueError("Join class must be supplied with at least two " 
3427                               "tables: tables = %s" % tables) 
3428   
3429          firstTable = self._parseInput(tables, attributes) 
3430   
3431          if subJoin: 
3432              subTables, subCol = subJoin 
3433              if isinstance(subTables, str): 
3434                  subTables = [subTables] 
3435   
3436              self.fromStr += ", " + ", ".join(subTables) 
3437              self.whereStr += " AND " + " AND ".join("%s.%s=%s.%s" 
3438                % (firstTable, subCol, table.split('.')[-1], subCol) 
3439                for table in subTables) 
 3440   
3441       
3442   
3444          """ 
3445          @return: The SQL FROM clause for this join. 
3446          @rtype:  str 
3447   
3448          """ 
3449          return self.fromStr 
 3450   
3451       
3452   
3486   
3487       
3488   
3489 -    def _setSQL(self, tables, predicates): 
 3490          """ 
3491          Sets the FROM and WHERE SQL statement member variables for this join. 
3492   
3493          """ 
3494          self.whereStr = " AND ".join(predicates) 
3495          self.fromStr = ", ".join((table if not isinstance(table, tuple) else 
3496                                    "%s AS %s" % table) for table in tables) 
  3497   
3501      """ 
3502      Defines a database left outer join between two tables. Use as the table 
3503      argument value in L{DbSession.query()} type methods or when initialising 
3504      an L{SelectSQL} object. 
3505   
3506      """ 
3507 -    def __init__(self, tables, attributes): 
 3508          """ 
3509          Defines an SQL left outer join between given tables on given attributes. 
3510   
3511          @param tables:     Pair of tables to be left-joined on same attribute 
3512                             set. Either just supply table names, or a tuple 
3513                             containing table name and alias, e.g. 
3514                             ["Multiframe", "ProgrammeFrame"] or 
3515                             [("Multiframe", "M1"), ("Multiframe", "M2")]. 
3516          @type  tables:     sequence(str) or sequence(tuple) 
3517          @param attributes: Unique attribute set on which to join. If a single 
3518                             join column is required then just a string will do. 
3519          @type  attributes: sequence(str) or str 
3520   
3521          """ 
3522          if len(tables) != 2: 
3523              raise ValueError("LeftJoin class must be supplied with exactly two" 
3524                               " tables: tables = %s" % tables) 
3525   
3526          self._parseInput(tables, attributes) 
 3527   
3528       
3529   
3530 -    def _setSQL(self, tables, predicates): 
 3531          """ 
3532          Sets the FROM and WHERE SQL statement member variables for this join. 
3533   
3534          """ 
3535          self.fromStr = " LEFT OUTER JOIN ".join( 
3536            (table if not isinstance(table, tuple) else "%s AS %s" % table) 
3537            for table in tables) + " ON " + " AND ".join(predicates) 
  3538   
3542      """ 
3543      Defines a database left outer join between two tables on the first table's 
3544      primary key. Use as the table argument value in L{DbSession.query()} type 
3545      methods or when initialising an L{SelectSQL} object. 
3546   
3547      """ 
3548 -    def __init__(self, leftTable, rightTable): 
 3549          """ 
3550          Defines an SQL left outer join between given tables on the first 
3551          table's primary key. 
3552   
3553          @param leftTable:  First table in the left join. 
3554          @type  leftTable:  Schema.Table 
3555          @param rightTable: Second table in the left join. 
3556          @type  rightTable: Schema.Table 
3557   
3558          """ 
3559          tables = [leftTable.name, rightTable.name] 
3560          self._parseInput(tables, leftTable.primaryKey()) 
  3561   
3565      """ 
3566      Defines an SQL select statement. Initialise with components of an SQL query 
3567      and then the SQL select line is returned when you convert the object to a 
3568      string. Primarily used as part of the WHERE clause of another SQL 
3569      statement. 
3570   
3571      """ 
3572       
3573      tables = None     
3574      selectStr = ''    
3575      fromStr = ''      
3576      whereStr = ''     
3577      orderByStr = ''   
3578      groupByStr = ''   
3579   
3580       
3581      _sql = ''         
3582   
3583 -    def __init__(self, select, table, where='', orderBy='', groupBy=''): 
 3638   
3639       
3640   
3642          """ @return: The SQL select statement line. 
3643              @rtype:  str 
3644          """ 
3645          return self._sql 
 3646   
3647       
3648   
3649      @staticmethod 
3651          """ Replaces tables derived from a sub-query in the SQL script's FROM 
3652              string, so that the class can parse the information correctly.  
3653          """ 
3654          nOpenBra = 0 
3655          nCloseBra = 0 
3656          isDerivedTable = False 
3657          startPos = 0 
3658          derivedTableDict = {} 
3659          for chrNo in range(len(fromStr)): 
3660              if fromStr[chrNo] == '(': 
3661                  nOpenBra += 1 
3662   
3663              if fromStr[chrNo] == ')': 
3664                  nCloseBra += 1 
3665   
3666              if nOpenBra > nCloseBra: 
3667                  if not isDerivedTable: 
3668                      startPos = chrNo 
3669                  isDerivedTable = True 
3670   
3671              if nCloseBra == nOpenBra: 
3672                  if isDerivedTable: 
3673                      derivedTableNo = len(derivedTableDict) + 1 
3674                      derivedTableDict['@derivedTable%s' % derivedTableNo] = \ 
3675                        fromStr[startPos:chrNo + 1] 
3676   
3677                  isDerivedTable = False 
3678   
3679          for dTable in derivedTableDict: 
3680              fromStr = fromStr.replace(derivedTableDict[dTable], dTable) 
3681   
3682          return derivedTableDict, fromStr 
  3683   
3684   
3685   
3686 -def bulkCopy(query, tableSchema, fromDb, toDb, fileTag, drive='', 
3687               isSmallTable=True): 
 3688      """ 
3689      Copies table data via a bulk outgest and ingest to another table, which 
3690      can be in a different database and on a different server. Destination table 
3691      will be automatically created if it doesn't already exist. 
3692   
3693      @param query:       Query of data selected to copy via outgest. Table names 
3694                          must be given with full path. Selection must be ordered 
3695                          by the ingest table's primary key in ascending order. 
3696      @type  query:       SelectSQL 
3697      @param tableSchema: Schema of the ingest table. 
3698      @type  tableSchema: schema.Table 
3699      @param fromDb:      Connection to the database where outgest will occur. 
3700      @type  fromDb:      DbSession 
3701      @param toDb:        Connection to the database where ingest will occur. 
3702      @type  toDb:        DbSession 
3703      @param fileTag:     Unique tag to attach to temporary outgest/ingest files. 
3704      @type  fileTag:     str 
3705      @param drive:       Optionally outgest to a different catalogue server 
3706                          drive, instead of the share directory, e.g. "G:\". 
3707      @type  drive:       str 
3708      @param isSmallTable: If False, then stdout is redirected to file to work 
3709                           around a mystery bug in very large table outgests, 
3710                           which is inefficient for small tables. 
3711      @type  isSmallTable: bool 
3712   
3713      @return: Number of rows ingested. 
3714      @rtype:  int 
3715   
3716      @warning: Make sure the ingest table schema has been checked first. 
3717   
3718      @todo: Merge Outgester and Ingester classes into one BulkTransfer class 
3719             with individual outgest, ingest and copy functions? Then we can 
3720             handle the tableSchema initialisation a bit more neatly, and put 
3721             fileTag into the initialiser. Then codes can choose whether or not 
3722             to pre-initialise. 
3723      """ 
3724      outPathName = \ 
3725        Outgester(fromDb, fileTag, honourTrialRun=True).outgestQuery(query, 
3726          filePath=drive or toDb.sharePath(), 
3727          redirectStdOut=not isSmallTable) 
3728   
3729   
3730   
3731   
3732   
3733   
3734   
3735   
3736   
3737   
3738   
3739   
3740   
3741   
3742   
3743   
3744   
3745   
3746      ingester = Ingester(toDb, [tableSchema], tag=fileTag, skipSchemaCheck=True) 
3747      return ingester.ingestTable(tableSchema.name, outPathName, isOrdered=True) 
 3748   
3749   
3750   
3751   
3752   
3753