1
2
3 """
4 Database interface. The L{DbSession} class provides a complete database API
5 as well as connection management features. A database connection is
6 instantiated upon construction of a L{DbSession} object and then
7 automatically closed when the object falls out of scope. Existing
8 identical connections are reused, invisibly to the user, to reduce the
9 number of simultaneous open connections. All access to the database should
10 then be via the methods supplied by the L{DbSession} class.
11
12 Usage
13 =====
14
15 Database API
16 ------------
17
18 Import as::
19 from wsatools.DbConnect.DbSession import DbSession
20
21 For a simple default database connection to the WSA::
22
23 db = DbSession("WSA")
24 db.update("Multiframe", "deprecated=0", where="multiframeID")
25 print(db.queryNumRows("Multiframe"))
26 print(db.queryAttrMax("multiframeID", "Multiframe"))
27
28 For joined queries, e.g. a list of all LAS stacks multiframeIDs, use the
29 L{Join} class::
30
31 from wsatools.DbConnect.DbSession import Join
32
33 mfIDs = db.query("Multiframe.multiframeID",
34 table=Join(["Multiframe", "ProgrammeFrame"], ["multiframeID"]),
35 where="frameType LIKE '%stack' AND programmeID=101")
36
37 For queries where a select statement is used within a where clause, use the
38 L{SelectSQL} class::
39
40 from wsatools.DbConnect.DbSession import SelectSQL
41
42 progMfs = SelectSQL("multiframeID", table="ProgrammeFrame",
43 where="programmeID=%s" % self.programmeID)
44 fileNames = db.query("fileName", table="Multiframe",
45 where="multiframeID IN %s" % progMfs)
46
47 To perform a safe temporary disconnect from the database invoke the
48 L{DbSession.goOffline()} method. When any L{DbSession} object that uses this
49 connection next attempts to access the database, e.g. a query, the
50 connection will be reactivated, and if the database is down, then it will
51 persistently attempt to reconnect. Hence, you can also use this method at
52 sensitive stages of a curation script to ensure the database connection
53 remains alive.
54
55 Please refer to the L{DbSession} class documentation for the full range of
56 API methods and options.
57
58 Ingester
59 --------
60
61 Import as::
62 from wsatools.DbConnect.DbSession import Ingester
63
64 Before ingesting it is vital to ensure the schema of the ingest file matches
65 the table into which it will be ingesting. This occurs when an L{Ingester}
66 object is created, so this should be done as early as possible in your code,
67 and not within a For-Loop unnecesarily. To instantiate an L{Ingester} object
68 you must pass it a L{DbSession} database connection to the database that
69 hosts the table into which the ingest will occur, and a schema for the
70 table(s) that will be ingested into during the lifetime of this particular
71 L{Ingester} object::
72
73 db = DbSession()
74 ingester = Ingester(db, tableSchema)
75
76 The tableSchema is just a list of L{Schema.Table} objects for every table
77 that will be ingested this session. Use the L{Schema.parseTables()} method
78 to create this schema, and then reduce or expand the list until just the
79 exact set of tables to be ingest are listed. To ingest a native binary file
80 located in the catalogue server's share path into the table::
81
82 numRows = ingester.ingestTable("lasSource", "lasSource.nat", idxInfo)
83
84 Afterward the ingest file is deleted. The idxInfo is created by
85 L{Schema.parseIndices()} and is required if you wish to drop indices for
86 ingest, to automatically recreate them afterward. If the database table
87 doesn't already exist the ingester will create the table.
88
89 Please refer to the L{Ingester} class documentation for further options.
90
91 Outgester
92 ---------
93
94 Import as::
95 from wsatools.DbConnect.DbSession import Outgester, SelectSQL
96
97 To initialise an L{Outgester} object you must supply it with a connection to
98 the database where the outgest will occur::
99
100 db = DbSession()
101 sourceOutgester = Outgester(db)
102
103 Then to outgest the results of a particular table query to a binary file::
104
105 filePathName = sourceOutgester.outgestQuery(SelectSQL(
106 select="*", table="WSA.dbo.Multiframe", where="deprecated = 0"))
107
108 By default the file is outgested to the load server share path, and there is
109 a delay following outgest to allow for the NFS to see the shared file. If
110 an alternative path is given outside of the share path then there is no
111 delay to wait for the NFS to see the file.
112
113 Please refer to the L{Outgester} class documentation for further options.
114
115 Creating and modifying databases
116 --------------------------------
117
118 The L{Database} class provides methods to create and modify databases.
119
120 Import as::
121 from wsatools.DbConnect.DbSession import Database, DbSession
122 from wsatools.SystemConstants import WsaConstants
123
124 To recreate the WSA with standard options on server "pharaoh"::
125 WSA = Database(WsaConstants.loadDatabase,
126 filegroups=[("Detection", "4 GB"),
127 ("Source", "4 GB"),
128 ("Curation", "4 GB"),
129 ("Indices", "4 GB")])
130 WSA.create(DbSession("pharaoh."+WsaConstants.adminDatabase))
131
132 Please refer to the L{Database} class documentation for further options.
133
134 @author: R.S. Collins
135 @org: WFAU, IfA, University of Edinburgh
136
137 @todo: Make method keyword arguments consistent. The query() interface
138 should be attrs, table, where; and this naming convention should be
139 stuck to as much as possible in other methods. Should update() use
140 attrs or entry instead of entryList?
141 @todo: Cut down code duplication between updateEntries and deleteRows
142 + possibly between updateEntries + update, ditto for deleteRows
143 @todo: Retrospectively upgrade code using queryNumRows() to make use of new
144 features if necessary.
145 @todo: Upgrade code to make use of join feature, and upgrade documentation
146 of query*() methods to reflect this.
147 @todo: Upgrade code to make use of query() returning namedtuples.
148 @todo: addColumn / dropColumn should be merged with add/drop constraint, and
149 so be designed like createObject/dropObject.
150 @todo: Password option?
151 """
152
153 from __future__ import division, print_function
154 from future_builtins import map, zip
155
156 from collections import defaultdict, namedtuple
157 from itertools import takewhile
158 from keyword import iskeyword
159 import mx.ODBC.unixODBC as odbc
160 from operator import itemgetter
161 import os
162 import shutil
163 import sys
164 import time
165
166 from wsatools.CLI import CLI
167 import wsatools.CSV as csv
168 import wsatools.DbConnect.DbConstants as dbc
169 import wsatools.DbConnect.Schema as schema
170 from wsatools.Logger import Logger
171
172 from wsatools.SystemConstants import SystemConstants, WsaConstants
173 import wsatools.Utilities as utils
177 """
178 An interface to our database. Manages the mxODBC connection, and provides a
179 set of wrappers to common SQL statements.
180
181 @note: Test database connections get read-write user access by default.
182
183 @group Errors & Exceptions: CatalogueServerError, DisconnectError
184
185 """
187 """ An error generated by a command run directly on the catalogue
188 server.
189 """
190 pass
191
193 """ Exception thrown if a programmer tries to access a database
194 connection that is closed, to either attempt close it or
195 commit/rollback transactions. This is a programming error rather
196 than an exception caused by an unexpected loss of database
197 connection.
198 """
200 """
201 @param db: Name of database.
202 @type db: str
203 @param msg: Optional specific message for this exception.
204 @type msg: str
205
206 """
207 msg = msg.strip().rstrip('.')
208 super(DbSession.DisconnectError, self).__init__(
209 "%sAlready disconnected from database %s" %
210 (msg + '. ' if msg else msg, db))
211
212
213
214
215 _dbConnections = {}
216 _isLogReq = {}
217 """ Dictionary of flags to indicate a database has been modified,
218 referenced by database name.
219 """
220 _numSessions = {}
221 sysc = SystemConstants()
222
223
224
225
226
227 database = WsaConstants.loadDatabase
228
229 isLoadDb = True
230
231 isRealRun = True
232
233 isTrialRun = False
234
235 server = WsaConstants.loadServer
236
237
238 userName = (dbc.loadServerRoUsername() if os.getenv('USER') != 'scos' else
239 dbc.loadServerRwUsername())
240
241
242
243
244 _autoCommit = False
245 """ Automatically commit transactions, minimally logged so faster.
246 """
247 _dbSessionKey = None
248 """ A string that defines this connection as a unique type. This is used to
249 determine if an identical connection already exists for reuse.
250 """
251 _fixedCopyBug = False
252 _isDeadlocked = False
253 _isDirty = False
254 _isPersistent = True
255
256
257
258
259 CLI.progArgs.append(CLI.Argument("database", server + '.' + database,
260 isValOK=lambda x: x.count('.') <= 1))
261 CLI.progOpts += [
262 CLI.Option('t', "test",
263 "test run, don't alter database, just print statements to screen"),
264 CLI.Option('u', "user",
265 "database username", "NAME", userName)]
266
267
268
336
337
338
351
352
353
355 """ @return: Database path.
356 @rtype: str
357 """
358 return self.server + '.' + self.database
359
360
361
362 - def addColumn(self, tableName, colName, dataType, defaultStr=None,
363 notNull=True, constraint=None):
364 """
365 Adds a column to a table in the database.
366
367 @param tableName: Name of the table.
368 @type tableName: str
369 @param colName: Name of the new column.
370 @type colName: str
371 @param dataType: SQL data type of the new attribute.
372 @type dataType: str
373 @param defaultStr: Optional default value, in the form of an SQL string
374 e.g. "'none'" or "0".
375 @type defaultStr: str
376 @param notNull: If True, column disallows null values.
377 @type notNull: bool
378 @param constraint: Optional constraint name. Defaults to
379 tableName + colName + '_cons'.
380 @type constraint: str
381
382 """
383 sql = "ALTER TABLE %s ADD %s %s" % (tableName, colName, dataType)
384 if notNull:
385 sql += " not null"
386 if defaultStr:
387 constraint = constraint or tableName + colName + '_cons'
388 sql += " CONSTRAINT %s DEFAULT %s" % (constraint, defaultStr)
389
390 try:
391 self._executeScript(sql)
392 except odbc.ProgrammingError as error:
393 if notNull:
394 raise Exception("Cannot add a column with 'not null' specified"
395 " without specifying default values if data already exists"
396 " in the table. mxODBC says: (%s)" % error)
397
398
399
400 - def addIndex(self, indexSchema, ignoreNS=False, releasedOnly=False,
401 usingDefaultFG=False):
402 """
403 Adds an index to a specified table.
404
405 @param indexSchema: Object describing the schema of the index to add.
406 @type indexSchema: L{Schema.Index}
407 @param ignoreNS: If True, upon a "no such" error message reporting
408 non-existent tables, attributes etc. log the
409 message and continue, with False return state (no
410 object created).
411 @type ignoreNS: bool
412 @param releasedOnly: If True, only create the object if it is to be
413 released (i.e. upper case CREATE statements).
414 @type releasedOnly: bool
415 @param usingDefaultFG: If False, use the file group "Indices_FG".
416 @type usingDefaultFG: bool
417
418 @return: True, if index has been created.
419 @rtype: bool
420
421 """
422 if not releasedOnly or indexSchema.releasable:
423 Logger.addMessage("Creating index %s on table %s..." %
424 (indexSchema.name, indexSchema.tableName))
425 if usingDefaultFG:
426 indexSchema.fileGroup = None
427 try:
428 return self.createObjects([indexSchema], releasedOnly=releasedOnly)
429 except odbc.ProgrammingError as error:
430 ignoreCases = [" does not exist in the target table.",
431 " table does not exist in database "]
432 if ignoreNS and any(case in str(error) for case in ignoreCases):
433 Logger.addExceptionWarning(error, "Database message")
434 return False
435 else:
436 raise
437
438
439
440 - def alterColumn(self, tableName, colName, dataType, defaultStr=None,
441 notNull=True, constraint=None):
442 """
443 Alter a column of a table in the database.
444
445 @param tableName: Name of the table.
446 @type tableName: str
447 @param colName: Name of the existing column.
448 @type colName: str
449 @param dataType: Desired SQL data type for the column.
450 @type dataType: str
451 @param defaultStr: Optional default value, in the form of an SQL string
452 e.g. "'none'" or "0".
453 @type defaultStr: str
454 @param notNull: If True, column disallows null values.
455 @type notNull: bool
456 @param constraint: Optional constraint name. Defaults to
457 tableName + colName + '_cons'.
458 @type constraint: str
459
460 @todo: Should become more schema driven together with L{addColumn()}.
461
462 """
463 sql = "ALTER TABLE %s ALTER COLUMN %s %s%s" \
464 % (tableName, colName, dataType, " not null" if notNull else '')
465 if defaultStr:
466 constraint = constraint or tableName + colName + '_cons'
467 sql += " CONSTRAINT %s DEFAULT %s" % (constraint, defaultStr)
468
469 self._executeScript(sql)
470
471
472
474 """
475 Checks the database constraints on specified table. Logs any constraint
476 violations and throws an exception if any are detected.
477
478 @param tableName: Name of the table.
479 @type tableName: str
480
481 """
482
483 lockPathName = os.path.join(self.sysc.sysPath, "conlock-" + self.server)
484 if os.path.exists(lockPathName):
485 Logger.addMessage("<Info> Another CU is checking constraints on %s"
486 ". Waiting for %s to disappear..." % (self.server, lockPathName))
487 while os.path.exists(lockPathName):
488 time.sleep(10)
489
490 Logger.addMessage(
491 "Checking constraints on table %s (may hang if locked out by "
492 "other CUs also accessing %s.tempdb)" % (tableName, self.server))
493
494
495 file(lockPathName, 'w').write("Locking PID = %s.%s" %
496 (os.getenv('HOST'), os.getpid()))
497
498 try:
499 violations = self._executeScript(
500 "DBCC CHECKCONSTRAINTS (%r)" % tableName, wantResult=True)
501 finally:
502 os.remove(lockPathName)
503
504 if violations:
505
506 Logger.addMessage("<Warning> Constraint check failed.")
507 msg = "Constraint violations:"
508 violationData = defaultdict(list)
509 for table, constraint, case in violations:
510 key = (table.strip().strip('[').strip(']'),
511 constraint.strip().strip('[').strip(']'))
512 violationData[key].append(case)
513 for violation in violationData:
514 msg += "\nIn table %r, constraint %r fails in these cases:\n" \
515 % violation + '\n'.join(violationData[violation])
516
517 raise odbc.IntegrityError(msg)
518
519
520
521 - def checkSchema(self, tableSchema, releasedOnly=False):
522 """
523 Compares the given table schema as described in the .sql files to the
524 actual database schema to spot inconsistencies. If there any problems
525 a L{Schema.MismatchError} exception is thrown.
526
527 @param tableSchema: Schema of tables to check as provided by
528 L{Schema.parseTables()}.
529 @type tableSchema: list(Schema.Table)
530 @param releasedOnly: If True, only check released database tables.
531 @type releasedOnly: bool
532
533 """
534
535 errMsg = ''
536 errSchema = []
537 for table in tableSchema:
538 if not releasedOnly or table.releasable:
539
540 dbAttrs = self.queryColumnNames(table.name)
541
542 dbTypes = self.queryDataTypes(table.name)
543
544 scTypes = dict((attr.name, attr.dataType)
545 for attr in table.columns)
546
547
548 exScAttrs = [attr.name for attr in table.columns
549 if attr.name not in dbAttrs]
550 if exScAttrs:
551 errMsg += "\n%s - " % table
552 if len(exScAttrs) == len(table.columns):
553 errMsg += "table does not exist in the database"
554 else:
555 errMsg += (
556 "the following attributes have no matching columns "
557 "in the database: " + ', '.join(exScAttrs))
558 errSchema.append(table)
559
560
561 exDbAttrs = [attrName for attrName in dbAttrs
562 if attrName not in table.columns]
563 if exDbAttrs:
564 errMsg += (
565 "\n%s - the following database columns no longer exist "
566 "in the current schema: " % table + ', '.join(exDbAttrs))
567 errSchema.append(table)
568
569
570
571 reScAttrs = [attr.name for attr in table.columns
572 if attr not in exScAttrs]
573 reDbAttrs = [attrName for attrName in dbAttrs
574 if attrName not in exDbAttrs]
575
576
577 colSwapList = [(scAttr, dbAttr) for scAttr, dbAttr
578 in zip(reScAttrs, reDbAttrs)
579 if scAttr != dbAttr]
580 if colSwapList:
581 errMsg += (
582 "\n%s - the order of these database columns and schema "
583 "attributes are swapped: " % table +
584 utils.joinNested(colSwapList, subJoinStr="<->"))
585 errSchema.append(table)
586
587
588 typeDifList = \
589 [(attrName, scTypes.get(attrName), dbTypes.get(attrName))
590 for attrName in reDbAttrs
591 if scTypes.get(attrName) != dbTypes.get(attrName)]
592
593 if typeDifList:
594 errMsg += (
595 "\n%s - these attributes have a different data type in "
596 "the database than described by the current schema: " %
597 table + utils.joinNested(typeDifList, subJoinStr=":"))
598 errSchema.append(table)
599
600 if errMsg:
601 raise schema.MismatchError(errMsg, errSchema)
602
603
604
614
615
616
617 - def copyIntoTable(self, destinationTable, sourceTable, columns='*',
618 where=''):
619 """
620 Copy a selection of rows from one table into another.
621
622 @param destinationTable: Table to copy rows into.
623 @type destinationTable: str
624 @param sourceTable: Table to copy from, or a L{Join} object of
625 several tables.
626 @type sourceTable: str or L{Join}
627 @param columns: Optional select a subset of columns, as a
628 comma-separated list.
629 @type columns: str
630 @param where: Optional SQL where clause defining the
631 selection of rows to copy.
632 @type where: str
633
634 @return: Number of rows copied.
635 @rtype: int
636
637 @note: This command is heavily transaction logged.
638
639 """
640 sql = "INSERT INTO %s %s" % \
641 (destinationTable, SelectSQL(columns, sourceTable, where))
642
643
644
645
646 return self._executeScript(sql, wantRowCount=True)
647
648
649
650 - def copyTable(self, sourceTable, tableSchema, columns='*', where='',
651 fileGroup='', attachForeignKeys=True):
652 """
653 Create a new table from the selected contents of another table.
654
655 @param sourceTable: Table to copy from, or a L{Join} object of
656 several tables.
657 @type sourceTable: str or L{Join}
658 @param tableSchema: Schema of table to copy rows into that will be
659 created in this database.
660 @type tableSchema: L{Schema.Table}
661 @param columns: Optional select a subset of columns, as a
662 comma-separated list.
663 @type columns: str
664 @param where: Optional SQL where clause defining the
665 selection of rows to copy.
666 @type where: str
667 @param fileGroup: Optionally supply a different file group to
668 create the new table on other than the
669 default.
670 @type fileGroup: str
671 @param attachForeignKeys: If True, foreign key constraints are attached
672 to the new table.
673 @type attachForeignKeys: bool
674
675 @return: Number of rows copied.
676 @rtype: int
677
678 @note: This command should be minimally logged.
679 @note: This method relies on Microsoft T-SQL, rather than standard SQL.
680 @note: Changing file-group is an experimental untested feature so far.
681
682 """
683 if not self._fixedCopyBug:
684
685
686
687 self.freeProcCache()
688 self._executeScript("DBCC DROPCLEANBUFFERS")
689 self._fixedCopyBug = True
690
691 if fileGroup:
692 adminDb = DbSession(self.server + '.' + self.sysc.adminDatabase,
693 autoCommit=True, isTrialRun=self.isTrialRun,
694 userName=self.userName)
695 try:
696 adminDb._executeScript("ALTER DATABASE %s MODIFY FILEGROUP %s "
697 "DEFAULT" % (self.database, fileGroup))
698 except odbc.ProgrammingError as error:
699 if "filegroup already has the 'DEFAULT' property set" \
700 not in str(error):
701 raise
702
703 Logger.addMessage("%s already has the 'DEFAULT' property "
704 "set in %s" % (fileGroup, self.database))
705
706 try:
707 rowsCopied = self._executeScript(
708 str(SelectSQL(columns, sourceTable, where)
709 ).replace(" FROM ", " INTO %s FROM " % tableSchema.name, 1),
710 wantRowCount=True)
711 if fileGroup:
712 self.commitTransaction()
713 finally:
714 if fileGroup:
715 adminDb._executeScript("ALTER DATABASE %s MODIFY FILEGROUP"
716 " [PRIMARY] DEFAULT" % self.database)
717
718
719 self.createObjects(tableSchema.constraints,
720 releasedOnly=not attachForeignKeys)
721
722 return rowsCopied
723
724
725
726 - def createObjects(self, objSchema, overWrite=False, ignoreNS=False,
727 releasedOnly=False, haltOnError=True):
728 """
729 Create new database objects using the ordered list of object schema
730 definitions provided from L{Schema} methods. Tables are created with
731 primary key constraints but no foreign key constraints, use
732 L{createTable()} instead if they are required.
733
734 @param objSchema: An ordered list of objects derived from
735 L{Schema._Schema}.
736 @type objSchema: list(Schema._Schema)
737 @param overWrite: If True, allow pre-existing database objects with
738 the same name to be overwritten.
739 @type overWrite: bool
740 @param ignoreNS: If True, do not throw an exception if an object
741 could not be created because some other object in
742 the database does not exist. The function will
743 return False - object could not be created, but
744 no warning message will be displayed.
745 @type ignoreNS: bool
746 @param releasedOnly: If True, only create the object if it is to be
747 released (i.e. upper case CREATE statements).
748 @type releasedOnly: bool
749 @param haltOnError: If True, raise an exception as soon as it occurs
750 otherwise continue with the next object and raise
751 an error at the end.
752 @type haltOnError: bool
753
754 @return: True, if all objects created successfully.
755 @rtype: bool
756
757 """
758 allCreated = True
759 lastError = None
760 for dbObj in objSchema:
761 if not releasedOnly or dbObj.releasable:
762 if overWrite:
763 self.dropObjects([dbObj])
764 try:
765 self._executeScript(dbObj.createSQL())
766
767
768 if releasedOnly and isinstance(dbObj, schema.Function):
769 self._executeScript("GRANT EXEC ON %s.%s TO public"
770 % (dbc.loadServerDbsOwner(), dbObj))
771
772 except (odbc.ProgrammingError, odbc.InterfaceError) as error:
773 if ignoreNS:
774 Logger.addMessage("<WARNING> %s" % error,
775 alwaysLog=False)
776
777 allCreated = False
778 ignoreCases = ["Invalid object name", "does not exist"]
779 if (not ignoreNS or
780 not any(case in str(error) for case in ignoreCases)) \
781 and (overWrite or
782 "There is already an" not in str(error)
783 and "already exists" not in str(error)):
784
785 if haltOnError:
786 raise
787 else:
788 Logger.addExceptionWarning(error)
789 Logger.addMessage("continuing...")
790 lastError = error
791
792 except odbc.IntegrityError as error:
793
794 fkMsgParts = ["ALTER TABLE statement conflicted",
795 "FOREIGN KEY constraint", dbObj.name]
796
797 if not all(part in str(error) for part in fkMsgParts):
798 if haltOnError:
799 raise
800 else:
801 Logger.addExceptionWarning(error)
802 Logger.addMessage("continuing...")
803 lastError = error
804
805 if lastError:
806 raise lastError
807
808 return allCreated
809
810
811
813 """
814 Creates database column statistics for given tables.
815
816 @param tables: List of table schema.
817 @type tables: list(Schema.Table)
818
819 """
820 sql = "CREATE STATISTICS stats_%s_%s ON %s (%s)"
821 for table in tables:
822 Logger.addMessage("Creating statistics on %s..." % table)
823 for column in table.columns:
824 self._executeScript(sql % (table, column, table, column))
825
826
827
828 - def createTable(self, tableSchema, dbSchema=[], overWrite=False):
829 """
830 Create a new database table using the schema provided from
831 L{Schema.parseTables()}, all constraints that reference this table
832 found in the given schema list will be applied.
833
834 @param tableSchema: Parsed schema describing the table that is to be
835 created.
836 @type tableSchema: Schema.Table
837 @param dbSchema: Schemas for every table in the database that may
838 have a foreign key constraint that references
839 the table to be created.
840 @type dbSchema: list(Schema.Table)
841 @param overWrite: If True, allow current db table to be overwritten.
842 @type overWrite: bool
843
844 @return: True, if table really was created.
845 @rtype: bool
846
847 """
848 if overWrite:
849 self.dropTable(tableSchema, dbSchema)
850
851
852 if self.createObjects([tableSchema]):
853
854 self.createObjects([con for con in tableSchema.constraints
855 if type(con) is schema.ForeignKeyConstraint])
856
857
858 foreignKeys = []
859 for table in dbSchema:
860 foreignKeys += [con for con in table.constraints
861 if type(con) is schema.ForeignKeyConstraint
862 and con.referenceTable == tableSchema.name]
863
864 self.createObjects(foreignKeys, ignoreNS=True)
865 return True
866
867 return False
868
869
870
894
895
896
897 - def delete(self, tableName, whereStr=''):
898 """
899 Deletes rows from a database table (to delete an entire table
900 use truncate() instead) based on a flexible SQL WHERE clause.
901
902 @param tableName: Name of the database table.
903 @type tableName: str
904 @param whereStr: SQL WHERE clause (without the "WHERE").
905 @type whereStr: str
906
907 @return: Number of rows deleted.
908 @rtype: int
909
910 """
911 if not whereStr:
912 return self.truncate(tableName)
913 else:
914 return self._executeScript("DELETE FROM %s WHERE %s" % (tableName,
915 whereStr), wantRowCount=True)
916
917
918
919 - def deleteRows(self, tableName, rowIndexList=[]):
920 """
921 Deletes selected rows from a database table (to delete an entire table
922 use truncate() instead).
923
924 @param tableName: Name of the database table.
925 @type tableName: str
926 @param rowIndexList: List of specific rows to delete, in the form
927 (attrName, PyValue), e.g. [("multiframeId", 101)],
928 all rows are deleted otherwise. If PyValue is a
929 list then executemany is invoked.
930 @type rowIndexList: list(tuple(str, object))
931
932 @return: Number of rows deleted.
933 @rtype: int
934
935 """
936 if not rowIndexList:
937 return self.truncate(tableName)
938
939 sql = "DELETE FROM " + tableName
940 manyList = []
941 if rowIndexList:
942 sql += self._createWhereStr(rowIndexList)
943
944 for _key, index in rowIndexList:
945 if type(index) is list:
946 manyList += [tuple([value]) for value in index]
947
948 return self._executeScript(sql, manyList, wantRowCount=True)
949
950
951
953 """
954 Removes a column from a table in the database.
955
956 @param tableName: Name of the table.
957 @type tableName: str
958 @param colName: Name of the new column.
959 @type colName: str
960
961 """
962 constraints = dict(self.query(
963 selectStr="C.name, O.name",
964 fromStr="syscolumns AS C, sysobjects AS O "
965 "LEFT JOIN sysobjects AS T ON O.parent_obj = T.id",
966 whereStr="O.id=C.cdefault AND O.name NOT LIKE '%%dtproper%%' AND "
967 "O.name NOT LIKE 'dt[_]%%' AND T.name = %r " % tableName +
968 "AND (O.name LIKE 'DF__%' OR O.name LIKE '%_cons')"))
969
970 if colName in constraints:
971 self._executeScript("ALTER TABLE %s DROP CONSTRAINT %s" %
972 (tableName, constraints[colName]))
973
974 self._executeScript("ALTER TABLE %s DROP COLUMN %s" %
975 (tableName, colName))
976
977
978
980 """
981 Drop database objects defined in a list of database L{Schema} objects.
982 Tables cannot be dropped if referenced by foreign key constraints, in
983 this case use L{dropTable()}.
984
985 @param objSchema: An ordered list of objects derived from
986 L{Schema._Schema}.
987 @type objSchema: list(Schema._Schema)
988
989 """
990 for dbObj in objSchema:
991 try:
992 self._executeScript(dbObj.dropSQL())
993 except odbc.ProgrammingError as error:
994
995 if "does not exist" not in str(error) and \
996 "Could not drop constraint" not in str(error):
997 raise
998
999
1000
1001 - def dropTable(self, tableSchema, dbSchema):
1002 """
1003 Drops a table from the database, first removing any foreign key
1004 constraints that reference the table.
1005
1006 @param tableSchema: Parsed schema describing the table that is to be
1007 created.
1008 @type tableSchema: Schema.Table
1009 @param dbSchema: Schemas for every table in the database that may
1010 have a foreign key constraint that references
1011 the table to be created.
1012 @type dbSchema: list(Schema.Table)
1013
1014 """
1015
1016 foreignKeys = []
1017 for table in dbSchema:
1018 foreignKeys += [con for con in table.constraints
1019 if type(con) is schema.ForeignKeyConstraint
1020 and con.referenceTable == tableSchema.name]
1021 self.dropObjects(foreignKeys)
1022
1023 self.dropObjects([tableSchema])
1024
1025
1026
1028 """
1029 Allows you to read from tables that are being written to and hence are
1030 presently locked. Warning, use at your own risk!
1031
1032 """
1033 self._executeScript("SET TRANSACTION ISOLATION LEVEL READ UNCOMMITTED")
1034 self._isDirty = True
1035
1036
1037
1039 """ @return: True, if table with name given in tableName parameter
1040 exists in current database.
1041 @rtype: bool
1042 """
1043 return self.queryColumnNames(tableName) != []
1044
1045
1046
1048 """ Calls DBCC FREEPROCCACHE.
1049 """
1050 self._executeScript("DBCC FREEPROCCACHE")
1051
1052
1053
1055 """ @return: The volume on the catalogue server with the most free
1056 space, e.g. G:\.
1057 @rtype: str
1058 """
1059 mostSpace = 0
1060 bestVolume = ''
1061 for volume in self.sysc.catServerVolumes(self.server):
1062 line = self.runOnServer("dir " + volume, isTrialRunSafe=True)[-1]
1063 try:
1064 freeSpace = long(line.split()[-3].replace(',', ''))
1065 except ValueError:
1066 raise DbSession.CatalogueServerError("Can't read volume %s. "
1067 "Check SystemConstants.catServerVolumes is correct for %s."
1068 % (volume, self.server))
1069
1070 freeSpace /= self.sysc.one_gigabyte
1071 if freeSpace > mostSpace:
1072 mostSpace = freeSpace
1073 bestVolume = volume
1074
1075 return bestVolume
1076
1077
1078
1080 """
1081 Temporarily drop the database connection, whilst keeping the DbSession
1082 object alive. When an attempt to access the database is next made, the
1083 DbSession will automatically try reconnect to the database.
1084
1085 """
1086 self._closeConnection()
1087 DbSession._dbConnections[self._dbSessionKey] = (None, None)
1088
1089
1090
1092 """
1093 Grants access to a given user to access the database.
1094
1095 @param userName: User name that will have permissions set.
1096 @type userName: str
1097
1098 """
1099 try:
1100 self._executeScript(
1101 "EXEC sp_grantdbaccess %s, %s" % tuple(2 * [userName]))
1102 self._isLogReq[self.database] = True
1103 except odbc.ProgrammingError as error:
1104 if "already exists" not in str(error):
1105 raise
1106
1107 for role in ["db_datareader"]:
1108 self._executeScript(
1109 "EXEC sp_addrolemember %s, %s" % (role, userName))
1110 self._isLogReq[self.database] = True
1111
1112 self._executeScript("GRANT SHOWPLAN TO " + userName)
1113
1114
1115
1116 - def insertData(self, tableName, rowData, enforcePKC=False):
1117 """
1118 Insert some data into the database.
1119
1120 @param tableName: Name of the database table.
1121 @type tableName: str
1122 @param rowData: Sequence of unformatted data values for each column,
1123 e.g. ("astring", 0, 5.4).
1124 @type rowData: sequence
1125 @param enforcePKC: If True, throw an exception if row already exists,
1126 otherwise just log a warning.
1127 @type enforcePKC: bool
1128
1129 @return: Number of rows inserted (either 1 or 0).
1130 @rtype: int
1131
1132 """
1133 if rowData:
1134 row = "(%s)" % ", ".join(map(self._sqlString, rowData))
1135
1136 if not rowData or row == "()":
1137 raise Exception(
1138 "No data has been supplied to insert into table %s" % tableName)
1139
1140 try:
1141 return self._executeScript(
1142 "INSERT INTO %s VALUES %s" % (tableName, row), wantRowCount=True)
1143 except odbc.IntegrityError as error:
1144 if not enforcePKC and "Violation of PRIMARY KEY" in str(error):
1145 Logger.addMessage(
1146 "<Warning> data row %s already exists in table %s" %
1147 (row, tableName))
1148 return 0
1149 else:
1150 raise
1151
1152
1153
1154 - def query(self, selectStr, fromStr, whereStr='', groupBy='', orderBy='',
1155 firstOnly=False, default=None, ResultsTuple=None,
1156 forceOneProc=False, forcePrOrder=False):
1157 """
1158 Wrapper function to perform a general select-from-where query.
1159
1160 @param selectStr: SQL SELECT string; comma-separated column names.
1161 @type selectStr: str
1162 @param fromStr: SQL FROM string; single table name or L{Join} object.
1163 @type fromStr: str or L{Join}
1164 @param whereStr: Optional SQL WHERE clause.
1165 @type whereStr: str
1166 @param groupBy: Optional SQL GROUP BY string.
1167 @type groupBy: str
1168 @param orderBy: Optional SQL ORDER BY string.
1169 @type orderBy: str
1170 @param firstOnly: If True, just return the first entry in result set.
1171 @type firstOnly: bool
1172 @param default: Value to return if querying first entry and no
1173 entries found.
1174 @type default: object
1175 @param ResultsTuple: Convert results to this specific namedtuple type.
1176 @type ResultsTuple: type
1177 @param forceOneProc: Force query to execute only on a single-processor
1178 (disable parallelism).
1179 @type forceOneProc: bool
1180 @param forcePrOrder: Force where clause predicates to be evaluated in
1181 the stated order.
1182 @type forcePrOrder: bool
1183
1184 @return: List of tuples (or scalars if only one attribute selected)
1185 containing values for every attribute of every entry. The
1186 tuples will be namedtuples of the database column names, if
1187 possible - i.e. no duplicate column names or functions without
1188 aliases. If firstOnly then a scalar or a tuple (or namedtuple)
1189 is returned.
1190 @rtype: list
1191
1192 @todo: Either replace arguments with one SelectSQL argument or:
1193 replicate SelectSQL.__init__() interface:
1194 selectStr becomes select, fromStr becomes table, whereStr
1195 becomes where - then remove code duplication by creating an
1196 SelectSQL object here.
1197 """
1198
1199
1200
1201
1202
1203
1204 if firstOnly:
1205 selectStr = "TOP 1 %s" % selectStr
1206
1207 sql = "SELECT %s FROM %s" % (selectStr, fromStr)
1208
1209 if isinstance(fromStr, Join):
1210 sql += " WHERE "
1211 sql += fromStr.whereStr
1212 if fromStr.whereStr and whereStr \
1213 and not whereStr.strip().startswith("AND"):
1214 sql += " AND "
1215
1216 elif whereStr.startswith(("GROUP BY", "ORDER BY")):
1217
1218 sql += " "
1219
1220 elif whereStr:
1221 sql += " WHERE "
1222
1223 sql += whereStr
1224 if groupBy:
1225 sql += " GROUP BY " + groupBy
1226
1227 if orderBy:
1228 sql += " ORDER BY " + orderBy
1229
1230 options = []
1231 if forceOneProc:
1232 options.append("MAXDOP 1")
1233
1234 if forcePrOrder:
1235 options.append("FORCE ORDER")
1236
1237 if options:
1238 sql += " OPTION (%s)" % ', '.join(options)
1239
1240 result = \
1241 self._executeScript(sql, wantResult=True, wantAll=not firstOnly)
1242
1243 if firstOnly and (result is None or result[0] is None):
1244
1245 if default is None and ',' in selectStr:
1246 result = tuple([None] * len(selectStr.split(',')))
1247 else:
1248 return default
1249
1250 if selectStr.endswith('*') or ',' in selectStr:
1251
1252 if not ResultsTuple:
1253 metadata = self._getCursor().description or []
1254 columns = [entry[0] for entry in metadata]
1255
1256
1257
1258 if columns and not any(not column or iskeyword(column)
1259 for column in columns) \
1260 and len(columns) == len(set(columns)):
1261
1262 ResultsTuple = namedtuple("Query", ' '.join(columns))
1263
1264 if ResultsTuple:
1265 if firstOnly:
1266 return ResultsTuple(*result)
1267 else:
1268 return [ResultsTuple(*row) for row in result]
1269
1270 return result
1271
1272 elif firstOnly:
1273
1274 return result[0]
1275
1276 else:
1277
1278 return [eachTuple[0] for eachTuple in result]
1279
1280
1281
1283 """
1284 Determine whether the table contains nullable columns.
1285
1286 @param queryTable: Name of table to query.
1287 @type queryTable: str
1288 @param columns: Sequence of columns to check.
1289 @type columns: sequence(str)
1290
1291 @return: True if any of the columns are nullable.
1292 @rtype: bool
1293
1294 """
1295 cursor = self._getCursor()
1296 cursor.columns(table=queryTable)
1297 return any(col[10] is not odbc.SQL.NO_NULLS
1298 for col in cursor.fetchall() if col[3] in columns)
1299
1300
1301
1303 """
1304 Wrapper to just query the maximum value of the given attributes.
1305
1306 @param attrs: Name of the attributes in the database (comma-separated).
1307 @type attrs: str
1308 @param table: Table with the attributes.
1309 @type table: str or L{Join}
1310 @param where: Optional SQL WHERE clause.
1311 @type where: str
1312
1313 @return: Maximum value of that attribute. Defaults to None.
1314 @rtype: scalar
1315
1316 """
1317
1318
1319
1320 maxAttrs = ", ".join("MAX(%s)" % attr for attr in csv.values(attrs))
1321 return self.query(maxAttrs, table, where)[0]
1322
1323
1324
1326 """
1327 Queries database to produce a list of column names in the table.
1328
1329 @param tableName: Name of the database table.
1330 @type tableName: str
1331
1332 @return: A list of the column names, in order, from the given table.
1333 @rtype: list(str)
1334
1335 """
1336 return self.query(
1337 selectStr="COLUMN_NAME",
1338 fromStr="INFORMATION_SCHEMA.COLUMNS",
1339 whereStr="TABLE_NAME=%r ORDER BY ORDINAL_POSITION" % tableName)
1340
1341
1342
1344 """
1345 Queries database to produce a dictionary of data-types for each column.
1346
1347 @param table: Name of the database table, with optional full-path.
1348 @type table: str
1349
1350 @return: A dictionary where the keywords are the column names and the
1351 values are the data-types of those columns.
1352 @rtype: dict(str:str)
1353
1354 """
1355 server, database, tableName = self._splitTablePath(table)
1356
1357 if server == self.server:
1358 attrs = self.query(
1359 selectStr="COLUMN_NAME, DATA_TYPE, CHARACTER_MAXIMUM_LENGTH",
1360 fromStr=database + ".INFORMATION_SCHEMA.COLUMNS",
1361 whereStr="TABLE_NAME=" + self._sqlString(tableName))
1362
1363 return dict((name, dataType +
1364 ('(%s)' % dataSize if dataSize else ''))
1365 for name, dataType, dataSize in attrs)
1366 else:
1367 db = '.'.join([server, database, dbc.loadServerDbsOwner()])
1368 attrs = self.query(
1369 selectStr="sc.name, st.name, sc.length",
1370 fromStr="%s.sysobjects AS so, %s.syscolumns AS sc, "
1371 "%s.systypes AS st" % tuple([db] * 3),
1372 whereStr="so.id = sc.id AND st.xtype = sc.xtype AND so.name=" +
1373 self._sqlString(tableName))
1374
1375 return dict((name, dataType +
1376 (('(%s)' % dataSize) if dataType == 'varchar' else ''))
1377 for name, dataType, dataSize in attrs)
1378
1379
1380
1382 """
1383 Queries database to determine if any rows exist in the given table
1384 that satisfy the WHERE clause.
1385
1386 @param table: Table to query. A single table name or a L{Join} object.
1387 @type table: str or L{Join}
1388 @param where: Optional WHERE clause.
1389 @type where: str
1390 @param pk: Optionally supply primary key to speed up query. Not
1391 required if table is a Join.
1392 @type pk: str
1393
1394 @return: True, if there are any rows in the table that satisfy the
1395 WHERE clause.
1396 @rtype: bool
1397
1398 """
1399 columns = pk or '*'
1400 if isinstance(table, Join):
1401 columns = table.joinCols
1402
1403
1404 result = self.query(columns, table, where, firstOnly=True)
1405 if not isinstance(result, tuple):
1406 return result is not None
1407 else:
1408 return any(entry is not None for entry in result)
1409
1410
1411
1412 - def queryNumRows(self, tableName, whereStr='', groupBy='',
1413 distinctAttr=''):
1414 """
1415 Wrapper to just query the number of rows in a given table. Will also
1416 return the total count for a "SELECT count(*) ... GROUP BY"-type query,
1417 if the "GROUP BY" is specified in the whereStr keyword, rather than the
1418 groupBy keyword.
1419
1420 @param tableName: Table to query.
1421 @type tableName: str
1422 @param whereStr: WHERE string, optional where clause.
1423 @type whereStr: str
1424 @param groupBy: Specify a comma-separated list of columns in which
1425 to group counts by.
1426 @type groupBy: str
1427 @param distinctAttr: With distinct values of this attribute.
1428 @type distinctAttr: str
1429
1430 @return: Number of rows, if no groupBy keyword, otherwise a list of
1431 the groupBy attribute values in ascending order with counts.
1432 @rtype: int or list(tuple)
1433
1434 """
1435 attr = ('DISTINCT ' + distinctAttr) if distinctAttr else '*'
1436 cmd = "COUNT(%s) AS count" % attr
1437 for _attempt in range(2):
1438 try:
1439 if groupBy:
1440 return self.query(groupBy + ", " + cmd, tableName, whereStr,
1441 groupBy=groupBy, orderBy=groupBy)
1442 else:
1443 results = self.query(cmd, tableName, whereStr)
1444 if len(results) == 1:
1445 return results[0]
1446 else:
1447 return len(results)
1448
1449 except odbc.ProgrammingError as error:
1450 if "Arithmetic overflow" in str(error):
1451 cmd = "COUNT_BIG(%s) AS count" % attr
1452 else:
1453 raise
1454
1455
1456
1458 """
1459 Determine the row size of the given table or the select query result.
1460
1461 @param queryTable: Either just a single table name or a select query.
1462 @type queryTable: str or SelectSQL
1463
1464 @return: Size in bytes of a row.
1465 @rtype: int
1466
1467 """
1468 cursor = self._getCursor()
1469 if isinstance(queryTable, SelectSQL):
1470 self.query(queryTable.selectStr, queryTable.fromStr,
1471 firstOnly=True)
1472 print(cursor.description)
1473 raise Exception("This feature is deprecated due to ODBC "
1474 "inconsistencies. Check that the values printed "
1475 "above are consistent and correct between servers")
1476
1477
1478 return sum(col[4] for col in cursor.description)
1479 else:
1480 cursor.columns(table=queryTable)
1481 return sum(col[7] for col in cursor.fetchall())
1482
1483
1484
1496
1497
1498
1512
1513
1514
1516 """
1517 Runs a command directly on the database server.
1518
1519 @param cmd: Command-line command to run on the database server.
1520 @type cmd: str
1521 @param isTrialRunSafe: Can this command be run in trial-run mode?
1522 @type isTrialRunSafe: bool
1523
1524 @return: A result set.
1525 @rtype: list(str)
1526
1527 """
1528
1529 script = "EXEC %s..xp_cmdshell '%s'" % (self.sysc.adminDatabase, cmd)
1530 isTrialRun = self.isTrialRun
1531 try:
1532 if isTrialRunSafe:
1533 self.isTrialRun = False
1534
1535 results = self._executeScript(script, wantResult=True)
1536 finally:
1537 self.isTrialRun = isTrialRun
1538
1539
1540 results = [(result if result else '')
1541 for result in map(itemgetter(0), results)]
1542
1543
1544 if results and not results[-1]:
1545 results = results[:-1]
1546
1547
1548 filesysCmds = \
1549 ("dir", "move", "copy", "mkdir", "rmdir", "del", "rename")
1550
1551 if results and cmd.lower().strip().startswith(filesysCmds) \
1552 and any("Access is denied" in result for result in results):
1553
1554 raise DbSession.CatalogueServerError("%s: %s\n%s"
1555 % (self.server, cmd, '\n'.join(results)))
1556
1557 return results
1558
1559
1560
1562 """
1563 Returns the path to the given file in the catalogue server's share
1564 directory for outgests and ingests.
1565
1566 @param fileName: Name of the file on the share.
1567 @type fileName: str
1568
1569 @todo: Deprecate!
1570
1571 """
1572 return self.sysc.dbSharePath(fileName) + (os.sep if not fileName else '')
1573
1574
1575
1577 """ Shrinks tempdb and its log files.
1578 """
1579 self._executeScript("DBCC SHRINKDATABASE (tempdb)")
1580
1581
1582
1584 """
1585 Full database path to the given table.
1586
1587 @param tableName: Name of table.
1588 @type tableName: str
1589 @param ownerName: Optionally specify a different table owner name than
1590 the database owner.
1591 @type ownerName: str
1592
1593 @return: Full path to database table, e.g. ahmose.WSA.dbo.tableName.
1594 @rtype: str
1595
1596 """
1597 return '.'.join([str(self), ownerName, str(tableName)])
1598
1599
1600
1602 """
1603 Checks to see if the share path for this database is active. Raises a
1604 SystemExit if there are problems.
1605
1606 """
1607 testDir = "outgest_test_%s_%s" % (os.getenv("HOST"), os.getpid())
1608 try:
1609 os.mkdir(self.sharePath(testDir))
1610 os.rmdir(self.sharePath(testDir))
1611 except OSError as error:
1612 raise SystemExit("<ERROR> " + str(error))
1613
1614
1615
1617 """
1618 Wipes a database table - use with care!
1619
1620 @param tableName: Name of database table to wipe.
1621 @type tableName: str
1622
1623 @return: Number of rows deleted from table.
1624 @rtype: int
1625
1626 @todo: Deprecate in favour of delete() without where clause?
1627 @todo: Always call truncate over delete from if not where clause, with
1628 the addition of a drop and add constraints?
1629 """
1630 try:
1631
1632 return self._executeScript("TRUNCATE TABLE %s" % tableName,
1633 wantRowCount=True)
1634 except odbc.IntegrityError:
1635
1636 return self._executeScript("DELETE %s" % tableName,
1637 wantRowCount=True)
1638
1639
1640
1642 """
1643 Converts the given catalogue server local path into a UNC path.
1644
1645 @param filePathName: Catalogue server local path.
1646 @type filePathName: str
1647
1648 @return: Catalogue server UNC path.
1649 @rtype: str
1650
1651 """
1652 return (2 * self.sysc.catServerFileSep + self.server.upper()
1653 + self.sysc.catServerFileSep + filePathName.replace(':', ''))
1654
1655
1656
1657 - def update(self, tableName, entryList, where='', fromTables=''):
1658 """
1659 Update specific entries in database table rows specified with a
1660 flexible SQL WHERE clause.
1661
1662 @param tableName: Name of the database table.
1663 @type tableName: str
1664 @param entryList: List entries to update, in the form (attrName,
1665 SQLvalue) with value as an SQL formatted string,
1666 as defined by L{DbSession._sqlString()}, e.g.
1667 [("skyID", -9999), ("raMoon", "1.01"),
1668 ("trackSys", "'J2000'")]. (NB: this may also be
1669 an explicit string for the SQL SET statement e.g.
1670 "trackSys='J2000'")
1671 @type entryList: list(tuple(str, str)) or str
1672 @param where: SQL WHERE clause (without the "WHERE").
1673 @type where: str
1674 @param fromTables: Optional source to update from (SQL FROM syntax),
1675 single table name or L{Join} object.
1676 @type fromTables: str or L{Join}
1677
1678 @return: Number of rows updated.
1679 @rtype: int
1680
1681 """
1682
1683 if not entryList:
1684 raise Exception("No entries supplied to update table " + tableName)
1685
1686 sql = "UPDATE " + tableName
1687 sql += " SET " + utils.joinNested(entryList, subJoinStr="=")
1688 if fromTables:
1689 sql += " FROM %s" % fromTables
1690
1691 if where or isinstance(fromTables, Join):
1692 sql += " WHERE "
1693 if isinstance(fromTables, Join):
1694 sql += fromTables.whereStr
1695 if fromTables.whereStr and where:
1696 sql += " AND "
1697 sql += where
1698
1699 numRows = self._executeScript(sql, wantRowCount=True)
1700 if self.isTrialRun:
1701 if isinstance(fromTables, Join):
1702 fromStr = fromTables.fromStr
1703 if fromTables.whereStr and where:
1704 where += " AND " + fromTables.whereStr
1705 else:
1706 where = where or fromTables.whereStr
1707 elif fromTables:
1708 fromStr = fromTables
1709 else:
1710 fromStr = tableName
1711
1712 if tableName not in fromStr:
1713 fromStr += ', ' + tableName
1714
1715 numRows = self.queryNumRows(fromStr, where)
1716
1717 return numRows
1718
1719
1720
1721 - def updateEntries(self, tableName, entryList, rowIndexList=[]):
1722 """
1723 Update specific entries in a database table row.
1724
1725 @param tableName: Name of the database table.
1726 @type tableName: str
1727 @param entryList: List entries to update, in the form (attrName,
1728 SQLvalue) with value as an SQL formatted string,
1729 as defined by L{DbSession._sqlString()}, e.g.
1730 [("skyID", -9999), ("raMoon", "1.01"),
1731 ("trackSys", "'J2000'")]. (NB: this may also be
1732 an explicit string for the SQL SET statement e.g.
1733 "trackSys='J2000'")
1734 @type entryList: list(tuple(str, str)) or str
1735 @param rowIndexList: List of specific rows to update, in the form
1736 (attrName, PyValue), e.g. [("multiframeID", 101)],
1737 all rows are updated otherwise. If PyValue is a
1738 list then executemany is invoked.
1739 @type rowIndexList: list(tuple(str, object))
1740
1741 @return: Number of rows updated.
1742 @rtype: int
1743
1744 """
1745
1746 if not tableName:
1747 raise Exception("No table name has been supplied")
1748 if not entryList:
1749 raise Exception("No entries supplied to update table " + tableName)
1750
1751 sql = "UPDATE " + tableName
1752 sql += " SET " + utils.joinNested(entryList, subJoinStr="=")
1753 manyList = []
1754 if rowIndexList:
1755 sql += self._createWhereStr(rowIndexList)
1756
1757 for _key, index in rowIndexList:
1758 if type(index) is list:
1759 manyList += [tuple([value]) for value in index]
1760
1761 return self._executeScript(sql, manyList, wantRowCount=True)
1762
1763
1764
1766 """ Updates database index statistics.
1767 """
1768 Logger.addMessage("Updating database index statistics...")
1769 self._executeScript("EXEC sp_updatestats")
1770
1771
1772
1794
1795
1796
1798 """
1799 Converts a list of tuple pairs of the form (attribute name, value) into
1800 a correctly formatted SQL WHERE clause. If the row index list contains
1801 a list of indicies for a given attribute, then executemany markers are
1802 used.
1803
1804 @param rowIndexList: List of specific rows to update, in the form
1805 (attrName, PyValue), e.g. [("multiframeId", 101)].
1806 @type rowIndexList: list(tuple(str, object))
1807
1808 @return: A SQL WHERE clause.
1809 @rtype: str
1810
1811 """
1812 whereList = [(key, self._sqlString(index))
1813 for key, index in rowIndexList
1814 if type(index) is not list]
1815
1816
1817
1818
1819 paramStyle = {'qmark': '?', 'pyformat': '%(x)s'}[odbc.paramstyle]
1820 whereList += [(key, paramStyle) for key, index in rowIndexList
1821 if type(index) is list]
1822
1823 return " WHERE " + utils.joinNested(whereList, subJoinStr="=",
1824 joinStr=" AND ")
1825
1826
1827
1828 - def _executeScript(self, script, parameters=None, wantResult=False,
1829 wantAll=True, wantRowCount=False):
1830 """
1831 Send any SQL script to the server to execute. Specific DBMS errors that
1832 are just warnings are trapped to prevent processing from halting.
1833
1834 @param script: The script which is to be processed.
1835 @type script: str
1836 @param parameters: Optionally supply parameter set if script contains
1837 executemany wildcards.
1838 @type parameters: list(tuple)
1839 @param wantResult: If True, return results.
1840 @type wantResult: bool
1841 @param wantAll: If wantResult is True and wantAll is True then
1842 return all results otherwise just first result.
1843 @type wantAll: bool
1844 @param wantRowCount: If True, and results aren't returned then a count
1845 of affected rows are returned instead.
1846 @param wantRowCount: bool
1847
1848 @return: The result of the executed script (if wantResult), or number
1849 of rows affected (if wantRowCount).
1850 @rtype: list(tuple) or int
1851
1852 """
1853 if self.isTrialRun and not script.upper().startswith("SELECT"):
1854 print(script)
1855 if wantResult:
1856 return [] if wantAll else None
1857 elif wantRowCount:
1858 return 0
1859 else:
1860 return
1861
1862
1863
1864 cursor = self._getCursor()
1865 try:
1866 if parameters:
1867 cursor.executemany(script, parameters)
1868 else:
1869 cursor.execute(script)
1870
1871 except odbc.Warning as error:
1872 if "xpsql.cpp" in str(error) and "GetProxyAccount" in str(error):
1873 Logger.addMessage("<ERROR> Database user name %s does not have"
1874 " bcp outgest permissions set on database %s.%s. Please fix "
1875 "the database permissions." % (self.userName, self.server,
1876 self.database))
1877
1878 if "DBCC execution completed" not in str(error):
1879 Logger.addExceptionWarning(error, "Database message")
1880 if "would be truncated" in str(error):
1881 Logger.addMessage("Script executed: " + script,
1882 alwaysLog=False)
1883
1884 except (odbc.OperationalError, odbc.InternalError) as error:
1885 isDeadlock = "deadlock victim. Rerun the transaction" in str(error)
1886 brokenMsgs = ["Function sequence error",
1887 "Read from SQL server failed",
1888 "Read from the server failed",
1889 "Write to SQL Server failed",
1890 "Write to the server failed",
1891 "Communication link failure"]
1892
1893 persist = self._isPersistent and self._autoCommit
1894 if persist and (any(message in str(error) for message in brokenMsgs)
1895 or isDeadlock and not self._isDeadlocked):
1896 self._isDeadlocked = isDeadlock
1897 msg = ("Deadlock occurred in" if self._isDeadlocked else
1898 "Lost connection to")
1899 msg += " %s.%s" % (self.server, self.database)
1900 sys.stderr.write(msg + '. See log.\n')
1901 Logger.addMessage("<Warning> %s. Reconnecting and resubmitting"
1902 " the last database statement..." % msg)
1903 self.goOffline()
1904 return self._executeScript(script, parameters, wantResult,
1905 wantAll, wantRowCount)
1906
1907 if persist and self._isDeadlocked:
1908
1909
1910 time.sleep(1)
1911 return self._executeScript(script, parameters, wantResult,
1912 wantAll, wantRowCount)
1913
1914 if isinstance(error, odbc.OperationalError):
1915 self._raiseWithScript(error, script)
1916 else:
1917 raise
1918
1919 except (odbc.ProgrammingError, odbc.IntegrityError) as error:
1920 self._raiseWithScript(error, script)
1921 else:
1922 self._isDeadlocked = False
1923
1924 numRows = max(cursor.rowcount, 0)
1925 if wantRowCount and not numRows:
1926
1927 numRows = max(self._executeScript("SELECT ROWCOUNT_BIG()",
1928 wantResult=True, wantAll=False)[0], 0)
1929
1930 self._isLogReq[self.database] = self._isLogReq[self.database] or \
1931 self._autoCommit and (not wantRowCount or numRows) and \
1932 not script.upper().startswith(("SELECT", "DBCC", "EXEC", "SET"))
1933
1934 if wantResult:
1935
1936 try:
1937 if wantAll:
1938 return cursor.fetchall()
1939 else:
1940
1941
1942
1943
1944
1945
1946
1947
1948
1949
1950
1951
1952
1953 try:
1954 return cursor.fetchall()[0]
1955 except IndexError:
1956 return
1957 except odbc.ProgrammingError as error:
1958 if "missing result set" in str(error):
1959
1960 return [] if wantAll else None
1961 else:
1962 raise
1963
1964 elif wantRowCount:
1965 return numRows
1966
1967
1968
1970 """
1971 Method to obtain cursor for current DbSession. If database connection
1972 is currently offline then an endless reconnect attempt is made.
1973
1974 @return: The cursor for the current DbSession.
1975 @rtype: Cursor
1976
1977 """
1978 connection, cursor = DbSession._dbConnections[self._dbSessionKey]
1979
1980
1981 while not connection:
1982 try:
1983 self._openConnection()
1984 except SystemExit:
1985 Logger.addMessage("<Warning> Cannot connect to database. "
1986 "Attempting reconnect in 10 minutes (Ctrl-C to quit)...")
1987 time.sleep(600)
1988 else:
1989 connection, cursor = \
1990 DbSession._dbConnections[self._dbSessionKey]
1991
1992 return cursor
1993
1994
1995
1997 """ Opens a database connection and initiates the cursor. """
1998 try:
1999
2000 pwFilePath = os.path.join(os.getenv("HOME"), ".load-server-access")
2001 password = file(pwFilePath).read().strip()
2002 params = (self.server, self.database, self.userName, password)
2003 connection = \
2004 odbc.DriverConnect("DSN=%s;Database=%s;UID=%s;PWD=%s" % params)
2005
2006 Logger.addMessage("Connected to %s.%s as %s" % params[:-1])
2007 if self._autoCommit:
2008 connection.setconnectoption(odbc.SQL.AUTOCOMMIT,
2009 odbc.SQL.AUTOCOMMIT_ON)
2010
2011 except odbc.OperationalError:
2012 raise SystemExit(
2013 "Cannot access database %s.%s. Probably connection is down."
2014 % (self.server, self.database))
2015 except odbc.ProgrammingError as error:
2016 if "Login failed for user" in str(error):
2017 raise SystemExit(
2018 "Access denied for username %r to database %s.%s" %
2019 (self.userName, self.server, self.database))
2020 elif "Cannot open database" in str(error):
2021 raise SystemExit(
2022 "Database %s.%s does not exist or access denied for "
2023 "user %s" % (self.server, self.database, self.userName))
2024 else:
2025 raise SystemExit("Cannot connect: %s" % error)
2026
2027 cursor = connection.cursor()
2028 if "mx.ODBC.unixODBC" in sys.modules:
2029
2030
2031 cursor.setconverter(lambda _position, sqlType, sqlLen:
2032 (sqlType, (sqlLen if sqlType != odbc.SQL.BIGINT else 0)))
2033
2034
2035 DbSession._dbConnections[self._dbSessionKey] = (connection, cursor)
2036
2037
2038
2040 """ Appends the SQL script to an mxODBC exception and raises it.
2041 """
2042 try:
2043 message = err.args[2].replace('[FreeTDS][SQL Server]', '')
2044 except (IndexError, AttributeError):
2045 message = str(err)
2046
2047 raise err.__class__(message + "\nScript executed: " + script)
2048
2049
2050
2052 """
2053 Decomposes a full table path to its individual components.
2054
2055 @param tablePath: Path to a table in the format "table" or
2056 "database.table" or "server.database..table" or
2057 "server.database.dbo.table"
2058 @type tablePath: str
2059
2060 @return: Server name, database name, table name from given path.
2061 @rtype: tuple(str, str, str)
2062
2063 """
2064 path = tablePath.split('.')
2065 tableName = path[-1]
2066 try:
2067 server = path[-4]
2068 except IndexError:
2069 server = self.server
2070 try:
2071 database = path[-3]
2072 except IndexError:
2073 try:
2074 database = path[-2]
2075 except IndexError:
2076 database = self.database
2077
2078 return server, database, tableName
2079
2080
2081
2083 """ @return: An SQL string of the correct format for the data type.
2084 @rtype: str
2085 """
2086 if value is None:
2087 return ""
2088 elif type(value) is str:
2089
2090
2091 if "'" in value:
2092 return repr(value).replace("\'", "''").replace('"', "'")
2093 else:
2094 return repr(value)
2095 else:
2096
2097 return str(value)
2098
2102 """
2103 A special type of database session, where you wish to ingest data into a
2104 database table from a native binary or CSV file. Upon initialisation this
2105 class verifies that the schemas of the database tables are up-to-date.
2106
2107 @note: Prefer binary files as they ingest faster than CSV files.
2108 @note: Prefer the ingest data to be pre-sorted by primary key and supply
2109 isOrdered=True to ingestTable() speed up ingest. Otherwise, if
2110 adding more data than already exists, e.g. overwriting the existing
2111 table or creating a new one, it may be faster to load the data
2112 without any constraints and then apply the constraints to the table
2113 after loading.
2114 @todo: Always parse indices on initialisation of Ingester. Can't drop
2115 indices at this stage as it should occur too soon. Then give
2116 ingestTable() a default option of dropIndices=True. Ingester can
2117 tell from the schema whether it should parse non-survey indices or
2118 not. Ingester itself could have a dropIndices option at
2119 initialisation with default of False, for IngIngester to overrule.
2120 """
2122 """ Exception thrown when the data cannot be ingested.
2123 """
2124 pass
2125
2126
2127
2128
2129 fileOnShare = None
2130
2131 fileTag = 'CuID000000'
2132
2133
2134
2135
2136
2137 _schema = None
2138
2139
2140
2141 - def __init__(self, dbCon, tableSchema, tag=fileTag, skipSchemaCheck=False,
2142 checkReleasedOnly=False):
2143 """
2144 Makes connection to the requested database, and checks that the schemas
2145 of the tables supplied in the table list are correct.
2146
2147 @param dbCon: A database connection.
2148 @type dbCon: DbSession
2149 @param tableSchema: Schema for tables to be ingested this session, as
2150 provided by L{Schema.parseTables()}.
2151 @type tableSchema: list(Schema.Table) or dict(str; Schema.Table)
2152 @param tag: A tag string to append to the file name on the
2153 share directory, so file clean up can be rm *tag*.
2154 @type tag: str
2155 @param skipSchemaCheck: If True, don't check the schema for
2156 mismatches - use with extreme caution!
2157 @type skipSchemaCheck: bool
2158 @param checkReleasedOnly: If True, only check the schema of released
2159 tables for mismatches.
2160 @type checkReleasedOnly: bool
2161
2162 """
2163 super(Ingester, self).__init__(
2164 database=dbCon.server + '.' + dbCon.database,
2165 autoCommit=dbCon._autoCommit,
2166 isTrialRun=dbCon.isTrialRun,
2167 userName=dbCon.userName,
2168 isPersistent=dbCon._isPersistent)
2169
2170 self.fileTag = tag
2171 self._schema = (dict((table.name, table) for table in tableSchema)
2172 if not isinstance(tableSchema, dict) else tableSchema)
2173
2174 if not skipSchemaCheck:
2175 Logger.addMessage("Checking database schema for ingest tables...")
2176 self.checkSchema(self._schema.values(), checkReleasedOnly)
2177
2178
2179
2180 - def ingestTable(self, tableName, filePathName, idxInfo=None,
2181 overWrite=False, isCsv=False, deleteFile=True,
2182 isOrdered=False, checkConstraints=True):
2183 """
2184 Ingest a binary or CSV flat file table into the database.
2185
2186 @note: If the table doesn't already exist in the database it will be
2187 created. However, it won't have foreign key constraints (this is a
2188 feature intended for release databases made by CU19). If you need
2189 foreign key constraints it is best to call createTable() prior to
2190 ingestTable(overWrite=False).
2191
2192 @todo: If not isOrdered and overWrite=True then we don't apply primary
2193 key until after ingest. OR even better - if not isOrdered try to create
2194 table without any constraints, if succeed, i.e. there is no existing
2195 table or overWrite=True, then apply primary key after ingest other
2196 don't but do check foreign keys. May make parts of
2197 NeighbourTableIngester redundant.
2198
2199 @param tableName: Name of table to create/update in the database.
2200 @type tableName: str
2201 @param filePathName: Name of the ingest file, with optional full
2202 catalogue server path or curation server path. If
2203 full path is omitted, then file is assumed to
2204 be located in the catalogue server's share
2205 directory. Otherwise if a different curation
2206 server path is supplied the file is automatically
2207 moved to the share (assuming the fileTag has been
2208 set upon Ingester() initialisation).
2209 @type filePathName: str
2210 @param idxInfo: If you wish to drop indices prior to ingest, then
2211 supply L{Schema.parseIndices()} information.
2212 @type idxInfo: defaultdict(str: list(Schema.Index))
2213 @param overWrite: If True, overwrite an existing table in the
2214 database. If False, update the table if it exists,
2215 otherwise create the table. NB: Whenever this
2216 method creates a table it does so without applying
2217 foreign key constraints.
2218 @type overWrite: bool
2219 @param isCsv: If True, expects to ingest a CSV ascii file, else
2220 expects native binary format.
2221 @type isCsv: bool
2222 @param deleteFile: If True, delete ingest file after ingest,
2223 regardless of outcome.
2224 @type deleteFile: bool
2225 @param isOrdered: If True, the data is ordered by primary key.
2226 @type isOrdered: bool
2227 @param checkConstraints: If True, check foreign key constraints during
2228 ingest if any exist, otherwise never check
2229 foreign key constraints.
2230 @type checkConstraints: bool
2231
2232 @return: Number of rows ingested.
2233 @rtype: int
2234
2235 """
2236 if not filePathName:
2237 return 0
2238
2239 if any(prefix in filePathName
2240 for prefix in["cu01id", "cu03id", "cu04id"]):
2241 fileName = self.fileOnShare = filePathName[:]
2242 ingHost, sharePath = filePathName.partition("samba/")[::2]
2243 filePathName = 2 * self.sysc.catServerFileSep \
2244 + ingHost.replace(os.sep, '').upper() \
2245 + self.sysc.catServerFileSep \
2246 + sharePath.replace(os.sep, self.sysc.catServerFileSep)
2247 else:
2248 fileName, filePathName = self._normalisePath(filePathName)
2249
2250 if not fileName:
2251 return 0
2252
2253 Logger.addMessage("Ingesting into %s..." % tableName)
2254 try:
2255 ingestSchema = self._schema[tableName]
2256 except KeyError:
2257 raise Exception(
2258 "<ERROR> Cannot ingest %s: Ingester was not initially supplied"
2259 " with a schema for table %s" % (fileName, tableName))
2260
2261 isNewTable = overWrite or not self.existsTable(ingestSchema.name)
2262 if isNewTable:
2263 isNewTable = self.createObjects([ingestSchema], overWrite)
2264
2265
2266 if not overWrite and idxInfo:
2267 self.dropObjects(idxInfo[tableName])
2268 self.commitTransaction()
2269
2270 hints = "firstrow=1"
2271 if isCsv:
2272 hints += ", datafiletype='char', fieldterminator=','"\
2273 ", rowterminator='0x0A'"
2274 else:
2275 hints += ", datafiletype='native', tablock"
2276
2277
2278
2279
2280 checkConstraints = checkConstraints and not isNewTable \
2281 and any(type(constraint) is schema.ForeignKeyConstraint
2282 for constraint in ingestSchema.constraints)
2283
2284 if checkConstraints:
2285 hints += ", check_constraints"
2286
2287 if isOrdered:
2288 hints += ", order(%s)" % self._schema[tableName].primaryKey()
2289
2290 sql = "BULK INSERT %s FROM '%s' WITH (%s)" \
2291 % (tableName, filePathName, hints)
2292
2293 try:
2294 try:
2295
2296 timeout = 0
2297 while True:
2298 try:
2299 numRowsIngested = \
2300 self._executeScript(sql, wantRowCount=True)
2301
2302 except odbc.DatabaseError as error:
2303 if "does not exist" not in str(error) or timeout == 600:
2304 raise
2305
2306 time.sleep(1)
2307 timeout += 1
2308 else:
2309 break
2310
2311 except odbc.ProgrammingError as error:
2312 if not str(error).startswith('Cannot fetch a row from OLE DB '
2313 'provider "BULK" for linked server "(null)".\nScript'):
2314 raise
2315
2316 msg = "Ingest file data does not match the schema of "
2317 msg += tableName
2318
2319 if not isCsv:
2320
2321 raise Ingester.IngestError(msg
2322 + ', ' + str(error).split("returned ")[-1]
2323 + ". Catalogue server file path: " + filePathName)
2324 else:
2325 Logger.addMessage("<WARNING> " + msg
2326 + ". Attemping ODBC insert...")
2327
2328
2329 numRowsIngested = 0
2330 for row in csv.File(self.sharePath(fileName)):
2331 fRow = [column.parseValue(val) for val, column
2332 in zip(row, self._schema[tableName].columns)]
2333 try:
2334 isIngested = self.insertData(tableName, fRow)
2335 except odbc.Error as error:
2336 raise Ingester.IngestError(error)
2337
2338 if not isIngested:
2339 Logger.addMessage("Insert of this row failed: "
2340 + ', '.join(map(str, fRow)))
2341
2342 numRowsIngested += isIngested
2343 Logger.addMessage("Row number = %s" % numRowsIngested,
2344 alwaysLog=False)
2345 else:
2346
2347 numRowsIngested = \
2348 max(self._executeScript("SELECT ROWCOUNT_BIG()",
2349 wantResult=True, wantAll=False)[0], 0)
2350
2351 except:
2352 if self.sysc.catServerSharePath in filePathName:
2353 Logger.addMessage("<Info> Ingest file left on file share: "
2354 + self.sharePath(fileName))
2355 else:
2356 Logger.addMessage("<Info> Ingest file left on catalogue "
2357 "server: " + filePathName)
2358 raise
2359
2360 Logger.addMessage("done! Rows affected = %s" % numRowsIngested)
2361
2362 self._removeIngestFile(fileName, filePathName, deleteFile)
2363
2364 return numRowsIngested
2365
2366
2367
2369 """
2370 Moves a file to the catalogue load server's share directory as mounted
2371 on the curation server. Works around the share violation problem under
2372 NFS: a file written to the curation-client / load-server NFS file share
2373 cannot be accessed immediately after being written unless it is first
2374 renamed!
2375
2376 @param filePathName: Path of file to copy over to the share directory.
2377 Must be a full path to the file visible from the
2378 curation server O/S.
2379 @type filePathName: str
2380
2381 @return: New file name in share directory, including any
2382 sub-directories, or None if original file does not exist.
2383 @rtype: str
2384
2385 """
2386 if not os.path.exists(filePathName):
2387 return
2388
2389 elif self.fileTag is Ingester.fileTag:
2390
2391
2392
2393
2394
2395 return filePathName.replace(self.sharePath(), '')
2396
2397 if not filePathName.startswith(self.sharePath()):
2398 shareDisk = self.sharePath().split(os.sep)[1]
2399 if filePathName.split(os.sep)[1] != shareDisk:
2400 Logger.addMessage(
2401 "Transferring ingest file to %s..." % shareDisk)
2402
2403 fileName = os.path.basename(filePathName)
2404 shutil.move(filePathName, self.sharePath(fileName))
2405 filePathName = self.sharePath(fileName)
2406
2407
2408 fileRoot, fileExt = os.path.splitext(filePathName)
2409 taggedPathName = fileRoot + self.fileTag + fileExt
2410 if self.fileTag not in fileRoot:
2411 os.rename(filePathName, taggedPathName)
2412 else:
2413 os.rename(filePathName, taggedPathName)
2414 os.rename(taggedPathName, filePathName)
2415 taggedPathName = filePathName
2416
2417 return taggedPathName.replace(self.sharePath(), '')
2418
2419
2420
2422 """
2423 Move ingest file to a place visible to the catalogue server O/S and
2424 normalise different filePathName inputs into the format:
2425
2426 filePathName = Full path to file from the catalogue server O/S e.g.
2427 H:\dir\subdir\filename.ext
2428
2429 fileName = filename.ext or subdir/filename.ext if file is in a
2430 subdirectory of the catalogue server share directory.
2431
2432 @param filePathName: Any acceptable file path definition allowed by
2433 the Ingester.ingestTable() method.
2434 @type filePathName: str
2435
2436 @return: fileName, filePathName (see description above).
2437 @rtype: tuple(str, str)
2438
2439 """
2440 if self.sharePath() in filePathName:
2441 curPathName = filePathName
2442
2443 elif self.sysc.catServerFileSep not in filePathName:
2444
2445 curPathName = (filePathName if os.path.dirname(filePathName) else
2446 self.sharePath(filePathName))
2447
2448 elif self.sysc.catServerSharePath in filePathName:
2449
2450
2451 curPathName = self.sharePath(
2452 filePathName.replace(self.sysc.catServerSharePath, '')
2453 .replace(self.sysc.catServerFileSep, os.sep))
2454
2455 else:
2456
2457 fileName = filePathName
2458 return fileName, filePathName
2459
2460 fileName = self._moveToShare(curPathName)
2461 if not fileName:
2462 return None, None
2463 else:
2464 filePathName = self.sysc.catServerSharePath \
2465 + fileName.replace(os.sep, self.sysc.catServerFileSep)
2466
2467 return fileName, filePathName
2468
2469
2470
2472 """
2473 Remove the ingest file.
2474
2475 @param fileName: Name and sub-dir of file in the curation server
2476 share path.
2477 @type fileName: str
2478 @param filePathName: Full path to file on the catalogue server.
2479 @type filePathName: str
2480 @param deleteFile: If false, then just provide a warning.
2481 @type deleteFile: bool
2482
2483 """
2484 if deleteFile and not self.isTrialRun:
2485 try:
2486 self.runOnServer("del " + filePathName)
2487 except DbSession.CatalogueServerError:
2488 if self.sysc.catServerSharePath not in filePathName:
2489 raise
2490
2491
2492 os.remove(self.sharePath(fileName))
2493 elif "cu03id" in filePathName or "cu04id" in filePathName:
2494 pass
2495 elif self.sysc.catServerSharePath not in filePathName:
2496 Logger.addMessage("<Info> Ingest file left on catalogue server: "
2497 + filePathName)
2498 elif deleteFile:
2499 Logger.addMessage("<Info> Ingest file left on file share: "
2500 + self.sharePath(fileName))
2501
2505 """
2506 A special type of database session, where you wish to bulk outgest data
2507 from the database to a file. This class ensures the connection has the
2508 correct permissions and is designed to work around the delays in the
2509 appearance of the data on the NFS mounted load server share directory. If
2510 the schema for tables that data will be outgested from is supplied, then
2511 upon initialisation this class verifies that the schemas of the database
2512 tables are up-to-date. Otherwise, the class will query the database
2513 schema directly, which is slower.
2514
2515 """
2516
2517
2518
2519 fileTag = 'CuID000000'
2520 """ Tag to append to outgest files so they can be cleaned-up later.
2521 """
2522 sampleRate = 2
2523 """ Delay in seconds between testing file size on NFS mounted share.
2524 """
2525 tempViewName = "OutgestTempView" + str(os.getpid()) + os.getenv("HOST")
2526 """ Name of temporary outgest view that's created if the SQL statement is
2527 too long for the BCP statement. Host PID is a safer UID than fileTag.
2528 """
2529 timeOut = 600
2530 """ Time in seconds before assuming outgest to NFS has failed.
2531 """
2532
2533
2534
2535 _isBcpDeadlocked = False
2536
2537
2538
2540 """
2541 Makes connection to the requested database. Supplied database
2542 connection must be under a username that has has bcp rights, e.g.
2543 ldservrw/pubdbrw. Can't do a SETUSER to make sure we are always
2544 ldservrw, because not all users have the permissions to perform a
2545 SETUSER - which kinda defeats the object. If user is not ldservrw or
2546 pubdbrw a new connection will be made as ldservrw (which is safe as
2547 Outgester only reads from the database anyway). In this circumstance,
2548 to prevent too many connections being made/dropped make sure to create
2549 just a single Outgester object, which is continually used throughout.
2550
2551 @param dbCon: A database connection.
2552 @type dbCon: DbSession
2553 @param tag: A tag string to append to the file name on the
2554 share directory, so file clean up can be rm *tag*.
2555 @type tag: str
2556 @param honourTrialRun: If True, don't outgest if the database
2557 connection is in trial run mode.
2558 @type honourTrialRun: bool
2559
2560 """
2561 super(Outgester, self).__init__(
2562 database=dbCon.server + '.' + dbCon.database,
2563 autoCommit=dbCon._autoCommit,
2564 isTrialRun=dbCon.isTrialRun if honourTrialRun else False,
2565 userName=(dbc.loadServerRwUsername()
2566 if dbCon.userName != "pubdbrw" else dbCon.userName),
2567 isPersistent=dbCon._isPersistent)
2568
2569 self._isDirty = dbCon._isDirty
2570 self.fileTag = tag
2571
2572
2573
2575 """ Returns the location of a file that outgest with this fileID.
2576 """
2577 return self.sharePath(fileID + self.fileTag + ".dat")
2578
2579
2580
2581 - def outgestQuery(self, query, fileID='', filePath='', createDtdFile=False,
2582 isCsv=False, redirectStdOut=False):
2583 """
2584 Bulk outgest the results from an SQL query to a file in the catalogue
2585 server share directory.
2586
2587 @note: If data is being copied by bulk outgest/ingest it helps to order
2588 by the primary key.
2589 @note: This function may hang for up to 10 minutes after outgest if the
2590 outgest failed silently (which can happen), whilst waiting for
2591 the NFS update to time out. I've chosen not to print a message
2592 saying the outgest is complete and that the function is waiting
2593 until time-out, because multiple outgests would display too
2594 much information, and there's no point making this message to
2595 occur in debug mode only because it would be obvious.
2596
2597 @param query: SQL query to outgest.
2598 @type query: SelectSQL
2599 @param fileID: Optionally request a specific unique identifier to be
2600 used in the name for the outgest file. The file will
2601 be named "[fileID][fileTag].dat". This is useful if
2602 multiple outgests from the same table need to be
2603 available at the same time.
2604 @type fileID: str
2605 @param filePath: Optionally outgest to a path on the catalogue server
2606 other than the share path (e.g. "G:\"), or transfer
2607 outgest file to a different path on the curation
2608 server. Either give full new path name for the file or
2609 else just provide the path to the new directory, but
2610 make sure that directory exists first.
2611 @type filePath: str
2612 @param createDtdFile: If True and the outgest is to a binary file on
2613 the share path, a file containing the data type
2614 definition of the outgest file will be created
2615 with the same name as the outgest file, but with
2616 a ".dtd" extension rather than ".dat".
2617 @type createDtdFile: bool
2618 @param isCsv: If True, outputs to CSV ascii file, else defaults
2619 to native binary format.
2620 @type isCsv: bool
2621 @param redirectStdOut: If True, the results of the outgest will be
2622 written to a file on the share path rather than
2623 returned through mxODBC. This is required to
2624 work around a mysterious bug in very large table
2625 outgests only and is slightly less efficient.
2626 @type redirectStdOut: bool
2627
2628 @return: Full path to the outgest file (unless createDtdFile requested,
2629 which returns the row count outgested). The path will be a
2630 curation server path if the file is outgest to the share
2631 directory, which is the default behaviour, otherwise a
2632 catalogue server path is returned.
2633 @rtype: str (or int)
2634
2635 """
2636 if len(query.tables) is 1:
2637 table = query.tables.values()[0]
2638 elif query.selectStr.count('.') is 1:
2639 table = query.tables[query.selectStr.split('.')[0]]
2640 else:
2641 tableList = query.tables.values()
2642 tableList.reverse()
2643 table = ''.join(tableList)
2644
2645 drive = ''
2646 if self.sysc.catServerFileSep in filePath:
2647 drive, filePath = filePath, drive
2648
2649 fileName = \
2650 os.path.basename(self.getFilePath(fileID or table.split('.')[-1]))
2651
2652 filePathName = (drive or self.sysc.catServerSharePath) + fileName
2653
2654 queryFmt = '"%s OPTION (MAXDOP 1)"'
2655 view = schema.View()
2656 if len(str(query)) < 1000:
2657 querySQL = (queryFmt % query).replace("'", "''")
2658 else:
2659 view.name = Outgester.tempViewName
2660
2661 view.definition = \
2662 str(SelectSQL(query.selectStr, query.fromStr, query.whereStr))
2663
2664
2665 order = ', '.join(col.split('.')[-1]
2666 for col in csv.values(query.orderByStr))
2667
2668 tablePath = (self.tablePath(view.name)
2669 if self.userName not in ["pubdbrw", "ldservro"] else
2670 self.tablePath(view.name, ownerName=self.userName))
2671
2672 querySQL = queryFmt % SelectSQL('*', tablePath, orderBy=order)
2673 self.createObjects([view], overWrite=True)
2674
2675 if self._isDirty:
2676 querySQL = querySQL.replace(" WHERE", " WITH (NOLOCK) WHERE")
2677
2678 try:
2679 cmd = "bcp %s queryout %s %s -T" % \
2680 (querySQL, filePathName, '-a' if isCsv else '-n')
2681
2682 while not self._outgestFile(cmd, redirectStdOut, view):
2683
2684 time.sleep(60)
2685
2686 finally:
2687 self._isBcpDeadlocked = False
2688 if view.definition:
2689 self.dropObjects([view])
2690
2691 if drive:
2692 if ':' in filePathName:
2693 filePathName = self.uncPath(filePathName)
2694 else:
2695 filePathName = self.sharePath(fileName)
2696
2697 if not self.isTrialRun:
2698 self._waitForNFS(filePathName, query)
2699
2700 if filePath:
2701 filePathName = self._transferFile(filePathName, filePath)
2702
2703 if not isCsv and createDtdFile and not self.isTrialRun:
2704 dataTypes = self._getDataTypes(query)
2705
2706
2707
2708 dtdPathName = filePathName.replace('.dat', '.dtd')
2709 if os.path.exists(dtdPathName):
2710 os.remove(dtdPathName)
2711
2712 file(dtdPathName, 'w').writelines(dataType + '\n'
2713 for dataType in dataTypes)
2714 os.chmod(dtdPathName, 0444)
2715
2716 rowSize = sum(self.sysc.sqlDataTypeSize[dataType]
2717 for dataType in dataTypes)
2718
2719 return os.path.getsize(filePathName) // rowSize
2720
2721 return filePathName
2722
2723
2724
2726 """ Returns ordered list of data types returned by the given query,
2727 either via examining the schema, if available, or by direct
2728 database table query.
2729
2730 @todo: Select * queries may not return correctly ordered data
2731 types. If so, and order matters, this can be acheived by
2732 another database query - which of course will slow the code
2733 down a bit.
2734 @todo: Simplify by just running a test query of one row and then
2735 inspecting the cursor like pySQL does? Similar to how
2736 DbSession.queryRowSize() works. This will render the above
2737 todo obsolete.
2738 """
2739 columns = \
2740 [column.split('.')[-1] for column in csv.values(query.selectStr)]
2741
2742
2743
2744 attrs = {}
2745 for table in query.tables.values():
2746 attrs.update(self.queryDataTypes(table))
2747
2748 if columns == ['*']:
2749 if '.' not in query.selectStr:
2750 columns = attrs.keys()
2751 else:
2752 alias = query.selectStr.split('.')[0]
2753 table = query.tables[alias].split('.')[-1]
2754 columns = self.queryColumnNames(table)
2755
2756 return [attrs[column] for column in columns]
2757
2758
2759
2761 """
2762 Performs outgest to file, ensuring it exists.
2763
2764 @return: True, if outgest file successfully created.
2765 @rtype: bool
2766
2767 """
2768 if redirectStdOut:
2769
2770
2771 resultFile = "G:" + self.sysc.catServerFileSep + self.database \
2772 + self.fileTag + "_outgest.log"
2773
2774 cmd += " -o " + resultFile
2775
2776 resultSet = self.runOnServer(cmd)
2777
2778 if redirectStdOut and not self.isTrialRun:
2779
2780
2781
2782 resultSet = self.runOnServer("type " + resultFile)
2783 self.runOnServer("del " + resultFile)
2784
2785 if not self.isTrialRun and (len(resultSet) < 3
2786 or "rows copied" not in resultSet[-3]):
2787
2788 isDeadlock = \
2789 any("deadlock victim" in result for result in resultSet)
2790
2791 if not self._isBcpDeadlocked and isDeadlock:
2792 Logger.addMessage("Outgest delayed due to deadlock...")
2793 self._isBcpDeadlocked = isDeadlock
2794
2795 if not isDeadlock:
2796 msg = "BCP Error:\n"
2797 msg += ('\n'.join(resultSet) if resultSet else
2798 "Outgest failed. BCP gave no error.")
2799
2800 msg += "\nCommand executed: " + cmd
2801 msg += ('' if not view.definition else
2802 "\n%s = %s" % (view.name, view.definition))
2803
2804 raise odbc.DatabaseError(msg)
2805
2806 return not self._isBcpDeadlocked
2807
2808
2809
2811 """ Transfers given file from the share path to the given path on the
2812 curation server.
2813 """
2814 if not os.path.isdir(filePath):
2815 newFileName = os.path.basename(filePath)
2816 newFilePathName = filePath
2817 else:
2818 newFileName = os.path.basename(filePathName)
2819 newFilePathName = os.path.join(filePath, newFileName)
2820
2821 if self.isTrialRun or newFilePathName == filePathName:
2822 return newFilePathName
2823
2824 if filePath.split(os.sep)[1] != filePathName.split(os.sep)[1]:
2825 Logger.addMessage("Transferring outgest file to %s..."
2826 % self.sysc.getServerName(filePath))
2827
2828 if os.path.exists(newFilePathName):
2829 Logger.addMessage("<Info> File %s already exists, will be "
2830 "overwritten." % newFileName)
2831 os.remove(newFilePathName)
2832
2833 os.system('cp %s %s' % (filePathName, newFilePathName))
2834
2835 if (os.path.getsize(filePathName) != os.path.getsize(newFilePathName)):
2836 os.remove(filePathName)
2837 raise Exception("Transfer from %s to %s failed"
2838 % (self.server, self.sysc.getServerName(filePath)))
2839
2840 os.remove(filePathName)
2841
2842 return newFilePathName
2843
2844
2845
2847 """ Wait until the file on the NFS share is ready to be accessed.
2848 """
2849 curTime = time.time()
2850 fileName = os.path.basename(filePathName)
2851 cmd = "dir " + self.sysc.catServerSharePath + fileName
2852 dirResults = [result.split()[2] for result in self.runOnServer(cmd)
2853 if result and fileName in result]
2854 try:
2855 tableFileSize = int(dirResults[0].replace(',', ''))
2856 except IndexError:
2857 raise odbc.OperationalError(
2858 "SQL outgest query to file %s has failed.\n"
2859 "Query executed: %s" % (filePathName, query))
2860
2861 while (not os.path.exists(filePathName) or
2862 os.path.getsize(filePathName) != tableFileSize):
2863
2864 time.sleep(Outgester.sampleRate)
2865
2866 if time.time() - curTime > Outgester.timeOut:
2867 sizeStr = ("Non-existent" if not os.path.exists(filePathName)
2868 else str(os.path.getsize(filePathName)))
2869 raise odbc.OperationalError(
2870 "SQL outgest query to file %s has failed.\n"
2871 "Expected file size = %s\n"
2872 "Actual size = %s\n"
2873 "Query executed: %s" %
2874 (filePathName, tableFileSize, sizeStr, query))
2875
2879 """
2880 Defines a database object, and provides method to create and modify
2881 databases through administrative database connections. First, define with
2882 L{Database.__init__()}, then create the database with L{Database.create()}.
2883
2884 @group Nested Error Exceptions: ExistsError
2885
2886 """
2888 """ Exception thrown when a database being created already exists.
2889 """
2890 msg = "Database %r already exists."
2891
2897
2898
2899
2900
2901 files = None
2902 name = ""
2903
2904
2905
2906
2907 _folders = []
2908 _sqlLine = ""
2909
2910
2911
2915 """
2916 Defines a database object.
2917
2918 @param name: Name of database to create.
2919 @type name: str
2920 @param volumes: List of volumes to spread database files over.
2921 @type volumes: str
2922 @param dbDir: Directory name on catalogue server to store
2923 database. This value is overriden for multi-file
2924 databases, which have the directory name as the
2925 database name, otherwise assumes non-survey folder.
2926 @type dbDir: str
2927 @param primarySize: Total initial size of files in the primary file
2928 group, as a string, e.g. "4 GB".
2929 @type primarySize: str
2930 @param logSize: Total initial size of files in the log file group,
2931 as a string, e.g. "4 GB".
2932 @type logSize: str
2933 @param filegroups: List of additional file group names and sizes, e.g.
2934 ("Detection", "4 GB").
2935 @type filegroups: list(tuple(str, str))
2936 @param filegrowth: File growth, e.g. "10%".
2937 @type filegrowth: str
2938
2939 """
2940 self.name = name
2941 self._sqlLine = "CREATE DATABASE %s ON PRIMARY " % self.name
2942 dbDir = (self.name if filegroups else
2943 dbDir or SystemConstants(name).nonSurveyDir)
2944
2945 dbDir += SystemConstants.catServerFileSep
2946 self._folders = [volume + dbDir for volume in volumes]
2947
2948 fgLine = "(name=%s, filename='%s', size=%s, maxsize=UNLIMITED, "
2949 fgLine += "filegrowth=%s)," % filegrowth.replace('%', '%%')
2950 self.files = []
2951
2952
2953 for count, volume in enumerate(volumes):
2954 if len(volumes) is 1:
2955 fgName = "Primary_FG"
2956 else:
2957 fgName = "pfg_%s" % (count + 1)
2958
2959 if filegroups:
2960 filePathName = volume + dbDir + \
2961 "PrimaryFileGroup%s.mdf" % (count + 1)
2962 else:
2963 filePathName = volume + dbDir + self.name + ".mdf"
2964
2965 self.files.append(filePathName)
2966 self._sqlLine += fgLine % (fgName, filePathName,
2967 self._splitSize(primarySize, len(volumes)))
2968
2969
2970 for filegroup, size in filegroups:
2971 self._sqlLine += " filegroup %s_FG " % filegroup
2972 if len(volumes) is 1:
2973 fgName = "%s_FG" % filegroup
2974 else:
2975 fgName = ''.join(takewhile(lambda c: c.islower(), filegroup))
2976 fgName += filegroup[len(fgName)].lower() + "fg_%s"
2977
2978 for count, volume in enumerate(volumes):
2979
2980
2981
2982 filePathName = volume + dbDir + filegroup + "%s.ndf" % (count + 1)
2983 self.files.append(filePathName)
2984 fgVolName = \
2985 (fgName % (count + 1) if len(volumes) > 1 else fgName)
2986
2987 self._sqlLine += fgLine % (fgVolName, filePathName,
2988 self._splitSize(size, len(volumes)))
2989
2990
2991 self._sqlLine = self._sqlLine.rstrip(',') + " LOG ON "
2992 for count, volume in enumerate(volumes):
2993 if len(volumes) is 1:
2994 fgName = "Log_FG"
2995 else:
2996 fgName = "lfg_%s" % (count + 1)
2997
2998 if filegroups:
2999 filePathName = volume + dbDir + \
3000 "LogFileGroup_data%s.ldf" % (count + 1)
3001 else:
3002 filePathName = volume + dbDir + self.name + ".ldf"
3003
3004 self.files.append(filePathName)
3005 self._sqlLine += fgLine % (fgName, filePathName,
3006 self._splitSize(logSize, len(volumes)))
3007
3008 self._sqlLine = self._sqlLine.rstrip(',')
3009
3010
3011
3013 """
3014 @return: Database name.
3015 @rtype: str
3016
3017 """
3018 return self.name
3019
3020
3021
3022 - def attach(self, adminDb, asName=None):
3023 """
3024 Re-attach the offline database that was previously detached. The
3025 current implementation is tied to MS SQL Server via T-SQL.
3026
3027 @param adminDb: An autocommiting admin (i.e. master) database
3028 connection to the server where the database will be
3029 attached.
3030 @type adminDb: DbSession
3031 @param asName: Optional alternative name to attach the database as,
3032 instead of its real name (used in the file names).
3033 @type asName: str
3034
3035 """
3036 sql = self._sqlLine + " FOR ATTACH"
3037 name = self.name
3038 if asName:
3039 sql = sql.replace("CREATE DATABASE %s" % name,
3040 "CREATE DATABASE %s" % asName)
3041 name = asName
3042
3043 try:
3044 adminDb._executeScript(sql)
3045 except odbc.ProgrammingError as error:
3046 if Database.ExistsError.msg % name in str(error):
3047 raise Database.ExistsError(name)
3048 try:
3049
3050
3051
3052 sql = "sp_attach_db '%s', '%s'" % (name, self.files[0])
3053 adminDb._executeScript(sql)
3054 except:
3055 raise error
3056
3057
3058 self.name = name
3059 self.setDefaultOptions(adminDb)
3060
3061
3062
3063 - def create(self, adminDb, overWrite=False):
3064 """
3065 Creates this database on a specified server.
3066
3067 @param adminDb: An autocommiting admin (i.e. master) database
3068 connection to the server where the database will be
3069 created.
3070 @type adminDb: DbSession
3071 @param overWrite: If True, overwrite any pre-existing database of this
3072 name.
3073 @type overWrite: bool
3074
3075 """
3076 self._makeFolders(adminDb)
3077
3078
3079 try:
3080 adminDb._executeScript(self._sqlLine)
3081 except odbc.ProgrammingError as error:
3082 if overWrite and (
3083 "file names listed could not be created" in str(error)
3084 or Database.ExistsError.msg % self.name in str(error)):
3085
3086 self.delete(adminDb)
3087 self._makeFolders(adminDb)
3088
3089 adminDb._executeScript(self._sqlLine)
3090 elif Database.ExistsError.msg % self.name in str(error):
3091 raise Database.ExistsError(self.name)
3092 else:
3093 raise
3094
3095 self.setDefaultOptions(adminDb)
3096
3097
3098
3100 """
3101 Delete this database on a specified server.
3102
3103 @param adminDb: An autocommiting admin (i.e. master) database
3104 connection to the server where the database will be
3105 deleted.
3106 @type adminDb: DbSession
3107
3108 """
3109
3110 numAttempts = 20
3111 delay = 600
3112 for attempt in range(numAttempts):
3113 try:
3114 self.detach(adminDb)
3115 except odbc.ProgrammingError as error:
3116 if "does not exist" in str(error):
3117 break
3118 else:
3119 msg = "<Info> Cannot delete existing database because it "\
3120 "is in use. Attempt %s of %s." % (attempt + 1, numAttempts)
3121 if attempt + 1 == numAttempts:
3122 msg += " Giving up..."
3123 Logger.addMessage(msg)
3124 raise
3125 else:
3126 msg += " Waiting for %d min..." % (delay / 60)
3127 Logger.addMessage(msg)
3128 time.sleep(delay)
3129 else:
3130 break
3131
3132
3133 for filePathName in self.files:
3134 adminDb.runOnServer("del " + filePathName)
3135 adminDb._isLogReq[adminDb.database] = True
3136
3137
3138 for folder in self._folders:
3139 adminDb.runOnServer("rmdir " + folder)
3140
3141
3142
3144 """
3145 Detach (take offline) the database. The current implementation is tied
3146 to MS SQL Server via T-SQL.
3147
3148 @param adminDb: An autocommiting admin (i.e. master) database
3149 connection to the server where the database will be
3150 detached.
3151 @type adminDb: DbSession
3152
3153 """
3154 adminDb._executeScript("EXEC sp_detach_db %r" % self.name)
3155
3156
3157
3159 """
3160 Returns the corresponding mirror server for the given server.
3161
3162 @param server: Name of server to find mirror of.
3163 @type server: str
3164
3165 @return: Name of mirror server.
3166 @rtype: str
3167
3168 """
3169
3170 try:
3171 originalNum = int(server.replace(SystemConstants.clusterName, ''))
3172 except ValueError:
3173 raise Exception("Database.mirror() can only be performed on "
3174 "cluster servers")
3175
3176 mirrorNum = 1 + (originalNum - 1 ^ 1)
3177 return SystemConstants.clusterName + str(mirrorNum)
3178
3179
3180
3182 """
3183 Copy the database to the mirror server on the cluster.
3184
3185 @param adminDb: An autocommiting admin (i.e. master) database
3186 connection to the server which has the database to be
3187 copied.
3188 @type adminDb: DbSession
3189
3190 @return: Mirror server name.
3191 @rtype: str
3192
3193 """
3194 mirrorServer = self.getMirrorServer(adminDb.server)
3195
3196
3197 try:
3198 self.detach(adminDb)
3199 except odbc.DatabaseError as error:
3200 Logger.addExceptionWarning(error)
3201 Logger.addMessage("<WARNING> Cannot mirror database %s to %s as it"
3202 " needs to be detached from %s first."
3203 % (self.name, mirrorServer, adminDb.server))
3204
3205 self.setReleaseOptions(adminDb)
3206 else:
3207 try:
3208 mirrorDb = DbSession(
3209 database=mirrorServer + '.' + adminDb.database,
3210 autoCommit=adminDb._autoCommit,
3211 isTrialRun=adminDb.isTrialRun, userName=adminDb.userName,
3212 isPersistent=adminDb._isPersistent)
3213
3214
3215 self.delete(mirrorDb)
3216 self._makeFolders(mirrorDb)
3217 for filePathName in self.files:
3218 mirrorFile = mirrorDb.uncPath(filePathName)
3219 adminDb.runOnServer("copy %s %s"
3220 % (filePathName, mirrorFile))
3221
3222 self.attach(mirrorDb)
3223 self.setReleaseOptions(mirrorDb)
3224 finally:
3225 self.attach(adminDb)
3226 self.setReleaseOptions(adminDb)
3227
3228 return mirrorServer
3229
3230
3231
3232 - def release(self, adminDb, pubServer):
3233 """
3234 Copy the database to the given public server on the cluster.
3235
3236 @param adminDb: An autocommiting admin (i.e. master) database
3237 connection to the server which has the database to be
3238 copied.
3239 @type adminDb: DbSession
3240 @param pubServer: Name of server to copy to.
3241 @type pubServer: str
3242
3243 @todo: Merge with mirror somehow.
3244
3245 """
3246 pubVolumes = SystemConstants.catServerVolumes(pubServer)
3247
3248
3249 origVolumes = SystemConstants.catServerVolumes(adminDb.server)
3250 origFolders = self._folders[:]
3251 origFiles = self.files[:]
3252 origSqlLine = self._sqlLine
3253
3254
3255 try:
3256 self.detach(adminDb)
3257 except odbc.DatabaseError as error:
3258 Logger.addExceptionWarning(error)
3259 Logger.addMessage("<WARNING> Cannot copy database %s to %s as it "
3260 "needs to be detached from %s first."
3261 % (self.name, pubServer, adminDb.server))
3262 else:
3263 try:
3264
3265 for volume, pubVolume in zip(origVolumes, pubVolumes):
3266 self._sqlLine = self._sqlLine.replace(volume, pubVolume)
3267 self._folders = [path.replace(volume, pubVolume)
3268 for path in self._folders]
3269
3270 self.files = [path.replace(volume, pubVolume)
3271 for path in self.files]
3272
3273 pubDb = DbSession(
3274 database=pubServer + '.' + adminDb.database,
3275 autoCommit=adminDb._autoCommit,
3276 isTrialRun=adminDb.isTrialRun, userName=adminDb.userName,
3277 isPersistent=adminDb._isPersistent)
3278
3279
3280 self.delete(pubDb)
3281 self._makeFolders(pubDb)
3282 for origFile, pubFile in zip(origFiles, self.files):
3283 pubFile = pubDb.uncPath(pubFile)
3284 adminDb.runOnServer("copy %s %s" % (origFile, pubFile))
3285
3286 self.attach(pubDb)
3287 self.setReleaseOptions(pubDb)
3288 finally:
3289
3290 self._folders = origFolders[:]
3291 self.files = origFiles[:]
3292 self._sqlLine = origSqlLine
3293 self.attach(adminDb)
3294
3295
3296
3298 """
3299 Set MS SQL Server database options appropriate for bulk loading, over
3300 the load database default options.
3301
3302 @param adminDb: An autocommiting admin (i.e. master) database
3303 connection to the server where the database will be
3304 altered.
3305 @type adminDb: DbSession
3306
3307 """
3308 for option in ["AUTO_CREATE_STATISTICS OFF", "RECOVERY SIMPLE"]:
3309 adminDb._executeScript("ALTER DATABASE %s SET %s"
3310 % (self.name, option))
3311
3312
3313
3315 """
3316 Set the MS SQL Server database options that are appropriate for a
3317 load database over the MS SQL server default options.
3318
3319 @param adminDb: An autocommiting admin (i.e. master) database
3320 connection to the server where the database will be
3321 altered.
3322 @type adminDb: DbSession
3323
3324 """
3325 for option in ["AUTO_CREATE_STATISTICS ON",
3326 "RECOVERY BULK_LOGGED",
3327 "AUTO_SHRINK OFF",
3328 "TORN_PAGE_DETECTION ON",
3329 "AUTO_CLOSE OFF"]:
3330 adminDb._executeScript("ALTER DATABASE %s SET %s"
3331 % (self.name, option))
3332
3333
3334
3336 """
3337 Set MS SQL Server database options appropriate for a release db over
3338 the default load database options.
3339
3340 @param adminDb: An autocommiting admin (i.e. master) database
3341 connection to the server where the database will be
3342 altered.
3343 @type adminDb: DbSession
3344
3345 """
3346 for option in ["AUTO_CREATE_STATISTICS ON", "RECOVERY SIMPLE"]:
3347 adminDb._executeScript("ALTER DATABASE %s SET %s"
3348 % (self.name, option))
3349
3350
3351
3353 """
3354 Creates the folders for the database files.
3355
3356 @param adminDb: An autocommiting admin (i.e. master) database
3357 connection to the server where the database will be
3358 created.
3359 @type adminDb: DbSession
3360
3361 """
3362
3363 for folder in self._folders:
3364 adminDb.runOnServer("mkdir " + folder)
3365
3366
3367
3369 """
3370 Splits a size into a number of divisions, converting between GB and
3371 MB when necessary.
3372
3373 @param size: Initial size in GB or MB, e.g. 10 GB.
3374 @type size: str
3375 @param divisions: Number of divisions size must be divide between.
3376 @type divisions: int
3377
3378 @return: Divided integer size in either GB or MB.
3379 @rtype: str
3380
3381 """
3382 units = size[-2:]
3383 newSize = float(size[:-2].strip()) / divisions
3384 if newSize < 1.0:
3385 if units != "MB":
3386 newSize *= 1000.0
3387 units = "MB"
3388 else:
3389 newSize = 1.0
3390
3391 return "%s%s" % (int(newSize), units)
3392
3393
3394
3395 -class Join(object):
3396 """
3397 Defines a database inner join between two or more tables. Use as the table
3398 argument value in L{DbSession.query()} type methods or when initialising
3399 an L{SelectSQL} object.
3400
3401 """
3402
3403 aliases = None
3404 joinCols = ''
3405 fromStr = ''
3406 whereStr = ''
3407
3408 - def __init__(self, tables, attributes, subJoin=None):
3409 """
3410 Defines an SQL inner join between given tables on given attributes.
3411
3412 @param tables: Unique set of all tables to be joined on same
3413 attribute set. Either just supply table names, or
3414 a tuple containing table name and alias, e.g.
3415 ["Multiframe", "ProgrammeFrame"] or
3416 [("Multiframe", "M1"), ("Multiframe", "M2")].
3417 @type tables: sequence(str) or sequence(tuple)
3418 @param attributes: Unique attribute set on which to join. If a single
3419 join column is required then just a string will do.
3420 @type attributes: sequence(str) or str
3421 @param subJoin: Join additional tables on just the given attribute.
3422 @type subJoin: tuple(list(str), str)
3423
3424 """
3425 if len(tables) < 2:
3426 raise ValueError("Join class must be supplied with at least two "
3427 "tables: tables = %s" % tables)
3428
3429 firstTable = self._parseInput(tables, attributes)
3430
3431 if subJoin:
3432 subTables, subCol = subJoin
3433 if isinstance(subTables, str):
3434 subTables = [subTables]
3435
3436 self.fromStr += ", " + ", ".join(subTables)
3437 self.whereStr += " AND " + " AND ".join("%s.%s=%s.%s"
3438 % (firstTable, subCol, table.split('.')[-1], subCol)
3439 for table in subTables)
3440
3441
3442
3444 """
3445 @return: The SQL FROM clause for this join.
3446 @rtype: str
3447
3448 """
3449 return self.fromStr
3450
3451
3452
3486
3487
3488
3489 - def _setSQL(self, tables, predicates):
3490 """
3491 Sets the FROM and WHERE SQL statement member variables for this join.
3492
3493 """
3494 self.whereStr = " AND ".join(predicates)
3495 self.fromStr = ", ".join((table if not isinstance(table, tuple) else
3496 "%s AS %s" % table) for table in tables)
3497
3501 """
3502 Defines a database left outer join between two tables. Use as the table
3503 argument value in L{DbSession.query()} type methods or when initialising
3504 an L{SelectSQL} object.
3505
3506 """
3507 - def __init__(self, tables, attributes):
3508 """
3509 Defines an SQL left outer join between given tables on given attributes.
3510
3511 @param tables: Pair of tables to be left-joined on same attribute
3512 set. Either just supply table names, or a tuple
3513 containing table name and alias, e.g.
3514 ["Multiframe", "ProgrammeFrame"] or
3515 [("Multiframe", "M1"), ("Multiframe", "M2")].
3516 @type tables: sequence(str) or sequence(tuple)
3517 @param attributes: Unique attribute set on which to join. If a single
3518 join column is required then just a string will do.
3519 @type attributes: sequence(str) or str
3520
3521 """
3522 if len(tables) != 2:
3523 raise ValueError("LeftJoin class must be supplied with exactly two"
3524 " tables: tables = %s" % tables)
3525
3526 self._parseInput(tables, attributes)
3527
3528
3529
3530 - def _setSQL(self, tables, predicates):
3531 """
3532 Sets the FROM and WHERE SQL statement member variables for this join.
3533
3534 """
3535 self.fromStr = " LEFT OUTER JOIN ".join(
3536 (table if not isinstance(table, tuple) else "%s AS %s" % table)
3537 for table in tables) + " ON " + " AND ".join(predicates)
3538
3542 """
3543 Defines a database left outer join between two tables on the first table's
3544 primary key. Use as the table argument value in L{DbSession.query()} type
3545 methods or when initialising an L{SelectSQL} object.
3546
3547 """
3548 - def __init__(self, leftTable, rightTable):
3549 """
3550 Defines an SQL left outer join between given tables on the first
3551 table's primary key.
3552
3553 @param leftTable: First table in the left join.
3554 @type leftTable: Schema.Table
3555 @param rightTable: Second table in the left join.
3556 @type rightTable: Schema.Table
3557
3558 """
3559 tables = [leftTable.name, rightTable.name]
3560 self._parseInput(tables, leftTable.primaryKey())
3561
3565 """
3566 Defines an SQL select statement. Initialise with components of an SQL query
3567 and then the SQL select line is returned when you convert the object to a
3568 string. Primarily used as part of the WHERE clause of another SQL
3569 statement.
3570
3571 """
3572
3573 tables = None
3574 selectStr = ''
3575 fromStr = ''
3576 whereStr = ''
3577 orderByStr = ''
3578 groupByStr = ''
3579
3580
3581 _sql = ''
3582
3583 - def __init__(self, select, table, where='', orderBy='', groupBy=''):
3638
3639
3640
3642 """ @return: The SQL select statement line.
3643 @rtype: str
3644 """
3645 return self._sql
3646
3647
3648
3649 @staticmethod
3651 """ Replaces tables derived from a sub-query in the SQL script's FROM
3652 string, so that the class can parse the information correctly.
3653 """
3654 nOpenBra = 0
3655 nCloseBra = 0
3656 isDerivedTable = False
3657 startPos = 0
3658 derivedTableDict = {}
3659 for chrNo in range(len(fromStr)):
3660 if fromStr[chrNo] == '(':
3661 nOpenBra += 1
3662
3663 if fromStr[chrNo] == ')':
3664 nCloseBra += 1
3665
3666 if nOpenBra > nCloseBra:
3667 if not isDerivedTable:
3668 startPos = chrNo
3669 isDerivedTable = True
3670
3671 if nCloseBra == nOpenBra:
3672 if isDerivedTable:
3673 derivedTableNo = len(derivedTableDict) + 1
3674 derivedTableDict['@derivedTable%s' % derivedTableNo] = \
3675 fromStr[startPos:chrNo + 1]
3676
3677 isDerivedTable = False
3678
3679 for dTable in derivedTableDict:
3680 fromStr = fromStr.replace(derivedTableDict[dTable], dTable)
3681
3682 return derivedTableDict, fromStr
3683
3684
3685
3686 -def bulkCopy(query, tableSchema, fromDb, toDb, fileTag, drive='',
3687 isSmallTable=True):
3688 """
3689 Copies table data via a bulk outgest and ingest to another table, which
3690 can be in a different database and on a different server. Destination table
3691 will be automatically created if it doesn't already exist.
3692
3693 @param query: Query of data selected to copy via outgest. Table names
3694 must be given with full path. Selection must be ordered
3695 by the ingest table's primary key in ascending order.
3696 @type query: SelectSQL
3697 @param tableSchema: Schema of the ingest table.
3698 @type tableSchema: schema.Table
3699 @param fromDb: Connection to the database where outgest will occur.
3700 @type fromDb: DbSession
3701 @param toDb: Connection to the database where ingest will occur.
3702 @type toDb: DbSession
3703 @param fileTag: Unique tag to attach to temporary outgest/ingest files.
3704 @type fileTag: str
3705 @param drive: Optionally outgest to a different catalogue server
3706 drive, instead of the share directory, e.g. "G:\".
3707 @type drive: str
3708 @param isSmallTable: If False, then stdout is redirected to file to work
3709 around a mystery bug in very large table outgests,
3710 which is inefficient for small tables.
3711 @type isSmallTable: bool
3712
3713 @return: Number of rows ingested.
3714 @rtype: int
3715
3716 @warning: Make sure the ingest table schema has been checked first.
3717
3718 @todo: Merge Outgester and Ingester classes into one BulkTransfer class
3719 with individual outgest, ingest and copy functions? Then we can
3720 handle the tableSchema initialisation a bit more neatly, and put
3721 fileTag into the initialiser. Then codes can choose whether or not
3722 to pre-initialise.
3723 """
3724 outPathName = \
3725 Outgester(fromDb, fileTag, honourTrialRun=True).outgestQuery(query,
3726 filePath=drive or toDb.sharePath(),
3727 redirectStdOut=not isSmallTable)
3728
3729
3730
3731
3732
3733
3734
3735
3736
3737
3738
3739
3740
3741
3742
3743
3744
3745
3746 ingester = Ingester(toDb, [tableSchema], tag=fileTag, skipSchemaCheck=True)
3747 return ingester.ingestTable(tableSchema.name, outPathName, isOrdered=True)
3748
3749
3750
3751
3752
3753